Loading pandora/server/bumble_experimental/rfcomm.py +62 −42 Original line number Diff line number Diff line Loading @@ -42,80 +42,100 @@ from pandora_experimental.rfcomm_pb2 import ( TxResponse, ) FIRST_SERVICE_RECORD_HANDLE = 0x00010000 class RFCOMMService(RFCOMMServicer): #TODO Add support for multiple servers device: Device server_id: Optional[ServerId] server: Optional[Server] def __init__(self, device: Device) -> None: super().__init__() self.device = device self.server_id = None self.server = None self.server_name = None self.server_uuid = None self.device = device self.server_ports = {} # key = channel, value = ServerInstance self.connections = {} # key = id, value = dlc self.next_server_id = 1 self.next_conn_id = 1 self.open_channel = None self.wait_dlc = None self.dlc = None self.next_scn = 7 class Connection: def __init__(self, dlc): self.dlc = dlc self.data_queue = asyncio.Queue() class ServerPort: def __init__(self, name, uuid, wait_dlc): self.name = name self.uuid = uuid self.wait_dlc = wait_dlc self.accepted = False self.saved_dlc = None def accept(self): self.accepted = True if self.saved_dlc is not None: self.wait_dlc.set_result(self.saved_dlc) def acceptor(self, dlc): if self.accepted: self.wait_dlc.set_result(dlc) else: self.saved_dlc = dlc @utils.rpc async def StartServer(self, request: StartServerRequest, context: grpc.ServicerContext) -> StartServerResponse: logging.info(f"StartServer") if self.server_id: logging.warning(f"Server already started, returning existing server") return StartServerResponse(server=self.server_id) else: self.server_id = ServerId(id=self.next_server_id) self.next_server_id += 1 uuid = core.UUID(request.uuid) logging.info(f"StartServer {uuid}") if self.server is None: self.server = Server(self.device) self.server_name = request.name self.server_uuid = core.UUID(request.uuid) self.wait_dlc = asyncio.get_running_loop().create_future() handle = 1 #TODO Add support for multiple clients self.open_channel = self.server.listen(acceptor=self.wait_dlc.set_result, channel=2) records = make_service_sdp_records(handle, self.open_channel, self.server_uuid) self.device.sdp_service_records[handle] = records return StartServerResponse(server=self.server_id) for existing_id, port in self.server_ports.items(): if port.uuid == uuid: logging.warning(f"Server port already started for {uuid}, returning existing port") return StartServerResponse(server=ServerId(id=existing_id)) wait_dlc = asyncio.get_running_loop().create_future() server_port = self.ServerPort(name=request.name, uuid=uuid, wait_dlc=wait_dlc) open_channel = self.server.listen(acceptor=server_port.acceptor, channel=self.next_scn) self.next_scn += 1 handle = FIRST_SERVICE_RECORD_HANDLE + open_channel self.device.sdp_service_records[handle] = make_service_sdp_records(handle, open_channel, uuid) self.server_ports[open_channel] = server_port return StartServerResponse(server=ServerId(id=open_channel)) @utils.rpc async def AcceptConnection(self, request: AcceptConnectionRequest, context: grpc.ServicerContext) -> AcceptConnectionResponse: logging.info(f"AcceptConnection") assert self.server_id.id == request.server.id self.dlc = await self.wait_dlc self.dlc.sink = self.data_queue.put_nowait new_conn = RfcommConnection(id=self.next_conn_id) assert self.server_ports[request.server.id] is not None self.server_ports[request.server.id].accept() dlc = await self.server_ports[request.server.id].wait_dlc id = self.next_conn_id self.next_conn_id += 1 self.connections[new_conn.id] = self.dlc return AcceptConnectionResponse(connection=new_conn) self.connections[id] = self.Connection(dlc=dlc) self.connections[id].dlc.sink = self.connections[id].data_queue.put_nowait return AcceptConnectionResponse(connection=RfcommConnection(id=id)) @utils.rpc async def StopServer(self, request: StopServerRequest, context: grpc.ServicerContext) -> StopServerResponse: logging.info(f"StopServer") assert self.server_id.id == request.server.id self.server = None self.server_id = None self.server_name = None self.server_uuid = None assert self.server_ports[request.server.id] is not None self.server_ports[request.server.id] = None return StopServerResponse() @utils.rpc async def Send(self, request: TxRequest, context: grpc.ServicerContext) -> TxResponse: logging.info(f"Send") dlc = self.connections[request.connection.id] dlc.write(request.data) assert self.connections[request.connection.id] is not None self.connections[request.connection.id].dlc.write(request.data) return TxResponse() @utils.rpc async def Receive(self, request: RxRequest, context: grpc.ServicerContext) -> RxResponse: logging.info(f"Receive") received_data = await self.data_queue.get() assert self.connections[request.connection.id] is not None received_data = await self.connections[request.connection.id].data_queue.get() return RxResponse(data=received_data) Loading
pandora/server/bumble_experimental/rfcomm.py +62 −42 Original line number Diff line number Diff line Loading @@ -42,80 +42,100 @@ from pandora_experimental.rfcomm_pb2 import ( TxResponse, ) FIRST_SERVICE_RECORD_HANDLE = 0x00010000 class RFCOMMService(RFCOMMServicer): #TODO Add support for multiple servers device: Device server_id: Optional[ServerId] server: Optional[Server] def __init__(self, device: Device) -> None: super().__init__() self.device = device self.server_id = None self.server = None self.server_name = None self.server_uuid = None self.device = device self.server_ports = {} # key = channel, value = ServerInstance self.connections = {} # key = id, value = dlc self.next_server_id = 1 self.next_conn_id = 1 self.open_channel = None self.wait_dlc = None self.dlc = None self.next_scn = 7 class Connection: def __init__(self, dlc): self.dlc = dlc self.data_queue = asyncio.Queue() class ServerPort: def __init__(self, name, uuid, wait_dlc): self.name = name self.uuid = uuid self.wait_dlc = wait_dlc self.accepted = False self.saved_dlc = None def accept(self): self.accepted = True if self.saved_dlc is not None: self.wait_dlc.set_result(self.saved_dlc) def acceptor(self, dlc): if self.accepted: self.wait_dlc.set_result(dlc) else: self.saved_dlc = dlc @utils.rpc async def StartServer(self, request: StartServerRequest, context: grpc.ServicerContext) -> StartServerResponse: logging.info(f"StartServer") if self.server_id: logging.warning(f"Server already started, returning existing server") return StartServerResponse(server=self.server_id) else: self.server_id = ServerId(id=self.next_server_id) self.next_server_id += 1 uuid = core.UUID(request.uuid) logging.info(f"StartServer {uuid}") if self.server is None: self.server = Server(self.device) self.server_name = request.name self.server_uuid = core.UUID(request.uuid) self.wait_dlc = asyncio.get_running_loop().create_future() handle = 1 #TODO Add support for multiple clients self.open_channel = self.server.listen(acceptor=self.wait_dlc.set_result, channel=2) records = make_service_sdp_records(handle, self.open_channel, self.server_uuid) self.device.sdp_service_records[handle] = records return StartServerResponse(server=self.server_id) for existing_id, port in self.server_ports.items(): if port.uuid == uuid: logging.warning(f"Server port already started for {uuid}, returning existing port") return StartServerResponse(server=ServerId(id=existing_id)) wait_dlc = asyncio.get_running_loop().create_future() server_port = self.ServerPort(name=request.name, uuid=uuid, wait_dlc=wait_dlc) open_channel = self.server.listen(acceptor=server_port.acceptor, channel=self.next_scn) self.next_scn += 1 handle = FIRST_SERVICE_RECORD_HANDLE + open_channel self.device.sdp_service_records[handle] = make_service_sdp_records(handle, open_channel, uuid) self.server_ports[open_channel] = server_port return StartServerResponse(server=ServerId(id=open_channel)) @utils.rpc async def AcceptConnection(self, request: AcceptConnectionRequest, context: grpc.ServicerContext) -> AcceptConnectionResponse: logging.info(f"AcceptConnection") assert self.server_id.id == request.server.id self.dlc = await self.wait_dlc self.dlc.sink = self.data_queue.put_nowait new_conn = RfcommConnection(id=self.next_conn_id) assert self.server_ports[request.server.id] is not None self.server_ports[request.server.id].accept() dlc = await self.server_ports[request.server.id].wait_dlc id = self.next_conn_id self.next_conn_id += 1 self.connections[new_conn.id] = self.dlc return AcceptConnectionResponse(connection=new_conn) self.connections[id] = self.Connection(dlc=dlc) self.connections[id].dlc.sink = self.connections[id].data_queue.put_nowait return AcceptConnectionResponse(connection=RfcommConnection(id=id)) @utils.rpc async def StopServer(self, request: StopServerRequest, context: grpc.ServicerContext) -> StopServerResponse: logging.info(f"StopServer") assert self.server_id.id == request.server.id self.server = None self.server_id = None self.server_name = None self.server_uuid = None assert self.server_ports[request.server.id] is not None self.server_ports[request.server.id] = None return StopServerResponse() @utils.rpc async def Send(self, request: TxRequest, context: grpc.ServicerContext) -> TxResponse: logging.info(f"Send") dlc = self.connections[request.connection.id] dlc.write(request.data) assert self.connections[request.connection.id] is not None self.connections[request.connection.id].dlc.write(request.data) return TxResponse() @utils.rpc async def Receive(self, request: RxRequest, context: grpc.ServicerContext) -> RxResponse: logging.info(f"Receive") received_data = await self.data_queue.get() assert self.connections[request.connection.id] is not None received_data = await self.connections[request.connection.id].data_queue.get() return RxResponse(data=received_data)