Loading system/blueberry/tests/gd/hci/le_acl_manager_test.py +65 −0 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ from blueberry.tests.gd.cert.truth import assertThat from blueberry.tests.gd.cert.py_hci import PyHci, PyHciAdvertisement from blueberry.tests.gd.cert.py_le_acl_manager import PyLeAclManager from blueberry.facade import common_pb2 as common from blueberry.facade.hci import le_acl_manager_facade_pb2 as le_acl_manager_facade from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade from blueberry.facade.hci import hci_facade_pb2 as hci_facade Loading Loading @@ -95,6 +96,46 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): cert_le_acl = self.cert_hci.incoming_le_connection() return dut_le_acl, cert_le_acl def cert_advertises_resolvable(self): self.cert_hci.add_device_to_resolving_list(hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, self.dut_public_address, b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f') # Cert Advertises advertising_handle = 0 py_hci_adv = PyHciAdvertisement(advertising_handle, self.cert_hci) self.cert_hci.create_advertisement( advertising_handle, self.cert_random_address, hci_packets.LegacyAdvertisingProperties.ADV_IND, own_address_type=hci_packets.OwnAddressType.RESOLVABLE_OR_PUBLIC_ADDRESS, peer_address=self.dut_public_address, peer_address_type=hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS) py_hci_adv.set_data(b'Im_A_Cert') py_hci_adv.set_scan_response(b'Im_A_C') py_hci_adv.start() def dut_connects_cert_resolvable(self): self.dut.hci_le_acl_manager.AddDeviceToResolvingList( le_acl_manager_facade.IrkMsg( peer=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes(self.cert_public_address, "utf-8")), type=int(hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS)), peer_irk=b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f', local_irk=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', )) dut_le_acl = self.dut_le_acl_manager.connect_to_remote( remote_addr=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes(self.cert_public_address, "utf-8")), type=int(hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS))) cert_le_acl = self.cert_hci.incoming_le_connection() return dut_le_acl, cert_le_acl def send_receive_and_check(self, dut_le_acl, cert_le_acl): self.enqueue_acl_data(cert_le_acl.handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, Loading Loading @@ -128,6 +169,9 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): dut_le_acl, cert_le_acl = self.dut_connects() assertThat(cert_le_acl.handle).isNotNone() assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_public_address) assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_random_address) assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) Loading @@ -135,6 +179,27 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_dut_connects_resolvable_address_public(self): privacy_policy = le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS, rotation_irk=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', minimum_rotation_time=7 * 60 * 1000, maximum_rotation_time=15 * 60 * 1000) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) self.cert_advertises_resolvable() dut_le_acl, cert_le_acl = self.dut_connects_cert_resolvable() assertThat(cert_le_acl.handle).isNotNone() assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_public_address) assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_random_address) assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_public_address) assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS) self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_dut_connects_non_resolvable_address(self): privacy_policy = le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_NON_RESOLVABLE_ADDRESS, Loading system/gd/hci/acl_manager/le_impl.h +13 −8 Original line number Diff line number Diff line Loading @@ -365,15 +365,20 @@ struct le_impl : public bluetooth::hci::LeAddressManagerCallback { on_le_connection_canceled_on_pause(); return; } AddressWithType remote_address(address, peer_address_type); // The address added to the connect list is the one that callbacks // should be sent for, given that is the address passed in // call to BluetoothDevice#connectGatt and tied to app layer callbacks. if (!peer_resolvable_address.IsEmpty() && is_device_in_connect_list(AddressWithType(peer_resolvable_address, AddressType::RANDOM_DEVICE_ADDRESS))) { LOG_INFO("Using resolvable address"); remote_address = AddressWithType(peer_resolvable_address, AddressType::RANDOM_DEVICE_ADDRESS); AddressType remote_address_type; switch (peer_address_type) { case AddressType::PUBLIC_DEVICE_ADDRESS: case AddressType::PUBLIC_IDENTITY_ADDRESS: remote_address_type = AddressType::PUBLIC_DEVICE_ADDRESS; break; case AddressType::RANDOM_DEVICE_ADDRESS: case AddressType::RANDOM_IDENTITY_ADDRESS: remote_address_type = AddressType::RANDOM_DEVICE_ADDRESS; break; } AddressWithType remote_address(address, remote_address_type); on_common_le_connection_complete(remote_address); if (status == ErrorCode::UNKNOWN_CONNECTION) { Loading Loading
system/blueberry/tests/gd/hci/le_acl_manager_test.py +65 −0 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ from blueberry.tests.gd.cert.truth import assertThat from blueberry.tests.gd.cert.py_hci import PyHci, PyHciAdvertisement from blueberry.tests.gd.cert.py_le_acl_manager import PyLeAclManager from blueberry.facade import common_pb2 as common from blueberry.facade.hci import le_acl_manager_facade_pb2 as le_acl_manager_facade from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade from blueberry.facade.hci import hci_facade_pb2 as hci_facade Loading Loading @@ -95,6 +96,46 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): cert_le_acl = self.cert_hci.incoming_le_connection() return dut_le_acl, cert_le_acl def cert_advertises_resolvable(self): self.cert_hci.add_device_to_resolving_list(hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, self.dut_public_address, b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f') # Cert Advertises advertising_handle = 0 py_hci_adv = PyHciAdvertisement(advertising_handle, self.cert_hci) self.cert_hci.create_advertisement( advertising_handle, self.cert_random_address, hci_packets.LegacyAdvertisingProperties.ADV_IND, own_address_type=hci_packets.OwnAddressType.RESOLVABLE_OR_PUBLIC_ADDRESS, peer_address=self.dut_public_address, peer_address_type=hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS) py_hci_adv.set_data(b'Im_A_Cert') py_hci_adv.set_scan_response(b'Im_A_C') py_hci_adv.start() def dut_connects_cert_resolvable(self): self.dut.hci_le_acl_manager.AddDeviceToResolvingList( le_acl_manager_facade.IrkMsg( peer=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes(self.cert_public_address, "utf-8")), type=int(hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS)), peer_irk=b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f', local_irk=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', )) dut_le_acl = self.dut_le_acl_manager.connect_to_remote( remote_addr=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes(self.cert_public_address, "utf-8")), type=int(hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS))) cert_le_acl = self.cert_hci.incoming_le_connection() return dut_le_acl, cert_le_acl def send_receive_and_check(self, dut_le_acl, cert_le_acl): self.enqueue_acl_data(cert_le_acl.handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, Loading Loading @@ -128,6 +169,9 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): dut_le_acl, cert_le_acl = self.dut_connects() assertThat(cert_le_acl.handle).isNotNone() assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_public_address) assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_random_address) assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) Loading @@ -135,6 +179,27 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_dut_connects_resolvable_address_public(self): privacy_policy = le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS, rotation_irk=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', minimum_rotation_time=7 * 60 * 1000, maximum_rotation_time=15 * 60 * 1000) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) self.cert_advertises_resolvable() dut_le_acl, cert_le_acl = self.dut_connects_cert_resolvable() assertThat(cert_le_acl.handle).isNotNone() assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_public_address) assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_random_address) assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_public_address) assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS) self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_dut_connects_non_resolvable_address(self): privacy_policy = le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_NON_RESOLVABLE_ADDRESS, Loading
system/gd/hci/acl_manager/le_impl.h +13 −8 Original line number Diff line number Diff line Loading @@ -365,15 +365,20 @@ struct le_impl : public bluetooth::hci::LeAddressManagerCallback { on_le_connection_canceled_on_pause(); return; } AddressWithType remote_address(address, peer_address_type); // The address added to the connect list is the one that callbacks // should be sent for, given that is the address passed in // call to BluetoothDevice#connectGatt and tied to app layer callbacks. if (!peer_resolvable_address.IsEmpty() && is_device_in_connect_list(AddressWithType(peer_resolvable_address, AddressType::RANDOM_DEVICE_ADDRESS))) { LOG_INFO("Using resolvable address"); remote_address = AddressWithType(peer_resolvable_address, AddressType::RANDOM_DEVICE_ADDRESS); AddressType remote_address_type; switch (peer_address_type) { case AddressType::PUBLIC_DEVICE_ADDRESS: case AddressType::PUBLIC_IDENTITY_ADDRESS: remote_address_type = AddressType::PUBLIC_DEVICE_ADDRESS; break; case AddressType::RANDOM_DEVICE_ADDRESS: case AddressType::RANDOM_IDENTITY_ADDRESS: remote_address_type = AddressType::RANDOM_DEVICE_ADDRESS; break; } AddressWithType remote_address(address, remote_address_type); on_common_le_connection_complete(remote_address); if (status == ErrorCode::UNKNOWN_CONNECTION) { Loading