Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e02289fc authored by Charlie Boutier's avatar Charlie Boutier Committed by Thomas Girardier
Browse files

[Pandora] - Import new Host.proto

Bug: 259449713
Bug: 245578454
Test: atest pts-bot:GAP + Treehugher
Ignore-AOSP-First: Cherry-picked from AOSP
Merged-In: Ie8ec73ac8bee33becbc2052a81e0ee4ac69d9e21
Change-Id: Ie8ec73ac8bee33becbc2052a81e0ee4ac69d9e21
parent 5bc65ded
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -92,8 +92,8 @@ class IUT:

        # Note: we don't keep a single gRPC channel instance in the IUT class
        # because reset is allowed to close the gRPC server.
        with grpc.insecure_channel(f"localhost:{self.pandora_server_port}") as channel:
            self._retry(Host(channel).HardReset)(wait_for_ready=True)
        with grpc.insecure_channel(f'localhost:{self.pandora_server_port}') as channel:
            self._retry(Host(channel).FactoryReset)(wait_for_ready=True)

    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.rootcanal.close()
+109 −105
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@ from time import sleep
from pandora_experimental.gatt_grpc import GATT
from pandora_experimental.gatt_pb2 import GattServiceParams, GattCharacteristicParams
from pandora_experimental.host_grpc import Host
from pandora_experimental.host_pb2 import AddressType, AdvertisingData, DiscoverabilityMode, ConnectabilityMode, Transport
from pandora_experimental.host_pb2 import ConnectabilityMode, DataTypes, DiscoverabilityMode, OwnAddressType
from pandora_experimental.security_grpc import Security


@@ -20,7 +20,8 @@ class GAPProxy(ProfileProxy):

        self.connection = None
        self.pairing_events = None
        self.discovery_events = None
        self.inquiry_responses = None
        self.scan_responses = None

        self.counter = 0
        self.cached_passkey = None
@@ -77,8 +78,8 @@ class GAPProxy(ProfileProxy):
        """

        self.host.StartAdvertising(
            connectability_mode=ConnectabilityMode.CONNECTABILITY_CONNECTABLE,
            own_address_type=AddressType.PUBLIC,
            connectable=True,
            own_address_type=OwnAddressType.PUBLIC,
        )

        self.pairing_events = self.security.OnPairing()
@@ -132,7 +133,7 @@ class GAPProxy(ProfileProxy):
            if self.counter == 0:
                self.counter += 1
                self.security.DeletePairing(address=pts_addr)
                self.connection = self.host.ConnectLE(address=pts_addr).connection
                self.connection = self.host.ConnectLE(public=pts_addr).connection
                self.security.Pair(connection=self.host.GetLEConnection(address=pts_addr).connection)
                return "OK"

@@ -146,14 +147,16 @@ class GAPProxy(ProfileProxy):
            address = pts_addr
        else:
            # the PTS sometimes decides to advertise with an RPA, so we do a scan to find its real address
            scans = self.host.RunDiscovery()
            scans = self.host.Scan()
            for scan in scans:
                if "pts" in scan.device.name.lower():
                    address = scan.device.address
                adv_address = scan.public if scan.HasField("public") else scan.random
                device_name = self.host.GetRemoteName(address=adv_address).name
                if "pts" in device_name.lower():
                    address = adv_address
                    scans.cancel()
                    break

        self.connection = self.host.ConnectLE(address=address).connection
        self.connection = self.host.ConnectLE(public=address).connection
        if test in {"GAP/BOND/BON/BV-04-C"}:
            self.security.Pair(connection=self.connection)

@@ -201,9 +204,8 @@ class GAPProxy(ProfileProxy):
        """

        self.host.StartAdvertising(
            own_address_type=AddressType.PUBLIC,
            advertising_data=AdvertisingData(service_uuids=["955798ce-3022-455c-b759-ee8edcd73d1a"],))

            own_address_type=OwnAddressType.PUBLIC,
            data=DataTypes(complete_service_class_uuids128=["955798ce-3022-455c-b759-ee8edcd73d1a"],))
        return "OK"

    @assert_description
@@ -213,7 +215,8 @@ class GAPProxy(ProfileProxy):
        """

        self.host.StartAdvertising(
            own_address_type=AddressType.PUBLIC, advertising_data=AdvertisingData(include_local_name=True,))
            own_address_type=OwnAddressType.PUBLIC,
            data=DataTypes(include_complete_local_name=True, include_shortened_local_name=True,))

        return "OK"

@@ -224,8 +227,8 @@ class GAPProxy(ProfileProxy):
        """

        self.host.StartAdvertising(
            connectability_mode=ConnectabilityMode.CONNECTABILITY_CONNECTABLE,
            own_address_type=AddressType.PUBLIC,
            connectable=True,
            own_address_type=OwnAddressType.PUBLIC,
        )

        self.pairing_events = self.security.OnPairing()
@@ -240,8 +243,8 @@ class GAPProxy(ProfileProxy):
        """

        self.host.StartAdvertising(
            own_address_type=AddressType.PUBLIC,
            advertising_data=AdvertisingData(manufacturer_specific_data=b"don't be evil!",))
            own_address_type=OwnAddressType.PUBLIC,
            data=DataTypes(manufacturer_specific_data=b"d0n't b3 3v1l!",))

        return "OK"

@@ -252,7 +255,8 @@ class GAPProxy(ProfileProxy):
        """

        self.host.StartAdvertising(
            own_address_type=AddressType.PUBLIC, advertising_data=AdvertisingData(include_tx_power_level=True,))
            own_address_type=OwnAddressType.PUBLIC,
            data=DataTypes(include_tx_power_level=True,))

        return "OK"

@@ -263,8 +267,8 @@ class GAPProxy(ProfileProxy):
        """

        self.host.StartAdvertising(
            own_address_type=AddressType.PUBLIC,
            connectability_mode=ConnectabilityMode.CONNECTABILITY_CONNECTABLE,
            own_address_type=OwnAddressType.PUBLIC,
            connectable=True,
        )

        self.pairing_events = self.security.OnPairing()
@@ -278,8 +282,8 @@ class GAPProxy(ProfileProxy):
        """

        self.host.StartAdvertising(
            own_address_type=AddressType.PUBLIC,
            connectability_mode=ConnectabilityMode.CONNECTABILITY_CONNECTABLE,
            own_address_type=OwnAddressType.PUBLIC,
            connectable=True,
        )

        self.pairing_events = self.security.OnPairing()
@@ -302,11 +306,10 @@ class GAPProxy(ProfileProxy):
        ready for PTS to initiate LE create connection otherwise click 'No'.
        """

        inquiry_events = self.host.RunInquiry()
        for event in inquiry_events:
            for device in event.device:
                assert device.address == pts_addr, (device.address, pts_addr)
                inquiry_events.cancel()
        inquiry_responses = self.host.Inquiry()
        for response in inquiry_responses:
            assert response.address == pts_addr, (response.address, pts_addr)
            inquiry_responses.cancel()
            return "Yes"

        assert False
@@ -351,9 +354,9 @@ class GAPProxy(ProfileProxy):
        """

        self.host.StartAdvertising(
            own_address_type=AddressType.PUBLIC,
            connectability_mode=ConnectabilityMode.CONNECTABILITY_CONNECTABLE,
            discovery_mode=DiscoverabilityMode.DISCOVERABILITY_GENERAL,
            data=DataTypes(le_discoverability_mode=DiscoverabilityMode.DISCOVERABLE_GENERAL),
            own_address_type=OwnAddressType.PUBLIC,
            connectable=True,
        )

        self.pairing_events = self.security.OnPairing()
@@ -369,9 +372,9 @@ class GAPProxy(ProfileProxy):
        """

        self.host.StartAdvertising(
            own_address_type=AddressType.PUBLIC,
            connectability_mode=ConnectabilityMode.CONNECTABILITY_CONNECTABLE,
            discovery_mode=DiscoverabilityMode.DISCOVERABILITY_GENERAL,
            data=DataTypes(le_discoverability_mode=DiscoverabilityMode.DISCOVERABLE_GENERAL),
            own_address_type=OwnAddressType.PUBLIC,
            connectable=True,
        )

        self.pairing_events = self.security.OnPairing()
@@ -385,12 +388,10 @@ class GAPProxy(ProfileProxy):
        report using connectable undirected advertising.
        """

        self.host.SetDiscoverabilityMode(discoverability=DiscoverabilityMode.DISCOVERABILITY_NONE)

        self.host.StartAdvertising(
            own_address_type=AddressType.PUBLIC,
            connectability_mode=ConnectabilityMode.CONNECTABILITY_CONNECTABLE,
            discovery_mode=DiscoverabilityMode.DISCOVERABILITY_NONE,
            data=DataTypes(le_discoverability_mode=DiscoverabilityMode.NOT_DISCOVERABLE),
            own_address_type=OwnAddressType.PUBLIC,
            connectable=True,
        )

        return "OK"
@@ -402,9 +403,9 @@ class GAPProxy(ProfileProxy):
        """

        self.host.StartAdvertising(
            own_address_type=AddressType.PUBLIC,
            connectability_mode=ConnectabilityMode.CONNECTABILITY_CONNECTABLE,
            discovery_mode=DiscoverabilityMode.DISCOVERABILITY_GENERAL,
            data=DataTypes(le_discoverability_mode=DiscoverabilityMode.DISCOVERABLE_GENERAL),
            own_address_type=OwnAddressType.PUBLIC,
            connectable=True,
        )

        return "OK"
@@ -415,7 +416,7 @@ class GAPProxy(ProfileProxy):
        Please start General Discovery. Press OK to continue.
        """

        self.discovery_events = self.host.RunDiscovery()
        self.scan_responses = self.host.Scan()

        return "OK"

@@ -425,7 +426,7 @@ class GAPProxy(ProfileProxy):
        Please start Limited Discovery. Press OK to continue.
        """

        self.discovery_events = self.host.RunDiscovery()
        self.scan_responses = self.host.Scan()

        return "OK"

@@ -435,9 +436,11 @@ class GAPProxy(ProfileProxy):
        Please confirm that PTS is discovered.
        """

        for event in self.discovery_events:
            if event.device.address == pts_addr and flags_match(event.flags, GENERAL_MASK):
                self.discovery_events.cancel()
        for response in self.scan_responses:
            assert response.HasField("public")
            # General Discoverability shall be able to check both limited and general advertising
            if response.public == pts_addr:
                self.scan_responses.cancel()
                return "OK"

        assert False
@@ -448,9 +451,11 @@ class GAPProxy(ProfileProxy):
        Please confirm that PTS is discovered.
        """

        for event in self.discovery_events:
            if event.device.address == pts_addr and flags_match(event.flags, LIMITED_MASK):
                self.discovery_events.cancel()
        for response in self.scan_responses:
            assert response.HasField("public")
            if (response.public == pts_addr and
                response.data.le_discoverability_mode == DiscoverabilityMode.DISCOVERABLE_LIMITED):
                self.scan_responses.cancel()
                return "OK"

        assert False
@@ -465,9 +470,11 @@ class GAPProxy(ProfileProxy):

        def search():
            nonlocal discovered
            for event in self.discovery_events:
                if event.device.address == pts_addr and flags_match(event.flags, GENERAL_MASK):
                    self.discovery_events.cancel()
            for response in self.scan_responses:
                assert response.HasField("public")
                if (response.public == pts_addr and
                    response.data.le_discoverability_mode == DiscoverabilityMode.DISCOVERABLE_GENERAL):
                    self.scan_responses.cancel()
                    discovered = True
                    return

@@ -490,9 +497,11 @@ class GAPProxy(ProfileProxy):

        def search():
            nonlocal discovered
            for event in self.discovery_events:
                if event.device.address == pts_addr and flags_match(event.flags, LIMITED_MASK):
                    self.discovery_events.cancel()
            for response in self.scan_responses:
                assert response.HasField("public")
                if (response.public == pts_addr and
                    response.data.le_discoverability_mode == DiscoverabilityMode.DISCOVERABLE_LIMITED):
                    self.inquiry_responses.cancel()
                    discovered = True
                    return

@@ -512,7 +521,7 @@ class GAPProxy(ProfileProxy):
        send an advertising report.
        """

        self.host.StartAdvertising(own_address_type=AddressType.PUBLIC,)
        self.host.StartAdvertising(own_address_type=OwnAddressType.PUBLIC,)

        return "OK"

@@ -575,11 +584,10 @@ class GAPProxy(ProfileProxy):

        def search_bredr():
            nonlocal discovered_bredr
            inquiry_events = self.host.RunInquiry()
            for event in inquiry_events:
                for device in event.device:
                    if device.address == pts_addr:
                        inquiry_events.cancel()
            inquiry_responses = self.host.Inquiry()
            for response in inquiry_responses:
                if response.address == pts_addr:
                    inquiry_responses.cancel()
                    discovered_bredr = True
                    return

@@ -590,10 +598,12 @@ class GAPProxy(ProfileProxy):

        def search_le():
            nonlocal discovered_le
            discovery_events = self.host.RunDiscovery()
            for event in discovery_events:
                if event.device.address == pts_addr and flags_match(event.flags, GENERAL_MASK):
                    discovery_events.cancel()
            scan_responses = self.host.Scan()
            for event in scan_responses:
                address = event.public if event.HasField("public") else event.random
                if (address == pts_addr and
                    event.data.le_discoverability_mode):
                    scan_responses.cancel()
                    discovered_le = True
                    return

@@ -614,11 +624,13 @@ class GAPProxy(ProfileProxy):
        Please make IUT general discoverable. Press OK to continue.
        """

        self.host.SetDiscoverabilityMode(discoverability=DiscoverabilityMode.DISCOVERABILITY_GENERAL,)
        self.host.SetDiscoverabilityMode(
            mode=DiscoverabilityMode.DISCOVERABLE_GENERAL)

        self.host.StartAdvertising(
            own_address_type=AddressType.PUBLIC,
            connectability_mode=ConnectabilityMode.CONNECTABILITY_CONNECTABLE,
            data=DataTypes(le_discoverability_mode=DiscoverabilityMode.DISCOVERABLE_GENERAL),
            own_address_type=OwnAddressType.PUBLIC,
            connectable=True,
        )

        return "OK"
@@ -630,11 +642,10 @@ class GAPProxy(ProfileProxy):
        press OK to continue.
        """

        inquiry_events = self.host.RunInquiry()
        for event in inquiry_events:
            for device in event.device:
                if device.address == pts_addr:
                    inquiry_events.cancel()
        inquiry_responses = self.host.Inquiry()
        for response in inquiry_responses:
            if response.address == pts_addr:
                inquiry_responses.cancel()
                return "OK"

        assert False
@@ -645,8 +656,10 @@ class GAPProxy(ProfileProxy):
        Please make IUT not connectable. Press OK to continue.
        """

        self.host.SetDiscoverabilityMode(discoverability=DiscoverabilityMode.DISCOVERABILITY_NONE)
        self.host.SetConnectabilityMode(connectability=ConnectabilityMode.CONNECTABILITY_NOT_CONNECTABLE)
        self.host.SetDiscoverabilityMode(
            mode=DiscoverabilityMode.NOT_DISCOVERABLE)

        self.host.SetConnectabilityMode(mode=ConnectabilityMode.NOT_CONNECTABLE)

        return "OK"

@@ -656,8 +669,9 @@ class GAPProxy(ProfileProxy):
        Please make IUT not discoverable. Press OK to continue.
        """

        self.host.SetDiscoverabilityMode(discoverability=DiscoverabilityMode.DISCOVERABILITY_NONE)
        self.host.SetConnectabilityMode(connectability=ConnectabilityMode.CONNECTABILITY_NOT_CONNECTABLE)
        self.host.SetDiscoverabilityMode(
            mode=DiscoverabilityMode.NOT_DISCOVERABLE)
        self.host.SetConnectabilityMode(mode=ConnectabilityMode.NOT_CONNECTABLE)

        return "OK"

@@ -676,7 +690,7 @@ class GAPProxy(ProfileProxy):
        """

        try:
            self.host.DisconnectLE(connection=self.connection)
            self.host.Disconnect(connection=self.connection)
        except Exception:
            # we already disconnected, no-op
            pass
@@ -727,7 +741,7 @@ class GAPProxy(ProfileProxy):
        Please enter Non-Connectable mode.
        """

        self.host.SetConnectabilityMode(connectability=ConnectabilityMode.CONNECTABILITY_NOT_CONNECTABLE)
        self.host.SetConnectabilityMode(mode=ConnectabilityMode.NOT_CONNECTABLE)

        return "OK"

@@ -737,8 +751,9 @@ class GAPProxy(ProfileProxy):
        Please enter General Discoverable and Non-Connectable mode.
        """

        self.host.SetDiscoverabilityMode(discoverability=DiscoverabilityMode.DISCOVERABILITY_GENERAL)
        self.host.SetConnectabilityMode(connectability=ConnectabilityMode.CONNECTABILITY_NOT_CONNECTABLE)
        self.host.SetDiscoverabilityMode(
            mode=DiscoverabilityMode.DISCOVERABLE_GENERAL)
        self.host.SetConnectabilityMode(mode=ConnectabilityMode.NOT_CONNECTABLE)

        return "OK"

@@ -751,9 +766,9 @@ class GAPProxy(ProfileProxy):
        """

        self.host.StartAdvertising(
            own_address_type=AddressType.PUBLIC,
            connectability_mode=ConnectabilityMode.CONNECTABILITY_NOT_CONNECTABLE,
            discovery_mode=DiscoverabilityMode.DISCOVERABILITY_GENERAL,
            data=DataTypes(le_discoverability_mode=DiscoverabilityMode.DISCOVERABLE_GENERAL),
            own_address_type=OwnAddressType.PUBLIC,
            connectable=False,
        )

        return "OK"
@@ -766,9 +781,9 @@ class GAPProxy(ProfileProxy):
        """

        self.host.StartAdvertising(
            own_address_type=AddressType.PUBLIC,
            connectability_mode=ConnectabilityMode.CONNECTABILITY_CONNECTABLE,
            discovery_mode=DiscoverabilityMode.DISCOVERABILITY_NONE,
            data=DataTypes(le_discoverability_mode=DiscoverabilityMode.NOT_DISCOVERABLE),
            own_address_type=OwnAddressType.PUBLIC,
            connectable=True,
        )

        return "OK"
@@ -780,12 +795,10 @@ class GAPProxy(ProfileProxy):
        advertising report using connectable undirected advertising.
        """

        self.host.SetDiscoverabilityMode(discoverability=DiscoverabilityMode.DISCOVERABILITY_GENERAL)

        self.host.StartAdvertising(
            own_address_type=AddressType.PUBLIC,
            connectability_mode=ConnectabilityMode.CONNECTABILITY_CONNECTABLE,
            discovery_mode=DiscoverabilityMode.DISCOVERABILITY_GENERAL,
            data=DataTypes(le_discoverability_mode=DiscoverabilityMode.DISCOVERABLE_GENERAL),
            own_address_type=OwnAddressType.PUBLIC,
            connectable=True,
        )

        return "OK"
@@ -900,14 +913,5 @@ class GAPProxy(ProfileProxy):

        Thread(target=task).start()


GENERAL_MASK = 0x3
LIMITED_MASK = 0x1


def flags_match(flags, mask):
    return flags != ((1 << 32) - 1) and flags & mask


def handle_format(handle):
    return hex(handle)[2:].zfill(4)
+5 −5
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ from mmi2grpc._proxy import ProfileProxy

from pandora_experimental.gatt_grpc import GATT
from pandora_experimental.host_grpc import Host
from pandora_experimental.host_pb2 import ConnectabilityMode, AddressType
from pandora_experimental.host_pb2 import ConnectabilityMode, OwnAddressType
from pandora_experimental.gatt_pb2 import AttStatusCode, AttProperties, AttPermissions
from pandora_experimental.gatt_pb2 import GattServiceParams
from pandora_experimental.gatt_pb2 import GattCharacteristicParams
@@ -70,7 +70,7 @@ class GATTProxy(ProfileProxy):
        PTS.
        """

        self.connection = self.host.ConnectLE(address=pts_addr).connection
        self.connection = self.host.ConnectLE(public=pts_addr).connection
        if test in NEEDS_CACHE_CLEARED:
            self.gatt.ClearCache(connection=self.connection)
        return "OK"
@@ -86,7 +86,7 @@ class GATTProxy(ProfileProxy):
        """

        assert self.connection is not None
        self.host.DisconnectLE(connection=self.connection)
        self.host.Disconnect(connection=self.connection)
        self.connection = None
        self.services = None
        self.characteristics = None
@@ -896,8 +896,8 @@ class GATTProxy(ProfileProxy):
        PTS.
        """
        self.host.StartAdvertising(
            connectability_mode=ConnectabilityMode.CONNECTABILITY_CONNECTABLE,
            own_address_type=AddressType.PUBLIC,
            connectable=True,
            own_address_type=OwnAddressType.PUBLIC,
        )
        self.gatt.RegisterService(
            service=GattServiceParams(
+3 −3
Original line number Diff line number Diff line
@@ -89,7 +89,7 @@ class HFPProxy(ProfileProxy):
            time.sleep(2)

            if test == "HFP/AG/SLC/BV-02-C":
                self.host.SetConnectabilityMode(connectability=ConnectabilityMode.CONNECTABILITY_CONNECTABLE)
                self.host.SetConnectabilityMode(mode=ConnectabilityMode.CONNECTABLE)
                self.connection = self.host.Connect(address=pts_addr).connection
            else:
                if not self.connection:
@@ -131,7 +131,7 @@ class HFPProxy(ProfileProxy):
        Make the Implementation Under Test (IUT) connectable, then click Ok.
        """

        self.host.SetConnectabilityMode(connectability=ConnectabilityMode.CONNECTABILITY_CONNECTABLE)
        self.host.SetConnectabilityMode(mode=ConnectabilityMode.CONNECTABLE)

        return "OK"

@@ -359,7 +359,7 @@ class HFPProxy(ProfileProxy):
        """

        self.hfp.SetInBandRingtone(enabled=False)
        self.host.SoftReset()
        self.host.Reset()

        return "OK"

+1 −1
Original line number Diff line number Diff line
@@ -37,7 +37,7 @@ class HOGPProxy(ProfileProxy):
        to the PTS.
        """

        self.connection = self.host.ConnectLE(address=pts_addr).connection
        self.connection = self.host.ConnectLE(public=pts_addr).connection
        self.pairing_stream = self.security.OnPairing()
        self.security.Pair(connection=self.connection)

Loading