Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 553d61b7 authored by JohnLai's avatar JohnLai
Browse files

Floss: Implements host of floss bluetooth test server

Implements host and security.OnPairing of floss bluetooth test server.

Bug: 289480188
Test: mma packages/modules/Bluetooth && pts-bot GAP
Tag: #floss
Change-Id: Iddb56f21bd9b773bfc6870b7dbf99e73a5cd4f5a
parent 394b5b8f
Loading
Loading
Loading
Loading
+24 −0
Original line number Diff line number Diff line
@@ -501,6 +501,30 @@ class FlossAdvertisingClient(BluetoothAdvertisingCallbacks):
        self.proxy().UnregisterAdvertiserCallback(self.callback_id)
        return True

    def register_callback_observer(self, name, observer):
        """Add an observer for all callbacks.

        Args:
            name:
                Name of the observer.
            observer:
                Observer that implements all callback classes.
        """
        if isinstance(observer, BluetoothAdvertisingCallbacks):
            self.callbacks.add_observer(name, observer)

    def unregister_callback_observer(self, name, observer):
        """Remove an observer for all callbacks.

        Args:
            name:
                Name of the observer.
            observer:
                Observer that implements all callback classes.
        """
        if isinstance(observer, BluetoothAdvertisingCallbacks):
            self.callbacks.remove_observer(name, observer)

    @utils.glib_call(None)
    def start_advertising_set(self, parameters, advertise_data, scan_response, periodic_parameters, periodic_data,
                              duration, max_ext_adv_events):
+23 −1
Original line number Diff line number Diff line
@@ -155,3 +155,25 @@ class SspVariant(enum.IntEnum):
    PASSKEY_ENTRY = 1
    CONSENT = 2
    PASSKEY_NOTIFICATION = 3


class BleAddressType(enum.IntEnum):
    BLE_ADDR_PUBLIC = 0x00
    BLE_ADDR_RANDOM = 0x01
    BLE_ADDR_PUBLIC_ID = 0x02
    BLE_ADDR_RANDOM_ID = 0x03
    BLE_ADDR_ANONYMOUS = 0xFF


class OwnAddressType(enum.IntEnum):
    DEFAULT = -1
    PUBLIC = 0
    RANDOM = 1


class CompanyIdentifiers(enum.IntEnum):
    """Bluetooth SIG Company ID values.

    Bluetooth SIG official document: https://www.bluetooth.com/specifications/assigned-numbers/
    """
    GOOGLE = 0x00E0
+24 −0
Original line number Diff line number Diff line
@@ -432,6 +432,30 @@ class FlossScannerClient(BluetoothScannerCallbacks):
        self.callback_id = self.proxy().RegisterScannerCallback(objpath)
        return True

    def register_callback_observer(self, name, observer):
        """Add an observer for all callbacks.

        Args:
            name:
                Name of the observer.
            observer:
                Observer that implements all callback classes.
        """
        if isinstance(observer, BluetoothScannerCallbacks):
            self.callbacks.add_observer(name, observer)

    def unregister_callback_observer(self, name, observer):
        """Remove an observer for all callbacks.

        Args:
            name:
                Name of the observer.
            observer:
                Observer that implements all callback classes.
        """
        if isinstance(observer, BluetoothScannerCallbacks):
            self.callbacks.remove_observer(name, observer)

    def wait_for_on_scanner_registered(self, uuid):
        """Waits for register scanner.

+115 −0
Original line number Diff line number Diff line
@@ -18,7 +18,9 @@ import logging
import threading
import time

from floss.pandora.floss import floss_enums
from gi.repository import GLib
from pandora import host_pb2

# All GLIB method calls should wait this many seconds by default
GLIB_METHOD_CALL_TIMEOUT = 2
@@ -337,3 +339,116 @@ class PropertySet:
            raise self.PropertySetterMissing('{} has no getter.'.format(prop_name))

        return setter(*args)


def address_from(request_address: bytes):
    """Converts address from grpc server format to floss format."""
    address = request_address.hex()
    address = f'{address[:2]}:{address[2:4]}:{address[4:6]}:{address[6:8]}:{address[8:10]}:{address[10:12]}'
    return address.upper()


def address_to(address: str):
    """Converts address from floss format to grpc server format."""
    request_address = bytes.fromhex(address.replace(':', ''))
    return request_address


def uuid16_to_uuid128(uuid16: str):
    return f'0000{uuid16}-0000-1000-8000-00805f9b34fb'


def uuid32_to_uuid128(uuid32: str):
    return f'{uuid32}-0000-1000-8000-00805f9b34fb'


def advertise_data_from(request_data: host_pb2.DataTypes):
    """Mapping DataTypes to a dict.

    The dict content follows the format of floss AdvertiseData.

    Args:
        request_data : advertising data.

    Raises:
        NotImplementedError: if request data is not implemented.

    Returns:
        dict: advertising data.
    """
    advertise_data = {
        'service_uuids': [],
        'solicit_uuids': [],
        'transport_discovery_data': [],
        'manufacturer_data': {},
        'service_data': {},
        'include_tx_power_level': False,
        'include_device_name': False,
    }

    # incomplete_service_class_uuids
    if (request_data.incomplete_service_class_uuids16 or request_data.incomplete_service_class_uuids32 or
            request_data.incomplete_service_class_uuids128):
        raise NotImplementedError('Incomplete service class uuid not supported')

    # service_uuids
    for uuid16 in request_data.complete_service_class_uuids16:
        advertise_data['service_uuids'].append(uuid16_to_uuid128(uuid16))

    for uuid32 in request_data.complete_service_class_uuids32:
        advertise_data['service_uuids'].append(uuid32_to_uuid128(uuid32))

    for uuid128 in request_data.complete_service_class_uuids128:
        advertise_data['service_uuids'].append(uuid128)

    # solicit_uuids
    for uuid16 in request_data.service_solicitation_uuids16:
        advertise_data['solicit_uuids'].append(uuid16_to_uuid128(uuid16))

    for uuid32 in request_data.service_solicitation_uuids32:
        advertise_data['solicit_uuids'].append(uuid32_to_uuid128(uuid32))

    for uuid128 in request_data.service_solicitation_uuids128:
        advertise_data['solicit_uuids'].append(uuid128)

    # service_data
    for (uuid16, data) in request_data.service_data_uuid16:
        advertise_data['service_data'][uuid16_to_uuid128(uuid16)] = data

    for (uuid32, data) in request_data.service_data_uuid32:
        advertise_data['service_data'][uuid32_to_uuid128(uuid32)] = data

    for (uuid128, data) in request_data.service_data_uuid128:
        advertise_data['service_data'][uuid128] = data

    advertise_data['manufacturer_data'][hex(
        floss_enums.CompanyIdentifiers.GOOGLE)] = request_data.manufacturer_specific_data

    # The name is derived from adapter directly in floss.
    if request_data.WhichOneof('shortened_local_name_oneof') in ('include_shortened_local_name',
                                                                 'include_complete_local_name'):
        advertise_data['include_device_name'] = getattr(request_data,
                                                        request_data.WhichOneof('shortened_local_name_oneof')).value

    # The tx power level is decided by the lower layers.
    if request_data.WhichOneof('tx_power_level_oneof') == 'include_tx_power_level':
        advertise_data['include_tx_power_level'] = request_data.include_tx_power_level
    return advertise_data


def create_observer_name(observer):
    """Generates an unique name for an observer.

    Args:
        observer: an observer class to observer the bluetooth callbacks.

    Returns:
        str: an unique name.
    """
    return observer.__class__.__name__ + str(id(observer))


# anext build-in is new in python3.10. Deprecate this function
# when we are able to use it.
async def anext(ait):
    return await ait.__anext__()
+249 −0
Original line number Diff line number Diff line
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""All functions relative to the bluetooth procedure."""

import asyncio
import logging
import threading
import traceback

from floss.pandora.floss import adapter_client
from floss.pandora.floss import advertising_client
from floss.pandora.floss import manager_client
from floss.pandora.floss import scanner_client
from floss.pandora.floss import utils
from gi.repository import GLib
import pydbus


class Bluetooth(object):
    """A bluetooth facade exposes all bluetooth functions."""

    # Default to this adapter during init. We will initialize to the correct
    # default adapter after the manager client is initialized.
    DEFAULT_ADAPTER = 0

    # Time to sleep between polls
    ADAPTER_CLIENT_POLL_INTERVAL = 0.1

    # How long we wait for the adapter to come up after we start it.
    ADAPTER_DAEMON_TIMEOUT_SEC = 20

    # How long we wait for the manager daemon to come up after we start it.
    DAEMON_TIMEOUT_SEC = 5

    # Default scanner settings
    SCANNER_INTERVAL = 0
    SCANNER_WINDOW = 0
    SCANNER_SCAN_TYPE = 0

    def __init__(self):
        self.setup_mainloop()

        # self state
        self.is_clean = False

        # GRPC server state
        self.pairing_events: asyncio.Queue = None
        self.pairing_answers = None

        # DBUS clients
        self.manager_client = manager_client.FlossManagerClient(self.bus)
        self.adapter_client = adapter_client.FlossAdapterClient(self.bus, self.DEFAULT_ADAPTER)
        self.advertising_client = advertising_client.FlossAdvertisingClient(self.bus, self.DEFAULT_ADAPTER)
        self.scanner_client = scanner_client.FlossScannerClient(self.bus, self.DEFAULT_ADAPTER)

    def __del__(self):
        if not self.is_clean:
            self.cleanup()

    def cleanup(self):
        self.mainloop_quit.set()
        self.mainloop.quit()
        self.is_clean = True

    def setup_mainloop(self):
        """Start mainloop thread in background.

        This will also initialize a few
        other variables (self.bus, self.mainloop, self.event_context) that may
        be necessary for proper operation.

        Raises:
            RuntimeError: if we timeout to wait for the mainloop ready.
        """

        self.mainloop_quit = threading.Event()
        self.mainloop_ready = threading.Event()
        self.thread = threading.Thread(name=utils.GLIB_THREAD_NAME, target=Bluetooth.mainloop_thread, args=(self,))
        self.thread.start()

        # Wait for mainloop to be ready
        if not self.mainloop_ready.wait(timeout=5):
            raise RuntimeError('Unable to initialize GLib mainloop')

    def mainloop_thread(self):
        # Set up mainloop. All subsequent buses and connections will use this
        # mainloop. We also use a separate main context to avoid multithreading
        # issues.
        GLib.threads_init()
        self.mainloop = GLib.MainLoop()

        # Set up bus connection
        self.bus = pydbus.SystemBus()

        # Set thread ready
        self.mainloop_ready.set()

        while not self.mainloop_quit.is_set():
            self.mainloop.run()

    def set_powered(self, powered: bool):
        """Set the power of bluetooth adapter and bluetooth clients.

        Args:
            powered: Power on or power off.

        Returns:
            bool: True if success, False otherwise.
        """
        default_adapter = self.manager_client.get_default_adapter()

        def _is_adapter_down(client):
            return lambda: not client.has_proxy()

        def _is_adapter_ready(client):
            return lambda: client.has_proxy() and client.get_address()

        if powered:
            # FIXME: Close rootcanal will cause manager_client failed call has_default_adapter.
            # if not self.manager_client.has_default_adapter():
            #     logging.warning('set_powered: Default adapter not available.')
            #     return False
            self.manager_client.start(default_adapter)

            self.adapter_client = adapter_client.FlossAdapterClient(self.bus, default_adapter)
            self.advertising_client = advertising_client.FlossAdvertisingClient(self.bus, default_adapter)
            self.scanner_client = scanner_client.FlossScannerClient(self.bus, default_adapter)

            try:
                utils.poll_for_condition(condition=_is_adapter_ready(self.adapter_client),
                                         desc='Wait for adapter start',
                                         sleep_interval=self.ADAPTER_CLIENT_POLL_INTERVAL,
                                         timeout=self.ADAPTER_DAEMON_TIMEOUT_SEC)
            except TimeoutError as e:
                logging.error('timeout: error starting adapter daemon: %s', e)
                logging.error(traceback.format_exc())
                return False

            # We need to observe callbacks for proper operation.
            if not self.adapter_client.register_callbacks():
                logging.error('adapter_client: Failed to register callbacks')
                return False
            if not self.advertising_client.register_advertiser_callback():
                logging.error('advertising_client: Failed to register advertiser callbacks')
                return False
            if not self.scanner_client.register_scanner_callback():
                logging.error('scanner_client: Failed to register callbacks')
                return False
        else:
            self.manager_client.stop(default_adapter)
            try:
                utils.poll_for_condition(condition=_is_adapter_down(self.adapter_client),
                                         desc='Wait for adapter stop',
                                         sleep_interval=self.ADAPTER_CLIENT_POLL_INTERVAL,
                                         timeout=self.ADAPTER_DAEMON_TIMEOUT_SEC)
            except TimeoutError as e:
                logging.error('timeout: error stopping adapter daemon: %s', e)
                logging.error(traceback.format_exc())
                return False
        return True

    def reset(self):
        if not self.set_powered(False):
            return False

        if not self.set_powered(True):
            return False
        return True

    def get_address(self):
        return self.adapter_client.get_address()

    def is_connected(self, address):
        return self.adapter_client.is_connected(address)

    def is_bonded(self, address):
        return self.adapter_client.is_bonded(address)

    def is_discovering(self):
        return self.adapter_client.is_discovering()

    def set_discoverable(self, mode, duration=60):
        return self.adapter_client.set_property('Discoverable', mode, duration)

    def create_bond(self, address, transport):
        return self.adapter_client.create_bond(address, transport)

    def set_pairing_confirmation(self, address, accept):
        return self.adapter_client.set_pairing_confirmation(address, accept)

    def connect_device(self, address):
        return self.adapter_client.connect_all_enabled_profiles(address)

    def disconnect_device(self, address):
        return self.adapter_client.disconnect_all_enabled_profiles(address)

    def start_discovery(self):
        if self.adapter_client.is_discovering():
            logging.warning('Adapter is already discovering.')
            return True
        return self.adapter_client.start_discovery()

    def stop_discovery(self):
        if not self.adapter_client.is_discovering():
            logging.warning('Discovery is already stopped.')
            return True
        return self.adapter_client.stop_discovery()

    def start_advertising_set(self, parameters, advertise_data, scan_response, periodic_parameters, periodic_data,
                              duration, max_ext_adv_events):
        parameters = self.advertising_client.make_dbus_advertising_set_parameters(parameters)
        advertise_data = self.advertising_client.make_dbus_advertise_data(advertise_data)
        scan_response = utils.make_kv_optional_value(self.advertising_client.make_dbus_advertise_data(scan_response))
        periodic_parameters = utils.make_kv_optional_value(
            self.advertising_client.make_dbus_periodic_advertising_parameters(periodic_parameters))
        periodic_data = utils.make_kv_optional_value(self.advertising_client.make_dbus_advertise_data(periodic_data))

        return self.advertising_client.start_advertising_set(parameters, advertise_data, scan_response,
                                                             periodic_parameters, periodic_data, duration,
                                                             max_ext_adv_events)

    def stop_advertising_set(self, advertiser_id):
        return self.advertising_client.stop_advertising_set(advertiser_id)

    def register_scanner(self):
        return self.scanner_client.register_scanner()

    def start_scan(self, scanner_id, settings=None, scan_filter=None):
        if settings is None:
            settings = self.scanner_client.make_dbus_scan_settings(self.SCANNER_INTERVAL, self.SCANNER_WINDOW,
                                                                   self.SCANNER_SCAN_TYPE)
        return self.scanner_client.start_scan(scanner_id, settings, scan_filter)

    def stop_scan(self, scanner_id):
        if not self.scanner_client.remove_monitor(scanner_id):
            logging.error('Failed to stop scanning.')
            return False
        return True
Loading