Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 51458981 authored by Nagham Masalmah's avatar Nagham Masalmah Committed by JohnLai
Browse files

Floss: Implement Pandora Security profile.

Bug: 309630370
Bug: 309682562
Test: mma packages/modules/Bluetooth && pts-bot GAP
Tag: #floss
Flag: EXEMPT floss only changes
Change-Id: I3ec69c86bfd2cbcefd9de62780ff65cbb4c7ca30
parent 01bed159
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -830,6 +830,12 @@ class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks):
        device = self._make_dbus_device(address, self.known_devices.get(address, {}).get('name', 'Test device'))
        return bool(self.proxy().DisconnectAllEnabledProfiles(device))

    @utils.glib_call(None)
    def get_connection_state(self, address):
        """Gets connection state."""
        device = self._make_dbus_device(address, self.known_devices.get(address, {}).get('name', 'Test device'))
        return self.proxy().GetConnectionState(device)

    def wait_for_device_disconnected(self, address):
        """Waits for the device become disconnected."""

+8 −0
Original line number Diff line number Diff line
@@ -171,6 +171,14 @@ class OwnAddressType(enum.IntEnum):
    RANDOM = 1


class BtConnectionState(enum.IntEnum):
    NOT_CONNECTED = 0
    CONNECTED_ONLY = 1
    ENCRYPTED_BR_EDR = 3
    ENCRYPTED_LE = 5
    ENCRYPTED_BOTH = 7


class CompanyIdentifiers(enum.IntEnum):
    """Bluetooth SIG Company ID values.

+7 −0
Original line number Diff line number Diff line
@@ -303,3 +303,10 @@ class Bluetooth(object):

    def set_connectable(self, mode):
        return self.qa_client.set_connectable(mode)

    def is_encrypted(self, address):
        connection_state = self.adapter_client.get_connection_state(address)
        if connection_state is None:
            logging.error('Failed to get connection state for address: %s', address)
            return False
        return connection_state > floss_enums.BtConnectionState.CONNECTED_ONLY
+152 −8
Original line number Diff line number Diff line
@@ -39,12 +39,111 @@ class SecurityService(security_grpc_aio.SecurityServicer):
    /pandora/bt-test-interfaces/pandora/security.proto
    """

    def __init__(self, server: grpc.aio.Server, bluetooth: bluetooth_module.Bluetooth):
        self.server = server
    def __init__(self, bluetooth: bluetooth_module.Bluetooth):
        self.bluetooth = bluetooth
        self.manually_confirm = False
        self.on_pairing_count = 0

    async def wait_le_security_level(self, level, address):

        class BondingObserver(adapter_client.BluetoothCallbacks):
            """Observer to observe the bond state."""

            def __init__(self, task):
                self.task = task

            @utils.glib_callback()
            def on_bond_state_changed(self, status, address, state):
                if address != self.task['address']:
                    return

                future = self.task['wait_bond']
                if status != floss_enums.BtStatus.SUCCESS:
                    future.get_loop().call_soon_threadsafe(future.set_result, (False, f'Status: {status}'))
                    return

                if state == floss_enums.BondState.BONDED:
                    future.get_loop().call_soon_threadsafe(future.set_result, (True, None))
                elif state == floss_enums.BondState.NOT_BONDED:
                    future.get_loop().call_soon_threadsafe(future.set_result,
                                                           (False, f'Status: {status}, State: {state}'))

        if level == security_pb2.LE_LEVEL1:
            return True
        if level == security_pb2.LE_LEVEL4:
            raise RuntimeError(f'wait_le_security_level: Low-energy level 4 not supported')

        if self.bluetooth.is_bonded(address):
            is_bonded = True
        else:
            try:
                wait_bond = asyncio.get_running_loop().create_future()
                observer = BondingObserver({'wait_bond': wait_bond, 'address': address})
                name = utils.create_observer_name(observer)
                self.bluetooth.adapter_client.register_callback_observer(name, observer)
                is_bonded, reason = await wait_bond
                if not is_bonded:
                    logging.error('Failed to bond to the address: %s, reason: %s', address, reason)
            finally:
                self.bluetooth.adapter_client.unregister_callback_observer(name, observer)

        is_encrypted = self.bluetooth.is_encrypted(address)
        if level == security_pb2.LE_LEVEL2:
            return is_encrypted
        if level == security_pb2.LE_LEVEL3:
            return is_encrypted and is_bonded
        raise ValueError(f'wait_le_security_level: Invalid security level {level}')

    async def wait_classic_security_level(self, level, address):

        class BondingObserver(adapter_client.BluetoothCallbacks):
            """Observer to observe the bond state"""

            def __init__(self, task):
                self.task = task

            @utils.glib_callback()
            def on_bond_state_changed(self, status, address, state):
                if address != self.task['address']:
                    return

                future = self.task['wait_bond']
                if status != floss_enums.BtStatus.SUCCESS:
                    future.get_loop().call_soon_threadsafe(future.set_result, (False, f'Status: {status}'))
                    return

                if state == floss_enums.BondState.BONDED:
                    future.get_loop().call_soon_threadsafe(future.set_result, (True, None))
                elif state == floss_enums.BondState.NOT_BONDED:
                    future.get_loop().call_soon_threadsafe(future.set_result,
                                                           (False, f'Status: {status}, State: {state}'))

        if level == security_pb2.LEVEL0:
            return True
        if level == security_pb2.LEVEL3:
            raise RuntimeError('wait_classic_security_level: Classic level 3 not supported')

        if self.bluetooth.is_bonded(address):
            is_bonded = True
        else:
            try:
                wait_bond = asyncio.get_running_loop().create_future()
                observer = BondingObserver({'wait_bond': wait_bond, 'address': address})
                name = utils.create_observer_name(observer)
                self.bluetooth.adapter_client.register_callback_observer(name, observer)
                is_bonded, reason = await wait_bond
                if not is_bonded:
                    logging.error('Failed to bond to the address: %s, reason: %s', address, reason)
            finally:
                self.bluetooth.adapter_client.unregister_callback_observer(name, observer)

        is_encrypted = self.bluetooth.is_encrypted(address)
        if level == security_pb2.LEVEL1:
            return not is_encrypted or is_bonded
        if level == security_pb2.LEVEL2:
            return is_encrypted and is_bonded
        return False

    async def OnPairing(self, request: AsyncIterator[security_pb2.PairingEventAnswer],
                        context: grpc.ServicerContext) -> AsyncGenerator[security_pb2.PairingEvent, None]:
        logging.info('OnPairing')
@@ -176,15 +275,60 @@ class SecurityService(security_grpc_aio.SecurityServicer):

    async def Secure(self, request: security_pb2.SecureRequest,
                     context: grpc.ServicerContext) -> security_pb2.SecureResponse:
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
        context.set_details('Method not implemented!')  # type: ignore
        raise NotImplementedError('Method not implemented!')
        connection = utils.connection_from(request.connection)
        address = connection.address
        transport = connection.transport

        if transport == floss_enums.BtTransport.LE:
            if not request.HasField('le'):
                raise RuntimeError('Secure: Request le field must be set.')
            if request.le == security_pb2.LE_LEVEL1:
                security_level_reached = True
            elif request.le == security_pb2.LE_LEVEL4:
                raise RuntimeError('Secure: Low-energy security level 4 not supported')
            else:
                if not self.bluetooth.is_bonded(address):
                    self.bluetooth.create_bond(address, transport)
                security_level_reached = await self.wait_le_security_level(request.le, address)
        elif transport == floss_enums.BtTransport.BREDR:
            if not request.HasField('classic'):
                raise RuntimeError('Secure: Request classic field must be set.')
            if request.classic == security_pb2.LEVEL0:
                security_level_reached = True
            elif request.classic >= security_pb2.LEVEL3:
                raise RuntimeError('Secure: Classic security level up to 3 not supported')
            else:
                if not self.bluetooth.is_bonded(address):
                    self.bluetooth.create_bond(address, transport)
                security_level_reached = await self.wait_classic_security_level(request.classic, address)
        else:
            raise RuntimeError(f'Secure: Invalid bluetooth transport type: {transport}')

        secure_response = security_pb2.SecureResponse()
        if security_level_reached:
            secure_response.success.CopyFrom(empty_pb2.Empty())
        else:
            secure_response.not_reached.CopyFrom(empty_pb2.Empty())
        return secure_response

    async def WaitSecurity(self, request: security_pb2.WaitSecurityRequest,
                           context: grpc.ServicerContext) -> security_pb2.WaitSecurityResponse:
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
        context.set_details('Method not implemented!')  # type: ignore
        raise NotImplementedError('Method not implemented!')
        address = utils.connection_from(request.connection).address
        transport = floss_enums.BtTransport.BREDR if request.HasField('classic') else floss_enums.BtTransport.LE

        if transport == floss_enums.BtTransport.LE:
            security_level_reached = await self.wait_le_security_level(request.le, address)
        elif transport == floss_enums.BtTransport.BREDR:
            security_level_reached = await self.wait_classic_security_level(request.classic, address)
        else:
            raise RuntimeError(f'WaitSecurity: Invalid bluetooth transport type: {transport}')

        wait_security_response = security_pb2.WaitSecurityResponse()
        if security_level_reached:
            wait_security_response.success.CopyFrom(empty_pb2.Empty())
        else:
            wait_security_response.pairing_failure.CopyFrom(empty_pb2.Empty())
        return wait_security_response


class SecurityStorageService(security_grpc_aio.SecurityStorageServicer):
+1 −1
Original line number Diff line number Diff line
@@ -38,7 +38,7 @@ async def serve(port):
            logging.info("bluetooth initialized")

            server = grpc.aio.server()
            security_service = security.SecurityService(server, bluetooth)
            security_service = security.SecurityService(bluetooth)
            security_grpc_aio.add_SecurityServicer_to_server(security_service, server)

            host_service = host.HostService(server, bluetooth, security_service)