Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ee9d97e9 authored by JohnLai's avatar JohnLai
Browse files

Floss: Fork Floss D-Bus Clients from autotest_lib

Copied the following clients and utils from autotest_lib:
* adapter_client
* advertising_client
* scanner_client
* manager_client
* floss_enums
* observer_base

Changes made afterwards:
*Move all utils functions into one utils file.
*Remove qa_legacy_proxy calls from adapter_client.
*Lint the forked code as Google Python style.

Bug: 289480188
Test: mma packages/modules/Bluetooth
Tag: #floss
Change-Id: I3d32b3361b59255412bef9787a5258f73a6d28a9
parent 9230d25a
Loading
Loading
Loading
Loading
+0 −0

Empty file added.

+767 −0

File added.

Preview size limit exceeded, changes collapsed.

+836 −0

File added.

Preview size limit exceeded, changes collapsed.

+157 −0
Original line number Diff line number Diff line
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Class to hold the Floss enums."""

import enum


class BtTransport(enum.IntEnum):
    """Bluetooth transport type."""
    AUTO = 0
    BR_EDR = 1
    LE = 2


class GattWriteRequestStatus(enum.IntEnum):
    """Gatt write request status."""
    SUCCESS = 0
    FAIL = 1
    BUSY = 2


class GattWriteType(enum.IntEnum):
    """GATT write type."""
    INVALID = 0
    WRITE_NO_RSP = 1
    WRITE = 2
    WRITE_PREPARE = 3


class LePhy(enum.IntEnum):
    """Bluetooth LE physical type."""
    INVALID = 0
    PHY1M = 1
    PHY2M = 2
    PHY_CODED = 3


class GattStatus(enum.IntEnum):
    """Bluetooth GATT return status."""
    SUCCESS = 0x00
    INVALID_HANDLE = 0x01
    READ_NOT_PERMIT = 0x02
    WRITE_NOT_PERMIT = 0x03
    INVALID_PDU = 0x04
    INSUF_AUTHENTICATION = 0x05
    REQ_NOT_SUPPORTED = 0x06
    INVALID_OFFSET = 0x07
    INSUF_AUTHORIZATION = 0x08
    PREPARE_Q_FULL = 0x09
    NOT_FOUND = 0x0A
    NOT_LONG = 0x0B
    INSUF_KEY_SIZE = 0x0C
    INVALID_ATTRLEN = 0x0D
    ERR_UNLIKELY = 0x0E
    INSUF_ENCRYPTION = 0x0F
    UNSUPPORT_GRP_TYPE = 0x10
    INSUF_RESOURCE = 0x11
    DATABASE_OUT_OF_SYNC = 0x12
    VALUE_NOT_ALLOWED = 0x13
    ILLEGAL_PARAMETER = 0x87
    TOO_SHORT = 0x7F
    NO_RESOURCES = 0x80
    INTERNAL_ERROR = 0x81
    WRONG_STATE = 0x82
    DB_FULL = 0x83
    BUSY = 0x84
    ERROR = 0x85
    CMD_STARTED = 0x86
    PENDING = 0x88
    AUTH_FAIL = 0x89
    MORE = 0x8A
    INVALID_CFG = 0x8B
    SERVICE_STARTED = 0x8C
    ENCRYPTED_NO_MITM = 0x8D
    NOT_ENCRYPTED = 0x8E
    CONGESTED = 0x8F
    DUP_REG = 0x90
    ALREADY_OPEN = 0x91
    CANCEL = 0x92


class BtStatus(enum.IntEnum):
    """Bluetooth return status."""
    SUCCESS = 0
    FAIL = 1
    NOT_READY = 2
    NO_MEMORY = 3
    BUSY = 4
    DONE = 5
    UNSUPPORTED = 6
    INVALID_PARAM = 7
    UNHANDLED = 8
    AUTH_FAILURE = 9
    REMOTE_DEVICE_DOWN = 10
    AUTH_REJECTED = 11
    JNI_ENVIRONMENT_ERROR = 12
    JNI_THREAD_ATTACH_ERROR = 13
    WAKE_LOCK_ERROR = 14


class SocketType(enum.IntEnum):
    """Socket types."""
    GT_SOCK_ANY = 0
    GT_SOCK_STREAM = 1
    GT_SOCK_DGRAM = 2
    GT_SOCK_RAW = 3
    GT_SOCK_RDM = 4
    GT_SOCK_SEQPACKET = 5
    GT_SOCK_DCCP = 6
    GT_SOCK_PACKET = 10


class SuspendMode(enum.IntEnum):
    """Bluetooth suspend mode."""
    NORMAL = 0
    SUSPENDING = 1
    SUSPENDED = 2
    RESUMING = 3


class ScanType(enum.IntEnum):
    """Bluetooth scan type."""
    ACTIVE = 0
    PASSIVE = 1


class BondState(enum.IntEnum):
    """Bluetooth bonding state."""
    NOT_BONDED = 0
    BONDING = 1
    BONDED = 2


class Transport(enum.IntEnum):
    """Bluetooth transport type."""
    AUTO = 0
    BREDR = 1
    LE = 2


class SspVariant(enum.IntEnum):
    """Bluetooth SSP variant type."""
    PASSKEY_CONFIRMATION = 0
    PASSKEY_ENTRY = 1
    CONSENT = 2
    PASSKEY_NOTIFICATION = 3
 No newline at end of file
+202 −0
Original line number Diff line number Diff line
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Client class to access the Floss manager interface."""

from floss.pandora.floss import observer_base
from floss.pandora.floss import utils


class ManagerCallbacks:
    """Callbacks for the Manager Interface.

    Implement this to observe these callbacks when exporting callbacks via
    register_callback.
    """

    def on_hci_device_changed(self, hci, present):
        """Hci device presence is updated.

        Args:
            hci: Hci interface number.
            present: Whether this hci interface is appearing or disappearing.
        """
        pass

    def on_hci_enabled_changed(self, hci, enabled):
        """Hci device is being enabled or disabled.

        Args:
            hci: Hci interface number.
            enabled: Whether this hci interface is being enabled or disabled.
        """
        pass


class FlossManagerClient(ManagerCallbacks):
    """Handles method calls to and callbacks from the Manager interface."""

    MGR_SERVICE = 'org.chromium.bluetooth.Manager'
    MGR_INTERFACE = 'org.chromium.bluetooth.Manager'
    MGR_OBJECT = '/org/chromium/bluetooth/Manager'

    # Exported callback interface and objects
    CB_EXPORTED_INTF = 'org.chromium.bluetooth.ManagerCallback'
    CB_EXPORTED_OBJ_NAME = 'test_manager_client'

    class AdaptersNotParseable(Exception):
        """An entry in the result of GetAvailableAdapters was not parseable."""
        pass

    class ExportedManagerCallbacks(observer_base.ObserverBase):
        """
        <node>
            <interface name="org.chromium.bluetooth.ManagerCallback">
                <method name="OnHciDeviceChanged">
                    <arg type="i" name="hci" direction="in" />
                    <arg type="b" name="present" direction="in" />
                </method>
                <method name="OnHciEnabledChanged">
                    <arg type="i" name="hci" direction="in" />
                    <arg type="b" name="enabled" direction="in" />
                </method>
            </interface>
        </node>
        """

        def __init__(self):
            """Construct exported callbacks object."""
            observer_base.ObserverBase.__init__(self)

        def OnHciDeviceChanged(self, hci, present):
            """Handle device presence callbacks."""
            for observer in self.observers.values():
                observer.on_hci_device_changed(hci, present)

        def OnHciEnabledChanged(self, hci, enabled):
            """Handle device enabled callbacks."""
            for observer in self.observers.values():
                observer.on_hci_enabled_changed(hci, enabled)

    def __init__(self, bus):
        """Construct the client.

        Args:
            bus: DBus bus over which we'll establish connections.
        """
        self.bus = bus

        # We don't register callbacks by default. The client owner must call
        # register_callbacks to do so.
        self.callbacks = None

        # Initialize hci devices and their power states
        self.adapters = {}

    def __del__(self):
        """Destructor."""
        del self.callbacks

    @utils.glib_call(False)
    def has_proxy(self):
        """Checks whether manager proxy can be acquired."""
        return bool(self.proxy())

    def proxy(self):
        """Gets proxy object to manager interface for method calls."""
        return self.bus.get(self.MGR_SERVICE, self.MGR_OBJECT)[self.MGR_INTERFACE]

    @utils.glib_call(False)
    def register_callbacks(self):
        """Registers manager callbacks for this client if one doesn't already exist."""
        # Callbacks already registered
        if self.callbacks:
            return True

        # Create and publish callbacks
        self.callbacks = self.ExportedManagerCallbacks()
        self.callbacks.add_observer('manager_client', self)
        objpath = utils.generate_dbus_cb_objpath(self.CB_EXPORTED_OBJ_NAME)
        self.bus.register_object(objpath, self.callbacks, None)

        # Register published callbacks with manager daemon
        self.proxy().RegisterCallback(objpath)

        return True

    @utils.glib_callback()
    def on_hci_device_changed(self, hci, present):
        """Handle device presence change."""
        if present:
            self.adapters[hci] = self.adapters.get(hci, False)
        elif hci in self.adapters:
            del self.adapters[hci]

    @utils.glib_callback()
    def on_hci_enabled_changed(self, hci, enabled):
        """Handle device enabled change."""
        self.adapters[hci] = enabled

    def get_default_adapter(self):
        """Get the default adapter in use by the manager."""
        # TODO(abps): The default adapter is hci0 until we support multiple
        #             adapters.
        return 0

    def has_default_adapter(self):
        """Checks whether the default adapter exists on this system."""
        return self.get_default_adapter() in self.adapters

    @utils.glib_call()
    def start(self, hci):
        """Start a specific adapter."""
        self.proxy().Start(hci)

    @utils.glib_call()
    def stop(self, hci):
        """Stop a specific adapter."""
        self.proxy().Stop(hci)

    @utils.glib_call(False)
    def get_adapter_enabled(self, hci):
        """Checks whether a specific adapter is enabled (i.e. started)."""
        return bool(self.proxy().GetAdapterEnabled(hci))

    @utils.glib_call(False)
    def get_floss_enabled(self):
        """Gets whether Floss is enabled."""
        return bool(self.proxy().GetFlossEnabled())

    @utils.glib_call()
    def set_floss_enabled(self, enabled):
        self.proxy().SetFlossEnabled(enabled)

    @utils.glib_call([])
    def get_available_adapters(self):
        """Gets a list of currently available adapters and if they are enabled."""
        all_adapters = []
        dbus_result = self.proxy().GetAvailableAdapters()

        for d in dbus_result:
            if 'hci_interface' in d and 'enabled' in d:
                all_adapters.append((int(d['hci_interface']), bool(d['enabled'])))
            else:
                raise FlossManagerClient.AdaptersNotParseable(f'Could not parse: {d}')

        # This function call overwrites any existing cached values of
        # self.adapters that we may have gotten from observers.
        self.adapters = {}
        for (hci, enabled) in all_adapters:
            self.adapters[hci] = enabled

        return all_adapters
Loading