Loading pandora/interfaces/pandora_experimental/hid.proto +2 −0 Original line number Diff line number Diff line Loading @@ -11,6 +11,8 @@ service HID { rpc ConnectHost(google.protobuf.Empty) returns (google.protobuf.Empty); // Disconnect HID Host rpc DisconnectHost(google.protobuf.Empty) returns (google.protobuf.Empty); // Virtual Cable Unplug HID Host rpc VirtualCableUnplugHost(google.protobuf.Empty) returns (google.protobuf.Empty); // Send a SET_REPORT command, acting as a HID host, to a connected HID device rpc SendHostReport(SendHostReportRequest) returns (SendHostReportResponse); } Loading pandora/server/bumble_experimental/hid.py +359 −10 Original line number Diff line number Diff line Loading @@ -3,20 +3,28 @@ import asyncio import grpc import grpc.aio import logging import struct from bumble.device import Device from google.protobuf import empty_pb2 # pytype: disable=pyi-error from pandora_experimental.hid_grpc_aio import HIDServicer from bumble.pandora import utils from bumble.core import ( BT_BR_EDR_TRANSPORT, BT_L2CAP_PROTOCOL_ID, BT_HUMAN_INTERFACE_DEVICE_SERVICE, BT_HIDP_PROTOCOL_ID, UUID, ProtocolError, ) from bumble.hci import ( HCI_StatusError, HCI_CONNECTION_ALREADY_EXISTS_ERROR, HCI_PAGE_TIMEOUT_ERROR, ) from bumble.hid import ( Device as HID_Device, HID_CONTROL_PSM, Loading Loading @@ -205,13 +213,137 @@ HID_REPORT_MAP = bytes( # Text String, 50 Octet Report Descriptor 0x02, # . Report Count (2) 0x81, 0x06, # . Input (Data,Var,Rel,No Wrap,Linear,Preferred State,No Null Position) 0xC0, # . End Collection 0xC0, # End Collection 0xC0, # . End Collection (Physical) 0xC0, # End Collection (Application) ]) # Default protocol mode set to report protocol protocol_mode = Message.ProtocolMode.REPORT_PROTOCOL from bumble.core import AdvertisingData from bumble.device import Device, Connection, Peer from bumble.gatt import ( Descriptor, Service, Characteristic, CharacteristicValue, GATT_DEVICE_INFORMATION_SERVICE, GATT_HUMAN_INTERFACE_DEVICE_SERVICE, GATT_BATTERY_SERVICE, GATT_BATTERY_LEVEL_CHARACTERISTIC, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, GATT_REPORT_CHARACTERISTIC, GATT_REPORT_MAP_CHARACTERISTIC, GATT_PROTOCOL_MODE_CHARACTERISTIC, GATT_HID_INFORMATION_CHARACTERISTIC, GATT_HID_CONTROL_POINT_CHARACTERISTIC, GATT_REPORT_REFERENCE_DESCRIPTOR, ) # ----------------------------------------------------------------------------- # Protocol Modes (HID Specification V1.1.1 Section 2.1.2) HID_BOOT_PROTOCOL = 0x00 HID_REPORT_PROTOCOL = 0x01 # Report Types (HID Specification V1.1.1 Section 2.1.1) HID_INPUT_REPORT = 0x01 HID_OUTPUT_REPORT = 0x02 HID_FEATURE_REPORT = 0x03 # Report Map HID_KEYBOARD_REPORT_MAP = bytes( # pylint: disable=line-too-long [ 0x05, 0x01, # Usage Page (Generic Desktop Controls) 0x09, 0x06, # Usage (Keyboard) 0xA1, 0x01, # Collection (Application) 0x85, 0x01, # . Report ID (1) 0x05, 0x07, # . Usage Page (Keyboard/Keypad) 0x19, 0xE0, # . Usage Minimum (0xE0) 0x29, 0xE7, # . Usage Maximum (0xE7) 0x15, 0x00, # . Logical Minimum (0) 0x25, 0x01, # . Logical Maximum (1) 0x75, 0x01, # . Report Size (1) 0x95, 0x08, # . Report Count (8) 0x81, 0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position) 0x95, 0x01, # . Report Count (1) 0x75, 0x08, # . Report Size (8) 0x81, 0x01, # . Input (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) 0x95, 0x06, # . Report Count (6) 0x75, 0x08, # . Report Size (8) 0x15, 0x00, # . Logical Minimum (0x00) 0x25, 0x94, # . Logical Maximum (0x94) 0x05, 0x07, # . Usage Page (Keyboard/Keypad) 0x19, 0x00, # . Usage Minimum (0x00) 0x29, 0x94, # . Usage Maximum (0x94) 0x81, 0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) 0x95, 0x05, # . Report Count (5) 0x75, 0x01, # . Report Size (1) 0x05, 0x08, # . Usage Page (LEDs) 0x19, 0x01, # . Usage Minimum (Num Lock) 0x29, 0x05, # . Usage Maximum (Kana) 0x91, 0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) 0x95, 0x01, # . Report Count (1) 0x75, 0x03, # . Report Size (3) 0x91, 0x01, # . Output (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) 0xC0, # End Collection ]) # ----------------------------------------------------------------------------- # pylint: disable=invalid-overridden-method class ServerListener(Device.Listener, Connection.Listener): def __init__(self, device): self.device = device @AsyncRunner.run_in_task() async def on_connection(self, connection): logging.info(f'=== Connected to {connection}') connection.listener = self @AsyncRunner.run_in_task() async def on_disconnection(self, reason): logging.info(f'### Disconnected, reason={reason}') # ----------------------------------------------------------------------------- def on_hid_control_point_write(_connection, value): logging.info(f'Control Point Write: {value}') # ----------------------------------------------------------------------------- def sdp_records(): Loading Loading @@ -355,6 +487,186 @@ def sdp_records(): } # ----------------------------------------------------------------------------- def hogp_device(device): global input_report_characteristic # Create an 'input report' characteristic to send keyboard reports to the host input_report_characteristic = Characteristic( GATT_REPORT_CHARACTERISTIC, Characteristic.Properties.READ | Characteristic.Properties.WRITE | Characteristic.Properties.NOTIFY, Characteristic.READABLE | Characteristic.WRITEABLE, bytes([0, 0, 0, 0, 0, 0, 0, 0]), [Descriptor( GATT_REPORT_REFERENCE_DESCRIPTOR, Descriptor.READABLE, bytes([0x01, HID_INPUT_REPORT]), )], ) # Create an 'output report' characteristic to receive keyboard reports from the host output_report_characteristic = Characteristic( GATT_REPORT_CHARACTERISTIC, Characteristic.Properties.READ | Characteristic.Properties.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE, Characteristic.READABLE | Characteristic.WRITEABLE, bytes([0]), [Descriptor( GATT_REPORT_REFERENCE_DESCRIPTOR, Descriptor.READABLE, bytes([0x01, HID_OUTPUT_REPORT]), )], ) # Add the services to the GATT sever device.add_services([ Service( GATT_DEVICE_INFORMATION_SERVICE, [ Characteristic( GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, Characteristic.Properties.READ, Characteristic.READABLE, 'Bumble', ) ], ), Service( GATT_HUMAN_INTERFACE_DEVICE_SERVICE, [ Characteristic( GATT_PROTOCOL_MODE_CHARACTERISTIC, Characteristic.Properties.READ, Characteristic.READABLE, bytes([HID_REPORT_PROTOCOL]), ), Characteristic( GATT_HID_INFORMATION_CHARACTERISTIC, Characteristic.Properties.READ, Characteristic.READABLE, # bcdHID=1.1, bCountryCode=0x00, # Flags=RemoteWake|NormallyConnectable bytes([0x11, 0x01, 0x00, 0x03]), ), Characteristic( GATT_HID_CONTROL_POINT_CHARACTERISTIC, Characteristic.WRITE_WITHOUT_RESPONSE, Characteristic.WRITEABLE, CharacteristicValue(write=on_hid_control_point_write), ), Characteristic( GATT_REPORT_MAP_CHARACTERISTIC, Characteristic.Properties.READ, Characteristic.READABLE, HID_KEYBOARD_REPORT_MAP, ), input_report_characteristic, output_report_characteristic, ], ), Service( GATT_BATTERY_SERVICE, [ Characteristic( GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.Properties.READ, Characteristic.READABLE, bytes([100]), ) ], ), ]) # Debug print for attribute in device.gatt_server.attributes: logging.info(attribute) # Set the advertising data device.advertising_data = bytes( AdvertisingData([ ( AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Keyboard', 'utf-8'), ), ( AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, bytes(GATT_HUMAN_INTERFACE_DEVICE_SERVICE), ), (AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)), (AdvertisingData.FLAGS, bytes([0x05])), ])) # Attach a listener device.listener = ServerListener(device) async def handle_virtual_cable_unplug(): hid_host_bd_addr = str(hid_device.remote_device_bd_address) await hid_device.disconnect_interrupt_channel() await hid_device.disconnect_control_channel() await hid_device.device.keystore.delete(hid_host_bd_addr) # type: ignore connection = hid_device.connection if connection is not None: await connection.disconnect() def on_get_report_cb(report_id: int, report_type: int, buffer_size: int): retValue = hid_device.GetSetStatus() logging.info("GET_REPORT report_id: " + str(report_id) + "report_type: " + str(report_type) + "buffer_size:" + str(buffer_size)) if report_type == Message.ReportType.INPUT_REPORT: if report_id == 1: retValue.data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) retValue.status = hid_device.GetSetReturn.SUCCESS elif report_id == 2: retValue.data = bytearray([0x02, 0x00, 0x00, 0x00]) retValue.status = hid_device.GetSetReturn.SUCCESS else: retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND return retValue def on_set_report_cb(report_id: int, report_type: int, report_size: int, data: bytes): retValue = hid_device.GetSetStatus() logging.info("SET_REPORT report_id: " + str(report_id) + "report_type: " + str(report_type) + "report_size " + str(report_size) + "data:" + str(data)) if report_type == Message.ReportType.FEATURE_REPORT: retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER elif report_type == Message.ReportType.INPUT_REPORT: if report_id == 1 and report_size != 9: retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER elif report_id == 2 and report_size != 4: retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER elif report_id == 3: retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND else: retValue.status = hid_device.GetSetReturn.SUCCESS else: retValue.status = hid_device.GetSetReturn.SUCCESS return retValue def on_get_protocol_cb(): retValue = hid_device.GetSetStatus() retValue.data = protocol_mode.to_bytes() retValue.status = hid_device.GetSetReturn.SUCCESS return retValue def on_set_protocol_cb(protocol: int): retValue = hid_device.GetSetStatus() # We do not support SET_PROTOCOL. logging.info(f"SET_PROTOCOL report_id: {protocol}") retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST return retValue def on_virtual_cable_unplug_cb(): logging.info('Received Virtual Cable Unplug') asyncio.create_task(handle_virtual_cable_unplug()) # This class implements the Hid Pandora interface. class HIDService(HIDServicer): Loading @@ -364,27 +676,43 @@ class HIDService(HIDServicer): super().__init__() self.device = device self.device.sdp_service_records.update(sdp_records()) hogp_device(self.device) logging.info(f'Hid device register: ') global hid_device hid_device = HID_Device(self.device) # Register for call backs hid_device.register_get_report_cb(on_get_report_cb) hid_device.register_set_report_cb(on_set_report_cb) hid_device.register_get_protocol_cb(on_get_protocol_cb) hid_device.register_set_protocol_cb(on_set_protocol_cb) # Register for virtual cable unplug call back hid_device.on('virtual_cable_unplug', on_virtual_cable_unplug_cb) @utils.rpc async def ConnectHost(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty: logging.info(f'ConnectHidHost') logging.info(f'ConnectHost') try: hid_host_bd_addr = str(hid_device.remote_device_bd_address) connection = await self.device.connect(hid_host_bd_addr, transport=BT_BR_EDR_TRANSPORT) await connection.authenticate() await connection.encrypt() if hid_device is not None: await hid_device.connect_control_channel() await hid_device.connect_interrupt_channel() except AttributeError as e: logging.error(f'Device does not exist') raise e except (HCI_StatusError, ProtocolError) as e: logging.error(f"Connection failure error: {e}") raise e return empty_pb2.Empty() @utils.rpc async def DisconnectHost(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty: logging.info(f'DisconnectHidHost') if hid_device is not None: logging.info(f'DisconnectHost') try: await hid_device.disconnect_interrupt_channel() await hid_device.disconnect_control_channel() connection = hid_device.connection Loading @@ -392,4 +720,25 @@ class HIDService(HIDServicer): await connection.disconnect() else: logging.info(f'Already disconnected from Hid Host') except AttributeError as e: logging.error(f'Device does not exist') raise e return empty_pb2.Empty() @utils.rpc async def VirtualCableUnplugHost(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty: logging.info(f'VirtualCableUnplugHost') try: hid_device.virtual_cable_unplug() try: hid_host_bd_addr = str(hid_device.remote_device_bd_address) await hid_device.device.keystore.delete(hid_host_bd_addr) except KeyError: logging.error(f'Device not found or Device already unpaired.') raise except AttributeError as e: logging.exception(f'Device does not exist') raise e return empty_pb2.Empty() Loading
pandora/interfaces/pandora_experimental/hid.proto +2 −0 Original line number Diff line number Diff line Loading @@ -11,6 +11,8 @@ service HID { rpc ConnectHost(google.protobuf.Empty) returns (google.protobuf.Empty); // Disconnect HID Host rpc DisconnectHost(google.protobuf.Empty) returns (google.protobuf.Empty); // Virtual Cable Unplug HID Host rpc VirtualCableUnplugHost(google.protobuf.Empty) returns (google.protobuf.Empty); // Send a SET_REPORT command, acting as a HID host, to a connected HID device rpc SendHostReport(SendHostReportRequest) returns (SendHostReportResponse); } Loading
pandora/server/bumble_experimental/hid.py +359 −10 Original line number Diff line number Diff line Loading @@ -3,20 +3,28 @@ import asyncio import grpc import grpc.aio import logging import struct from bumble.device import Device from google.protobuf import empty_pb2 # pytype: disable=pyi-error from pandora_experimental.hid_grpc_aio import HIDServicer from bumble.pandora import utils from bumble.core import ( BT_BR_EDR_TRANSPORT, BT_L2CAP_PROTOCOL_ID, BT_HUMAN_INTERFACE_DEVICE_SERVICE, BT_HIDP_PROTOCOL_ID, UUID, ProtocolError, ) from bumble.hci import ( HCI_StatusError, HCI_CONNECTION_ALREADY_EXISTS_ERROR, HCI_PAGE_TIMEOUT_ERROR, ) from bumble.hid import ( Device as HID_Device, HID_CONTROL_PSM, Loading Loading @@ -205,13 +213,137 @@ HID_REPORT_MAP = bytes( # Text String, 50 Octet Report Descriptor 0x02, # . Report Count (2) 0x81, 0x06, # . Input (Data,Var,Rel,No Wrap,Linear,Preferred State,No Null Position) 0xC0, # . End Collection 0xC0, # End Collection 0xC0, # . End Collection (Physical) 0xC0, # End Collection (Application) ]) # Default protocol mode set to report protocol protocol_mode = Message.ProtocolMode.REPORT_PROTOCOL from bumble.core import AdvertisingData from bumble.device import Device, Connection, Peer from bumble.gatt import ( Descriptor, Service, Characteristic, CharacteristicValue, GATT_DEVICE_INFORMATION_SERVICE, GATT_HUMAN_INTERFACE_DEVICE_SERVICE, GATT_BATTERY_SERVICE, GATT_BATTERY_LEVEL_CHARACTERISTIC, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, GATT_REPORT_CHARACTERISTIC, GATT_REPORT_MAP_CHARACTERISTIC, GATT_PROTOCOL_MODE_CHARACTERISTIC, GATT_HID_INFORMATION_CHARACTERISTIC, GATT_HID_CONTROL_POINT_CHARACTERISTIC, GATT_REPORT_REFERENCE_DESCRIPTOR, ) # ----------------------------------------------------------------------------- # Protocol Modes (HID Specification V1.1.1 Section 2.1.2) HID_BOOT_PROTOCOL = 0x00 HID_REPORT_PROTOCOL = 0x01 # Report Types (HID Specification V1.1.1 Section 2.1.1) HID_INPUT_REPORT = 0x01 HID_OUTPUT_REPORT = 0x02 HID_FEATURE_REPORT = 0x03 # Report Map HID_KEYBOARD_REPORT_MAP = bytes( # pylint: disable=line-too-long [ 0x05, 0x01, # Usage Page (Generic Desktop Controls) 0x09, 0x06, # Usage (Keyboard) 0xA1, 0x01, # Collection (Application) 0x85, 0x01, # . Report ID (1) 0x05, 0x07, # . Usage Page (Keyboard/Keypad) 0x19, 0xE0, # . Usage Minimum (0xE0) 0x29, 0xE7, # . Usage Maximum (0xE7) 0x15, 0x00, # . Logical Minimum (0) 0x25, 0x01, # . Logical Maximum (1) 0x75, 0x01, # . Report Size (1) 0x95, 0x08, # . Report Count (8) 0x81, 0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position) 0x95, 0x01, # . Report Count (1) 0x75, 0x08, # . Report Size (8) 0x81, 0x01, # . Input (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) 0x95, 0x06, # . Report Count (6) 0x75, 0x08, # . Report Size (8) 0x15, 0x00, # . Logical Minimum (0x00) 0x25, 0x94, # . Logical Maximum (0x94) 0x05, 0x07, # . Usage Page (Keyboard/Keypad) 0x19, 0x00, # . Usage Minimum (0x00) 0x29, 0x94, # . Usage Maximum (0x94) 0x81, 0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) 0x95, 0x05, # . Report Count (5) 0x75, 0x01, # . Report Size (1) 0x05, 0x08, # . Usage Page (LEDs) 0x19, 0x01, # . Usage Minimum (Num Lock) 0x29, 0x05, # . Usage Maximum (Kana) 0x91, 0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) 0x95, 0x01, # . Report Count (1) 0x75, 0x03, # . Report Size (3) 0x91, 0x01, # . Output (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) 0xC0, # End Collection ]) # ----------------------------------------------------------------------------- # pylint: disable=invalid-overridden-method class ServerListener(Device.Listener, Connection.Listener): def __init__(self, device): self.device = device @AsyncRunner.run_in_task() async def on_connection(self, connection): logging.info(f'=== Connected to {connection}') connection.listener = self @AsyncRunner.run_in_task() async def on_disconnection(self, reason): logging.info(f'### Disconnected, reason={reason}') # ----------------------------------------------------------------------------- def on_hid_control_point_write(_connection, value): logging.info(f'Control Point Write: {value}') # ----------------------------------------------------------------------------- def sdp_records(): Loading Loading @@ -355,6 +487,186 @@ def sdp_records(): } # ----------------------------------------------------------------------------- def hogp_device(device): global input_report_characteristic # Create an 'input report' characteristic to send keyboard reports to the host input_report_characteristic = Characteristic( GATT_REPORT_CHARACTERISTIC, Characteristic.Properties.READ | Characteristic.Properties.WRITE | Characteristic.Properties.NOTIFY, Characteristic.READABLE | Characteristic.WRITEABLE, bytes([0, 0, 0, 0, 0, 0, 0, 0]), [Descriptor( GATT_REPORT_REFERENCE_DESCRIPTOR, Descriptor.READABLE, bytes([0x01, HID_INPUT_REPORT]), )], ) # Create an 'output report' characteristic to receive keyboard reports from the host output_report_characteristic = Characteristic( GATT_REPORT_CHARACTERISTIC, Characteristic.Properties.READ | Characteristic.Properties.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE, Characteristic.READABLE | Characteristic.WRITEABLE, bytes([0]), [Descriptor( GATT_REPORT_REFERENCE_DESCRIPTOR, Descriptor.READABLE, bytes([0x01, HID_OUTPUT_REPORT]), )], ) # Add the services to the GATT sever device.add_services([ Service( GATT_DEVICE_INFORMATION_SERVICE, [ Characteristic( GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, Characteristic.Properties.READ, Characteristic.READABLE, 'Bumble', ) ], ), Service( GATT_HUMAN_INTERFACE_DEVICE_SERVICE, [ Characteristic( GATT_PROTOCOL_MODE_CHARACTERISTIC, Characteristic.Properties.READ, Characteristic.READABLE, bytes([HID_REPORT_PROTOCOL]), ), Characteristic( GATT_HID_INFORMATION_CHARACTERISTIC, Characteristic.Properties.READ, Characteristic.READABLE, # bcdHID=1.1, bCountryCode=0x00, # Flags=RemoteWake|NormallyConnectable bytes([0x11, 0x01, 0x00, 0x03]), ), Characteristic( GATT_HID_CONTROL_POINT_CHARACTERISTIC, Characteristic.WRITE_WITHOUT_RESPONSE, Characteristic.WRITEABLE, CharacteristicValue(write=on_hid_control_point_write), ), Characteristic( GATT_REPORT_MAP_CHARACTERISTIC, Characteristic.Properties.READ, Characteristic.READABLE, HID_KEYBOARD_REPORT_MAP, ), input_report_characteristic, output_report_characteristic, ], ), Service( GATT_BATTERY_SERVICE, [ Characteristic( GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.Properties.READ, Characteristic.READABLE, bytes([100]), ) ], ), ]) # Debug print for attribute in device.gatt_server.attributes: logging.info(attribute) # Set the advertising data device.advertising_data = bytes( AdvertisingData([ ( AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Keyboard', 'utf-8'), ), ( AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, bytes(GATT_HUMAN_INTERFACE_DEVICE_SERVICE), ), (AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)), (AdvertisingData.FLAGS, bytes([0x05])), ])) # Attach a listener device.listener = ServerListener(device) async def handle_virtual_cable_unplug(): hid_host_bd_addr = str(hid_device.remote_device_bd_address) await hid_device.disconnect_interrupt_channel() await hid_device.disconnect_control_channel() await hid_device.device.keystore.delete(hid_host_bd_addr) # type: ignore connection = hid_device.connection if connection is not None: await connection.disconnect() def on_get_report_cb(report_id: int, report_type: int, buffer_size: int): retValue = hid_device.GetSetStatus() logging.info("GET_REPORT report_id: " + str(report_id) + "report_type: " + str(report_type) + "buffer_size:" + str(buffer_size)) if report_type == Message.ReportType.INPUT_REPORT: if report_id == 1: retValue.data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) retValue.status = hid_device.GetSetReturn.SUCCESS elif report_id == 2: retValue.data = bytearray([0x02, 0x00, 0x00, 0x00]) retValue.status = hid_device.GetSetReturn.SUCCESS else: retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND return retValue def on_set_report_cb(report_id: int, report_type: int, report_size: int, data: bytes): retValue = hid_device.GetSetStatus() logging.info("SET_REPORT report_id: " + str(report_id) + "report_type: " + str(report_type) + "report_size " + str(report_size) + "data:" + str(data)) if report_type == Message.ReportType.FEATURE_REPORT: retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER elif report_type == Message.ReportType.INPUT_REPORT: if report_id == 1 and report_size != 9: retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER elif report_id == 2 and report_size != 4: retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER elif report_id == 3: retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND else: retValue.status = hid_device.GetSetReturn.SUCCESS else: retValue.status = hid_device.GetSetReturn.SUCCESS return retValue def on_get_protocol_cb(): retValue = hid_device.GetSetStatus() retValue.data = protocol_mode.to_bytes() retValue.status = hid_device.GetSetReturn.SUCCESS return retValue def on_set_protocol_cb(protocol: int): retValue = hid_device.GetSetStatus() # We do not support SET_PROTOCOL. logging.info(f"SET_PROTOCOL report_id: {protocol}") retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST return retValue def on_virtual_cable_unplug_cb(): logging.info('Received Virtual Cable Unplug') asyncio.create_task(handle_virtual_cable_unplug()) # This class implements the Hid Pandora interface. class HIDService(HIDServicer): Loading @@ -364,27 +676,43 @@ class HIDService(HIDServicer): super().__init__() self.device = device self.device.sdp_service_records.update(sdp_records()) hogp_device(self.device) logging.info(f'Hid device register: ') global hid_device hid_device = HID_Device(self.device) # Register for call backs hid_device.register_get_report_cb(on_get_report_cb) hid_device.register_set_report_cb(on_set_report_cb) hid_device.register_get_protocol_cb(on_get_protocol_cb) hid_device.register_set_protocol_cb(on_set_protocol_cb) # Register for virtual cable unplug call back hid_device.on('virtual_cable_unplug', on_virtual_cable_unplug_cb) @utils.rpc async def ConnectHost(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty: logging.info(f'ConnectHidHost') logging.info(f'ConnectHost') try: hid_host_bd_addr = str(hid_device.remote_device_bd_address) connection = await self.device.connect(hid_host_bd_addr, transport=BT_BR_EDR_TRANSPORT) await connection.authenticate() await connection.encrypt() if hid_device is not None: await hid_device.connect_control_channel() await hid_device.connect_interrupt_channel() except AttributeError as e: logging.error(f'Device does not exist') raise e except (HCI_StatusError, ProtocolError) as e: logging.error(f"Connection failure error: {e}") raise e return empty_pb2.Empty() @utils.rpc async def DisconnectHost(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty: logging.info(f'DisconnectHidHost') if hid_device is not None: logging.info(f'DisconnectHost') try: await hid_device.disconnect_interrupt_channel() await hid_device.disconnect_control_channel() connection = hid_device.connection Loading @@ -392,4 +720,25 @@ class HIDService(HIDServicer): await connection.disconnect() else: logging.info(f'Already disconnected from Hid Host') except AttributeError as e: logging.error(f'Device does not exist') raise e return empty_pb2.Empty() @utils.rpc async def VirtualCableUnplugHost(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty: logging.info(f'VirtualCableUnplugHost') try: hid_device.virtual_cable_unplug() try: hid_host_bd_addr = str(hid_device.remote_device_bd_address) await hid_device.device.keystore.delete(hid_host_bd_addr) except KeyError: logging.error(f'Device not found or Device already unpaired.') raise except AttributeError as e: logging.exception(f'Device does not exist') raise e return empty_pb2.Empty()