Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 3da06729 authored by John Lai's avatar John Lai Committed by Automerger Merge Worker
Browse files

Merge changes Ida766e8e,I1ebf72e4,I8f4c1a6f,I1f251968,I3b831b5c into main am: 2ea01f75

parents a67f2a71 2ea01f75
Loading
Loading
Loading
Loading
+82 −1
Original line number Diff line number Diff line
@@ -68,6 +68,30 @@ class BluetoothCallbacks:
        """
        pass

    def on_pin_request(self, remote_device, cod, min_16_digit):
        """When there is a pin request to display the event to client.

        Args:
            remote_device:
                Remote device that is being paired.
            cod:
                Class of device as described in HCI spec.
            min_16_digit:
                True if the pin is 16 digit, False otherwise.
        """
        pass

    def on_pin_display(self, remote_device, pincode):
        """When there is a auto-gen pin to display the event to client.

        Args:
            remote_device:
                Remote device that is being paired.
            pincode:
                PIN code to display.
        """
        pass

    def on_bond_state_changed(self, status, address, state):
        """Bonding/Pairing state has changed for a device.

@@ -155,6 +179,15 @@ class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks):
                    <arg type="u" name="variant" direction="in" />
                    <arg type="u" name="passkey" direction="in" />
                </method>
                <method name="OnPinRequest">
                    <arg type="a{sv}" name="remote_device_dbus" direction="in" />
                    <arg type="u" name="cod" direction="in" />
                    <arg type="b" name="min_16_digit" direction="in" />
                </method>
                <method name="OnPinDisplay">
                    <arg type="a{sv}" name="remote_device_dbus" direction="in" />
                    <arg type="s" name="pincode" direction="in" />
                </method>
                <method name="OnBondStateChanged">
                    <arg type="u" name="status" direction="in" />
                    <arg type="s" name="address" direction="in" />
@@ -192,12 +225,32 @@ class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks):
            """Handle pairing/bonding request to agent."""
            parsed, remote_device = FlossAdapterClient.parse_dbus_device(remote_device_dbus)
            if not parsed:
                logging.debug('OnSspRequest parse error: {}'.format(remote_device_dbus))
                logging.error('OnSspRequest parse error: {}'.format(remote_device_dbus))
                return

            for observer in self.observers.values():
                observer.on_ssp_request(remote_device, class_of_device, variant, passkey)

        def OnPinRequest(self, remote_device_dbus, cod, min_16_digit):
            """Handle PIN request callback."""
            parsed, remote_device = FlossAdapterClient.parse_dbus_device(remote_device_dbus)
            if not parsed:
                logging.error('OnPinRequest parse error: {}'.format(remote_device_dbus))
                return

            for observer in self.observers.values():
                observer.on_pin_request(remote_device, cod, min_16_digit)

        def OnPinDisplay(self, remote_device_dbus, pincode):
            """Handle PIN display callback."""
            parsed, remote_device = FlossAdapterClient.parse_dbus_device(remote_device_dbus)
            if not parsed:
                logging.error('OnPinDisplay parse error: {}'.format(remote_device_dbus))
                return

            for observer in self.observers.values():
                observer.on_pin_display(remote_device, pincode)

        def OnBondStateChanged(self, status, address, state):
            """Handle bond state changed callbacks."""
            for observer in self.observers.values():
@@ -685,6 +738,15 @@ class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks):
        remote_device = self._make_dbus_device(address, name)
        return bool(self.proxy().RemoveBond(remote_device))

    @utils.glib_call(None)
    def get_bonded_devices(self):
        """Get all bonded devices.

        Returns:
            List of device addresses; None on DBus error.
        """
        return self.proxy().GetBondedDevices()

    @utils.glib_call(False)
    def forget_device(self, address):
        """Forgets device from local cache and removes bonding.
@@ -716,6 +778,25 @@ class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks):

        return True

    @utils.glib_call(False)
    def set_pin(self, address, accept, pin_code):
        """Set pin on bonding device.

        Args:
            address: Device address to reply.
            accept: True to accept the pin request, False to reject the pin request.
            pin_code: PIN code to reply. The PIN code is a list of up to 16
                      integers.
        """
        if address not in self.known_devices:
            logging.debug('[%s] Unknown device in set_pin', address)
            return False

        device = self.known_devices[address]
        remote_device = self._make_dbus_device(address, device['name'])

        return self.proxy().SetPin(remote_device, accept, pin_code)

    @utils.glib_call(False)
    def set_pairing_confirmation(self, address, accept):
        """Confirm that a pairing should be completed on a bonding device."""
+37 −2
Original line number Diff line number Diff line
@@ -147,15 +147,22 @@ class Transport(enum.IntEnum):
    AUTO = 0
    BREDR = 1
    LE = 2
    DUAL = 3


class SspVariant(enum.IntEnum):
    """Bluetooth SSP variant type."""
class PairingVariant(enum.IntEnum):
    """Bluetooth pairing variant type."""
    # SSP variants.
    PASSKEY_CONFIRMATION = 0
    PASSKEY_ENTRY = 1
    CONSENT = 2
    PASSKEY_NOTIFICATION = 3

    # Legacy pairing variants.
    PIN_ENTRY = 4
    PIN_16_DIGITS_ENTRY = 5
    PIN_NOTIFICATION = 6


class BleAddressType(enum.IntEnum):
    BLE_ADDR_PUBLIC = 0x00
@@ -177,3 +184,31 @@ class CompanyIdentifiers(enum.IntEnum):
    Bluetooth SIG official document: https://www.bluetooth.com/specifications/assigned-numbers/
    """
    GOOGLE = 0x00E0


class AdvertisingDataType(enum.IntEnum):
    FLAGS = 0x01
    INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x02
    COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x03
    INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x04
    COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x05
    INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x06
    COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x07
    SHORTENED_LOCAL_NAME = 0x08
    COMPLETE_LOCAL_NAME = 0x09
    TX_POWER_LEVEL = 0x0A
    CLASS_OF_DEVICE = 0x0D
    SLAVE_CONNECTION_INTERVAL_RANGE = 0x12
    LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = 0x14
    LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = 0x15
    SERVICE_DATA_16_BIT_UUID = 0x16
    PUBLIC_TARGET_ADDRESS = 0x17
    RANDOM_TARGET_ADDRESS = 0x18
    APPEARANCE = 0x19
    ADVERTISING_INTERVAL = 0x1A
    LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS = 0x1F
    SERVICE_DATA_32_BIT_UUID = 0x20
    SERVICE_DATA_128_BIT_UUID = 0x21
    URI = 0x24
    LE_SUPPORTED_FEATURES = 0x27
    MANUFACTURER_SPECIFIC_DATA = 0xFF
+42 −0
Original line number Diff line number Diff line
@@ -13,6 +13,8 @@
# limitations under the License.
"""All floss utils functions."""

from typing import List, Optional

import functools
import logging
import threading
@@ -452,3 +454,43 @@ def create_observer_name(observer):
# when we are able to use it.
async def anext(ait):
    return await ait.__anext__()


def parse_advertiging_data(adv_data: List[int]) -> host_pb2.DataTypes:
    index = 0
    data = host_pb2.DataTypes()

    # advertising data packet is repeated by many advertising data and each one with the following format:
    # | Length (0) | Type (1) | Data Payload (2~N-1)
    # Take an advertising data packet [2, 1, 6, 5, 3, 0, 24, 1, 24] as an example,
    # The first 3 numbers 2, 1, 6:
    # 2 is the data length is 2 including the data type,
    # 1 is the data type is Flags (0x01),
    # 6 is the data payload.
    while index < len(adv_data):
        # Extract data length.
        data_length = adv_data[index]
        index = index + 1

        if data_length <= 0:
            break

        # Extract data type.
        data_type = adv_data[index]
        index = index + 1

        # Extract data payload.
        if data_type == floss_enums.AdvertisingDataType.COMPLETE_LOCAL_NAME:
            data.complete_local_name = parse_complete_local_name(adv_data[index:index + data_length - 1])
            logging.info('complete_local_name: %s', data.complete_local_name)
        else:
            logging.debug('Unsupported advertising data type to parse: %s', data_type)

        index = index + data_length - 1

    logging.info('Parsed data: %s', data)
    return data


def parse_complete_local_name(data: List[int]) -> Optional[str]:
    return ''.join(chr(char) for char in data) if data else None
+15 −4
Original line number Diff line number Diff line
@@ -54,10 +54,6 @@ class Bluetooth(object):
        # self state
        self.is_clean = False

        # GRPC server state
        self.pairing_events: asyncio.Queue = None
        self.pairing_answers = None

        # DBUS clients
        self.manager_client = manager_client.FlossManagerClient(self.bus)
        self.adapter_client = adapter_client.FlossAdapterClient(self.bus, self.DEFAULT_ADAPTER)
@@ -181,6 +177,9 @@ class Bluetooth(object):
    def get_address(self):
        return self.adapter_client.get_address()

    def get_remote_type(self):
        return self.adapter_client.get_remote_property('Type')

    def is_connected(self, address):
        return self.adapter_client.is_connected(address)

@@ -196,6 +195,18 @@ class Bluetooth(object):
    def create_bond(self, address, transport):
        return self.adapter_client.create_bond(address, transport)

    def remove_bond(self, address):
        return self.adapter_client.remove_bond(address)

    def get_bonded_devices(self):
        return self.adapter_client.get_bonded_devices()

    def forget_device(self, address):
        return self.adapter_client.forget_device(address)

    def set_pin(self, address, accept, pin_code):
        return self.adapter_client.set_pin(address, accept, pin_code)

    def set_pairing_confirmation(self, address, accept):
        return self.adapter_client.set_pairing_confirmation(address, accept)

+18 −1
Original line number Diff line number Diff line
@@ -51,6 +51,16 @@ class HostService(host_grpc_aio.HostServicer):

    async def FactoryReset(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty:
        self.waited_connections.clear()

        devices = self.bluetooth.get_bonded_devices()
        if devices is None:
            logging.error('Failed to call get_bonded_devices.')
        else:
            for device in devices:
                address = device['address']
                logging.info('Forget device %s', address)
                self.bluetooth.forget_device(address)

        asyncio.create_task(self.server.stop(None))
        return empty_pb2.Empty()

@@ -124,7 +134,7 @@ class HostService(host_grpc_aio.HostServicer):
                if address != self.task['address']:
                    return

                if variant == floss_enums.SspVariant.CONSENT:
                if variant in (floss_enums.PairingVariant.CONSENT, floss_enums.PairingVariant.PASSKEY_CONFIRMATION):
                    self.client.set_pairing_confirmation(address,
                                                         True,
                                                         method_callback=self.on_set_pairing_confirmation)
@@ -176,6 +186,9 @@ class HostService(host_grpc_aio.HostServicer):

                    if not success:
                        raise RuntimeError(f'Failed to connect to the {address}. Reason: {reason}')

                    if self.bluetooth.is_bonded(address) and self.bluetooth.is_connected(address):
                        self.bluetooth.connect_device(address)
            finally:
                self.bluetooth.adapter_client.unregister_callback_observer(name, observer)

@@ -461,6 +474,9 @@ class HostService(host_grpc_aio.HostServicer):
                elif scan_result['addr_type'] == floss_enums.BleAddressType.BLE_ADDR_RANDOM_ID:
                    response.random_static_identity = address

                data = utils.parse_advertiging_data(scan_result['adv_data'])
                response.data.CopyFrom(data)

                # TODO: b/289480188 - Support more data if needed.
                mode = host_pb2.NOT_DISCOVERABLE
                if scan_result['flags'] & (1 << 0):
@@ -470,6 +486,7 @@ class HostService(host_grpc_aio.HostServicer):
                else:
                    mode = host_pb2.NOT_DISCOVERABLE
                response.data.le_discoverability_mode = mode

                yield response
        finally:
            if scanner_id is not None:
Loading