Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1db8fdaf authored by bidsharma's avatar bidsharma
Browse files

GD: Check address types in le_acl_manager_test

Bug: 230123996
Test: cert/run LeAclManagerTest
Tag: #refactor

Change-Id: I5602c516d3bc3c1c9df00f92a7ffbac56f720b7d
parent 4afb69b0
Loading
Loading
Loading
Loading
+14 −2
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
from datetime import timedelta

from google.protobuf import empty_pb2 as empty_proto
from blueberry.tests.gd.cert.event_stream import EventStream
@@ -72,10 +73,11 @@ class PyHciAclConnection(IEventStream):

class PyHciLeAclConnection(IEventStream):

    def __init__(self, handle, acl_stream, device, peer, peer_resolvable, local_resolvable):
    def __init__(self, handle, acl_stream, device, peer, peer_type, peer_resolvable, local_resolvable):
        self.handle = int(handle)
        self.device = device
        self.peer = peer
        self.peer_type = peer_type
        self.peer_resolvable = peer_resolvable
        self.local_resolvable = local_resolvable
        # todo, handle we got is 0, so doesn't match - fix before enabling filtering
@@ -250,11 +252,21 @@ class PyHci(Closable):

        handle = connection_complete.get().GetConnectionHandle()
        peer = connection_complete.get().GetPeerAddress()
        peer_type = connection_complete.get().GetPeerAddressType()
        local_resolvable = connection_complete.get().GetLocalResolvablePrivateAddress()
        peer_resolvable = connection_complete.get().GetPeerResolvablePrivateAddress()
        if self.acl_stream is None:
            raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
        return PyHciLeAclConnection(handle, self.acl_stream, self.device, peer, peer_resolvable, local_resolvable)
        return PyHciLeAclConnection(handle, self.acl_stream, self.device, peer, peer_type, peer_resolvable,
                                    local_resolvable)

    def incoming_le_connection_fails(self):
        connection_complete = HciCaptures.LeConnectionCompleteCapture()
        assertThat(self.le_event_stream).emitsNone(connection_complete, timeout=timedelta(seconds=5))

    def add_device_to_resolving_list(self, peer_address_type, peer_address, peer_irk, local_irk):
        self.send_command(
            hci_packets.LeAddDeviceToResolvingListBuilder(peer_address_type, peer_address, peer_irk, local_irk))

    def create_advertisement(self,
                             handle,
+6 −2
Original line number Diff line number Diff line
@@ -29,12 +29,13 @@ from blueberry.facade.hci import le_acl_manager_facade_pb2 as le_acl_manager_fac

class PyLeAclManagerAclConnection(IEventStream, Closable):

    def __init__(self, le_acl_manager, address, remote_addr, handle, event_stream):
    def __init__(self, le_acl_manager, address, remote_addr, remote_addr_type, handle, event_stream):
        """
        An abstract representation for an LE ACL connection in GD certification test
        :param le_acl_manager: The LeAclManager from this GD device
        :param address: The local device address
        :param remote_addr: Remote device address
        :param remote_addr_type: Remote device address type
        :param handle: Connection handle
        :param event_stream: The connection event stream for this connection
        """
@@ -46,6 +47,7 @@ class PyLeAclManagerAclConnection(IEventStream, Closable):
        self.acl_stream = EventStream(
            self.le_acl_manager.FetchAclData(le_acl_manager_facade.LeHandleMsg(handle=self.handle)))
        self.remote_address = remote_addr
        self.remote_address_type = remote_addr_type
        self.own_address = address
        self.disconnect_reason = None

@@ -128,11 +130,13 @@ class PyLeAclManager(Closable):
        complete = connection_complete.get()
        handle = complete.GetConnectionHandle()
        remote = complete.GetPeerAddress()
        remote_address_type = complete.GetPeerAddressType()
        if complete.GetSubeventCode() == hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE:
            address = complete.GetLocalResolvablePrivateAddress()
        else:
            address = None
        connection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, handle, event_stream)
        connection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, remote_address_type, handle,
                                                 event_stream)
        self.active_connections.append(connection)
        return connection

+93 −50
Original line number Diff line number Diff line
@@ -37,6 +37,10 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
        gd_base_test.GdBaseTestClass.setup_test(self)
        self.cert_hci = PyHci(self.cert, acl_streaming=True)
        self.dut_le_acl_manager = PyLeAclManager(self.dut)
        self.cert_public_address = self.cert_hci.read_own_address()
        self.dut_public_address = self.dut.hci_controller.GetMacAddressSimple().decode("utf-8")
        self.dut_random_address = 'd0:05:04:03:02:01'
        self.cert_random_address = 'c0:05:04:03:02:01'

    def teardown_test(self):
        safeClose(self.dut_le_acl_manager)
@@ -44,11 +48,11 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
        gd_base_test.GdBaseTestClass.teardown_test(self)

    def set_privacy_policy_static(self):
        self.dut_address = b'd0:05:04:03:02:01'
        private_policy = le_initiator_address_facade.PrivacyPolicy(
            address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS,
            address_with_type=common.BluetoothAddressWithType(
                address=common.BluetoothAddress(address=bytes(self.dut_address)), type=common.RANDOM_DEVICE_ADDRESS))
                address=common.BluetoothAddress(address=bytes(self.dut_random_address, "utf-8")),
                type=common.RANDOM_DEVICE_ADDRESS))
        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy)

    def register_for_event(self, event_code):
@@ -68,14 +72,14 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
        acl = hci_packets.AclBuilder(handle, pb_flag, b_flag, RawBuilder(data))
        self.cert.hci.SendAcl(common.Data(payload=bytes(acl.Serialize())))

    def dut_connects(self, check_address, cert_addr='0C:05:04:03:02:01'):
    def dut_connects(self):
        # Cert Advertises
        advertising_handle = 0
        py_hci_adv = PyHciAdvertisement(advertising_handle, self.cert_hci)

        self.cert_hci.create_advertisement(
            advertising_handle,
            cert_addr,
            self.cert_random_address,
            hci_packets.LegacyAdvertisingProperties.ADV_IND,
        )

@@ -83,32 +87,36 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
        py_hci_adv.set_scan_response(b'Im_A_C')
        py_hci_adv.start()

        self.dut_le_acl = self.dut_le_acl_manager.connect_to_remote(
        dut_le_acl = self.dut_le_acl_manager.connect_to_remote(
            remote_addr=common.BluetoothAddressWithType(
                address=common.BluetoothAddress(address=bytes(cert_addr, 'utf8')),
                address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')),
                type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)))

        py_hci_le_acl_connection = self.cert_hci.incoming_le_connection()
        self.cert_acl_data_stream = py_hci_le_acl_connection.our_acl_stream
        assertThat(py_hci_le_acl_connection.handle).isNotNone()
        if check_address:
            assertThat(py_hci_le_acl_connection.peer).isEqualTo(self.dut_address.decode())
        cert_le_acl = self.cert_hci.incoming_le_connection()
        return dut_le_acl, cert_le_acl

        self.cert_handle = py_hci_le_acl_connection.handle

    def send_receive_and_check(self):
        self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
    def send_receive_and_check(self, dut_le_acl, cert_le_acl):
        self.enqueue_acl_data(cert_le_acl.handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
                              hci_packets.BroadcastFlag.POINT_TO_POINT,
                              bytes(b'\x19\x00\x07\x00SomeAclData from the Cert'))

        self.dut_le_acl.send(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT')
        assertThat(self.cert_acl_data_stream).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
        assertThat(self.dut_le_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
        dut_le_acl.send(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT')
        assertThat(cert_le_acl.our_acl_stream).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
        assertThat(dut_le_acl).emits(lambda packet: b'SomeAclData' in packet.payload)

    def test_dut_connects(self):
        self.set_privacy_policy_static()
        self.dut_connects(check_address=True)
        self.send_receive_and_check()
        dut_le_acl, cert_le_acl = self.dut_connects()

        assertThat(cert_le_acl.handle).isNotNone()
        assertThat(cert_le_acl.peer).isEqualTo(self.dut_random_address)
        assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)

        assertThat(dut_le_acl.handle).isNotNone()
        assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address)
        assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)

        self.send_receive_and_check(dut_le_acl, cert_le_acl)

    def test_dut_connects_resolvable_address(self):
        privacy_policy = le_initiator_address_facade.PrivacyPolicy(
@@ -117,8 +125,15 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
            minimum_rotation_time=7 * 60 * 1000,
            maximum_rotation_time=15 * 60 * 1000)
        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy)
        self.dut_connects(check_address=False)
        self.send_receive_and_check()
        dut_le_acl, cert_le_acl = self.dut_connects()

        assertThat(cert_le_acl.handle).isNotNone()

        assertThat(dut_le_acl.handle).isNotNone()
        assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address)
        assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)

        self.send_receive_and_check(dut_le_acl, cert_le_acl)

    def test_dut_connects_non_resolvable_address(self):
        privacy_policy = le_initiator_address_facade.PrivacyPolicy(
@@ -127,22 +142,46 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
            minimum_rotation_time=8 * 60 * 1000,
            maximum_rotation_time=14 * 60 * 1000)
        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy)
        self.dut_connects(check_address=False)
        self.send_receive_and_check()
        dut_le_acl, cert_le_acl = self.dut_connects()

        assertThat(dut_le_acl.handle).isNotNone()
        assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address)
        assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)

        self.send_receive_and_check(dut_le_acl, cert_le_acl)

    def test_dut_connects_public_address(self):
        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(
            le_initiator_address_facade.PrivacyPolicy(
                address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS))
        self.dut_connects(check_address=False)
        self.send_receive_and_check()
        dut_le_acl, cert_le_acl = self.dut_connects()

        assertThat(cert_le_acl.handle).isNotNone()
        assertThat(cert_le_acl.peer).isEqualTo(self.dut_public_address)
        assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS)

        assertThat(dut_le_acl.handle).isNotNone()
        assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address)
        assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)

        self.send_receive_and_check(dut_le_acl, cert_le_acl)

    def test_dut_connects_public_address_cancelled(self):
        # TODO (Add cancel)
        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(
            le_initiator_address_facade.PrivacyPolicy(
                address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS))
        self.dut_connects(check_address=False)
        self.send_receive_and_check()
        dut_le_acl, cert_le_acl = self.dut_connects()

        assertThat(cert_le_acl.handle).isNotNone()
        assertThat(cert_le_acl.peer).isEqualTo(self.dut_public_address)
        assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS)

        assertThat(dut_le_acl.handle).isNotNone()
        assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address)
        assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)

        self.send_receive_and_check(dut_le_acl, cert_le_acl)

    def test_cert_connects(self):
        self.set_privacy_policy_static()
@@ -168,33 +207,37 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
        self.dut.hci_le_advertising_manager.CreateAdvertiser(request)

        # Cert Connects
        self.cert_hci.set_random_le_address('0C:05:04:03:02:01')
        self.cert_hci.initiate_le_connection(self.dut_address.decode())
        self.cert_hci.set_random_le_address(self.cert_random_address)
        self.cert_hci.initiate_le_connection(self.dut_random_address)

        # Cert gets ConnectionComplete with a handle and sends ACL data
        py_hci_le_acl_connection = self.cert_hci.incoming_le_connection()
        cert_le_acl = self.cert_hci.incoming_le_connection()

        py_hci_le_acl_connection.send(hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
                                      hci_packets.BroadcastFlag.POINT_TO_POINT,
                                      b'\x19\x00\x07\x00SomeAclData from the Cert')
        self.cert_acl_data_stream = py_hci_le_acl_connection.our_acl_stream
        assertThat(py_hci_le_acl_connection.handle).isNotNone()
        self.cert_handle = py_hci_le_acl_connection.handle
        cert_le_acl.send(hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
                         hci_packets.BroadcastFlag.POINT_TO_POINT, b'\x19\x00\x07\x00SomeAclData from the Cert')

        # DUT gets a connection complete event and sends and receives
        self.dut_le_acl = self.dut_le_acl_manager.complete_incoming_connection()
        self.send_receive_and_check()
        dut_le_acl = self.dut_le_acl_manager.complete_incoming_connection()
        assertThat(cert_le_acl.handle).isNotNone()
        assertThat(cert_le_acl.peer).isEqualTo(self.dut_random_address)
        assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)

        assertThat(dut_le_acl.handle).isNotNone()
        assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address)
        assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)

        self.send_receive_and_check(dut_le_acl, cert_le_acl)

    def test_recombination_l2cap_packet(self):
        self.set_privacy_policy_static()
        self.dut_connects(check_address=True)

        self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
        dut_le_acl, cert_le_acl = self.dut_connects()
        cert_handle = cert_le_acl.handle
        self.enqueue_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
                              hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x06\x00\x07\x00Hello'))
        self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT,
        self.enqueue_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT,
                              hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!'))

        assertThat(self.dut_le_acl).emits(lambda packet: b'Hello!' in packet.payload)
        assertThat(dut_le_acl).emits(lambda packet: b'Hello!' in packet.payload)

    def test_background_connection(self):
        self.set_privacy_policy_static()
@@ -207,7 +250,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):

        token_background = self.dut_le_acl_manager.initiate_connection(
            remote_addr=common.BluetoothAddressWithType(
                address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')),
                address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')),
                type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)),
            is_direct=False)

@@ -217,7 +260,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
        # Cert Advertises
        advertising_handle = 0

        py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, '0C:05:04:03:02:01',
        py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, self.cert_random_address,
                                                        hci_packets.LegacyAdvertisingProperties.ADV_IND, 155, 165)

        py_hci_adv.set_data(b'Im_A_Cert')
@@ -233,7 +276,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
        # Start two background connections
        token_1 = self.dut_le_acl_manager.initiate_connection(
            remote_addr=common.BluetoothAddressWithType(
                address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')),
                address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')),
                type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)),
            is_direct=False)

@@ -246,7 +289,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
        # Cert Advertises
        advertising_handle = 0

        py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, '0C:05:04:03:02:01',
        py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, self.cert_random_address,
                                                        hci_packets.LegacyAdvertisingProperties.ADV_IND, 155, 165)

        py_hci_adv.set_data(b'Im_A_Cert')
@@ -275,7 +318,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
        self.set_privacy_policy_static()

        advertising_handle = 0
        py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, '0C:05:04:03:02:01',
        py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, self.cert_random_address,
                                                        hci_packets.LegacyAdvertisingProperties.ADV_IND, 155, 165)

        py_hci_adv.set_data(b'Im_A_Cert')
@@ -285,7 +328,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
        # Start direct connection
        token = self.dut_le_acl_manager.initiate_connection(
            remote_addr=common.BluetoothAddressWithType(
                address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')),
                address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')),
                type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)),
            is_direct=True)
        self.dut_le_acl_manager.complete_outgoing_connection(token)