Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1f3db500 authored by Saba Sarabta's avatar Saba Sarabta
Browse files

Floss: Implement Pandora GATT profile (part2).

Bug: 301000123, Bug: 300942866, Bug: 300942867, Bug: 300954033, Bug: 300954035, Bug: 301000124
Test: mma packages/modules/Bluetooth && pts-bot GATT
Tag: #floss
Flag: EXEMPT floss only changes
Change-Id: I36965d425321368b6ae6d9b886c4a52924bc2d80
parent 7434bcee
Loading
Loading
Loading
Loading
+116 −34
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ import logging
from floss.pandora.floss import floss_enums
from floss.pandora.floss import observer_base
from floss.pandora.floss import utils
from gi.repository import GLib


class GattServerCallbacks:
@@ -508,6 +509,8 @@ class FlossGattServer(GattServerCallbacks):
        self.callbacks = self.ExportedGattServerCallbacks()
        self.callbacks.add_observer('gatt_testing_server', self)
        self.bus.register_object(self.cb_dbus_objpath, self.callbacks, None)
        self.server_connect_id = None
        self.server_id = None

    def __del__(self):
        """Destructor."""
@@ -526,6 +529,7 @@ class FlossGattServer(GattServerCallbacks):
        if status != floss_enums.GattStatus.SUCCESS:
            logging.error('Failed to register server with id: %s, status = %s', server_id, status)
            return
        self.server_id = server_id

    @utils.glib_callback()
    def on_server_connection_state(self, server_id, connected, addr):
@@ -738,24 +742,20 @@ class FlossGattServer(GattServerCallbacks):
        return True

    @utils.glib_call(False)
    def unregister_server(self, server_id):
    def unregister_server(self):
        """Unregisters GATT server for this client.

        Args:
            server_id: Bluetooth GATT server id.

        Returns:
            True on success, False otherwise.
        """
        self.proxy().UnregisterServer(server_id)
        self.proxy().UnregisterServer(self.server_id)
        return True

    @utils.glib_call(None)
    def server_connect(self, server_id, addr, is_direct, transport):
    def server_connect(self, addr, is_direct, transport):
        """Connects remote device to GATT server.

        Args:
            server_id: Bluetooth GATT server id.
            addr: Remote device MAC address.
            is_direct: A boolean value that specifies whether the connection should be made using direct connection.
            transport: BtTransport type.
@@ -763,68 +763,61 @@ class FlossGattServer(GattServerCallbacks):
        Returns:
            Server connect as boolean on success, None otherwise.
        """
        return self.proxy().ServerConnect(server_id, addr, is_direct, transport)
        return self.proxy().ServerConnect(self.server_id, addr, is_direct, transport)

    @utils.glib_call(None)
    def server_disconnect(self, server_id, addr):
    def server_disconnect(self, addr):
        """Disconnects remote device from the GATT server.

        Args:
            server_id: Bluetooth GATT server id.
            addr: Remote device MAC address.

        Returns:
            Server disconnect as boolean on success, None otherwise.
        """
        return self.proxy().ServerDisconnect(server_id, addr)
        return self.proxy().ServerDisconnect(self.server_id, addr)

    @utils.glib_call(False)
    def add_service(self, server_id, service):
    def add_service(self, service):
        """Adds GATT service.

        Args:
            server_id: Bluetooth GATT server id.
            service: BluetoothGattService.

        Returns:
            True on success, False otherwise.
        """
        self.proxy().AddService(server_id, service)
        self.proxy().AddService(self.server_id, service)
        return True

    @utils.glib_call(False)
    def remove_service(self, server_id, handle):
    def remove_service(self, handle):
        """Removes GATT service.

        Args:
            server_id: Bluetooth GATT server id.
            handle: Service record handle.

        Returns:
            True on success, False otherwise.
        """
        self.proxy().RemoveService(server_id, handle)
        self.proxy().RemoveService(self.server_id, handle)
        return True

    @utils.glib_call(False)
    def clear_services(self, server_id):
    def clear_services(self):
        """Clears GATT services.

        Args:
            server_id: Bluetooth GATT server id.

        Returns:
            True on success, False otherwise.
        """
        self.proxy().ClearServices(server_id)
        self.proxy().ClearServices(self.server_id)
        return True

    @utils.glib_call(None)
    def send_response(self, server_id, addr, request_id, status, offset, value):
    def send_response(self, addr, request_id, status, offset, value):
        """Sends GATT response.

        Args:
            server_id: Bluetooth GATT server id.
            addr: Remote device MAC address.
            request_id: Request id.
            status: floss_enums.GattStatus.
@@ -834,14 +827,13 @@ class FlossGattServer(GattServerCallbacks):
        Returns:
            Response send as boolean on success, None otherwise.
        """
        return self.proxy().SendResponse(server_id, addr, request_id, status, offset, value)
        return self.proxy().SendResponse(self.server_id, addr, request_id, status, offset, value)

    @utils.glib_call(None)
    def send_notification(self, server_id, addr, handle, confirm, value):
    def send_notification(self, addr, handle, confirm, value):
        """Sends GATT notification.

        Args:
            server_id: Bluetooth GATT server id.
            addr: Remote device MAC address.
            handle: The attribute handle of the attribute to send the notification for.
            confirm: A boolean value indicating whether the client should send a confirmation in response to
@@ -851,14 +843,13 @@ class FlossGattServer(GattServerCallbacks):
        Returns:
            Notification send as boolean on success, None otherwise.
        """
        return self.proxy().SendNotification(server_id, addr, handle, confirm, value)
        return self.proxy().SendNotification(self.server_id, addr, handle, confirm, value)

    @utils.glib_call(False)
    def server_set_preferred_phy(self, server_id, addr, tx_phy, rx_phy, phy_options):
    def server_set_preferred_phy(self, addr, tx_phy, rx_phy, phy_options):
        """Sets preferred phy for server.

        Args:
            server_id: Bluetooth GATT server id.
            addr: Remote device MAC address.
            tx_phy: Preferred PHY for transmitting data.
            rx_phy: Preferred PHY for receiving data.
@@ -867,19 +858,110 @@ class FlossGattServer(GattServerCallbacks):
        Returns:
            True on success, False otherwise.
        """
        self.proxy().ServerSetPreferredPhy(server_id, addr, tx_phy, rx_phy, phy_options)
        self.proxy().ServerSetPreferredPhy(self.server_id, addr, tx_phy, rx_phy, phy_options)
        return True

    @utils.glib_call(False)
    def server_read_phy(self, server_id, addr):
    def server_read_phy(self, addr):
        """Reads phy of server.

        Args:
            server_id: Bluetooth GATT server id.
            addr: Remote device MAC address.

        Returns:
            True on success, False otherwise.
        """
        self.proxy().ServerReadPhy(server_id, addr)
        self.proxy().ServerReadPhy(self.server_id, addr)
        return True

    def register_callback_observer(self, name, observer):
        """Add an observer for all callbacks.

        Args:
            name: Name of the observer.
            observer: Observer that implements all callback classes.
        """
        if isinstance(observer, GattServerCallbacks):
            self.callbacks.add_observer(name, observer)

    def unregister_callback_observer(self, name, observer):
        """Remove an observer for all callbacks.

        Args:
            name: Name of the observer.
            observer: Observer that implements all callback classes.
        """
        if isinstance(observer, GattServerCallbacks):
            self.callbacks.remove_observer(name, observer)

    def make_dbus_descriptor(self, uuid, instance_id, permissions):
        """Makes struct for descriptor D-Bus.

        Args:
            uuid : Descriptor UUID as array of bytes.
            instance_id: Descriptor identifier.
            permissions: Descriptor permissions.

        Returns:
            Dictionary of descriptor.
        """
        return {
            'uuid': GLib.Variant('ay', uuid),
            'instance_id': GLib.Variant('i', instance_id),
            'permissions': GLib.Variant('i', permissions)
        }

    def make_dbus_characteristic(self, uuid, instance_id, properties, permissions, key_size, write_type, descriptors):
        """Makes struct for characteristic D-Bus.

        Args:
            uuid : Characteristic UUID as array of bytes.
            instance_id: Characteristic handle id.
            properties: Characteristic properties.
            permissions: Characteristic permissions.
            key_size: Characteristic key size.
            write_type: Characteristic write type.
            descriptors: Characteristic descriptors.

        Returns:
            Dictionary of characteristic.
        """
        desc = []
        for d in descriptors:
            desc.append(self.make_dbus_descriptor(d['uuid'], d['instance_id'], d['permissions']))
        return {
            'uuid': GLib.Variant('ay', uuid),
            'instance_id': GLib.Variant('i', instance_id),
            'properties': GLib.Variant('i', properties),
            'permissions': GLib.Variant('i', permissions),
            'key_size': GLib.Variant('i', key_size),
            'write_type': GLib.Variant('u', write_type),
            'descriptors': GLib.Variant('aa{sv}', desc)
        }

    def make_dbus_service(self, service):
        """Makes struct for service D-Bus.

        Args:
            service: The struct of BluetoothGattService.

        Returns:
            Dictionary of service.
        """
        characteristics = []
        for c in service['characteristics']:
            characteristics.append(
                self.make_dbus_characteristic(c['uuid'], c['instance_id'], c['properties'], c['permissions'],
                                              c['key_size'], c['write_type'], c['descriptors']))

        included_services = []
        for s in service['included_services']:
            included_services.append(self.make_dbus_service(s))

        return {
            'uuid': GLib.Variant('ay', service['uuid']),
            'instance_id': GLib.Variant('i', service['instance_id']),
            'service_type': GLib.Variant('i', service['service_type']),
            'characteristics': GLib.Variant('aa{sv}', characteristics),
            'included_services': GLib.Variant('aa{sv}', included_services)
        }
+17 −0
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ from floss.pandora.floss import manager_client
from floss.pandora.floss import qa_client
from floss.pandora.floss import scanner_client
from floss.pandora.floss import gatt_client
from floss.pandora.floss import gatt_server
from floss.pandora.floss import floss_enums
from floss.pandora.floss import utils
from gi.repository import GLib
@@ -65,6 +66,7 @@ class Bluetooth(object):
        self.scanner_client = scanner_client.FlossScannerClient(self.bus, self.DEFAULT_ADAPTER)
        self.qa_client = qa_client.FlossQAClient(self.bus, self.DEFAULT_ADAPTER)
        self.gatt_client = gatt_client.FlossGattClient(self.bus, self.DEFAULT_ADAPTER)
        self.gatt_server = gatt_server.FlossGattServer(self.bus, self.DEFAULT_ADAPTER)

    def __del__(self):
        if not self.is_clean:
@@ -132,6 +134,9 @@ class Bluetooth(object):
        if not self.gatt_client.register_client(self.FAKE_GATT_APP_ID, False):
            logging.error('gatt_client: Failed to register callbacks')
            return False
        if not self.gatt_server.register_server(self.FAKE_GATT_APP_ID, False):
            logging.error('gatt_server: Failed to register callbacks')
            return False
        return True

    def is_bluetoothd_proxy_valid(self):
@@ -144,6 +149,7 @@ class Bluetooth(object):
            self.scanner_client.has_proxy(),
            self.qa_client.has_proxy(),
            self.gatt_client.has_proxy(),
            self.gatt_server.has_proxy(),
        ])

        if not proxy_ready:
@@ -178,6 +184,7 @@ class Bluetooth(object):
            self.scanner_client = scanner_client.FlossScannerClient(self.bus, default_adapter)
            self.qa_client = qa_client.FlossQAClient(self.bus, default_adapter)
            self.gatt_client = gatt_client.FlossGattClient(self.bus, default_adapter)
            self.gatt_server = gatt_server.FlossGattServer(self.bus, default_adapter)

            try:
                utils.poll_for_condition(
@@ -337,6 +344,16 @@ class Bluetooth(object):
    def set_connectable(self, mode):
        return self.qa_client.set_connectable(mode)

    def read_using_characteristic_uuid(self, address, uuid, start_handle, end_handle, auth_req):
        return self.gatt_client.read_using_characteristic_uuid(address, uuid, start_handle, end_handle, auth_req)

    def register_for_notification(self, address, handle, enable):
        return self.gatt_client.register_for_notification(address, handle, enable)

    def add_service(self, service):
        service = self.gatt_server.make_dbus_service(service)
        return self.gatt_server.add_service(service)

    def is_encrypted(self, address):
        connection_state = self.adapter_client.get_connection_state(address)
        if connection_state is None:
+460 −0

File changed.

Preview size limit exceeded, changes collapsed.