Loading system/blueberry/tests/gd/cert/py_hci.py +14 −2 Original line number Diff line number Diff line Loading @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from datetime import timedelta from google.protobuf import empty_pb2 as empty_proto from blueberry.tests.gd.cert.event_stream import EventStream Loading Loading @@ -72,10 +73,11 @@ class PyHciAclConnection(IEventStream): class PyHciLeAclConnection(IEventStream): def __init__(self, handle, acl_stream, device, peer, peer_resolvable, local_resolvable): def __init__(self, handle, acl_stream, device, peer, peer_type, peer_resolvable, local_resolvable): self.handle = int(handle) self.device = device self.peer = peer self.peer_type = peer_type self.peer_resolvable = peer_resolvable self.local_resolvable = local_resolvable # todo, handle we got is 0, so doesn't match - fix before enabling filtering Loading Loading @@ -250,11 +252,21 @@ class PyHci(Closable): handle = connection_complete.get().GetConnectionHandle() peer = connection_complete.get().GetPeerAddress() peer_type = connection_complete.get().GetPeerAddressType() local_resolvable = connection_complete.get().GetLocalResolvablePrivateAddress() peer_resolvable = connection_complete.get().GetPeerResolvablePrivateAddress() if self.acl_stream is None: raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__) return PyHciLeAclConnection(handle, self.acl_stream, self.device, peer, peer_resolvable, local_resolvable) return PyHciLeAclConnection(handle, self.acl_stream, self.device, peer, peer_type, peer_resolvable, local_resolvable) def incoming_le_connection_fails(self): connection_complete = HciCaptures.LeConnectionCompleteCapture() assertThat(self.le_event_stream).emitsNone(connection_complete, timeout=timedelta(seconds=5)) def add_device_to_resolving_list(self, peer_address_type, peer_address, peer_irk, local_irk): self.send_command( hci_packets.LeAddDeviceToResolvingListBuilder(peer_address_type, peer_address, peer_irk, local_irk)) def create_advertisement(self, handle, Loading system/blueberry/tests/gd/cert/py_le_acl_manager.py +6 −2 Original line number Diff line number Diff line Loading @@ -29,12 +29,13 @@ from blueberry.facade.hci import le_acl_manager_facade_pb2 as le_acl_manager_fac class PyLeAclManagerAclConnection(IEventStream, Closable): def __init__(self, le_acl_manager, address, remote_addr, handle, event_stream): def __init__(self, le_acl_manager, address, remote_addr, remote_addr_type, handle, event_stream): """ An abstract representation for an LE ACL connection in GD certification test :param le_acl_manager: The LeAclManager from this GD device :param address: The local device address :param remote_addr: Remote device address :param remote_addr_type: Remote device address type :param handle: Connection handle :param event_stream: The connection event stream for this connection """ Loading @@ -46,6 +47,7 @@ class PyLeAclManagerAclConnection(IEventStream, Closable): self.acl_stream = EventStream( self.le_acl_manager.FetchAclData(le_acl_manager_facade.LeHandleMsg(handle=self.handle))) self.remote_address = remote_addr self.remote_address_type = remote_addr_type self.own_address = address self.disconnect_reason = None Loading Loading @@ -128,11 +130,13 @@ class PyLeAclManager(Closable): complete = connection_complete.get() handle = complete.GetConnectionHandle() remote = complete.GetPeerAddress() remote_address_type = complete.GetPeerAddressType() if complete.GetSubeventCode() == hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE: address = complete.GetLocalResolvablePrivateAddress() else: address = None connection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, handle, event_stream) connection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, remote_address_type, handle, event_stream) self.active_connections.append(connection) return connection Loading system/blueberry/tests/gd/hci/le_acl_manager_test.py +93 −50 Original line number Diff line number Diff line Loading @@ -37,6 +37,10 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): gd_base_test.GdBaseTestClass.setup_test(self) self.cert_hci = PyHci(self.cert, acl_streaming=True) self.dut_le_acl_manager = PyLeAclManager(self.dut) self.cert_public_address = self.cert_hci.read_own_address() self.dut_public_address = self.dut.hci_controller.GetMacAddressSimple().decode("utf-8") self.dut_random_address = 'd0:05:04:03:02:01' self.cert_random_address = 'c0:05:04:03:02:01' def teardown_test(self): safeClose(self.dut_le_acl_manager) Loading @@ -44,11 +48,11 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): gd_base_test.GdBaseTestClass.teardown_test(self) def set_privacy_policy_static(self): self.dut_address = b'd0:05:04:03:02:01' private_policy = le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, address_with_type=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes(self.dut_address)), type=common.RANDOM_DEVICE_ADDRESS)) address=common.BluetoothAddress(address=bytes(self.dut_random_address, "utf-8")), type=common.RANDOM_DEVICE_ADDRESS)) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) def register_for_event(self, event_code): Loading @@ -68,14 +72,14 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): acl = hci_packets.AclBuilder(handle, pb_flag, b_flag, RawBuilder(data)) self.cert.hci.SendAcl(common.Data(payload=bytes(acl.Serialize()))) def dut_connects(self, check_address, cert_addr='0C:05:04:03:02:01'): def dut_connects(self): # Cert Advertises advertising_handle = 0 py_hci_adv = PyHciAdvertisement(advertising_handle, self.cert_hci) self.cert_hci.create_advertisement( advertising_handle, cert_addr, self.cert_random_address, hci_packets.LegacyAdvertisingProperties.ADV_IND, ) Loading @@ -83,32 +87,36 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): py_hci_adv.set_scan_response(b'Im_A_C') py_hci_adv.start() self.dut_le_acl = self.dut_le_acl_manager.connect_to_remote( dut_le_acl = self.dut_le_acl_manager.connect_to_remote( remote_addr=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes(cert_addr, 'utf8')), address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')), type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS))) py_hci_le_acl_connection = self.cert_hci.incoming_le_connection() self.cert_acl_data_stream = py_hci_le_acl_connection.our_acl_stream assertThat(py_hci_le_acl_connection.handle).isNotNone() if check_address: assertThat(py_hci_le_acl_connection.peer).isEqualTo(self.dut_address.decode()) cert_le_acl = self.cert_hci.incoming_le_connection() return dut_le_acl, cert_le_acl self.cert_handle = py_hci_le_acl_connection.handle def send_receive_and_check(self): self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, def send_receive_and_check(self, dut_le_acl, cert_le_acl): self.enqueue_acl_data(cert_le_acl.handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x19\x00\x07\x00SomeAclData from the Cert')) self.dut_le_acl.send(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT') assertThat(self.cert_acl_data_stream).emits(lambda packet: b'SomeMoreAclData' in packet.payload) assertThat(self.dut_le_acl).emits(lambda packet: b'SomeAclData' in packet.payload) dut_le_acl.send(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT') assertThat(cert_le_acl.our_acl_stream).emits(lambda packet: b'SomeMoreAclData' in packet.payload) assertThat(dut_le_acl).emits(lambda packet: b'SomeAclData' in packet.payload) def test_dut_connects(self): self.set_privacy_policy_static() self.dut_connects(check_address=True) self.send_receive_and_check() dut_le_acl, cert_le_acl = self.dut_connects() assertThat(cert_le_acl.handle).isNotNone() assertThat(cert_le_acl.peer).isEqualTo(self.dut_random_address) assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_dut_connects_resolvable_address(self): privacy_policy = le_initiator_address_facade.PrivacyPolicy( Loading @@ -117,8 +125,15 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): minimum_rotation_time=7 * 60 * 1000, maximum_rotation_time=15 * 60 * 1000) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) self.dut_connects(check_address=False) self.send_receive_and_check() dut_le_acl, cert_le_acl = self.dut_connects() assertThat(cert_le_acl.handle).isNotNone() assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_dut_connects_non_resolvable_address(self): privacy_policy = le_initiator_address_facade.PrivacyPolicy( Loading @@ -127,22 +142,46 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): minimum_rotation_time=8 * 60 * 1000, maximum_rotation_time=14 * 60 * 1000) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) self.dut_connects(check_address=False) self.send_receive_and_check() dut_le_acl, cert_le_acl = self.dut_connects() assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_dut_connects_public_address(self): self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress( le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS)) self.dut_connects(check_address=False) self.send_receive_and_check() dut_le_acl, cert_le_acl = self.dut_connects() assertThat(cert_le_acl.handle).isNotNone() assertThat(cert_le_acl.peer).isEqualTo(self.dut_public_address) assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS) assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_dut_connects_public_address_cancelled(self): # TODO (Add cancel) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress( le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS)) self.dut_connects(check_address=False) self.send_receive_and_check() dut_le_acl, cert_le_acl = self.dut_connects() assertThat(cert_le_acl.handle).isNotNone() assertThat(cert_le_acl.peer).isEqualTo(self.dut_public_address) assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS) assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_cert_connects(self): self.set_privacy_policy_static() Loading @@ -168,33 +207,37 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): self.dut.hci_le_advertising_manager.CreateAdvertiser(request) # Cert Connects self.cert_hci.set_random_le_address('0C:05:04:03:02:01') self.cert_hci.initiate_le_connection(self.dut_address.decode()) self.cert_hci.set_random_le_address(self.cert_random_address) self.cert_hci.initiate_le_connection(self.dut_random_address) # Cert gets ConnectionComplete with a handle and sends ACL data py_hci_le_acl_connection = self.cert_hci.incoming_le_connection() cert_le_acl = self.cert_hci.incoming_le_connection() py_hci_le_acl_connection.send(hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, b'\x19\x00\x07\x00SomeAclData from the Cert') self.cert_acl_data_stream = py_hci_le_acl_connection.our_acl_stream assertThat(py_hci_le_acl_connection.handle).isNotNone() self.cert_handle = py_hci_le_acl_connection.handle cert_le_acl.send(hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, b'\x19\x00\x07\x00SomeAclData from the Cert') # DUT gets a connection complete event and sends and receives self.dut_le_acl = self.dut_le_acl_manager.complete_incoming_connection() self.send_receive_and_check() dut_le_acl = self.dut_le_acl_manager.complete_incoming_connection() assertThat(cert_le_acl.handle).isNotNone() assertThat(cert_le_acl.peer).isEqualTo(self.dut_random_address) assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_recombination_l2cap_packet(self): self.set_privacy_policy_static() self.dut_connects(check_address=True) self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, dut_le_acl, cert_le_acl = self.dut_connects() cert_handle = cert_le_acl.handle self.enqueue_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x06\x00\x07\x00Hello')) self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, self.enqueue_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!')) assertThat(self.dut_le_acl).emits(lambda packet: b'Hello!' in packet.payload) assertThat(dut_le_acl).emits(lambda packet: b'Hello!' in packet.payload) def test_background_connection(self): self.set_privacy_policy_static() Loading @@ -207,7 +250,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): token_background = self.dut_le_acl_manager.initiate_connection( remote_addr=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')), address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')), type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)), is_direct=False) Loading @@ -217,7 +260,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): # Cert Advertises advertising_handle = 0 py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, '0C:05:04:03:02:01', py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, self.cert_random_address, hci_packets.LegacyAdvertisingProperties.ADV_IND, 155, 165) py_hci_adv.set_data(b'Im_A_Cert') Loading @@ -233,7 +276,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): # Start two background connections token_1 = self.dut_le_acl_manager.initiate_connection( remote_addr=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')), address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')), type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)), is_direct=False) Loading @@ -246,7 +289,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): # Cert Advertises advertising_handle = 0 py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, '0C:05:04:03:02:01', py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, self.cert_random_address, hci_packets.LegacyAdvertisingProperties.ADV_IND, 155, 165) py_hci_adv.set_data(b'Im_A_Cert') Loading Loading @@ -275,7 +318,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): self.set_privacy_policy_static() advertising_handle = 0 py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, '0C:05:04:03:02:01', py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, self.cert_random_address, hci_packets.LegacyAdvertisingProperties.ADV_IND, 155, 165) py_hci_adv.set_data(b'Im_A_Cert') Loading @@ -285,7 +328,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): # Start direct connection token = self.dut_le_acl_manager.initiate_connection( remote_addr=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')), address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')), type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)), is_direct=True) self.dut_le_acl_manager.complete_outgoing_connection(token) Loading Loading
system/blueberry/tests/gd/cert/py_hci.py +14 −2 Original line number Diff line number Diff line Loading @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from datetime import timedelta from google.protobuf import empty_pb2 as empty_proto from blueberry.tests.gd.cert.event_stream import EventStream Loading Loading @@ -72,10 +73,11 @@ class PyHciAclConnection(IEventStream): class PyHciLeAclConnection(IEventStream): def __init__(self, handle, acl_stream, device, peer, peer_resolvable, local_resolvable): def __init__(self, handle, acl_stream, device, peer, peer_type, peer_resolvable, local_resolvable): self.handle = int(handle) self.device = device self.peer = peer self.peer_type = peer_type self.peer_resolvable = peer_resolvable self.local_resolvable = local_resolvable # todo, handle we got is 0, so doesn't match - fix before enabling filtering Loading Loading @@ -250,11 +252,21 @@ class PyHci(Closable): handle = connection_complete.get().GetConnectionHandle() peer = connection_complete.get().GetPeerAddress() peer_type = connection_complete.get().GetPeerAddressType() local_resolvable = connection_complete.get().GetLocalResolvablePrivateAddress() peer_resolvable = connection_complete.get().GetPeerResolvablePrivateAddress() if self.acl_stream is None: raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__) return PyHciLeAclConnection(handle, self.acl_stream, self.device, peer, peer_resolvable, local_resolvable) return PyHciLeAclConnection(handle, self.acl_stream, self.device, peer, peer_type, peer_resolvable, local_resolvable) def incoming_le_connection_fails(self): connection_complete = HciCaptures.LeConnectionCompleteCapture() assertThat(self.le_event_stream).emitsNone(connection_complete, timeout=timedelta(seconds=5)) def add_device_to_resolving_list(self, peer_address_type, peer_address, peer_irk, local_irk): self.send_command( hci_packets.LeAddDeviceToResolvingListBuilder(peer_address_type, peer_address, peer_irk, local_irk)) def create_advertisement(self, handle, Loading
system/blueberry/tests/gd/cert/py_le_acl_manager.py +6 −2 Original line number Diff line number Diff line Loading @@ -29,12 +29,13 @@ from blueberry.facade.hci import le_acl_manager_facade_pb2 as le_acl_manager_fac class PyLeAclManagerAclConnection(IEventStream, Closable): def __init__(self, le_acl_manager, address, remote_addr, handle, event_stream): def __init__(self, le_acl_manager, address, remote_addr, remote_addr_type, handle, event_stream): """ An abstract representation for an LE ACL connection in GD certification test :param le_acl_manager: The LeAclManager from this GD device :param address: The local device address :param remote_addr: Remote device address :param remote_addr_type: Remote device address type :param handle: Connection handle :param event_stream: The connection event stream for this connection """ Loading @@ -46,6 +47,7 @@ class PyLeAclManagerAclConnection(IEventStream, Closable): self.acl_stream = EventStream( self.le_acl_manager.FetchAclData(le_acl_manager_facade.LeHandleMsg(handle=self.handle))) self.remote_address = remote_addr self.remote_address_type = remote_addr_type self.own_address = address self.disconnect_reason = None Loading Loading @@ -128,11 +130,13 @@ class PyLeAclManager(Closable): complete = connection_complete.get() handle = complete.GetConnectionHandle() remote = complete.GetPeerAddress() remote_address_type = complete.GetPeerAddressType() if complete.GetSubeventCode() == hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE: address = complete.GetLocalResolvablePrivateAddress() else: address = None connection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, handle, event_stream) connection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, remote_address_type, handle, event_stream) self.active_connections.append(connection) return connection Loading
system/blueberry/tests/gd/hci/le_acl_manager_test.py +93 −50 Original line number Diff line number Diff line Loading @@ -37,6 +37,10 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): gd_base_test.GdBaseTestClass.setup_test(self) self.cert_hci = PyHci(self.cert, acl_streaming=True) self.dut_le_acl_manager = PyLeAclManager(self.dut) self.cert_public_address = self.cert_hci.read_own_address() self.dut_public_address = self.dut.hci_controller.GetMacAddressSimple().decode("utf-8") self.dut_random_address = 'd0:05:04:03:02:01' self.cert_random_address = 'c0:05:04:03:02:01' def teardown_test(self): safeClose(self.dut_le_acl_manager) Loading @@ -44,11 +48,11 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): gd_base_test.GdBaseTestClass.teardown_test(self) def set_privacy_policy_static(self): self.dut_address = b'd0:05:04:03:02:01' private_policy = le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, address_with_type=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes(self.dut_address)), type=common.RANDOM_DEVICE_ADDRESS)) address=common.BluetoothAddress(address=bytes(self.dut_random_address, "utf-8")), type=common.RANDOM_DEVICE_ADDRESS)) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) def register_for_event(self, event_code): Loading @@ -68,14 +72,14 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): acl = hci_packets.AclBuilder(handle, pb_flag, b_flag, RawBuilder(data)) self.cert.hci.SendAcl(common.Data(payload=bytes(acl.Serialize()))) def dut_connects(self, check_address, cert_addr='0C:05:04:03:02:01'): def dut_connects(self): # Cert Advertises advertising_handle = 0 py_hci_adv = PyHciAdvertisement(advertising_handle, self.cert_hci) self.cert_hci.create_advertisement( advertising_handle, cert_addr, self.cert_random_address, hci_packets.LegacyAdvertisingProperties.ADV_IND, ) Loading @@ -83,32 +87,36 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): py_hci_adv.set_scan_response(b'Im_A_C') py_hci_adv.start() self.dut_le_acl = self.dut_le_acl_manager.connect_to_remote( dut_le_acl = self.dut_le_acl_manager.connect_to_remote( remote_addr=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes(cert_addr, 'utf8')), address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')), type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS))) py_hci_le_acl_connection = self.cert_hci.incoming_le_connection() self.cert_acl_data_stream = py_hci_le_acl_connection.our_acl_stream assertThat(py_hci_le_acl_connection.handle).isNotNone() if check_address: assertThat(py_hci_le_acl_connection.peer).isEqualTo(self.dut_address.decode()) cert_le_acl = self.cert_hci.incoming_le_connection() return dut_le_acl, cert_le_acl self.cert_handle = py_hci_le_acl_connection.handle def send_receive_and_check(self): self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, def send_receive_and_check(self, dut_le_acl, cert_le_acl): self.enqueue_acl_data(cert_le_acl.handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x19\x00\x07\x00SomeAclData from the Cert')) self.dut_le_acl.send(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT') assertThat(self.cert_acl_data_stream).emits(lambda packet: b'SomeMoreAclData' in packet.payload) assertThat(self.dut_le_acl).emits(lambda packet: b'SomeAclData' in packet.payload) dut_le_acl.send(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT') assertThat(cert_le_acl.our_acl_stream).emits(lambda packet: b'SomeMoreAclData' in packet.payload) assertThat(dut_le_acl).emits(lambda packet: b'SomeAclData' in packet.payload) def test_dut_connects(self): self.set_privacy_policy_static() self.dut_connects(check_address=True) self.send_receive_and_check() dut_le_acl, cert_le_acl = self.dut_connects() assertThat(cert_le_acl.handle).isNotNone() assertThat(cert_le_acl.peer).isEqualTo(self.dut_random_address) assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_dut_connects_resolvable_address(self): privacy_policy = le_initiator_address_facade.PrivacyPolicy( Loading @@ -117,8 +125,15 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): minimum_rotation_time=7 * 60 * 1000, maximum_rotation_time=15 * 60 * 1000) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) self.dut_connects(check_address=False) self.send_receive_and_check() dut_le_acl, cert_le_acl = self.dut_connects() assertThat(cert_le_acl.handle).isNotNone() assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_dut_connects_non_resolvable_address(self): privacy_policy = le_initiator_address_facade.PrivacyPolicy( Loading @@ -127,22 +142,46 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): minimum_rotation_time=8 * 60 * 1000, maximum_rotation_time=14 * 60 * 1000) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) self.dut_connects(check_address=False) self.send_receive_and_check() dut_le_acl, cert_le_acl = self.dut_connects() assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_dut_connects_public_address(self): self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress( le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS)) self.dut_connects(check_address=False) self.send_receive_and_check() dut_le_acl, cert_le_acl = self.dut_connects() assertThat(cert_le_acl.handle).isNotNone() assertThat(cert_le_acl.peer).isEqualTo(self.dut_public_address) assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS) assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_dut_connects_public_address_cancelled(self): # TODO (Add cancel) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress( le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS)) self.dut_connects(check_address=False) self.send_receive_and_check() dut_le_acl, cert_le_acl = self.dut_connects() assertThat(cert_le_acl.handle).isNotNone() assertThat(cert_le_acl.peer).isEqualTo(self.dut_public_address) assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS) assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_cert_connects(self): self.set_privacy_policy_static() Loading @@ -168,33 +207,37 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): self.dut.hci_le_advertising_manager.CreateAdvertiser(request) # Cert Connects self.cert_hci.set_random_le_address('0C:05:04:03:02:01') self.cert_hci.initiate_le_connection(self.dut_address.decode()) self.cert_hci.set_random_le_address(self.cert_random_address) self.cert_hci.initiate_le_connection(self.dut_random_address) # Cert gets ConnectionComplete with a handle and sends ACL data py_hci_le_acl_connection = self.cert_hci.incoming_le_connection() cert_le_acl = self.cert_hci.incoming_le_connection() py_hci_le_acl_connection.send(hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, b'\x19\x00\x07\x00SomeAclData from the Cert') self.cert_acl_data_stream = py_hci_le_acl_connection.our_acl_stream assertThat(py_hci_le_acl_connection.handle).isNotNone() self.cert_handle = py_hci_le_acl_connection.handle cert_le_acl.send(hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, b'\x19\x00\x07\x00SomeAclData from the Cert') # DUT gets a connection complete event and sends and receives self.dut_le_acl = self.dut_le_acl_manager.complete_incoming_connection() self.send_receive_and_check() dut_le_acl = self.dut_le_acl_manager.complete_incoming_connection() assertThat(cert_le_acl.handle).isNotNone() assertThat(cert_le_acl.peer).isEqualTo(self.dut_random_address) assertThat(cert_le_acl.peer_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) assertThat(dut_le_acl.handle).isNotNone() assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) assertThat(dut_le_acl.remote_address_type).isEqualTo(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS) self.send_receive_and_check(dut_le_acl, cert_le_acl) def test_recombination_l2cap_packet(self): self.set_privacy_policy_static() self.dut_connects(check_address=True) self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, dut_le_acl, cert_le_acl = self.dut_connects() cert_handle = cert_le_acl.handle self.enqueue_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x06\x00\x07\x00Hello')) self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, self.enqueue_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!')) assertThat(self.dut_le_acl).emits(lambda packet: b'Hello!' in packet.payload) assertThat(dut_le_acl).emits(lambda packet: b'Hello!' in packet.payload) def test_background_connection(self): self.set_privacy_policy_static() Loading @@ -207,7 +250,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): token_background = self.dut_le_acl_manager.initiate_connection( remote_addr=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')), address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')), type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)), is_direct=False) Loading @@ -217,7 +260,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): # Cert Advertises advertising_handle = 0 py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, '0C:05:04:03:02:01', py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, self.cert_random_address, hci_packets.LegacyAdvertisingProperties.ADV_IND, 155, 165) py_hci_adv.set_data(b'Im_A_Cert') Loading @@ -233,7 +276,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): # Start two background connections token_1 = self.dut_le_acl_manager.initiate_connection( remote_addr=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')), address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')), type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)), is_direct=False) Loading @@ -246,7 +289,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): # Cert Advertises advertising_handle = 0 py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, '0C:05:04:03:02:01', py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, self.cert_random_address, hci_packets.LegacyAdvertisingProperties.ADV_IND, 155, 165) py_hci_adv.set_data(b'Im_A_Cert') Loading Loading @@ -275,7 +318,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): self.set_privacy_policy_static() advertising_handle = 0 py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, '0C:05:04:03:02:01', py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, self.cert_random_address, hci_packets.LegacyAdvertisingProperties.ADV_IND, 155, 165) py_hci_adv.set_data(b'Im_A_Cert') Loading @@ -285,7 +328,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass): # Start direct connection token = self.dut_le_acl_manager.initiate_connection( remote_addr=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')), address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')), type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)), is_direct=True) self.dut_le_acl_manager.complete_outgoing_connection(token) Loading