Loading floss/pandora/floss/adapter_client.py +67 −0 Original line number Diff line number Diff line Loading @@ -105,6 +105,17 @@ class BluetoothCallbacks: """ pass def on_device_properties_changed(self, remote_device, props): """Device properties changed for a remote device. Args: remote_device: Remote device that is being searched. props: Remote device properties. """ pass class BluetoothConnectionCallbacks: """Callbacks for the Device Connection interface. Loading Loading @@ -193,6 +204,10 @@ class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks): <arg type="s" name="address" direction="in" /> <arg type="u" name="state" direction="in" /> </method> <method name="OnDevicePropertiesChanged"> <arg type="a{sv}" name="remote_device" direction="in" /> <arg type="au" name="props" direction="in" /> </method> </interface> </node> """ Loading Loading @@ -256,6 +271,11 @@ class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks): for observer in self.observers.values(): observer.on_bond_state_changed(status, address, state) def OnDevicePropertiesChanged(self, remote_device, props): """Handle device properties changed callbacks.""" for observer in self.observers.values(): observer.on_device_properties_changed(remote_device, props) class ExportedConnectionCallbacks(observer_base.ObserverBase): """ <node> Loading Loading @@ -394,6 +414,19 @@ class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks): else: self.known_devices[address]['connected'] = False @utils.glib_callback() def on_device_properties_changed(self, remote_device, props): """Device properties changed for a remote device. Args: remote_device: Remote device that is being searched. props: Remote device properties. """ pass def _make_dbus_device(self, address, name): return {'address': GLib.Variant('s', address), 'name': GLib.Variant('s', name)} Loading Loading @@ -738,6 +771,40 @@ class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks): remote_device = self._make_dbus_device(address, name) return bool(self.proxy().RemoveBond(remote_device)) @utils.glib_call(None) def get_bond_state(self, address): """Gets remote device bond state. Args: address: Device to get bond status. Returns: True on success, False on failure, None on DBus error. """ name = 'Test bond' if address in self.known_devices: name = self.known_devices[address]['name'] remote_device = self._make_dbus_device(address, name) return bool(self.proxy().GetBondState(remote_device)) @utils.glib_call(None) def fetch_remote_uuids(self, address): """Gets remote device service uuids. Args: address: Device to cancel bond. Returns: True on success, False on failure, None on DBus error. """ name = 'Test bond' if address in self.known_devices: name = self.known_devices[address]['name'] remote_device = self._make_dbus_device(address, name) return self.proxy().FetchRemoteUuids(remote_device) @utils.glib_call(None) def get_bonded_devices(self): """Get all bonded devices. Loading floss/pandora/floss/floss_enums.py +31 −0 Original line number Diff line number Diff line Loading @@ -143,6 +143,37 @@ class BondState(enum.IntEnum): BONDED = 2 class BtPropertyType(enum.IntEnum): """Bluetooth's property type.""" BdName = 0x1 BdAddr = 0x2 Uuids = 0x3 ClassOfDevice = 0x4 TypeOfDevice = 0x5 ServiceRecord = 0x6 AdapterScanMode = 0x7 AdapterBondedDevices = 0x8 AdapterDiscoverableTimeout = 0x9 RemoteFriendlyName = 0xA RemoteRssi = 0xB RemoteVersionInfo = 0xC LocalLeFeatures = 0xD LocalIoCaps = 0xE LocalIoCapsBle = 0xF DynamicAudioBuffer = 0x10 RemoteIsCoordinatedSetMember = 0x11 Appearance = 0x12 VendorProductInfo = 0x13 # Unimplemented: # BT_PROPERTY_WL_MEDIA_PLAYERS_LIST, # BT_PROPERTY_REMOTE_ASHA_CAPABILITY, # BT_PROPERTY_REMOTE_ASHA_TRUNCATED_HISYNCID, # BT_PROPERTY_REMOTE_MODEL_NUM, RemoteAddrType = 0x18 Unknown = 0xFE RemoteDeviceTimestamp = 0xFF class PairingVariant(enum.IntEnum): """Bluetooth pairing variant type.""" # SSP variants. Loading floss/pandora/floss/gatt_client.py +53 −3 Original line number Diff line number Diff line Loading @@ -695,6 +695,26 @@ class FlossGattClient(GattClientCallbacks): self.proxy().UnregisterClient(self.client_id) return True def register_callback_observer(self, name, observer): """Add an observer for all callbacks. Args: name: Name of the observer. observer: Observer that implements all callback classes. """ if isinstance(observer, GattClientCallbacks): self.callbacks.add_observer(name, observer) def unregister_callback_observer(self, name, observer): """Remove an observer for all callbacks. Args: name: Name of the observer. observer: Observer that implements all callback classes. """ if isinstance(observer, GattClientCallbacks): self.callbacks.remove_observer(name, observer) @utils.glib_call(False) def connect_client(self, address, Loading Loading @@ -816,13 +836,43 @@ class FlossGattClient(GattClientCallbacks): self.proxy().ReadUsingCharacteristicUuid(self.client_id, address, uuid, start_handle, end_handle, auth_req) return True @utils.glib_call(False) def read_descriptor(self, address, handle, auth_req): """Reads remote device GATT descriptor. Args: address: Remote device MAC address. handle: Descriptor handle id. auth_req: Authentication requirements value. Returns: True on success, False otherwise. """ self.proxy().ReadDescriptor(self.client_id, address, handle, auth_req) return True @utils.glib_call(False) def write_descriptor(self, address, handle, auth_req, value): """Writes remote device GATT descriptor. Args: address: Remote device MAC address. handle: Descriptor handle id. auth_req: Authentication requirements value. value: Descriptor value to write. Returns: True on success, False otherwise. """ self.proxy().WriteDescriptor(self.client_id, address, handle, auth_req, value) return True @utils.glib_call(None) def write_characteristic(self, address, uuid, handle, write_type, auth_req, value): def write_characteristic(self, address, handle, write_type, auth_req, value): """Writes remote device GATT characteristic. Args: address: Remote device MAC address. uuid: The characteristic UUID as a string. handle: Characteristic handle id. write_type: Characteristic write type. auth_req: Authentication requirements value. Loading @@ -831,7 +881,7 @@ class FlossGattClient(GattClientCallbacks): Returns: GattWriteRequestStatus on success, None otherwise. """ return self.proxy().write_characteristic(self.client_id, address, uuid, handle, write_type, auth_req, value) return self.proxy().WriteCharacteristic(self.client_id, address, handle, write_type, auth_req, value) @utils.glib_call(False) def register_for_notification(self, address, handle, enable): Loading floss/pandora/server/bluetooth.py +33 −0 Original line number Diff line number Diff line Loading @@ -298,6 +298,39 @@ class Bluetooth(object): def set_hid_report(self, addr, report_type, report): return self.qa_client.set_hid_report(addr, report_type, report) def read_characteristic(self, address, handle, auth_re): return self.gatt_client.read_characteristic(address, handle, auth_re) def read_descriptor(self, address, handle, auth_req): return self.gatt_client.read_descriptor(address, handle, auth_req) def discover_services(self, address): return self.gatt_client.discover_services(address) def get_bond_state(self, address): self.adapter_client.get_bond_state(address) def fetch_remote(self, address): return self.adapter_client.fetch_remote_uuids(address) def get_remote_uuids(self, address): return self.adapter_client.get_remote_property(address, 'Uuids') def btif_gattc_discover_service_by_uuid(self, address, uuid): return self.gatt_client.btif_gattc_discover_service_by_uuid(address, uuid) def configure_mtu(self, address, mtu): return self.gatt_client.configure_mtu(address, mtu) def refresh_device(self, address): return self.gatt_client.refresh_device(address) def write_descriptor(self, address, handle, auth_req, value): return self.gatt_client.write_descriptor(address, handle, auth_req, value) def write_characteristic(self, address, handle, write_type, auth_req, value): return self.gatt_client.write_characteristic(address, handle, write_type, auth_req, value) def gatt_connect(self, address, is_direct, transport): return self.gatt_client.connect_client(address, is_direct, transport) Loading floss/pandora/server/gatt.py 0 → 100644 +328 −0 Original line number Diff line number Diff line # Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """GATT grpc interface.""" import asyncio import logging from uuid import UUID from floss.pandora.floss import adapter_client from floss.pandora.floss import floss_enums from floss.pandora.floss import gatt_client from floss.pandora.floss import utils from floss.pandora.server import bluetooth as bluetooth_module import grpc from pandora_experimental import gatt_grpc_aio from pandora_experimental import gatt_pb2 class GATTService(gatt_grpc_aio.GATTServicer): """Service to trigger Bluetooth GATT procedures. This class implements the Pandora bluetooth test interfaces, where the metaclass definition is automatically generated by the protobuf. The interface definition can be found in: https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/gatt.proto?q=gatt.proto """ # Write characteristic, requesting acknowledgement by the remote device. WRITE_TYPE_DEFAULT = 2 # No authentication required. AUTHENTICATION_NONE = 0 def __init__(self, bluetooth: bluetooth_module.Bluetooth): self.bluetooth = bluetooth async def ExchangeMTU(self, request: gatt_pb2.ExchangeMTURequest, context: grpc.ServicerContext) -> gatt_pb2.ExchangeMTUResponse: class MTUChangeObserver(gatt_client.GattClientCallbacks): """Observer to observe MTU change state.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_configure_mtu(self, addr, mtu, status): if addr != self.task['address']: return if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: logging.error('Failed to configure MTU. Status: %s', status) future = self.task['configure_mtu'] future.get_loop().call_soon_threadsafe(future.set_result, status) address = utils.connection_from(request.connection).address try: configure_mtu = asyncio.get_running_loop().create_future() observer = MTUChangeObserver({'configure_mtu': configure_mtu, 'address': address}) name = utils.create_observer_name(observer) self.bluetooth.gatt_client.register_callback_observer(name, observer) self.bluetooth.configure_mtu(address, request.mtu) status = await configure_mtu if status != floss_enums.GattStatus.SUCCESS: raise RuntimeError('Failed to configure MTU.') finally: self.bluetooth.gatt_client.unregister_callback_observer(name, observer) return gatt_pb2.ExchangeMTUResponse() async def WriteAttFromHandle(self, request: gatt_pb2.WriteRequest, context: grpc.ServicerContext) -> gatt_pb2.WriteResponse: class WriteAttObserver(gatt_client.GattClientCallbacks): """Observer to observe write attribute state.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_characteristic_write(self, addr, status, handle): if addr != self.task['address']: return if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: logging.error('Failed to write characteristic from handle. Status: %s', status) future = self.task['write_attribute'] future.get_loop().call_soon_threadsafe(future.set_result, (status, handle)) @utils.glib_callback() def on_descriptor_write(self, addr, status, handle): if addr != self.task['address']: return if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: logging.error('Failed to write descriptor from handle. Status: %s', status) future = self.task['write_attribute'] future.get_loop().call_soon_threadsafe(future.set_result, (status, handle)) class ReadCharacteristicFromHandleObserver(gatt_client.GattClientCallbacks): """Observer to observe the read characteristics state.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_characteristic_read(self, addr, status, handle, value): if addr != self.task['address']: return if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: logging.error('Failed to read characteristic from handle. Status: %s', status) future = self.task['characteristics'] future.get_loop().call_soon_threadsafe(future.set_result, status) class ReadCharacteristicDescriptorFromHandleObserver(gatt_client.GattClientCallbacks): """Observer to observe the read characteristic descriptor state.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_descriptor_read(self, addr, status, handle, value): if addr != self.task['address']: return if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: logging.error('Failed to read descriptors. Status: %s', status) future = self.task['descriptors'] future.get_loop().call_soon_threadsafe(future.set_result, status) address = utils.connection_from(request.connection).address observers = [] valid_handle = True try: write_attribute = asyncio.get_running_loop().create_future() observer = WriteAttObserver({'write_attribute': write_attribute, 'address': address}) name = utils.create_observer_name(observer) self.bluetooth.gatt_client.register_callback_observer(name, observer) observers.append((name, observer)) characteristics = asyncio.get_running_loop().create_future() observer = ReadCharacteristicFromHandleObserver({'characteristics': characteristics, 'address': address}) name = utils.create_observer_name(observer) self.bluetooth.gatt_client.register_callback_observer(name, observer) observers.append((name, observer)) self.bluetooth.read_characteristic(address, request.handle, self.AUTHENTICATION_NONE) char_status = await characteristics if char_status != floss_enums.GattStatus.SUCCESS: descriptors = asyncio.get_running_loop().create_future() observer = ReadCharacteristicDescriptorFromHandleObserver({ 'descriptors': descriptors, 'address': address }) name = utils.create_observer_name(observer) self.bluetooth.gatt_client.register_callback_observer(name, observer) observers.append((name, observer)) self.bluetooth.gatt_client.read_descriptor(address, request.handle, self.AUTHENTICATION_NONE) desc_status = await descriptors if desc_status != floss_enums.GattStatus.SUCCESS: valid_handle = False else: self.bluetooth.write_descriptor(address, request.handle, self.AUTHENTICATION_NONE, request.value) else: self.bluetooth.write_characteristic(address, request.handle, self.WRITE_TYPE_DEFAULT, self.AUTHENTICATION_NONE, request.value) if valid_handle: status, handle = await write_attribute finally: for name, observer in observers: self.bluetooth.gatt_client.unregister_callback_observer(name, observer) if valid_handle: return gatt_pb2.WriteResponse(handle=handle, status=status) return gatt_pb2.WriteResponse(handle=request.handle, status=gatt_pb2.INVALID_HANDLE) async def DiscoverServiceByUuid(self, request: gatt_pb2.DiscoverServiceByUuidRequest, context: grpc.ServicerContext) -> gatt_pb2.DiscoverServicesResponse: address = utils.connection_from(request.connection).address self.bluetooth.btif_gattc_discover_service_by_uuid(address, request.uuid) return gatt_pb2.DiscoverServicesResponse() async def DiscoverServices(self, request: gatt_pb2.DiscoverServicesRequest, context: grpc.ServicerContext) -> gatt_pb2.DiscoverServicesResponse: class DiscoveryObserver(gatt_client.GattClientCallbacks): """Observer to observe the discovery service state.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_search_complete(self, addr, services, status): if addr != self.task['address']: return if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: logging.error('Failed to complete search. Status: %s', status) future = self.task['search_services'] future.get_loop().call_soon_threadsafe(future.set_result, (services, status)) address = utils.connection_from(request.connection).address try: search_services = asyncio.get_running_loop().create_future() observer = DiscoveryObserver({'search_services': search_services, 'address': address}) name = utils.create_observer_name(observer) self.bluetooth.gatt_client.register_callback_observer(name, observer) self.bluetooth.discover_services(address) services, status = await search_services if status != floss_enums.GattStatus.SUCCESS: raise RuntimeError('Failed to find services.') response = gatt_pb2.DiscoverServicesResponse() for serv in services: response.services.append(self.create_gatt_service(serv)) finally: self.bluetooth.gatt_client.unregister_callback_observer(name, observer) return response async def DiscoverServicesSdp(self, request: gatt_pb2.DiscoverServicesSdpRequest, context: grpc.ServicerContext) -> gatt_pb2.DiscoverServicesSdpResponse: class DiscoverySDPObserver(adapter_client.BluetoothCallbacks): """Observer to observe the SDP discovery service state.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_device_properties_changed(self, remote_device, props): if remote_device['address'] != self.task['address']: return if floss_enums.BtPropertyType.Uuids in props: future = self.task['device_uuids_changed'] future.get_loop().call_soon_threadsafe(future.set_result, ()) address = utils.address_from(request.address) try: uuids = self.bluetooth.get_remote_uuids(address) if self.bluetooth.get_bond_state(address) == floss_enums.BondState.BONDING and (uuids is None or len(uuids)) == 0: logging.error('Failed to get UUIDs.') return gatt_pb2.DiscoverServicesSdpResponse() if self.bluetooth.get_bond_state(address) != floss_enums.BondState.BONDING: device_uuids_changed = asyncio.get_running_loop().create_future() observer = DiscoverySDPObserver({'device_uuids_changed': device_uuids_changed, 'address': address}) name = utils.create_observer_name(observer) self.bluetooth.adapter_client.register_callback_observer(name, observer) status = self.bluetooth.fetch_remote(address) if not status: raise RuntimeError(f'Failed to fetch remote device {address} ' f'uuids.') await device_uuids_changed uuids = self.bluetooth.get_remote_uuids(address) response = gatt_pb2.DiscoverServicesSdpResponse() if uuids: for uuid in uuids: response.service_uuids.append(str(UUID(bytes=bytes(uuid))).upper()) finally: self.bluetooth.adapter_client.unregister_callback_observer(name, observer) return response async def ClearCache(self, request: gatt_pb2.ClearCacheRequest, context: grpc.ServicerContext) -> gatt_pb2.ClearCacheResponse: class ClearCacheObserver(gatt_client.GattClientCallbacks): """Observer to observe the clear cache state.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_connection_updated(self, addr, interval, latency, timeout, status): if addr != self.task['address']: return if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: logging.error('Failed to update connection. Status: %s', status) future = self.task['refresh'] future.get_loop().call_soon_threadsafe(future.set_result, status) address = utils.connection_from(request.connection).address try: refresh = asyncio.get_running_loop().create_future() observer = ClearCacheObserver({'refresh': refresh, 'address': address}) name = utils.create_observer_name(observer) self.bluetooth.gatt_client.register_callback_observer(name, observer) self.bluetooth.refresh_device(address) status = await refresh if status != floss_enums.GattStatus.SUCCESS: raise RuntimeError('Failed to clear cache.') finally: self.bluetooth.gatt_client.unregister_callback_observer(name, observer) return gatt_pb2.ClearCacheResponse() def create_gatt_characteristic_descriptor(self, descriptor): return gatt_pb2.GattCharacteristicDescriptor(handle=descriptor['instance_id'], permissions=descriptor['permissions'], uuid=str(UUID(bytes=bytes(descriptor['uuid']))).upper()) def create_gatt_characteristic(self, characteristic): return gatt_pb2.GattCharacteristic( properties=characteristic['properties'], permissions=characteristic['permissions'], uuid=str(UUID(bytes=bytes(characteristic['uuid']))).upper(), handle=characteristic['instance_id'], descriptors=[ self.create_gatt_characteristic_descriptor(descriptor) for descriptor in characteristic['descriptors'] ]) def create_gatt_service(self, service): return gatt_pb2.GattService( handle=service['instance_id'], type=service['service_type'], uuid=str(UUID(bytes=bytes(service['uuid']))).upper(), included_services=[ self.create_gatt_service(included_service) for included_service in service['included_services'] ], characteristics=[ self.create_gatt_characteristic(characteristic) for characteristic in service['characteristics'] ]) Loading
floss/pandora/floss/adapter_client.py +67 −0 Original line number Diff line number Diff line Loading @@ -105,6 +105,17 @@ class BluetoothCallbacks: """ pass def on_device_properties_changed(self, remote_device, props): """Device properties changed for a remote device. Args: remote_device: Remote device that is being searched. props: Remote device properties. """ pass class BluetoothConnectionCallbacks: """Callbacks for the Device Connection interface. Loading Loading @@ -193,6 +204,10 @@ class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks): <arg type="s" name="address" direction="in" /> <arg type="u" name="state" direction="in" /> </method> <method name="OnDevicePropertiesChanged"> <arg type="a{sv}" name="remote_device" direction="in" /> <arg type="au" name="props" direction="in" /> </method> </interface> </node> """ Loading Loading @@ -256,6 +271,11 @@ class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks): for observer in self.observers.values(): observer.on_bond_state_changed(status, address, state) def OnDevicePropertiesChanged(self, remote_device, props): """Handle device properties changed callbacks.""" for observer in self.observers.values(): observer.on_device_properties_changed(remote_device, props) class ExportedConnectionCallbacks(observer_base.ObserverBase): """ <node> Loading Loading @@ -394,6 +414,19 @@ class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks): else: self.known_devices[address]['connected'] = False @utils.glib_callback() def on_device_properties_changed(self, remote_device, props): """Device properties changed for a remote device. Args: remote_device: Remote device that is being searched. props: Remote device properties. """ pass def _make_dbus_device(self, address, name): return {'address': GLib.Variant('s', address), 'name': GLib.Variant('s', name)} Loading Loading @@ -738,6 +771,40 @@ class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks): remote_device = self._make_dbus_device(address, name) return bool(self.proxy().RemoveBond(remote_device)) @utils.glib_call(None) def get_bond_state(self, address): """Gets remote device bond state. Args: address: Device to get bond status. Returns: True on success, False on failure, None on DBus error. """ name = 'Test bond' if address in self.known_devices: name = self.known_devices[address]['name'] remote_device = self._make_dbus_device(address, name) return bool(self.proxy().GetBondState(remote_device)) @utils.glib_call(None) def fetch_remote_uuids(self, address): """Gets remote device service uuids. Args: address: Device to cancel bond. Returns: True on success, False on failure, None on DBus error. """ name = 'Test bond' if address in self.known_devices: name = self.known_devices[address]['name'] remote_device = self._make_dbus_device(address, name) return self.proxy().FetchRemoteUuids(remote_device) @utils.glib_call(None) def get_bonded_devices(self): """Get all bonded devices. Loading
floss/pandora/floss/floss_enums.py +31 −0 Original line number Diff line number Diff line Loading @@ -143,6 +143,37 @@ class BondState(enum.IntEnum): BONDED = 2 class BtPropertyType(enum.IntEnum): """Bluetooth's property type.""" BdName = 0x1 BdAddr = 0x2 Uuids = 0x3 ClassOfDevice = 0x4 TypeOfDevice = 0x5 ServiceRecord = 0x6 AdapterScanMode = 0x7 AdapterBondedDevices = 0x8 AdapterDiscoverableTimeout = 0x9 RemoteFriendlyName = 0xA RemoteRssi = 0xB RemoteVersionInfo = 0xC LocalLeFeatures = 0xD LocalIoCaps = 0xE LocalIoCapsBle = 0xF DynamicAudioBuffer = 0x10 RemoteIsCoordinatedSetMember = 0x11 Appearance = 0x12 VendorProductInfo = 0x13 # Unimplemented: # BT_PROPERTY_WL_MEDIA_PLAYERS_LIST, # BT_PROPERTY_REMOTE_ASHA_CAPABILITY, # BT_PROPERTY_REMOTE_ASHA_TRUNCATED_HISYNCID, # BT_PROPERTY_REMOTE_MODEL_NUM, RemoteAddrType = 0x18 Unknown = 0xFE RemoteDeviceTimestamp = 0xFF class PairingVariant(enum.IntEnum): """Bluetooth pairing variant type.""" # SSP variants. Loading
floss/pandora/floss/gatt_client.py +53 −3 Original line number Diff line number Diff line Loading @@ -695,6 +695,26 @@ class FlossGattClient(GattClientCallbacks): self.proxy().UnregisterClient(self.client_id) return True def register_callback_observer(self, name, observer): """Add an observer for all callbacks. Args: name: Name of the observer. observer: Observer that implements all callback classes. """ if isinstance(observer, GattClientCallbacks): self.callbacks.add_observer(name, observer) def unregister_callback_observer(self, name, observer): """Remove an observer for all callbacks. Args: name: Name of the observer. observer: Observer that implements all callback classes. """ if isinstance(observer, GattClientCallbacks): self.callbacks.remove_observer(name, observer) @utils.glib_call(False) def connect_client(self, address, Loading Loading @@ -816,13 +836,43 @@ class FlossGattClient(GattClientCallbacks): self.proxy().ReadUsingCharacteristicUuid(self.client_id, address, uuid, start_handle, end_handle, auth_req) return True @utils.glib_call(False) def read_descriptor(self, address, handle, auth_req): """Reads remote device GATT descriptor. Args: address: Remote device MAC address. handle: Descriptor handle id. auth_req: Authentication requirements value. Returns: True on success, False otherwise. """ self.proxy().ReadDescriptor(self.client_id, address, handle, auth_req) return True @utils.glib_call(False) def write_descriptor(self, address, handle, auth_req, value): """Writes remote device GATT descriptor. Args: address: Remote device MAC address. handle: Descriptor handle id. auth_req: Authentication requirements value. value: Descriptor value to write. Returns: True on success, False otherwise. """ self.proxy().WriteDescriptor(self.client_id, address, handle, auth_req, value) return True @utils.glib_call(None) def write_characteristic(self, address, uuid, handle, write_type, auth_req, value): def write_characteristic(self, address, handle, write_type, auth_req, value): """Writes remote device GATT characteristic. Args: address: Remote device MAC address. uuid: The characteristic UUID as a string. handle: Characteristic handle id. write_type: Characteristic write type. auth_req: Authentication requirements value. Loading @@ -831,7 +881,7 @@ class FlossGattClient(GattClientCallbacks): Returns: GattWriteRequestStatus on success, None otherwise. """ return self.proxy().write_characteristic(self.client_id, address, uuid, handle, write_type, auth_req, value) return self.proxy().WriteCharacteristic(self.client_id, address, handle, write_type, auth_req, value) @utils.glib_call(False) def register_for_notification(self, address, handle, enable): Loading
floss/pandora/server/bluetooth.py +33 −0 Original line number Diff line number Diff line Loading @@ -298,6 +298,39 @@ class Bluetooth(object): def set_hid_report(self, addr, report_type, report): return self.qa_client.set_hid_report(addr, report_type, report) def read_characteristic(self, address, handle, auth_re): return self.gatt_client.read_characteristic(address, handle, auth_re) def read_descriptor(self, address, handle, auth_req): return self.gatt_client.read_descriptor(address, handle, auth_req) def discover_services(self, address): return self.gatt_client.discover_services(address) def get_bond_state(self, address): self.adapter_client.get_bond_state(address) def fetch_remote(self, address): return self.adapter_client.fetch_remote_uuids(address) def get_remote_uuids(self, address): return self.adapter_client.get_remote_property(address, 'Uuids') def btif_gattc_discover_service_by_uuid(self, address, uuid): return self.gatt_client.btif_gattc_discover_service_by_uuid(address, uuid) def configure_mtu(self, address, mtu): return self.gatt_client.configure_mtu(address, mtu) def refresh_device(self, address): return self.gatt_client.refresh_device(address) def write_descriptor(self, address, handle, auth_req, value): return self.gatt_client.write_descriptor(address, handle, auth_req, value) def write_characteristic(self, address, handle, write_type, auth_req, value): return self.gatt_client.write_characteristic(address, handle, write_type, auth_req, value) def gatt_connect(self, address, is_direct, transport): return self.gatt_client.connect_client(address, is_direct, transport) Loading
floss/pandora/server/gatt.py 0 → 100644 +328 −0 Original line number Diff line number Diff line # Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """GATT grpc interface.""" import asyncio import logging from uuid import UUID from floss.pandora.floss import adapter_client from floss.pandora.floss import floss_enums from floss.pandora.floss import gatt_client from floss.pandora.floss import utils from floss.pandora.server import bluetooth as bluetooth_module import grpc from pandora_experimental import gatt_grpc_aio from pandora_experimental import gatt_pb2 class GATTService(gatt_grpc_aio.GATTServicer): """Service to trigger Bluetooth GATT procedures. This class implements the Pandora bluetooth test interfaces, where the metaclass definition is automatically generated by the protobuf. The interface definition can be found in: https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/gatt.proto?q=gatt.proto """ # Write characteristic, requesting acknowledgement by the remote device. WRITE_TYPE_DEFAULT = 2 # No authentication required. AUTHENTICATION_NONE = 0 def __init__(self, bluetooth: bluetooth_module.Bluetooth): self.bluetooth = bluetooth async def ExchangeMTU(self, request: gatt_pb2.ExchangeMTURequest, context: grpc.ServicerContext) -> gatt_pb2.ExchangeMTUResponse: class MTUChangeObserver(gatt_client.GattClientCallbacks): """Observer to observe MTU change state.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_configure_mtu(self, addr, mtu, status): if addr != self.task['address']: return if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: logging.error('Failed to configure MTU. Status: %s', status) future = self.task['configure_mtu'] future.get_loop().call_soon_threadsafe(future.set_result, status) address = utils.connection_from(request.connection).address try: configure_mtu = asyncio.get_running_loop().create_future() observer = MTUChangeObserver({'configure_mtu': configure_mtu, 'address': address}) name = utils.create_observer_name(observer) self.bluetooth.gatt_client.register_callback_observer(name, observer) self.bluetooth.configure_mtu(address, request.mtu) status = await configure_mtu if status != floss_enums.GattStatus.SUCCESS: raise RuntimeError('Failed to configure MTU.') finally: self.bluetooth.gatt_client.unregister_callback_observer(name, observer) return gatt_pb2.ExchangeMTUResponse() async def WriteAttFromHandle(self, request: gatt_pb2.WriteRequest, context: grpc.ServicerContext) -> gatt_pb2.WriteResponse: class WriteAttObserver(gatt_client.GattClientCallbacks): """Observer to observe write attribute state.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_characteristic_write(self, addr, status, handle): if addr != self.task['address']: return if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: logging.error('Failed to write characteristic from handle. Status: %s', status) future = self.task['write_attribute'] future.get_loop().call_soon_threadsafe(future.set_result, (status, handle)) @utils.glib_callback() def on_descriptor_write(self, addr, status, handle): if addr != self.task['address']: return if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: logging.error('Failed to write descriptor from handle. Status: %s', status) future = self.task['write_attribute'] future.get_loop().call_soon_threadsafe(future.set_result, (status, handle)) class ReadCharacteristicFromHandleObserver(gatt_client.GattClientCallbacks): """Observer to observe the read characteristics state.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_characteristic_read(self, addr, status, handle, value): if addr != self.task['address']: return if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: logging.error('Failed to read characteristic from handle. Status: %s', status) future = self.task['characteristics'] future.get_loop().call_soon_threadsafe(future.set_result, status) class ReadCharacteristicDescriptorFromHandleObserver(gatt_client.GattClientCallbacks): """Observer to observe the read characteristic descriptor state.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_descriptor_read(self, addr, status, handle, value): if addr != self.task['address']: return if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: logging.error('Failed to read descriptors. Status: %s', status) future = self.task['descriptors'] future.get_loop().call_soon_threadsafe(future.set_result, status) address = utils.connection_from(request.connection).address observers = [] valid_handle = True try: write_attribute = asyncio.get_running_loop().create_future() observer = WriteAttObserver({'write_attribute': write_attribute, 'address': address}) name = utils.create_observer_name(observer) self.bluetooth.gatt_client.register_callback_observer(name, observer) observers.append((name, observer)) characteristics = asyncio.get_running_loop().create_future() observer = ReadCharacteristicFromHandleObserver({'characteristics': characteristics, 'address': address}) name = utils.create_observer_name(observer) self.bluetooth.gatt_client.register_callback_observer(name, observer) observers.append((name, observer)) self.bluetooth.read_characteristic(address, request.handle, self.AUTHENTICATION_NONE) char_status = await characteristics if char_status != floss_enums.GattStatus.SUCCESS: descriptors = asyncio.get_running_loop().create_future() observer = ReadCharacteristicDescriptorFromHandleObserver({ 'descriptors': descriptors, 'address': address }) name = utils.create_observer_name(observer) self.bluetooth.gatt_client.register_callback_observer(name, observer) observers.append((name, observer)) self.bluetooth.gatt_client.read_descriptor(address, request.handle, self.AUTHENTICATION_NONE) desc_status = await descriptors if desc_status != floss_enums.GattStatus.SUCCESS: valid_handle = False else: self.bluetooth.write_descriptor(address, request.handle, self.AUTHENTICATION_NONE, request.value) else: self.bluetooth.write_characteristic(address, request.handle, self.WRITE_TYPE_DEFAULT, self.AUTHENTICATION_NONE, request.value) if valid_handle: status, handle = await write_attribute finally: for name, observer in observers: self.bluetooth.gatt_client.unregister_callback_observer(name, observer) if valid_handle: return gatt_pb2.WriteResponse(handle=handle, status=status) return gatt_pb2.WriteResponse(handle=request.handle, status=gatt_pb2.INVALID_HANDLE) async def DiscoverServiceByUuid(self, request: gatt_pb2.DiscoverServiceByUuidRequest, context: grpc.ServicerContext) -> gatt_pb2.DiscoverServicesResponse: address = utils.connection_from(request.connection).address self.bluetooth.btif_gattc_discover_service_by_uuid(address, request.uuid) return gatt_pb2.DiscoverServicesResponse() async def DiscoverServices(self, request: gatt_pb2.DiscoverServicesRequest, context: grpc.ServicerContext) -> gatt_pb2.DiscoverServicesResponse: class DiscoveryObserver(gatt_client.GattClientCallbacks): """Observer to observe the discovery service state.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_search_complete(self, addr, services, status): if addr != self.task['address']: return if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: logging.error('Failed to complete search. Status: %s', status) future = self.task['search_services'] future.get_loop().call_soon_threadsafe(future.set_result, (services, status)) address = utils.connection_from(request.connection).address try: search_services = asyncio.get_running_loop().create_future() observer = DiscoveryObserver({'search_services': search_services, 'address': address}) name = utils.create_observer_name(observer) self.bluetooth.gatt_client.register_callback_observer(name, observer) self.bluetooth.discover_services(address) services, status = await search_services if status != floss_enums.GattStatus.SUCCESS: raise RuntimeError('Failed to find services.') response = gatt_pb2.DiscoverServicesResponse() for serv in services: response.services.append(self.create_gatt_service(serv)) finally: self.bluetooth.gatt_client.unregister_callback_observer(name, observer) return response async def DiscoverServicesSdp(self, request: gatt_pb2.DiscoverServicesSdpRequest, context: grpc.ServicerContext) -> gatt_pb2.DiscoverServicesSdpResponse: class DiscoverySDPObserver(adapter_client.BluetoothCallbacks): """Observer to observe the SDP discovery service state.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_device_properties_changed(self, remote_device, props): if remote_device['address'] != self.task['address']: return if floss_enums.BtPropertyType.Uuids in props: future = self.task['device_uuids_changed'] future.get_loop().call_soon_threadsafe(future.set_result, ()) address = utils.address_from(request.address) try: uuids = self.bluetooth.get_remote_uuids(address) if self.bluetooth.get_bond_state(address) == floss_enums.BondState.BONDING and (uuids is None or len(uuids)) == 0: logging.error('Failed to get UUIDs.') return gatt_pb2.DiscoverServicesSdpResponse() if self.bluetooth.get_bond_state(address) != floss_enums.BondState.BONDING: device_uuids_changed = asyncio.get_running_loop().create_future() observer = DiscoverySDPObserver({'device_uuids_changed': device_uuids_changed, 'address': address}) name = utils.create_observer_name(observer) self.bluetooth.adapter_client.register_callback_observer(name, observer) status = self.bluetooth.fetch_remote(address) if not status: raise RuntimeError(f'Failed to fetch remote device {address} ' f'uuids.') await device_uuids_changed uuids = self.bluetooth.get_remote_uuids(address) response = gatt_pb2.DiscoverServicesSdpResponse() if uuids: for uuid in uuids: response.service_uuids.append(str(UUID(bytes=bytes(uuid))).upper()) finally: self.bluetooth.adapter_client.unregister_callback_observer(name, observer) return response async def ClearCache(self, request: gatt_pb2.ClearCacheRequest, context: grpc.ServicerContext) -> gatt_pb2.ClearCacheResponse: class ClearCacheObserver(gatt_client.GattClientCallbacks): """Observer to observe the clear cache state.""" def __init__(self, task): self.task = task @utils.glib_callback() def on_connection_updated(self, addr, interval, latency, timeout, status): if addr != self.task['address']: return if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: logging.error('Failed to update connection. Status: %s', status) future = self.task['refresh'] future.get_loop().call_soon_threadsafe(future.set_result, status) address = utils.connection_from(request.connection).address try: refresh = asyncio.get_running_loop().create_future() observer = ClearCacheObserver({'refresh': refresh, 'address': address}) name = utils.create_observer_name(observer) self.bluetooth.gatt_client.register_callback_observer(name, observer) self.bluetooth.refresh_device(address) status = await refresh if status != floss_enums.GattStatus.SUCCESS: raise RuntimeError('Failed to clear cache.') finally: self.bluetooth.gatt_client.unregister_callback_observer(name, observer) return gatt_pb2.ClearCacheResponse() def create_gatt_characteristic_descriptor(self, descriptor): return gatt_pb2.GattCharacteristicDescriptor(handle=descriptor['instance_id'], permissions=descriptor['permissions'], uuid=str(UUID(bytes=bytes(descriptor['uuid']))).upper()) def create_gatt_characteristic(self, characteristic): return gatt_pb2.GattCharacteristic( properties=characteristic['properties'], permissions=characteristic['permissions'], uuid=str(UUID(bytes=bytes(characteristic['uuid']))).upper(), handle=characteristic['instance_id'], descriptors=[ self.create_gatt_characteristic_descriptor(descriptor) for descriptor in characteristic['descriptors'] ]) def create_gatt_service(self, service): return gatt_pb2.GattService( handle=service['instance_id'], type=service['service_type'], uuid=str(UUID(bytes=bytes(service['uuid']))).upper(), included_services=[ self.create_gatt_service(included_service) for included_service in service['included_services'] ], characteristics=[ self.create_gatt_characteristic(characteristic) for characteristic in service['characteristics'] ])