Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0e863b59 authored by Abel Lucas's avatar Abel Lucas Committed by Gerrit Code Review
Browse files

Merge "avatar: add pandora experimental Gatt bumble service"

parents 9d8a4627 c0af732c
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -21,10 +21,12 @@ python_test_host {
    main: "main.py",
    srcs: [
        "main.py",
        "*_test.py",
        "example.py",
    ],
    libs: [
        "libavatar",
        "bumble_services_experimental-python",
    ],
    required: ["PandoraServer"],
    test_suites: ["general-tests"],
+78 −0
Original line number Diff line number Diff line
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import logging

from avatar import PandoraDevices
from avatar.aio import asynchronous
from avatar.pandora_client import PandoraClient

from mobly import base_test, test_runner

from pandora.host_grpc import DataTypes, OwnAddressType
from pandora_experimental.gatt_grpc import GATT

from typing import Optional


class GattTest(base_test.BaseTestClass):  # type: ignore[misc]
    devices: Optional[PandoraDevices] = None

    # pandora devices.
    dut: PandoraClient
    ref: PandoraClient

    def setup_class(self) -> None:
        self.devices = PandoraDevices(self)
        self.dut, self.ref = self.devices

    def teardown_class(self) -> None:
        if self.devices:
            self.devices.stop_all()

    @asynchronous
    async def setup_test(self) -> None:
        await asyncio.gather(self.dut.reset(), self.ref.reset())

    def test_print_dut_gatt_services(self) -> None:
        self.ref.host.StartAdvertising(legacy=True, connectable=True)
        dut_ref = self.dut.host.ConnectLE(public=self.ref.address, own_address_type=OwnAddressType.RANDOM).connection

        gatt = GATT(self.dut.channel)
        services = gatt.DiscoverServices(dut_ref)
        self.dut.log.info(f'DUT services: {services}')

    def test_print_ref_gatt_services(self) -> None:
        self.dut.host.StartAdvertising(
            legacy=True,
            connectable=True,
            own_address_type=OwnAddressType.RANDOM,
            data=DataTypes(manufacturer_specific_data=b'pause cafe'),
        )

        scan = self.ref.host.Scan()
        dut = next((x for x in scan if b'pause cafe' in x.data.manufacturer_specific_data))
        scan.cancel()

        ref_dut = self.ref.host.ConnectLE(own_address_type=OwnAddressType.RANDOM, **dut.address_asdict()).connection

        gatt = GATT(self.ref.channel)
        services = gatt.DiscoverServices(ref_dut)
        self.ref.log.info(f'REF services: {services}')


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    test_runner.main()  # type: ignore
+13 −1
Original line number Diff line number Diff line
from mobly import suite_runner
from avatar import bumble_server

import example
import gatt_test

import logging
import sys

_TEST_CLASSES_LIST = [example.ExampleTest]
from bumble_experimental.gatt import GATTService
from pandora_experimental.gatt_grpc_aio import add_GATTServicer_to_server

_TEST_CLASSES_LIST = [example.ExampleTest, gatt_test.GattTest]


def _bumble_servicer_hook(server: bumble_server.Server) -> None:
  add_GATTServicer_to_server(GATTService(server.bumble.device), server.server)


if __name__ == "__main__":
@@ -16,6 +25,9 @@ if __name__ == "__main__":
  # Mobly tradefed is using these arguments for specific java tests
  argv = [arg for arg in argv if not arg.startswith(('--device_serial', '--log_path'))]

  # register experimental bumble servicers hook.
  bumble_server.register_servicer_hook(_bumble_servicer_hook)

  suite_runner.run_suite(
      argv=argv,
      test_classes=_TEST_CLASSES_LIST,
+29 −0
Original line number Diff line number Diff line
// Copyright 2022, The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package {
    default_applicable_licenses: ["Android-Apache-2.0"],
}

python_library_host {
    name: "bumble_services_experimental-python",
    srcs: [
        "bumble_experimental/*.py",
        ":pandora_experimental-python-src",
    ],
    libs: [
        "libprotobuf-python",
        "bumble",
    ],
}
+263 −0
Original line number Diff line number Diff line
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import grpc
import logging

from bumble.core import ProtocolError
from bumble.device import Connection as BumbleConnection, Device, Peer
from bumble.gatt_client import CharacteristicProxy, ServiceProxy
from pandora_experimental.gatt_grpc import (
    AttStatusCode,
    AttValue,
    ClearCacheRequest,
    ClearCacheResponse,
    DiscoverServiceByUuidRequest,
    DiscoverServicesRequest,
    DiscoverServicesResponse,
    ExchangeMTURequest,
    ExchangeMTUResponse,
    GattCharacteristic,
    GattCharacteristicDescriptor,
    GattService,
    ReadCharacteristicDescriptorRequest,
    ReadCharacteristicDescriptorResponse,
    ReadCharacteristicRequest,
    ReadCharacteristicResponse,
    ReadCharacteristicsFromUuidRequest,
    ReadCharacteristicsFromUuidResponse,
    WriteRequest,
    WriteResponse,
)
from pandora_experimental.gatt_grpc_aio import GATTServicer
from typing import Dict, List


class GATTService(GATTServicer):
    device: Device
    peers: Dict[int, Peer]

    def __init__(self, device: Device) -> None:
        super().__init__()
        self.device = device
        self.peers: Dict[int, Peer] = {}
        self.device.on('connection', self.on_connection)  # type: ignore
        self.device.on('disconnection', self.on_disconnection)  # type: ignore

    def __del__(self) -> None:
        self.device.remove_listener('connection', self.on_connection)  # type: ignore
        self.device.remove_listener('disconnection', self.on_disconnection)  # type: ignore

    def on_connection(self, connection: BumbleConnection) -> None:
        self.peers[connection.handle] = Peer(connection)

    def on_disconnection(self, connection: BumbleConnection) -> None:
        del self.peers[connection.handle]

    async def ExchangeMTU(self, request: ExchangeMTURequest, context: grpc.ServicerContext) -> ExchangeMTUResponse:
        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
        logging.info(f"ExchangeMTU: {connection_handle}")

        connection = self.device.lookup_connection(connection_handle)
        assert connection
        peer = self.peers[connection.handle]

        mtu = await peer.request_mtu(request.mtu)  # type: ignore
        assert mtu == request.mtu

        return ExchangeMTUResponse()

    async def WriteAttFromHandle(self, request: WriteRequest, context: grpc.ServicerContext) -> WriteResponse:
        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
        logging.info(f"WriteAttFromHandle: {connection_handle}")

        connection = self.device.lookup_connection(connection_handle)
        assert connection
        peer = self.peers[connection.handle]

        try:
            await peer.write_value(request.handle, request.value, with_response=True)  # type: ignore
            status = 0
        except ProtocolError as e:
            status = e.error_code

        return WriteResponse(handle=request.handle, status=AttStatusCode(status))

    async def DiscoverServiceByUuid(
        self, request: DiscoverServiceByUuidRequest, context: grpc.ServicerContext
    ) -> DiscoverServicesResponse:
        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
        logging.info(f"DiscoverServiceByUuid: {connection_handle}")

        connection = self.device.lookup_connection(connection_handle)
        assert connection
        peer = self.peers[connection.handle]

        services: List[ServiceProxy] = await peer.discover_service(request.uuid)  # type: ignore

        async def feed_service(service: ServiceProxy) -> None:
            characteristic: CharacteristicProxy
            for characteristic in await peer.discover_characteristics(service=service):  # type: ignore
                await characteristic.discover_descriptors()

        await asyncio.gather(*(feed_service(service) for service in services))

        return DiscoverServicesResponse(
            services=[
                GattService(
                    handle=service.handle,
                    type=int.from_bytes(bytes(service.type), 'little'),
                    uuid=str(service.uuid),
                    characteristics=[
                        GattCharacteristic(
                            properties=characteristic.properties,  # type: ignore
                            permissions=0,  # TODO
                            uuid=str(characteristic.uuid),  # type: ignore
                            handle=characteristic.handle,  # type: ignore
                            descriptors=[
                                GattCharacteristicDescriptor(
                                    handle=descriptor.handle,  # type: ignore
                                    permissions=0,  # TODO
                                    uuid=str(descriptor.type),  # type: ignore
                                )
                                for descriptor in characteristic.descriptors  # type: ignore
                            ],
                        )
                        for characteristic in service.characteristics  # type: ignore
                    ],
                )
                for service in services
            ]
        )

    async def DiscoverServices(
        self, request: DiscoverServicesRequest, context: grpc.ServicerContext
    ) -> DiscoverServicesResponse:
        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
        logging.info(f"DiscoverServices: {connection_handle}")

        connection = self.device.lookup_connection(connection_handle)
        assert connection
        peer = self.peers[connection.handle]

        services: List[ServiceProxy] = await peer.discover_services()  # type: ignore

        async def feed_service(service: ServiceProxy) -> None:
            for characteristic in await peer.discover_characteristics(service=service):  # type: ignore
                await characteristic.discover_descriptors()  # type: ignore

        await asyncio.gather(*(feed_service(service) for service in services))

        return DiscoverServicesResponse(
            services=[
                GattService(
                    handle=service.handle,
                    type=int.from_bytes(bytes(service.type), 'little'),
                    uuid=str(service.uuid),
                    characteristics=[
                        GattCharacteristic(
                            properties=characteristic.properties,  # type: ignore
                            permissions=0,  # TODO
                            uuid=str(characteristic.uuid),  # type: ignore
                            handle=characteristic.handle,  # type: ignore
                            descriptors=[
                                GattCharacteristicDescriptor(
                                    handle=descriptor.handle,  # type: ignore
                                    permissions=0,  # TODO
                                    uuid=str(descriptor.type),  # type: ignore
                                )
                                for descriptor in characteristic.descriptors  # type: ignore
                            ],
                        )
                        for characteristic in service.characteristics  # type: ignore
                    ],
                )
                for service in services
            ]
        )

    # TODO: implement `DiscoverServicesSdp`

    async def ClearCache(self, request: ClearCacheRequest, context: grpc.ServicerContext) -> ClearCacheResponse:
        logging.info(f"ClearCache")
        return ClearCacheResponse()

    async def ReadCharacteristicFromHandle(
        self, request: ReadCharacteristicRequest, context: grpc.ServicerContext
    ) -> ReadCharacteristicResponse:
        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
        logging.info(f"ReadCharacteristicFromHandle: {connection_handle}")

        connection = self.device.lookup_connection(connection_handle)
        assert connection
        peer = self.peers[connection.handle]

        try:
            value = await peer.read_value(request.handle)  # type: ignore
            status = 0
        except ProtocolError as e:
            value = bytes()
            status = e.error_code

        return ReadCharacteristicResponse(value=AttValue(value=value), status=AttStatusCode(status))

    async def ReadCharacteristicsFromUuid(
        self, request: ReadCharacteristicsFromUuidRequest, context: grpc.ServicerContext
    ) -> ReadCharacteristicsFromUuidResponse:
        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
        logging.info(f"ReadCharacteristicsFromUuid: {connection_handle}")

        connection = self.device.lookup_connection(connection_handle)
        assert connection
        peer = self.peers[connection.handle]

        service_mock = type('', (), {'handle': request.start_handle, 'end_group_handle': request.end_handle})()

        try:
            characteristics = await peer.read_characteristics_by_uuid(request.uuid, service_mock)  # type: ignore

            return ReadCharacteristicsFromUuidResponse(
                characteristics_read=[
                    ReadCharacteristicResponse(
                        value=AttValue(value=value, handle=handle),  # type: ignore
                        status=AttStatusCode.SUCCESS,
                    )
                    for handle, value in characteristics  # type: ignore
                ]
            )

        except ProtocolError as e:
            return ReadCharacteristicsFromUuidResponse(
                characteristics_read=[ReadCharacteristicResponse(status=e.error_code)]
            )

    async def ReadCharacteristicDescriptorFromHandle(
        self, request: ReadCharacteristicDescriptorRequest, context: grpc.ServicerContext
    ) -> ReadCharacteristicDescriptorResponse:
        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
        logging.info(f"ReadCharacteristicDescriptorFromHandle: {connection_handle}")

        connection = self.device.lookup_connection(connection_handle)
        assert connection
        peer = self.peers[connection.handle]

        try:
            value = await peer.read_value(request.handle)  # type: ignore
            status = 0
        except ProtocolError as e:
            value = bytes()
            status = e.error_code

        return ReadCharacteristicDescriptorResponse(value=AttValue(value=value), status=AttStatusCode(status))