Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6c4d2cd2 authored by William Escande's avatar William Escande Committed by Gerrit Code Review
Browse files

Merge changes I1138724f,I167a6a35 into main

* changes:
  AVATAR: Run lint and fix it
  AVATAR: Run format | Remove import | Update json
parents b4d5d38c 610afbff
Loading
Loading
Loading
Loading
+1 −4
Original line number Diff line number Diff line
@@ -16,10 +16,7 @@ import asyncio
import avatar
import enum
import grpc
import inspect
import itertools
import logging
import math
import numpy as np

from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous
@@ -34,7 +31,7 @@ from mobly.asserts import assert_in # type: ignore
from mobly.asserts import assert_is_not_none  # type: ignore
from mobly.asserts import assert_not_equal  # type: ignore
from mobly.asserts import assert_true  # type: ignore
from pandora._utils import AioStream, Stream
from pandora._utils import AioStream
from pandora.host_pb2 import PUBLIC, RANDOM, AdvertiseResponse, Connection, DataTypes, OwnAddressType, ScanningResponse
from pandora.security_pb2 import LE_LEVEL3
from pandora_experimental.asha_grpc_aio import Asha as AioAsha, add_AshaServicer_to_server
+1 −1
Original line number Diff line number Diff line
@@ -24,10 +24,10 @@ from bumble.pairing import PairingConfig
from bumble_experimental.gatt import GATTService
from mobly import base_test, signals, test_runner
from mobly.asserts import assert_equal  # type: ignore
from mobly.asserts import assert_true  # type: ignore
from mobly.asserts import assert_in  # type: ignore
from mobly.asserts import assert_is_not_none  # type: ignore
from mobly.asserts import assert_not_in  # type: ignore
from mobly.asserts import assert_true  # type: ignore
from pandora.host_pb2 import RANDOM, Connection, DataTypes
from pandora.security_pb2 import LE_LEVEL3, PairingEventAnswer, SecureResponse
from pandora_experimental.gatt_grpc import GATT
+70 −53
Original line number Diff line number Diff line
@@ -24,32 +24,32 @@ from bumble.core import (
    BT_L2CAP_PROTOCOL_ID,
    BT_RFCOMM_PROTOCOL_ID,
)
from bumble.rfcomm import Server as RfcommServer, DLC
from bumble.hfp import HfpProtocol
from bumble.rfcomm import DLC, Server as RfcommServer
from bumble.sdp import (
    DataElement,
    ServiceAttribute,
    SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
    SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
    SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
    SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
    SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
    DataElement,
    ServiceAttribute,
)
from bumble.hfp import HfpProtocol
from mobly import base_test, test_runner
from mobly.asserts import assert_equal  # type: ignore
from mobly.asserts import assert_not_equal  # type: ignore
from mobly.asserts import assert_in  # type: ignore
from mobly.asserts import assert_not_equal  # type: ignore
from mobly.asserts import assert_not_in  # type: ignore
from pandora.host_pb2 import Connection as PandoraConnection
from pandora.security_pb2 import LEVEL2
from typing import Optional, Tuple, List, Dict
from typing import Dict, List, Optional, Tuple

SDP_PROFILE_SUPPORTED_FEATURES_ID = 0x0311

HFP_AG_FEATURE_HF_INDICATORS = (1 << 10)
HFP_AG_FEATURE_HF_INDICATORS = 1 << 10
HFP_AG_FEATURE_DEFAULT = HFP_AG_FEATURE_HF_INDICATORS

HFP_HF_FEATURE_HF_INDICATORS = (1 << 8)
HFP_HF_FEATURE_DEFAULT = hex(0x01b5)
HFP_HF_FEATURE_HF_INDICATORS = 1 << 8
HFP_HF_FEATURE_DEFAULT = hex(0x01B5)

PROPERTY_HF_ENABLED = 'bluetooth.profile.hfp.hf.enabled'
PROPERTY_HF_FEATURES = 'bluetooth.hfp.hf_client_features.config'
@@ -128,8 +128,8 @@ class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc]
        def on_dlc(dlc: DLC) -> None:
            dlc_connected.set_result(dlc)

        rfcomm_server = RfcommServer(self.ref.device)
        channel_number = rfcomm_server.listen(on_dlc)
        rfcomm_server = RfcommServer(self.ref.device)  # type: ignore
        channel_number = rfcomm_server.listen(on_dlc)  # type: ignore

        # Setup SDP records
        self.ref.device.sdp_service_records = make_bumble_ag_sdp_records(HFP_VERSION_1_7, channel_number, 0)
@@ -142,7 +142,7 @@ class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc]
        dlc = await dlc_connected
        assert isinstance(dlc, DLC)

        return HfpProtocol(dlc)
        return HfpProtocol(dlc)  # type: ignore

    @avatar.parameterized((True,), (False,))  # type: ignore[misc]
    @avatar.asynchronous
@@ -155,7 +155,6 @@ class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc]
        ref_dut_hfp_protocol = await self.make_hfp_connection()

        class TestAgServer(HfpAgServer):

            def on_brsf(self, hf_features: int) -> None:
                # HF indicators should be enabled
                assert_not_equal(hf_features & HFP_HF_FEATURE_HF_INDICATORS, 0)
@@ -172,8 +171,9 @@ class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc]
        await server.serve()


def make_bumble_ag_sdp_records(hfp_version: int, rfcomm_channel: int,
                               ag_sdp_features: int) -> Dict[int, List[ServiceAttribute]]:
def make_bumble_ag_sdp_records(
    hfp_version: int, rfcomm_channel: int, ag_sdp_features: int
) -> Dict[int, List[ServiceAttribute]]:
    return {
        0x00010001: [
            ServiceAttribute(
@@ -182,29 +182,39 @@ def make_bumble_ag_sdp_records(hfp_version: int, rfcomm_channel: int,
            ),
            ServiceAttribute(
                SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
                DataElement.sequence([
                DataElement.sequence(
                    [
                        DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE),
                        DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
                ]),
                    ]
                ),
            ),
            ServiceAttribute(
                SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
                DataElement.sequence([
                DataElement.sequence(
                    [
                        DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]),
                    DataElement.sequence([
                        DataElement.sequence(
                            [
                                DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
                                DataElement.unsigned_integer_8(rfcomm_channel),
                    ]),
                ]),
                            ]
                        ),
                    ]
                ),
            ),
            ServiceAttribute(
                SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
                DataElement.sequence([
                    DataElement.sequence([
                DataElement.sequence(
                    [
                        DataElement.sequence(
                            [
                                DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE),
                                DataElement.unsigned_integer_16(hfp_version),
                    ])
                ]),
                            ]
                        )
                    ]
                ),
            ),
            ServiceAttribute(
                SDP_PROFILE_SUPPORTED_FEATURES_ID,
@@ -224,9 +234,12 @@ class HfpAgServer:
        self.terminated = False
        self.hf_features = 0  # Unknown

    def send_response_line(self, response: str) -> None:
        self.protocol.send_response_line(response)  # type: ignore

    async def serve(self) -> None:
        while not self.terminated:
            line = await self.protocol.next_line()
            line = await self.protocol.next_line()  # type: ignore

            if line.startswith('AT+BRSF='):
                hf_features = int(line[len('AT+BRSF=') :])
@@ -243,50 +256,54 @@ class HfpAgServer:
            elif line.startswith('AT+CIND?'):
                self.on_cind_test()
            # TODO(b/286226902): Implement handlers for these commands
            elif line.startswith((
            elif line.startswith(
                (
                    'AT+CLIP=',
                    'AT+VGS=',
                    'AT+BIA=',
                    'AT+CMER=',
                    'AT+XEVENT=',
                    'AT+XAPL=',
            )):
                self.protocol.send_response_line('OK')
                )
            ):
                self.send_response_line('OK')
            else:
                self.protocol.send_response_line('ERROR')
                self.send_response_line('ERROR')

    def on_brsf(self, hf_features: int) -> None:
        self.hf_features = hf_features
        self.protocol.send_response_line(f'+BRSF: {self.ag_features}')
        self.protocol.send_response_line('OK')
        self.send_response_line(f'+BRSF: {self.ag_features}')
        self.send_response_line('OK')

    # AT+CIND?
    def on_cind_read(self) -> None:
        self.protocol.send_response_line('+CIND: 0,0,1,4,1,5,0')
        self.protocol.send_response_line('OK')
        self.send_response_line('+CIND: 0,0,1,4,1,5,0')
        self.send_response_line('OK')

    # AT+CIND=?
    def on_cind_test(self) -> None:
        self.protocol.send_response_line('+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
        self.send_response_line(
            '+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
            '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
                                         '("callheld",(0-2))')
        self.protocol.send_response_line('OK')
            '("callheld",(0-2))'
        )
        self.send_response_line('OK')

    # AT+BIND=
    def on_bind_list(self, indicators: list[int]) -> None:
        self.enabled_hf_indicators = indicators[:]
        self.protocol.send_response_line('OK')
        self.send_response_line('OK')

    # AT+BIND=?
    def on_bind_read_capabilities(self) -> None:
        self.protocol.send_response_line('+BIND: ' + ','.join(map(str, self.enabled_hf_indicators)))
        self.protocol.send_response_line('OK')
        self.send_response_line('+BIND: ' + ','.join(map(str, self.enabled_hf_indicators)))
        self.send_response_line('OK')

    # AT+BIND?
    def on_bind_read_configuration(self) -> None:
        for i in self.enabled_hf_indicators:
            self.protocol.send_response_line(f'+BIND: {i},1')
        self.protocol.send_response_line('OK')
            self.send_response_line(f'+BIND: {i},1')
        self.send_response_line('OK')


if __name__ == '__main__':
+2 −2
Original line number Diff line number Diff line
@@ -20,9 +20,9 @@ import cases.le_host_test
import cases.le_security_test
import cases.security_test
import gatt_test
import hfpclient_test
import sdp_test
import smp_test
import hfpclient_test

_TEST_CLASSES_LIST = [
    cases.host_test.HostTest,
+9 −0
Original line number Diff line number Diff line
@@ -5,4 +5,13 @@
    "reportMissingTypeStubs": false,
    "reportUnknownLambdaType": false,
    "reportImportCycles": false,
    "extraPaths": [
        "../../../pandora/server",
        "../../../../../../external/pandora/avatar",
        "../../../../../../external/python/bumble",
        "../../../../../../external/python/mobly",
        "../../../../../../external/python/pyee",
        "../../../../../../out/soong/.intermediates/external/pandora/bt-test-interfaces/python/pandora-python-gen-src/gen/",
        "../../../../../../out/soong/.intermediates/packages/modules/Bluetooth/pandora/interfaces/python/pandora_experimental-python-gen-src/gen/"
    ]
}
 No newline at end of file
Loading