Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7b6f28bc authored by Myles Watson's avatar Myles Watson
Browse files

gd: Add cert tests that cover privacy policies

Bug: 145832107
Tag: #gd-refactor
Test: cert/run --host
Change-Id: I7a6fff5a4f570e40a5b1f27dd611d912d7fac105
parent e4a7ded8
Loading
Loading
Loading
Loading
+43 −9
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@

from cert.gd_base_test import GdBaseTestClass
from cert.event_stream import EventStream
from cert.truth import assertThat
from google.protobuf import empty_pb2 as empty_proto
from facade import rootservice_pb2 as facade_rootservice
from facade import common_pb2 as common
@@ -32,16 +33,12 @@ class LeAclManagerTest(GdBaseTestClass):
    def setup_class(self):
        super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI')

    def setup_test(self):
        super().setup_test()
    def set_privacy_policy_static(self):
        self.dut_address = b'd0:05:04:03:02:01'
        private_policy = le_initiator_address_facade.PrivacyPolicy(
            address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS,
            address_with_type=common.BluetoothAddressWithType(
                address=common.BluetoothAddress(address=bytes(b'D0:05:04:03:02:01')),
                type=common.RANDOM_DEVICE_ADDRESS),
            rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
            minimum_rotation_time=(7 * 60 * 1000),
            maximum_rotation_time=(15 * 60 * 1000))
                address=common.BluetoothAddress(address=bytes(self.dut_address)), type=common.RANDOM_DEVICE_ADDRESS))
        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy)

    def register_for_event(self, event_code):
@@ -65,7 +62,7 @@ class LeAclManagerTest(GdBaseTestClass):
            handle=int(handle), packet_boundary_flag=int(pb_flag), broadcast_flag=int(b_flag), data=acl)
        self.cert.hci.SendAclData(acl_msg)

    def test_dut_connects(self):
    def dut_connects(self, check_address):
        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
        with EventStream(self.cert.hci.FetchLeSubevents(empty_proto.Empty())) as cert_hci_le_event_stream, \
            EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
@@ -126,26 +123,33 @@ class LeAclManagerTest(GdBaseTestClass):

                # Cert gets ConnectionComplete with a handle and sends ACL data
                handle = 0xfff
                address = hci_packets.Address()

                def get_handle(packet):
                    packet_bytes = packet.event
                    nonlocal handle
                    nonlocal address
                    if b'\x3e\x13\x01\x00' in packet_bytes:
                        cc_view = hci_packets.LeConnectionCompleteView(
                            hci_packets.LeMetaEventView(
                                hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
                        handle = cc_view.GetConnectionHandle()
                        address = cc_view.GetPeerAddress()
                        return True
                    if b'\x3e\x13\x0A\x00' in packet_bytes:
                        cc_view = hci_packets.LeEnhancedConnectionCompleteView(
                            hci_packets.LeMetaEventView(
                                hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
                        handle = cc_view.GetConnectionHandle()
                        address = cc_view.GetPeerResolvablePrivateAddress()
                        return True
                    return False

                cert_hci_le_event_stream.assert_event_occurs(get_handle)
                cert_handle = handle
                dut_address_from_complete = address
                if (check_address):
                    assertThat(dut_address_from_complete).isEqualTo(self.dut_address.decode())

                self.enqueue_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
                                      hci_packets.BroadcastFlag.POINT_TO_POINT,
@@ -162,7 +166,36 @@ class LeAclManagerTest(GdBaseTestClass):
                cert_acl_data_stream.assert_event_occurs(lambda packet: b'SomeMoreAclData' in packet.data)
                acl_data_stream.assert_event_occurs(lambda packet: b'SomeAclData' in packet.payload)

    def test_dut_connects(self):
        self.set_privacy_policy_static()
        self.dut_connects(check_address=True)

    def test_dut_connects_resolvable_address(self):
        privacy_policy = le_initiator_address_facade.PrivacyPolicy(
            address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS,
            rotation_irk=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f',
            minimum_rotation_time=7 * 60 * 1000,
            maximum_rotation_time=15 * 60 * 1000)
        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy)
        self.dut_connects(check_address=False)

    def test_dut_connects_non_resolvable_address(self):
        privacy_policy = le_initiator_address_facade.PrivacyPolicy(
            address_policy=le_initiator_address_facade.AddressPolicy.USE_NON_RESOLVABLE_ADDRESS,
            rotation_irk=b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f',
            minimum_rotation_time=8 * 60 * 1000,
            maximum_rotation_time=14 * 60 * 1000)
        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy)
        self.dut_connects(check_address=False)

    def test_dut_connects_public_address(self):
        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(
            le_initiator_address_facade.PrivacyPolicy(
                address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS))
        self.dut_connects(check_address=False)

    def test_cert_connects(self):
        self.set_privacy_policy_static()
        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
        with EventStream(self.cert.hci.FetchLeSubevents(empty_proto.Empty())) as cert_hci_le_event_stream, \
                EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
@@ -203,7 +236,7 @@ class LeAclManagerTest(GdBaseTestClass):
                hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS,
                                                              hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                                                              hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
                                                              'D0:05:04:03:02:01', 1, [phy_scan_params]), False)
                                                              self.dut_address.decode(), 1, [phy_scan_params]), False)

            # Cert gets ConnectionComplete with a handle and sends ACL data
            handle = 0xfff
@@ -244,6 +277,7 @@ class LeAclManagerTest(GdBaseTestClass):
            acl_data_stream.assert_event_occurs(lambda packet: b'SomeAclData' in packet.payload)

    def test_recombination_l2cap_packet(self):
        self.set_privacy_policy_static()
        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
        with EventStream(self.cert.hci.FetchLeSubevents(empty_proto.Empty())) as cert_hci_le_event_stream, \
                EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \