Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 15cb2c85 authored by JohnLai's avatar JohnLai Committed by John Lai
Browse files

Floss: Fix future in pandora server host interface

future.set_result is not thread-safe, use call_soon_threadsafe to wrap
it.

Bug: 299237787
Test: mma packages/modules/Bluetooth
Tag: #floss
Change-Id: I3ee10aaf81f310d338b74862fd1bc2eb5702168e
parent b2c72ee2
Loading
Loading
Loading
Loading
+36 −22
Original line number Diff line number Diff line
@@ -47,12 +47,12 @@ class HostService(host_grpc_aio.HostServicer):
        self.waited_connections = set()

    async def FactoryReset(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty:
        self.waited_connections = set()
        self.waited_connections.clear()
        asyncio.create_task(self.server.stop(None))
        return empty_pb2.Empty()

    async def Reset(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty:
        self.waited_connections = set()
        self.waited_connections.clear()
        self.bluetooth.reset()
        return empty_pb2.Empty()

@@ -77,18 +77,22 @@ class HostService(host_grpc_aio.HostServicer):
                    return

                if status != 0:
                    self.task['connect_device'].set_result(
                        (False, f'{address} failed to bond. Status: {status}, State: {state}'))
                    future = self.task['connect_device']
                    future.get_loop().call_soon_threadsafe(
                        future.set_result, (False, f'{address} failed to bond. Status: {status}, State: {state}'))
                    return

                if state == floss_enums.BondState.BONDED:
                    if not self.client.is_connected(self.task['address']):
                        logging.info('{address} calling connect_all_enabled_profiles')
                        logging.info('%s calling connect_all_enabled_profiles', address)
                        if not self.client.connect_all_enabled_profiles(self.task['address']):
                            self.task['connect_device'].set_result(
                            future = self.task['connect_device']
                            future.get_loop().call_soon_threadsafe(
                                future.set_result,
                                (False, f'{self.task["address"]} failed on connect_all_enabled_profiles'))
                    else:
                        self.task['connect_device'].set_result((True, None))
                        future = self.task['connect_device']
                        future.get_loop().call_soon_threadsafe(future.set_result, (True, None))

            @utils.glib_callback()
            def on_ssp_request(self, remote_device, class_of_device, variant, passkey):
@@ -104,8 +108,9 @@ class HostService(host_grpc_aio.HostServicer):
            @utils.glib_callback()
            def on_set_pairing_confirmation(self, err, result):
                if err or not result:
                    self.task['connect_device'].set_result(
                        (False, f'Pairing confirmation failed: err: {err}, result: {result}'))
                    future = self.task['connect_device']
                    future.get_loop().call_soon_threadsafe(
                        future.set_result, (False, f'Pairing confirmation failed: err: {err}, result: {result}'))

            @utils.glib_callback()
            def on_device_connected(self, remote_device):
@@ -114,13 +119,14 @@ class HostService(host_grpc_aio.HostServicer):
                    return

                if self.client.is_bonded(address):
                    self.task['connect_device'].set_result((True, None))
                    future = self.task['connect_device']
                    future.get_loop().call_soon_threadsafe(future.set_result, (True, None))

        address = utils.address_from(request.address)

        if not self.bluetooth.is_connected(address):
            try:
                connect_device = asyncio.Future()
                connect_device = asyncio.get_running_loop().create_future()
                observer = PairingObserver(self.bluetooth.adapter_client, {
                    'connect_device': connect_device,
                    'address': address
@@ -140,7 +146,7 @@ class HostService(host_grpc_aio.HostServicer):
            finally:
                self.bluetooth.adapter_client.unregister_callback_observer(name, observer)

        cookie = any_pb2.Any(value=utils.address_to(request.address))
        cookie = any_pb2.Any(value=utils.address_to(address))
        return host_pb2.ConnectResponse(connection=host_pb2.Connection(cookie=cookie))

    async def WaitConnection(self, request: host_pb2.WaitConnectionRequest,
@@ -158,7 +164,8 @@ class HostService(host_grpc_aio.HostServicer):
                if address != self.task['address']:
                    return

                self.task['wait_connection'].set_result(address)
                future = self.task['wait_connection']
                future.get_loop().call_soon_threadsafe(future.set_result, address)

        if request.address is None:
            raise ValueError('Request address field must be set.')
@@ -166,7 +173,7 @@ class HostService(host_grpc_aio.HostServicer):

        if not self.bluetooth.is_connected(address) or address not in self.waited_connections:
            try:
                wait_connection = asyncio.Future()
                wait_connection = asyncio.get_running_loop().create_future()
                observer = ConnectionObserver({'wait_connection': wait_connection, 'address': address})
                name = utils.create_observer_name(observer)
                self.bluetooth.adapter_client.register_callback_observer(name, observer)
@@ -174,7 +181,7 @@ class HostService(host_grpc_aio.HostServicer):
                await wait_connection
            finally:
                self.bluetooth.adapter_client.unregister_callback_observer(name, observer)
            self.waited_connection.add(address)
            self.waited_connections.add(address)

        cookie = any_pb2.Any(value=utils.address_to(address))
        return host_pb2.WaitConnectionResponse(connection=host_pb2.Connection(cookie=cookie))
@@ -205,7 +212,9 @@ class HostService(host_grpc_aio.HostServicer):
                address, _ = remote_device
                if address != self.task['address']:
                    return
                self.task['wait_disconnection'].set_result(address)

                future = self.task['wait_disconnection']
                future.get_loop().call_soon_threadsafe(future.set_result, address)

        if request.address is None:
            raise ValueError('Request address field must be set')
@@ -213,7 +222,7 @@ class HostService(host_grpc_aio.HostServicer):

        if self.bluetooth.is_connected(address):
            try:
                wait_disconnection = asyncio.Future()
                wait_disconnection = asyncio.get_running_loop().create_future()
                observer = ConnectionObserver({'wait_disconnection': wait_disconnection, 'address': address})
                name = utils.create_observer_name(observer)
                self.bluetooth.adapter_client.register_callback_observer(name, observer)
@@ -277,7 +286,9 @@ class HostService(host_grpc_aio.HostServicer):
                if status is None or floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS:
                    logging.error('Failed to start advertising.')
                    advertiser_id = None
                self.task['start_advertising'].set_result(advertiser_id)

                future = self.task['start_advertising']
                future.get_loop().call_soon_threadsafe(future.set_result, advertiser_id)

        class ConnectionObserver(adapter_client.BluetoothConnectionCallbacks):
            """Observer to observe all connections."""
@@ -323,7 +334,7 @@ class HostService(host_grpc_aio.HostServicer):

                logging.info('Advertise: Wait for LE connection...')
                address = await connections.get()
                logging.info(f'Advertise: Connected to {address}')
                logging.info('Advertise: Connected to %s', address)

                cookie = any_pb2.Any(value=utils.address_to(address))
                yield host_pb2.AdvertiseResponse(connection=host_pb2.Connection(cookie=cookie))
@@ -357,7 +368,9 @@ class HostService(host_grpc_aio.HostServicer):
                if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS:
                    logging.error('Failed to register scanner! uuid: {uuid}')
                    scanner_id = None
                self.task['register_scanner'].set_result(scanner_id)

                future = self.task['register_scanner']
                future.get_loop().call_soon_threadsafe(future.set_result, scanner_id)

            @utils.glib_callback()
            def on_scan_result(self, scan_result):
@@ -455,12 +468,13 @@ class HostService(host_grpc_aio.HostServicer):
            @utils.glib_callback()
            def on_discovering_changed(self, discovering):
                if discovering == self.task['discovering']:
                    self.task['start_inquiry'].set_result(discovering)
                    future = self.task['start_inquiry']
                    future.get_loop().call_soon_threadsafe(future.set_result, discovering)

        observers = []
        try:
            if not self.bluetooth.is_discovering():
                inquriy = {'start_inquiry': asyncio.Future(), 'discovering': True}
                inquriy = {'start_inquiry': asyncio.get_running_loop().create_future(), 'discovering': True}
                observer = DiscoveryObserver(inquriy)
                name = utils.create_observer_name(observer)
                self.bluetooth.adapter_client.register_callback_observer(name, observer)
+1 −1
Original line number Diff line number Diff line
@@ -68,7 +68,7 @@ class SecurityService(security_grpc_aio.SecurityServicer):
                answer = pairing_answer.WhichOneof('answer')
                address = utils.address_from(pairing_answer.event.connection.cookie.value)

                logging.info(f'pairing_answer: {pairing_answer} address: {address}')
                logging.info('pairing_answer: %s address: %s', pairing_answer, address)

                if answer == 'confirm':
                    self.bluetooth.set_pairing_confirmation(address, True)
+7 −5
Original line number Diff line number Diff line
@@ -16,7 +16,7 @@
import asyncio
import logging

from floss.pandora.server import bluetooth as bluetoot_module
from floss.pandora.server import bluetooth as bluetooth_module
from floss.pandora.server import host
from floss.pandora.server import security
import grpc
@@ -31,10 +31,12 @@ async def serve(port):

    try:
        while True:
            server = grpc.aio.server()
            bluetooth = bluetoot_module.Bluetooth()
            bluetooth = bluetooth_module.Bluetooth()
            bluetooth.reset()

            logging.info("bluetooth initialized")

            server = grpc.aio.server()
            host_service = host.HostService(server, bluetooth)
            host_grpc_aio.add_HostServicer_to_server(host_service, server)

@@ -44,10 +46,10 @@ async def serve(port):
            security_storage_service = security.SecurityStorageService(server, bluetooth)
            security_grpc_aio.add_SecurityStorageServicer_to_server(security_storage_service, server)

            server.add_insecure_port(f'localhost:{port}')
            server.add_insecure_port(f'[::]:{port}')

            await server.start()
            await server.wait_for_termination()

            bluetooth.cleanup()
            del bluetooth
    finally: