Loading system/gd/cert/py_le_acl_manager.py +71 −36 Original line number Diff line number Diff line Loading @@ -28,39 +28,40 @@ from hci.facade import le_acl_manager_facade_pb2 as le_acl_manager_facade class PyLeAclManagerAclConnection(IEventStream, Closable): def __init__(self, device, acl_stream, remote_addr, handle): def __init__(self, le_acl_manager, address, remote_addr, handle, event_stream): """ An abstract representation for an LE ACL connection in GD certification test :param device: The GD device :param acl_stream: The ACL stream for this connection :param le_acl_manager: The LeAclManager from this GD device :param address: The local device address :param remote_addr: Remote device address :param handle: Connection handle :param event_stream: The connection event stream for this connection """ self.device = device self.handle = handle self.le_acl_manager = le_acl_manager # todo enable filtering after sorting out handles # self.our_acl_stream = FilteringEventStream(acl_stream, None) self.our_acl_stream = acl_stream if remote_addr: self.connection_event_stream = EventStream(self.device.hci_le_acl_manager.CreateConnection(remote_addr)) else: self.connection_event_stream = None self.handle = handle self.connection_event_stream = event_stream self.acl_stream = EventStream( self.le_acl_manager.FetchAclData(le_acl_manager_facade.LeHandleMsg(handle=self.handle))) self.remote_address = remote_addr self.own_address = address self.disconnect_reason = None def close(self): safeClose(self.connection_event_stream) safeClose(self.acl_stream) def wait_for_connection_complete(self): connection_complete = HciCaptures.LeConnectionCompleteCapture() assertThat(self.connection_event_stream).emits(connection_complete) self.handle = connection_complete.get().GetConnectionHandle() def wait_for_disconnection_complete(self): disconnection_complete = HciCaptures.DisconnectionCompleteCapture() assertThat(self.connection_event_stream).emits(disconnection_complete) self.disconnect_reason = disconnection_complete.get().GetReason() def send(self, data): self.device.hci_le_acl_manager.SendAclData( le_acl_manager_facade.LeAclData(handle=self.handle, payload=bytes(data))) self.le_acl_manager.SendAclData(le_acl_manager_facade.LeAclData(handle=self.handle, payload=bytes(data))) def get_event_queue(self): return self.our_acl_stream.get_event_queue() return self.acl_stream.get_event_queue() class PyLeAclManager(Closable): Loading @@ -70,28 +71,62 @@ class PyLeAclManager(Closable): LE ACL Manager for GD Certification test :param device: The GD device """ self.device = device self.le_acl_manager = device.hci_le_acl_manager self.le_acl_stream = EventStream(self.device.hci_le_acl_manager.FetchAclData(empty_proto.Empty())) self.incoming_connection_stream = None self.incoming_connection_event_stream = None self.outgoing_connection_event_streams = {} self.active_connections = [] self.next_token = 1 def close(self): safeClose(self.le_acl_stream) safeClose(self.incoming_connection_stream) # temporary, until everyone is migrated def get_le_acl_stream(self): return self.le_acl_stream safeClose(self.incoming_connection_event_stream) for v in self.outgoing_connection_event_streams.values(): safeClose(v) for connection in self.active_connections: safeClose(connection) def listen_for_incoming_connections(self): self.incoming_connection_stream = EventStream( self.device.hci_le_acl_manager.FetchIncomingConnection(empty_proto.Empty())) assertThat(self.incoming_connection_event_stream).isNone() self.incoming_connection_event_stream = EventStream( self.le_acl_manager.FetchIncomingConnection(empty_proto.Empty())) def initiate_connection(self, remote_addr): return PyLeAclManagerAclConnection(self.device, self.le_acl_stream, remote_addr, None) def connect_to_remote(self, remote_addr): token = self.initiate_connection(remote_addr) return self.complete_outgoing_connection(token) def wait_for_connection(self): self.listen_for_incoming_connections() return self.complete_incoming_connection() def accept_connection(self): def initiate_connection(self, remote_addr): assertThat(self.next_token in self.outgoing_connection_event_streams).isFalse() self.outgoing_connection_event_streams[self.next_token] = EventStream( self.le_acl_manager.CreateConnection(remote_addr)) token = self.next_token self.next_token += 1 return token def complete_connection(self, event_stream): connection_complete = HciCaptures.LeConnectionCompleteCapture() assertThat(self.incoming_connection_stream).emits(connection_complete) handle = connection_complete.get().GetConnectionHandle() return PyLeAclManagerAclConnection(self.device, self.le_acl_stream, None, handle) assertThat(event_stream).emits(connection_complete) complete = connection_complete.get() handle = complete.GetConnectionHandle() remote = complete.GetPeerAddress() if complete.GetSubeventCode() == hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE: address = complete.GetLocalResolvablePrivateAddress() else: address = None connection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, handle, event_stream) self.active_connections.append(connection) return connection def complete_incoming_connection(self): assertThat(self.incoming_connection_event_stream).isNotNone() event_stream = self.incoming_connection_event_stream self.incoming_connection_event_stream = None return self.complete_connection(event_stream) def complete_outgoing_connection(self, token): assertThat(self.outgoing_connection_event_streams[token]).isNotNone() event_stream = self.outgoing_connection_event_streams.pop(token) return self.complete_connection(event_stream) system/gd/hci/cert/le_acl_manager_test.py +190 −270 Original line number Diff line number Diff line Loading @@ -14,11 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. from cert.closable import safeClose from cert.gd_base_test import GdBaseTestClass from cert.event_stream import EventStream from cert.truth import assertThat from cert.py_le_acl_manager import PyLeAclManager from google.protobuf import empty_pb2 as empty_proto from facade import rootservice_pb2 as facade_rootservice from facade import common_pb2 as common from hci.facade import le_acl_manager_facade_pb2 as le_acl_manager_facade from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade Loading @@ -33,6 +34,18 @@ class LeAclManagerTest(GdBaseTestClass): def setup_class(self): super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI') def setup_test(self): super().setup_test() self.dut_le_acl_manager = PyLeAclManager(self.dut) self.cert_hci_le_event_stream = EventStream(self.cert.hci.FetchLeSubevents(empty_proto.Empty())) self.cert_acl_data_stream = EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) def teardown_test(self): safeClose(self.cert_hci_le_event_stream) safeClose(self.cert_acl_data_stream) safeClose(self.dut_le_acl_manager) super().teardown_test() def set_privacy_policy_static(self): self.dut_address = b'd0:05:04:03:02:01' private_policy = le_initiator_address_facade.PrivacyPolicy( Loading Loading @@ -64,9 +77,7 @@ class LeAclManagerTest(GdBaseTestClass): def dut_connects(self, check_address): self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE) with EventStream(self.cert.hci.FetchLeSubevents(empty_proto.Empty())) as cert_hci_le_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventStream(self.dut.hci_le_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: self.register_for_le_event(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE) # Cert Advertises advertising_handle = 0 Loading Loading @@ -115,11 +126,10 @@ class LeAclManagerTest(GdBaseTestClass): self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]), True) with EventStream( self.dut.hci_le_acl_manager.CreateConnection( common.BluetoothAddressWithType( self.dut_le_acl = self.dut_le_acl_manager.connect_to_remote( remote_addr=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')), type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)))) as connection_event_stream: type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS))) # Cert gets ConnectionComplete with a handle and sends ACL data handle = 0xfff Loading @@ -145,30 +155,25 @@ class LeAclManagerTest(GdBaseTestClass): return True return False cert_hci_le_event_stream.assert_event_occurs(get_handle) cert_handle = handle self.cert_hci_le_event_stream.assert_event_occurs(get_handle) self.cert_handle = handle dut_address_from_complete = address if (check_address): if check_address: assertThat(dut_address_from_complete).isEqualTo(self.dut_address.decode()) self.enqueue_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, def send_receive_and_check(self): self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x19\x00\x07\x00SomeAclData from the Cert')) # DUT gets a connection complete event and sends and receives handle = 0xfff connection_event_stream.assert_event_occurs(get_handle) self.dut.hci_le_acl_manager.SendAclData( le_acl_manager_facade.LeAclData( handle=handle, payload=bytes(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT'))) cert_acl_data_stream.assert_event_occurs(lambda packet: b'SomeMoreAclData' in packet.data) acl_data_stream.assert_event_occurs(lambda packet: b'SomeAclData' in packet.payload) self.dut_le_acl.send(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT') self.cert_acl_data_stream.assert_event_occurs(lambda packet: b'SomeMoreAclData' in packet.data) assertThat(self.dut_le_acl).emits(lambda packet: b'SomeAclData' in packet.payload) def test_dut_connects(self): self.set_privacy_policy_static() self.dut_connects(check_address=True) self.send_receive_and_check() def test_dut_connects_resolvable_address(self): privacy_policy = le_initiator_address_facade.PrivacyPolicy( Loading @@ -178,6 +183,7 @@ class LeAclManagerTest(GdBaseTestClass): maximum_rotation_time=15 * 60 * 1000) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) self.dut_connects(check_address=False) self.send_receive_and_check() def test_dut_connects_non_resolvable_address(self): privacy_policy = le_initiator_address_facade.PrivacyPolicy( Loading @@ -187,26 +193,27 @@ class LeAclManagerTest(GdBaseTestClass): maximum_rotation_time=14 * 60 * 1000) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) self.dut_connects(check_address=False) self.send_receive_and_check() def test_dut_connects_public_address(self): self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress( le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS)) self.dut_connects(check_address=False) self.send_receive_and_check() def test_dut_connects_public_address_cancelled(self): self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress( le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS)) self.dut_connects(check_address=False) self.send_receive_and_check() def test_cert_connects(self): self.set_privacy_policy_static() self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE) with EventStream(self.cert.hci.FetchLeSubevents(empty_proto.Empty())) as cert_hci_le_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventStream(self.dut.hci_le_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \ EventStream(self.dut.hci_le_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: self.dut_le_acl_manager.listen_for_incoming_connections() # DUT Advertises gap_name = hci_packets.GapData() Loading Loading @@ -264,113 +271,26 @@ class LeAclManagerTest(GdBaseTestClass): return True return False cert_hci_le_event_stream.assert_event_occurs(get_handle) cert_handle = handle self.cert_hci_le_event_stream.assert_event_occurs(get_handle) self.cert_handle = handle self.enqueue_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x19\x00\x07\x00SomeAclData from the Cert')) # DUT gets a connection complete event and sends and receives handle = 0xfff incoming_connection_stream.assert_event_occurs(get_handle) self.dut.hci_le_acl_manager.SendAclData( le_acl_manager_facade.LeAclData( handle=handle, payload=bytes(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT'))) self.dut_le_acl = self.dut_le_acl_manager.complete_incoming_connection() cert_acl_data_stream.assert_event_occurs(lambda packet: b'SomeMoreAclData' in packet.data) acl_data_stream.assert_event_occurs(lambda packet: b'SomeAclData' in packet.payload) self.send_receive_and_check() def test_recombination_l2cap_packet(self): self.set_privacy_policy_static() self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE) with EventStream(self.cert.hci.FetchLeSubevents(empty_proto.Empty())) as cert_hci_le_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventStream(self.dut.hci_le_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: # Cert Advertises advertising_handle = 0 self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( advertising_handle, hci_packets.LegacyAdvertisingProperties.ADV_IND, 400, 450, 7, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, '00:00:00:00:00:00', hci_packets.AdvertisingFilterPolicy.ALL_DEVICES, 0xF8, 1, #SID hci_packets.Enable.DISABLED # Scan request notification ), True) self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'), True) gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_A_Cert')) self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingDataBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]), True) gap_short_name = hci_packets.GapData() gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME gap_short_name.data = list(bytes(b'Im_A_C')) self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingScanResponseBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]), True) enabled_set = hci_packets.EnabledSet() enabled_set.advertising_handle = advertising_handle enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]), True) with EventStream( self.dut.hci_le_acl_manager.CreateConnection( common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')), type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)))) as connection_event_stream: # Cert gets ConnectionComplete with a handle and sends ACL data handle = 0xfff def get_handle(packet): packet_bytes = packet.event nonlocal handle if b'\x3e\x13\x01\x00' in packet_bytes: cc_view = hci_packets.LeConnectionCompleteView( hci_packets.LeMetaEventView( hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet_bytes))))) handle = cc_view.GetConnectionHandle() return True if b'\x3e\x13\x0A\x00' in packet_bytes: cc_view = hci_packets.LeEnhancedConnectionCompleteView( hci_packets.LeMetaEventView( hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet_bytes))))) handle = cc_view.GetConnectionHandle() return True return False cert_hci_le_event_stream.assert_event_occurs(get_handle) cert_handle = handle # DUT gets a connection complete event connection_event_stream.assert_event_occurs(get_handle) self.dut_connects(check_address=True) self.enqueue_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x06\x00\x07\x00Hello')) self.enqueue_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!')) acl_data_stream.assert_event_occurs(lambda packet: b'Hello!' in packet.payload) assertThat(self.dut_le_acl).emits(lambda packet: b'Hello!' in packet.payload) system/gd/hci/facade/le_acl_manager_facade.cc +98 −42 File changed.Preview size limit exceeded, changes collapsed. Show changes system/gd/hci/facade/le_acl_manager_facade.proto +6 −1 Original line number Diff line number Diff line Loading @@ -9,8 +9,9 @@ service LeAclManagerFacade { rpc CreateConnection(bluetooth.facade.BluetoothAddressWithType) returns (stream LeConnectionEvent) {} rpc CancelConnection(bluetooth.facade.BluetoothAddressWithType) returns (google.protobuf.Empty) {} rpc Disconnect(LeHandleMsg) returns (google.protobuf.Empty) {} rpc ConnectionCommand(LeConnectionCommandMsg) returns (google.protobuf.Empty) {} rpc SendAclData(LeAclData) returns (google.protobuf.Empty) {} rpc FetchAclData(google.protobuf.Empty) returns (stream LeAclData) {} rpc FetchAclData(LeHandleMsg) returns (stream LeAclData) {} rpc FetchIncomingConnection(google.protobuf.Empty) returns (stream LeConnectionEvent) {} } Loading @@ -22,6 +23,10 @@ message LeConnectionEvent { bytes event = 1; } message LeConnectionCommandMsg { bytes packet = 1; } message LeAclData { uint32 handle = 1; bytes payload = 2; Loading system/gd/l2cap/le/cert/cert_le_l2cap.py +3 −5 Original line number Diff line number Diff line Loading @@ -90,15 +90,13 @@ class CertLeL2cap(Closable): safeClose(self._le_acl) def connect_le_acl(self, remote_addr): self._le_acl = self._le_acl_manager.initiate_connection(remote_addr) self._le_acl.wait_for_connection_complete() self._le_acl = self._le_acl_manager.connect_to_remote(remote_addr) self.control_channel = CertLeL2capChannel( self._device, 5, 5, self._get_acl_stream(), self._le_acl, control_channel=None) self._get_acl_stream().register_callback(self._handle_control_packet) def wait_for_connection(self): self._le_acl_manager.listen_for_incoming_connections() self._le_acl = self._le_acl_manager.accept_connection() self._le_acl = self._le_acl_manager.wait_for_connection() self.control_channel = CertLeL2capChannel( self._device, 5, 5, self._get_acl_stream(), self._le_acl, control_channel=None) self._get_acl_stream().register_callback(self._handle_control_packet) Loading Loading @@ -171,7 +169,7 @@ class CertLeL2cap(Closable): return self.control_channel def _get_acl_stream(self): return self._le_acl_manager.get_le_acl_stream() return self._le_acl.acl_stream def _on_disconnection_request_default(self, request): disconnection_request = l2cap_packets.LeDisconnectionRequestView(request) Loading Loading
system/gd/cert/py_le_acl_manager.py +71 −36 Original line number Diff line number Diff line Loading @@ -28,39 +28,40 @@ from hci.facade import le_acl_manager_facade_pb2 as le_acl_manager_facade class PyLeAclManagerAclConnection(IEventStream, Closable): def __init__(self, device, acl_stream, remote_addr, handle): def __init__(self, le_acl_manager, address, remote_addr, handle, event_stream): """ An abstract representation for an LE ACL connection in GD certification test :param device: The GD device :param acl_stream: The ACL stream for this connection :param le_acl_manager: The LeAclManager from this GD device :param address: The local device address :param remote_addr: Remote device address :param handle: Connection handle :param event_stream: The connection event stream for this connection """ self.device = device self.handle = handle self.le_acl_manager = le_acl_manager # todo enable filtering after sorting out handles # self.our_acl_stream = FilteringEventStream(acl_stream, None) self.our_acl_stream = acl_stream if remote_addr: self.connection_event_stream = EventStream(self.device.hci_le_acl_manager.CreateConnection(remote_addr)) else: self.connection_event_stream = None self.handle = handle self.connection_event_stream = event_stream self.acl_stream = EventStream( self.le_acl_manager.FetchAclData(le_acl_manager_facade.LeHandleMsg(handle=self.handle))) self.remote_address = remote_addr self.own_address = address self.disconnect_reason = None def close(self): safeClose(self.connection_event_stream) safeClose(self.acl_stream) def wait_for_connection_complete(self): connection_complete = HciCaptures.LeConnectionCompleteCapture() assertThat(self.connection_event_stream).emits(connection_complete) self.handle = connection_complete.get().GetConnectionHandle() def wait_for_disconnection_complete(self): disconnection_complete = HciCaptures.DisconnectionCompleteCapture() assertThat(self.connection_event_stream).emits(disconnection_complete) self.disconnect_reason = disconnection_complete.get().GetReason() def send(self, data): self.device.hci_le_acl_manager.SendAclData( le_acl_manager_facade.LeAclData(handle=self.handle, payload=bytes(data))) self.le_acl_manager.SendAclData(le_acl_manager_facade.LeAclData(handle=self.handle, payload=bytes(data))) def get_event_queue(self): return self.our_acl_stream.get_event_queue() return self.acl_stream.get_event_queue() class PyLeAclManager(Closable): Loading @@ -70,28 +71,62 @@ class PyLeAclManager(Closable): LE ACL Manager for GD Certification test :param device: The GD device """ self.device = device self.le_acl_manager = device.hci_le_acl_manager self.le_acl_stream = EventStream(self.device.hci_le_acl_manager.FetchAclData(empty_proto.Empty())) self.incoming_connection_stream = None self.incoming_connection_event_stream = None self.outgoing_connection_event_streams = {} self.active_connections = [] self.next_token = 1 def close(self): safeClose(self.le_acl_stream) safeClose(self.incoming_connection_stream) # temporary, until everyone is migrated def get_le_acl_stream(self): return self.le_acl_stream safeClose(self.incoming_connection_event_stream) for v in self.outgoing_connection_event_streams.values(): safeClose(v) for connection in self.active_connections: safeClose(connection) def listen_for_incoming_connections(self): self.incoming_connection_stream = EventStream( self.device.hci_le_acl_manager.FetchIncomingConnection(empty_proto.Empty())) assertThat(self.incoming_connection_event_stream).isNone() self.incoming_connection_event_stream = EventStream( self.le_acl_manager.FetchIncomingConnection(empty_proto.Empty())) def initiate_connection(self, remote_addr): return PyLeAclManagerAclConnection(self.device, self.le_acl_stream, remote_addr, None) def connect_to_remote(self, remote_addr): token = self.initiate_connection(remote_addr) return self.complete_outgoing_connection(token) def wait_for_connection(self): self.listen_for_incoming_connections() return self.complete_incoming_connection() def accept_connection(self): def initiate_connection(self, remote_addr): assertThat(self.next_token in self.outgoing_connection_event_streams).isFalse() self.outgoing_connection_event_streams[self.next_token] = EventStream( self.le_acl_manager.CreateConnection(remote_addr)) token = self.next_token self.next_token += 1 return token def complete_connection(self, event_stream): connection_complete = HciCaptures.LeConnectionCompleteCapture() assertThat(self.incoming_connection_stream).emits(connection_complete) handle = connection_complete.get().GetConnectionHandle() return PyLeAclManagerAclConnection(self.device, self.le_acl_stream, None, handle) assertThat(event_stream).emits(connection_complete) complete = connection_complete.get() handle = complete.GetConnectionHandle() remote = complete.GetPeerAddress() if complete.GetSubeventCode() == hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE: address = complete.GetLocalResolvablePrivateAddress() else: address = None connection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, handle, event_stream) self.active_connections.append(connection) return connection def complete_incoming_connection(self): assertThat(self.incoming_connection_event_stream).isNotNone() event_stream = self.incoming_connection_event_stream self.incoming_connection_event_stream = None return self.complete_connection(event_stream) def complete_outgoing_connection(self, token): assertThat(self.outgoing_connection_event_streams[token]).isNotNone() event_stream = self.outgoing_connection_event_streams.pop(token) return self.complete_connection(event_stream)
system/gd/hci/cert/le_acl_manager_test.py +190 −270 Original line number Diff line number Diff line Loading @@ -14,11 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. from cert.closable import safeClose from cert.gd_base_test import GdBaseTestClass from cert.event_stream import EventStream from cert.truth import assertThat from cert.py_le_acl_manager import PyLeAclManager from google.protobuf import empty_pb2 as empty_proto from facade import rootservice_pb2 as facade_rootservice from facade import common_pb2 as common from hci.facade import le_acl_manager_facade_pb2 as le_acl_manager_facade from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade Loading @@ -33,6 +34,18 @@ class LeAclManagerTest(GdBaseTestClass): def setup_class(self): super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI') def setup_test(self): super().setup_test() self.dut_le_acl_manager = PyLeAclManager(self.dut) self.cert_hci_le_event_stream = EventStream(self.cert.hci.FetchLeSubevents(empty_proto.Empty())) self.cert_acl_data_stream = EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) def teardown_test(self): safeClose(self.cert_hci_le_event_stream) safeClose(self.cert_acl_data_stream) safeClose(self.dut_le_acl_manager) super().teardown_test() def set_privacy_policy_static(self): self.dut_address = b'd0:05:04:03:02:01' private_policy = le_initiator_address_facade.PrivacyPolicy( Loading Loading @@ -64,9 +77,7 @@ class LeAclManagerTest(GdBaseTestClass): def dut_connects(self, check_address): self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE) with EventStream(self.cert.hci.FetchLeSubevents(empty_proto.Empty())) as cert_hci_le_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventStream(self.dut.hci_le_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: self.register_for_le_event(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE) # Cert Advertises advertising_handle = 0 Loading Loading @@ -115,11 +126,10 @@ class LeAclManagerTest(GdBaseTestClass): self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]), True) with EventStream( self.dut.hci_le_acl_manager.CreateConnection( common.BluetoothAddressWithType( self.dut_le_acl = self.dut_le_acl_manager.connect_to_remote( remote_addr=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')), type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)))) as connection_event_stream: type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS))) # Cert gets ConnectionComplete with a handle and sends ACL data handle = 0xfff Loading @@ -145,30 +155,25 @@ class LeAclManagerTest(GdBaseTestClass): return True return False cert_hci_le_event_stream.assert_event_occurs(get_handle) cert_handle = handle self.cert_hci_le_event_stream.assert_event_occurs(get_handle) self.cert_handle = handle dut_address_from_complete = address if (check_address): if check_address: assertThat(dut_address_from_complete).isEqualTo(self.dut_address.decode()) self.enqueue_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, def send_receive_and_check(self): self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x19\x00\x07\x00SomeAclData from the Cert')) # DUT gets a connection complete event and sends and receives handle = 0xfff connection_event_stream.assert_event_occurs(get_handle) self.dut.hci_le_acl_manager.SendAclData( le_acl_manager_facade.LeAclData( handle=handle, payload=bytes(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT'))) cert_acl_data_stream.assert_event_occurs(lambda packet: b'SomeMoreAclData' in packet.data) acl_data_stream.assert_event_occurs(lambda packet: b'SomeAclData' in packet.payload) self.dut_le_acl.send(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT') self.cert_acl_data_stream.assert_event_occurs(lambda packet: b'SomeMoreAclData' in packet.data) assertThat(self.dut_le_acl).emits(lambda packet: b'SomeAclData' in packet.payload) def test_dut_connects(self): self.set_privacy_policy_static() self.dut_connects(check_address=True) self.send_receive_and_check() def test_dut_connects_resolvable_address(self): privacy_policy = le_initiator_address_facade.PrivacyPolicy( Loading @@ -178,6 +183,7 @@ class LeAclManagerTest(GdBaseTestClass): maximum_rotation_time=15 * 60 * 1000) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) self.dut_connects(check_address=False) self.send_receive_and_check() def test_dut_connects_non_resolvable_address(self): privacy_policy = le_initiator_address_facade.PrivacyPolicy( Loading @@ -187,26 +193,27 @@ class LeAclManagerTest(GdBaseTestClass): maximum_rotation_time=14 * 60 * 1000) self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) self.dut_connects(check_address=False) self.send_receive_and_check() def test_dut_connects_public_address(self): self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress( le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS)) self.dut_connects(check_address=False) self.send_receive_and_check() def test_dut_connects_public_address_cancelled(self): self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress( le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS)) self.dut_connects(check_address=False) self.send_receive_and_check() def test_cert_connects(self): self.set_privacy_policy_static() self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE) with EventStream(self.cert.hci.FetchLeSubevents(empty_proto.Empty())) as cert_hci_le_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventStream(self.dut.hci_le_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \ EventStream(self.dut.hci_le_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: self.dut_le_acl_manager.listen_for_incoming_connections() # DUT Advertises gap_name = hci_packets.GapData() Loading Loading @@ -264,113 +271,26 @@ class LeAclManagerTest(GdBaseTestClass): return True return False cert_hci_le_event_stream.assert_event_occurs(get_handle) cert_handle = handle self.cert_hci_le_event_stream.assert_event_occurs(get_handle) self.cert_handle = handle self.enqueue_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x19\x00\x07\x00SomeAclData from the Cert')) # DUT gets a connection complete event and sends and receives handle = 0xfff incoming_connection_stream.assert_event_occurs(get_handle) self.dut.hci_le_acl_manager.SendAclData( le_acl_manager_facade.LeAclData( handle=handle, payload=bytes(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT'))) self.dut_le_acl = self.dut_le_acl_manager.complete_incoming_connection() cert_acl_data_stream.assert_event_occurs(lambda packet: b'SomeMoreAclData' in packet.data) acl_data_stream.assert_event_occurs(lambda packet: b'SomeAclData' in packet.payload) self.send_receive_and_check() def test_recombination_l2cap_packet(self): self.set_privacy_policy_static() self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE) with EventStream(self.cert.hci.FetchLeSubevents(empty_proto.Empty())) as cert_hci_le_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventStream(self.dut.hci_le_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: # Cert Advertises advertising_handle = 0 self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( advertising_handle, hci_packets.LegacyAdvertisingProperties.ADV_IND, 400, 450, 7, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, '00:00:00:00:00:00', hci_packets.AdvertisingFilterPolicy.ALL_DEVICES, 0xF8, 1, #SID hci_packets.Enable.DISABLED # Scan request notification ), True) self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'), True) gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(b'Im_A_Cert')) self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingDataBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]), True) gap_short_name = hci_packets.GapData() gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME gap_short_name.data = list(bytes(b'Im_A_C')) self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingScanResponseBuilder( advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]), True) enabled_set = hci_packets.EnabledSet() enabled_set.advertising_handle = advertising_handle enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.enqueue_hci_command( hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]), True) with EventStream( self.dut.hci_le_acl_manager.CreateConnection( common.BluetoothAddressWithType( address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')), type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)))) as connection_event_stream: # Cert gets ConnectionComplete with a handle and sends ACL data handle = 0xfff def get_handle(packet): packet_bytes = packet.event nonlocal handle if b'\x3e\x13\x01\x00' in packet_bytes: cc_view = hci_packets.LeConnectionCompleteView( hci_packets.LeMetaEventView( hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet_bytes))))) handle = cc_view.GetConnectionHandle() return True if b'\x3e\x13\x0A\x00' in packet_bytes: cc_view = hci_packets.LeEnhancedConnectionCompleteView( hci_packets.LeMetaEventView( hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet_bytes))))) handle = cc_view.GetConnectionHandle() return True return False cert_hci_le_event_stream.assert_event_occurs(get_handle) cert_handle = handle # DUT gets a connection complete event connection_event_stream.assert_event_occurs(get_handle) self.dut_connects(check_address=True) self.enqueue_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'\x06\x00\x07\x00Hello')) self.enqueue_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!')) acl_data_stream.assert_event_occurs(lambda packet: b'Hello!' in packet.payload) assertThat(self.dut_le_acl).emits(lambda packet: b'Hello!' in packet.payload)
system/gd/hci/facade/le_acl_manager_facade.cc +98 −42 File changed.Preview size limit exceeded, changes collapsed. Show changes
system/gd/hci/facade/le_acl_manager_facade.proto +6 −1 Original line number Diff line number Diff line Loading @@ -9,8 +9,9 @@ service LeAclManagerFacade { rpc CreateConnection(bluetooth.facade.BluetoothAddressWithType) returns (stream LeConnectionEvent) {} rpc CancelConnection(bluetooth.facade.BluetoothAddressWithType) returns (google.protobuf.Empty) {} rpc Disconnect(LeHandleMsg) returns (google.protobuf.Empty) {} rpc ConnectionCommand(LeConnectionCommandMsg) returns (google.protobuf.Empty) {} rpc SendAclData(LeAclData) returns (google.protobuf.Empty) {} rpc FetchAclData(google.protobuf.Empty) returns (stream LeAclData) {} rpc FetchAclData(LeHandleMsg) returns (stream LeAclData) {} rpc FetchIncomingConnection(google.protobuf.Empty) returns (stream LeConnectionEvent) {} } Loading @@ -22,6 +23,10 @@ message LeConnectionEvent { bytes event = 1; } message LeConnectionCommandMsg { bytes packet = 1; } message LeAclData { uint32 handle = 1; bytes payload = 2; Loading
system/gd/l2cap/le/cert/cert_le_l2cap.py +3 −5 Original line number Diff line number Diff line Loading @@ -90,15 +90,13 @@ class CertLeL2cap(Closable): safeClose(self._le_acl) def connect_le_acl(self, remote_addr): self._le_acl = self._le_acl_manager.initiate_connection(remote_addr) self._le_acl.wait_for_connection_complete() self._le_acl = self._le_acl_manager.connect_to_remote(remote_addr) self.control_channel = CertLeL2capChannel( self._device, 5, 5, self._get_acl_stream(), self._le_acl, control_channel=None) self._get_acl_stream().register_callback(self._handle_control_packet) def wait_for_connection(self): self._le_acl_manager.listen_for_incoming_connections() self._le_acl = self._le_acl_manager.accept_connection() self._le_acl = self._le_acl_manager.wait_for_connection() self.control_channel = CertLeL2capChannel( self._device, 5, 5, self._get_acl_stream(), self._le_acl, control_channel=None) self._get_acl_stream().register_callback(self._handle_control_packet) Loading Loading @@ -171,7 +169,7 @@ class CertLeL2cap(Closable): return self.control_channel def _get_acl_stream(self): return self._le_acl_manager.get_le_acl_stream() return self._le_acl.acl_stream def _on_disconnection_request_default(self, request): disconnection_request = l2cap_packets.LeDisconnectionRequestView(request) Loading