Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d05f5726 authored by Myles Watson's avatar Myles Watson Committed by Pomai Ahlo
Browse files

RfcommTest: Encapsulate servers and connections

Enable multiple connections at the same time.

Bug: 331415222
Test: atest RfcommTest
Flag: TEST_ONLY
Change-Id: Ibde106a922fd22d9285f4d8986e145e1101ce149
parent 4ec04021
Loading
Loading
Loading
Loading
+62 −42
Original line number Original line Diff line number Diff line
@@ -42,80 +42,100 @@ from pandora_experimental.rfcomm_pb2 import (
    TxResponse,
    TxResponse,
)
)


FIRST_SERVICE_RECORD_HANDLE = 0x00010000



class RFCOMMService(RFCOMMServicer):
class RFCOMMService(RFCOMMServicer):
    #TODO Add support for multiple servers
    device: Device
    device: Device
    server_id: Optional[ServerId]
    server: Optional[Server]


    def __init__(self, device: Device) -> None:
    def __init__(self, device: Device) -> None:
        super().__init__()
        super().__init__()
        self.device = device
        self.server_id = None
        self.server = None
        self.server = None
        self.server_name = None
        self.device = device
        self.server_uuid = None
        self.server_ports = {}  # key = channel, value = ServerInstance
        self.connections = {}  # key = id, value = dlc
        self.connections = {}  # key = id, value = dlc
        self.next_server_id = 1
        self.next_conn_id = 1
        self.next_conn_id = 1
        self.open_channel = None
        self.next_scn = 7
        self.wait_dlc = None

        self.dlc = None
    class Connection:

        def __init__(self, dlc):
            self.dlc = dlc
            self.data_queue = asyncio.Queue()
            self.data_queue = asyncio.Queue()


    class ServerPort:

        def __init__(self, name, uuid, wait_dlc):
            self.name = name
            self.uuid = uuid
            self.wait_dlc = wait_dlc
            self.accepted = False
            self.saved_dlc = None

        def accept(self):
            self.accepted = True
            if self.saved_dlc is not None:
                self.wait_dlc.set_result(self.saved_dlc)

        def acceptor(self, dlc):
            if self.accepted:
                self.wait_dlc.set_result(dlc)
            else:
                self.saved_dlc = dlc

    @utils.rpc
    @utils.rpc
    async def StartServer(self, request: StartServerRequest, context: grpc.ServicerContext) -> StartServerResponse:
    async def StartServer(self, request: StartServerRequest, context: grpc.ServicerContext) -> StartServerResponse:
        logging.info(f"StartServer")
        uuid = core.UUID(request.uuid)
        if self.server_id:
        logging.info(f"StartServer {uuid}")
            logging.warning(f"Server already started, returning existing server")

            return StartServerResponse(server=self.server_id)
        if self.server is None:
        else:
            self.server_id = ServerId(id=self.next_server_id)
            self.next_server_id += 1
            self.server = Server(self.device)
            self.server = Server(self.device)
            self.server_name = request.name

            self.server_uuid = core.UUID(request.uuid)
        for existing_id, port in self.server_ports.items():
        self.wait_dlc = asyncio.get_running_loop().create_future()
            if port.uuid == uuid:
        handle = 1
                logging.warning(f"Server port already started for {uuid}, returning existing port")
        #TODO Add support for multiple clients
                return StartServerResponse(server=ServerId(id=existing_id))
        self.open_channel = self.server.listen(acceptor=self.wait_dlc.set_result, channel=2)

        records = make_service_sdp_records(handle, self.open_channel, self.server_uuid)
        wait_dlc = asyncio.get_running_loop().create_future()
        self.device.sdp_service_records[handle] = records
        server_port = self.ServerPort(name=request.name, uuid=uuid, wait_dlc=wait_dlc)
        return StartServerResponse(server=self.server_id)
        open_channel = self.server.listen(acceptor=server_port.acceptor, channel=self.next_scn)
        self.next_scn += 1
        handle = FIRST_SERVICE_RECORD_HANDLE + open_channel
        self.device.sdp_service_records[handle] = make_service_sdp_records(handle, open_channel, uuid)
        self.server_ports[open_channel] = server_port
        return StartServerResponse(server=ServerId(id=open_channel))


    @utils.rpc
    @utils.rpc
    async def AcceptConnection(self, request: AcceptConnectionRequest,
    async def AcceptConnection(self, request: AcceptConnectionRequest,
                               context: grpc.ServicerContext) -> AcceptConnectionResponse:
                               context: grpc.ServicerContext) -> AcceptConnectionResponse:
        logging.info(f"AcceptConnection")
        logging.info(f"AcceptConnection")
        assert self.server_id.id == request.server.id
        assert self.server_ports[request.server.id] is not None
        self.dlc = await self.wait_dlc
        self.server_ports[request.server.id].accept()
        self.dlc.sink = self.data_queue.put_nowait
        dlc = await self.server_ports[request.server.id].wait_dlc
        new_conn = RfcommConnection(id=self.next_conn_id)
        id = self.next_conn_id
        self.next_conn_id += 1
        self.next_conn_id += 1
        self.connections[new_conn.id] = self.dlc
        self.connections[id] = self.Connection(dlc=dlc)
        return AcceptConnectionResponse(connection=new_conn)
        self.connections[id].dlc.sink = self.connections[id].data_queue.put_nowait
        return AcceptConnectionResponse(connection=RfcommConnection(id=id))


    @utils.rpc
    @utils.rpc
    async def StopServer(self, request: StopServerRequest, context: grpc.ServicerContext) -> StopServerResponse:
    async def StopServer(self, request: StopServerRequest, context: grpc.ServicerContext) -> StopServerResponse:
        logging.info(f"StopServer")
        logging.info(f"StopServer")
        assert self.server_id.id == request.server.id
        assert self.server_ports[request.server.id] is not None
        self.server = None
        self.server_ports[request.server.id] = None
        self.server_id = None
        self.server_name = None
        self.server_uuid = None


        return StopServerResponse()
        return StopServerResponse()


    @utils.rpc
    @utils.rpc
    async def Send(self, request: TxRequest, context: grpc.ServicerContext) -> TxResponse:
    async def Send(self, request: TxRequest, context: grpc.ServicerContext) -> TxResponse:
        logging.info(f"Send")
        logging.info(f"Send")
        dlc = self.connections[request.connection.id]
        assert self.connections[request.connection.id] is not None
        dlc.write(request.data)
        self.connections[request.connection.id].dlc.write(request.data)
        return TxResponse()
        return TxResponse()


    @utils.rpc
    @utils.rpc
    async def Receive(self, request: RxRequest, context: grpc.ServicerContext) -> RxResponse:
    async def Receive(self, request: RxRequest, context: grpc.ServicerContext) -> RxResponse:
        logging.info(f"Receive")
        logging.info(f"Receive")
        received_data = await self.data_queue.get()
        assert self.connections[request.connection.id] is not None
        received_data = await self.connections[request.connection.id].data_queue.get()
        return RxResponse(data=received_data)
        return RxResponse(data=received_data)