Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a1fd21f5 authored by John Lai's avatar John Lai Committed by Automerger Merge Worker
Browse files

Merge "Floss: Implement Pandora RFCOMM profile" into main am: 02995dab

parents e0392ab4 02995dab
Loading
Loading
Loading
Loading
+11 −0
Original line number Diff line number Diff line
@@ -386,3 +386,14 @@ class Bluetooth(object):

    def accept_socket(self, socket_id, timeout_ms=None):
        return self.socket_manager.accept(socket_id, timeout_ms)

    def create_insecure_rfcomm_socket_to_service_record(self, address, uuid):
        name = self.adapter_client.get_remote_property(address, 'Name')
        device = self.socket_manager.make_dbus_device(address, name)
        return self.socket_manager.create_insecure_rfcomm_socket_to_service_record(device, uuid)

    def listen_using_insecure_rfcomm_with_service_record(self, name, uuid):
        return self.socket_manager.listen_using_insecure_rfcomm_with_service_record(name, uuid)

    def close_socket(self, socket_id):
        return self.socket_manager.close(socket_id)
+335 −0
Original line number Diff line number Diff line
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""RFCOMM grpc interface."""

import asyncio
import logging
import os
import socket as socket_module
import uuid as uuid_module

from floss.pandora.floss import floss_enums
from floss.pandora.floss import socket_manager
from floss.pandora.floss import utils
from floss.pandora.server import bluetooth as bluetooth_module
from google.protobuf import empty_pb2
import grpc
from pandora_experimental import rfcomm_grpc_aio
from pandora_experimental import rfcomm_pb2


class RFCOMMService(rfcomm_grpc_aio.RFCOMMServicer):
    """Service to trigger Bluetooth RFCOMM procedures.

    This class implements the Pandora bluetooth test interfaces,
    where the meta class definition is automatically generated by the protobuf.
    The interface definition can be found in:
    https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/rfcomm.proto
    """

    # Size of the buffer for data transactions.
    BUFFER_SIZE = 512

    def __init__(self, bluetooth: bluetooth_module.Bluetooth):
        self.bluetooth = bluetooth

        # Used by new_stream_id() to generate IDs for the RPC client to specify the stream.
        self.current_stream_id = 0x12FC0

        # key = stream_id, val = stream
        self.streams = dict()

    def new_stream_id(self) -> int:
        id = self.current_stream_id
        self.current_stream_id += 1
        return id

    async def ConnectToServer(self, request: rfcomm_pb2.ConnectionRequest,
                              context: grpc.ServicerContext) -> rfcomm_pb2.ConnectionResponse:

        class CreateRFCOMMObserver(socket_manager.SocketManagerCallbacks):
            """Observer to observe the created RFCOMM connection state."""

            def __init__(self, task):
                self.task = task

            @utils.glib_callback()
            def on_outgoing_connection_result(self, connecting_id, result, socket, *, dbus_unix_fd_list=None):
                if connecting_id != self.task['connecting_id']:
                    return

                future = self.task['create_rfcomm_channel']
                if result is None or floss_enums.BtStatus(result) != floss_enums.BtStatus.SUCCESS:
                    logging.error('Failed to create the RFCOMM channel with connecting_id: %s. Status: %s',
                                  connecting_id, result)
                    future.get_loop().call_soon_threadsafe(future.set_result, None)
                    return

                if not socket:
                    future.get_loop().call_soon_threadsafe(future.set_result, None)
                    return

                optional_fd = socket['optional_value']['fd']
                if not optional_fd:
                    future.get_loop().call_soon_threadsafe(future.set_result, None)
                    return

                if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1:
                    logging.error('on_outgoing_connection_result: Empty fd list')
                    future.get_loop().call_soon_threadsafe(future.set_result, None)
                    return

                fd_handle = optional_fd['optional_value']
                if fd_handle > dbus_unix_fd_list.get_length():
                    logging.error('on_outgoing_connection_result: Invalid fd handle')
                    future.get_loop().call_soon_threadsafe(future.set_result, None)
                    return

                fd = dbus_unix_fd_list.get(fd_handle)
                fd_dup = os.dup(fd)
                future.get_loop().call_soon_threadsafe(future.set_result, fd_dup)

        address = utils.address_from(request.address)
        uuid = list(uuid_module.UUID(request.uuid).bytes)
        try:
            socket_result = self.bluetooth.create_insecure_rfcomm_socket_to_service_record(address, uuid)
            if socket_result is None:
                await context.abort(grpc.StatusCode.INTERNAL,
                                    'Failed to call create_insecure_rfcomm_socket_to_service_record.')

            connecting_id = socket_result['id']
            rfcomm_channel_creation = {
                'create_rfcomm_channel': asyncio.get_running_loop().create_future(),
                'connecting_id': connecting_id
            }
            observer = CreateRFCOMMObserver(rfcomm_channel_creation)
            name = utils.create_observer_name(observer)
            self.bluetooth.socket_manager.register_callback_observer(name, observer)
            fd = await asyncio.wait_for(rfcomm_channel_creation['create_rfcomm_channel'], timeout=5)
            if fd is None:
                await context.abort(grpc.StatusCode.INTERNAL,
                                    f'Failed to get the fd from RFCOMM socket with connecting_id: {connecting_id}')

            stream_id = self.new_stream_id()
            stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM)
            self.streams[stream_id] = stream
        finally:
            self.bluetooth.socket_manager.unregister_callback_observer(name, observer)

        return rfcomm_pb2.ConnectionResponse(connection=rfcomm_pb2.RfcommConnection(id=stream_id))

    async def Disconnect(self, request: rfcomm_pb2.DisconnectionRequest,
                         context: grpc.ServicerContext) -> rfcomm_pb2.DisconnectionResponse:

        stream_id = request.connection.id
        if stream_id in self.streams:
            stream = self.streams[stream_id]
            try:
                stream.shutdown(socket_module.SHUT_RDWR)
                stream.close()
                del self.streams[stream_id]
            except asyncio.TimeoutError as e:
                logging.error('Disconnect: asyncio.TimeoutError %s', e)
        else:
            logging.error('No stream found with ID %s', stream_id)

        return empty_pb2.Empty()

    async def StopServer(self, request: rfcomm_pb2.StopServerRequest,
                         context: grpc.ServicerContext) -> rfcomm_pb2.StopServerResponse:

        class StopRFCOMMSocket(socket_manager.SocketManagerCallbacks):
            """Observer to observe stop state of RFCOMM connection."""

            def __init__(self, task):
                self.task = task

            @utils.glib_callback()
            def on_incoming_socket_closed(self, listener_id, reason):
                if listener_id != self.task['listener_id']:
                    return

                if reason is None or floss_enums.BtStatus(reason) != floss_enums.BtStatus.SUCCESS:
                    logging.error('Failed to stop RFCOMM channel with listener_id: %s. Status: %s', listener_id, reason)

                future = self.task['stop_rfcomm_channel']
                future.get_loop().call_soon_threadsafe(future.set_result, reason)

        try:
            listener_id = request.server.id
            rfcomm_channel_stop = {
                'stop_rfcomm_channel': asyncio.get_running_loop().create_future(),
                'listener_id': listener_id
            }
            observer = StopRFCOMMSocket(rfcomm_channel_stop)
            name = utils.create_observer_name(observer)
            self.bluetooth.socket_manager.register_callback_observer(name, observer)
            if not self.bluetooth.close_socket(listener_id):
                await context.abort(grpc.StatusCode.INTERNAL, 'Failed to call close_socket.')

            status = await asyncio.wait_for(rfcomm_channel_stop['stop_rfcomm_channel'], timeout=5)
            if status != floss_enums.BtStatus.SUCCESS:
                await context.abort(grpc.StatusCode.INTERNAL,
                                    f'Failed to stop RFCOMM channel with listener_id: {listener_id}. Status: {status}')
        finally:
            self.bluetooth.socket_manager.unregister_callback_observer(name, observer)

        return empty_pb2.Empty()

    async def StartServer(self, request: rfcomm_pb2.ServerOptions,
                          context: grpc.ServicerContext) -> rfcomm_pb2.StartServerResponse:

        class StartServerObserver(socket_manager.SocketManagerCallbacks):
            """Observer to observe the RFCOMM server start."""

            def __init__(self, task):
                self.task = task

            @utils.glib_callback()
            def on_incoming_socket_ready(self, socket, status):
                if not socket or 'id' not in socket:
                    return

                listener_id = socket['id']
                if listener_id != self.task['socket_id']:
                    return

                future = self.task['start_rfcomm_server']
                if status is None or floss_enums.BtStatus(status) != floss_enums.BtStatus.SUCCESS:
                    logging.error('Failed listening to RFCOMM channel with socket_id: %s. Status: %s', listener_id,
                                  status)
                    future.get_loop().call_soon_threadsafe(future.set_result, None)
                else:
                    future.get_loop().call_soon_threadsafe(future.set_result, listener_id)

        try:
            uuid = list(uuid_module.UUID(request.uuid).bytes)
            socket_result = self.bluetooth.listen_using_insecure_rfcomm_with_service_record(request.name, uuid)
            if socket_result is None:
                await context.abort(grpc.StatusCode.INTERNAL,
                                    'Failed to call listen_using_insecure_rfcomm_with_service_record.')

            rfcomm_channel_listener = {
                'start_rfcomm_server': asyncio.get_running_loop().create_future(),
                'socket_id': socket_result['id']
            }
            observer = StartServerObserver(rfcomm_channel_listener)
            name = utils.create_observer_name(observer)
            self.bluetooth.socket_manager.register_callback_observer(name, observer)
            listener_id = await asyncio.wait_for(rfcomm_channel_listener['start_rfcomm_server'], timeout=5)
        finally:
            self.bluetooth.socket_manager.unregister_callback_observer(name, observer)

        return rfcomm_pb2.StartServerResponse(server=rfcomm_pb2.ServerId(id=listener_id))

    async def AcceptConnection(self, request: rfcomm_pb2.AcceptConnectionRequest,
                               context: grpc.ServicerContext) -> rfcomm_pb2.AcceptConnectionResponse:

        class AcceptConnectionObserver(socket_manager.SocketManagerCallbacks):
            """Observer to observe the accepted RFCOMM connection."""

            def __init__(self, task):
                self.task = task

            @utils.glib_callback()
            def on_handle_incoming_connection(self, listener_id, connection, *, dbus_unix_fd_list=None):
                if listener_id != self.task['listener_id']:
                    return

                future = self.task['accept_rfcomm_channel']
                if not connection:
                    future.get_loop().call_soon_threadsafe(future.set_result, None)
                    return

                optional_fd = connection['fd']
                if not optional_fd:
                    future.get_loop().call_soon_threadsafe(future.set_result, None)
                    return

                if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1:
                    logging.error('on_handle_incoming_connection: Empty fd list')
                    future.get_loop().call_soon_threadsafe(future.set_result, None)
                    return

                fd_handle = optional_fd['optional_value']
                if fd_handle > dbus_unix_fd_list.get_length():
                    logging.error('on_handle_incoming_connection: Invalid fd handle')
                    future.get_loop().call_soon_threadsafe(future.set_result, None)
                    return

                fd = dbus_unix_fd_list.get(fd_handle)
                fd_dup = os.dup(fd)
                future.get_loop().call_soon_threadsafe(future.set_result, fd_dup)

        try:
            listener_id = request.server.id
            rfcomm_channel_acceptance = {
                'accept_rfcomm_channel': asyncio.get_running_loop().create_future(),
                'listener_id': listener_id
            }
            observer = AcceptConnectionObserver(rfcomm_channel_acceptance)
            name = utils.create_observer_name(observer)
            self.bluetooth.socket_manager.register_callback_observer(name, observer)
            accept_socket_status = self.bluetooth.accept_socket(listener_id, timeout_ms=5)
            if accept_socket_status != floss_enums.BtStatus.SUCCESS:
                await context.abort(
                    grpc.StatusCode.INTERNAL, f'Failed to accept the RFCOMM socket with listener_id: {listener_id}. '
                    f'Status: {accept_socket_status}.')

            fd = await asyncio.wait_for(rfcomm_channel_acceptance['accept_rfcomm_channel'], timeout=5)
            if fd is None:
                await context.abort(grpc.StatusCode.INTERNAL,
                                    f'Failed to get the fd from RFCOMM socket with listener_id: {listener_id}')

            stream_id = self.new_stream_id()
            stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM)
            self.streams[stream_id] = stream

        except asyncio.TimeoutError as e:
            logging.error('AcceptConnection: asyncio.TimeoutError %s', e)
            return rfcomm_pb2.AcceptConnectionResponse(connection=rfcomm_pb2.RfcommConnection(id=None))
        finally:
            self.bluetooth.socket_manager.unregister_callback_observer(name, observer)

        return rfcomm_pb2.AcceptConnectionResponse(connection=rfcomm_pb2.RfcommConnection(id=stream_id))

    async def Send(self, request: rfcomm_pb2.TxRequest, context: grpc.ServicerContext) -> rfcomm_pb2.TxResponse:
        stream_id = request.connection.id
        output_stream = self.streams.get(stream_id)
        if output_stream:
            try:
                output_stream.send(request.data)

            except Exception as e:
                logging.error('Exception during writing to output stream %s', e)
        else:
            logging.error('Output stream: %s not found for the stream_id: %s', output_stream, stream_id)

        return empty_pb2.Empty()

    async def Receive(self, request: rfcomm_pb2.RxRequest, context: grpc.ServicerContext) -> rfcomm_pb2.RxResponse:
        stream_id = request.connection.id
        input_stream = self.streams.get(stream_id)
        if input_stream:
            try:
                data = input_stream.recv(self.BUFFER_SIZE)
                if data:
                    return rfcomm_pb2.RxResponse(data=bytes(data))
            except Exception as e:
                logging.error('Exception during reading from input stream %s', e)
        else:
            logging.error('Input stream: %s not found for the stream_id: %s', input_stream, stream_id)

        # Return an empty byte array.
        return rfcomm_pb2.RxResponse(data=b'')
+5 −0
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ from floss.pandora.server import gatt
from floss.pandora.server import hid
from floss.pandora.server import host
from floss.pandora.server import l2cap
from floss.pandora.server import rfcomm
from floss.pandora.server import security
import grpc
from pandora import host_grpc_aio
@@ -28,6 +29,7 @@ from pandora import security_grpc_aio
from pandora_experimental import gatt_grpc_aio
from pandora_experimental import hid_grpc_aio
from pandora_experimental import l2cap_grpc_aio
from pandora_experimental import rfcomm_grpc_aio


async def serve(port):
@@ -60,6 +62,9 @@ async def serve(port):
            l2cap_service = l2cap.L2CAPService(bluetooth)
            l2cap_grpc_aio.add_L2CAPServicer_to_server(l2cap_service, server)

            rfcomm_service = rfcomm.RFCOMMService(bluetooth)
            rfcomm_grpc_aio.add_RFCOMMServicer_to_server(rfcomm_service, server)

            server.add_insecure_port(f'[::]:{port}')

            await server.start()