Loading system/audio_a2dp_hw/Android.bp +4 −1 Original line number Diff line number Diff line Loading @@ -52,7 +52,10 @@ cc_library_static { cc_test { name: "net_test_audio_a2dp_hw", test_suites: ["device-tests"], defaults: ["audio_a2dp_hw_defaults"], defaults: [ "audio_a2dp_hw_defaults", "mts_defaults", ], srcs: [ "test/audio_a2dp_hw_test.cc", ], Loading system/audio_hearing_aid_hw/Android.bp +4 −1 Original line number Diff line number Diff line Loading @@ -40,7 +40,10 @@ cc_library { cc_test { name: "net_test_audio_hearing_aid_hw", test_suites: ["device-tests"], defaults: ["audio_hearing_aid_hw_defaults"], defaults: [ "audio_hearing_aid_hw_defaults", "mts_defaults", ], srcs: [ "test/audio_hearing_aid_hw_test.cc", ], Loading system/blueberry/facade/hci/le_acl_manager_facade.proto +13 −3 Original line number Diff line number Diff line Loading @@ -6,15 +6,14 @@ import "google/protobuf/empty.proto"; import "blueberry/facade/common.proto"; service LeAclManagerFacade { rpc CreateConnection(blueberry.facade.BluetoothAddressWithType) returns (stream LeConnectionEvent) {} rpc CreateBackgroundAndDirectConnection(blueberry.facade.BluetoothAddressWithType) returns (stream LeConnectionEvent) {} rpc CreateConnection(CreateConnectionMsg) returns (stream LeConnectionEvent) {} rpc CancelConnection(blueberry.facade.BluetoothAddressWithType) returns (google.protobuf.Empty) {} rpc Disconnect(LeHandleMsg) returns (google.protobuf.Empty) {} rpc ConnectionCommand(LeConnectionCommandMsg) returns (google.protobuf.Empty) {} rpc SendAclData(LeAclData) returns (google.protobuf.Empty) {} rpc FetchAclData(LeHandleMsg) returns (stream LeAclData) {} rpc FetchIncomingConnection(google.protobuf.Empty) returns (stream LeConnectionEvent) {} rpc AddDeviceToResolvingList(IrkMsg) returns (google.protobuf.Empty) {} } message LeHandleMsg { Loading @@ -34,3 +33,14 @@ message LeAclData { bytes payload = 2; } message CreateConnectionMsg { blueberry.facade.BluetoothAddressWithType peer_address = 1; bool is_direct = 2; } message IrkMsg { blueberry.facade.BluetoothAddressWithType peer = 1; bytes peer_irk = 2; bytes local_irk = 3; } system/blueberry/tests/gd/cert/py_hci.py +83 −2 Original line number Diff line number Diff line Loading @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from datetime import timedelta from google.protobuf import empty_pb2 as empty_proto from blueberry.tests.gd.cert.event_stream import EventStream Loading Loading @@ -70,6 +71,43 @@ class PyHciAclConnection(IEventStream): return self.our_acl_stream.get_event_queue() class PyHciLeAclConnection(IEventStream): def __init__(self, handle, acl_stream, device, peer, peer_type, peer_resolvable, local_resolvable): self.handle = int(handle) self.device = device self.peer = peer self.peer_type = peer_type self.peer_resolvable = peer_resolvable self.local_resolvable = local_resolvable # todo, handle we got is 0, so doesn't match - fix before enabling filtering self.our_acl_stream = FilteringEventStream(acl_stream, None) def send(self, pb_flag, b_flag, data): acl = AclBuilder(self.handle, pb_flag, b_flag, RawBuilder(data)) self.device.hci.SendAcl(common.Data(payload=bytes(acl.Serialize()))) def send_first(self, data): self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data)) def send_continuing(self, data): self.send(hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data)) def get_event_queue(self): return self.our_acl_stream.get_event_queue() def local_resolvable_address(self): return self.local_resolvable def peer_resolvable_address(self): return self.peer_resolvable def peer_address(self): return self.peer class PyHciAdvertisement(object): def __init__(self, handle, py_hci): Loading @@ -79,7 +117,7 @@ class PyHciAdvertisement(object): def set_data(self, complete_name): data = GapData() data.data_type = GapDataType.COMPLETE_LOCAL_NAME data.data = list(bytes(complete_name)) data.data = list(complete_name) self.py_hci.send_command( LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) Loading @@ -87,7 +125,7 @@ class PyHciAdvertisement(object): def set_scan_response(self, shortened_name): data = GapData() data.data_type = GapDataType.SHORTENED_LOCAL_NAME data.data = list(bytes(shortened_name)) data.data = list(shortened_name) self.py_hci.send_command( LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) Loading Loading @@ -121,6 +159,7 @@ class PyHci(Closable): self.register_for_events(hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_REQUEST, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) self.register_for_le_events(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE) self.acl_stream = EventStream(self.device.hci.StreamAcl(empty_proto.Empty())) def close(self): Loading Loading @@ -187,6 +226,48 @@ class PyHci(Closable): raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__) return PyHciAclConnection(handle, self.acl_stream, self.device) def set_random_le_address(self, addr): self.send_command(hci_packets.LeSetRandomAddressBuilder(addr)) assertThat(self.event_stream).emits(HciMatchers.CommandComplete(OpCode.LE_SET_RANDOM_ADDRESS)) def initiate_le_connection(self, remote_addr): phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() phy_scan_params.scan_interval = 0x60 phy_scan_params.scan_window = 0x30 phy_scan_params.conn_interval_min = 0x18 phy_scan_params.conn_interval_max = 0x28 phy_scan_params.conn_latency = 0 phy_scan_params.supervision_timeout = 0x1f4 phy_scan_params.min_ce_length = 0 phy_scan_params.max_ce_length = 0 self.send_command( hci_packets.LeExtendedCreateConnectionBuilder( hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params])) assertThat(self.event_stream).emits(HciMatchers.CommandStatus(OpCode.LE_EXTENDED_CREATE_CONNECTION)) def incoming_le_connection(self): connection_complete = HciCaptures.LeConnectionCompleteCapture() assertThat(self.le_event_stream).emits(connection_complete) handle = connection_complete.get().GetConnectionHandle() peer = connection_complete.get().GetPeerAddress() peer_type = connection_complete.get().GetPeerAddressType() local_resolvable = connection_complete.get().GetLocalResolvablePrivateAddress() peer_resolvable = connection_complete.get().GetPeerResolvablePrivateAddress() if self.acl_stream is None: raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__) return PyHciLeAclConnection(handle, self.acl_stream, self.device, peer, peer_type, peer_resolvable, local_resolvable) def incoming_le_connection_fails(self): connection_complete = HciCaptures.LeConnectionCompleteCapture() assertThat(self.le_event_stream).emitsNone(connection_complete, timeout=timedelta(seconds=5)) def add_device_to_resolving_list(self, peer_address_type, peer_address, peer_irk, local_irk): self.send_command( hci_packets.LeAddDeviceToResolvingListBuilder(peer_address_type, peer_address, peer_irk, local_irk)) def create_advertisement(self, handle, own_address, Loading system/blueberry/tests/gd/cert/py_le_acl_manager.py +9 −12 Original line number Diff line number Diff line Loading @@ -29,12 +29,13 @@ from blueberry.facade.hci import le_acl_manager_facade_pb2 as le_acl_manager_fac class PyLeAclManagerAclConnection(IEventStream, Closable): def __init__(self, le_acl_manager, address, remote_addr, handle, event_stream): def __init__(self, le_acl_manager, address, remote_addr, remote_addr_type, handle, event_stream): """ An abstract representation for an LE ACL connection in GD certification test :param le_acl_manager: The LeAclManager from this GD device :param address: The local device address :param remote_addr: Remote device address :param remote_addr_type: Remote device address type :param handle: Connection handle :param event_stream: The connection event stream for this connection """ Loading @@ -46,6 +47,7 @@ class PyLeAclManagerAclConnection(IEventStream, Closable): self.acl_stream = EventStream( self.le_acl_manager.FetchAclData(le_acl_manager_facade.LeHandleMsg(handle=self.handle))) self.remote_address = remote_addr self.remote_address_type = remote_addr_type self.own_address = address self.disconnect_reason = None Loading Loading @@ -113,18 +115,11 @@ class PyLeAclManager(Closable): safeClose(pair[0]) self.le_acl_manager.CancelConnection(pair[1]) def initiate_connection(self, remote_addr): def initiate_connection(self, remote_addr, is_direct=True): assertThat(self.next_token in self.outgoing_connection_event_streams).isFalse() create_connection_msg = le_acl_manager_facade.CreateConnectionMsg(peer_address=remote_addr, is_direct=is_direct) self.outgoing_connection_event_streams[self.next_token] = EventStream( self.le_acl_manager.CreateConnection(remote_addr)), remote_addr token = self.next_token self.next_token += 1 return token def initiate_background_and_direct_connection(self, remote_addr): assertThat(self.next_token in self.outgoing_connection_event_streams).isFalse() self.outgoing_connection_event_streams[self.next_token] = EventStream( self.le_acl_manager.CreateBackgroundAndDirectConnection(remote_addr)), remote_addr self.le_acl_manager.CreateConnection(create_connection_msg)), remote_addr token = self.next_token self.next_token += 1 return token Loading @@ -135,11 +130,13 @@ class PyLeAclManager(Closable): complete = connection_complete.get() handle = complete.GetConnectionHandle() remote = complete.GetPeerAddress() remote_address_type = complete.GetPeerAddressType() if complete.GetSubeventCode() == hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE: address = complete.GetLocalResolvablePrivateAddress() else: address = None connection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, handle, event_stream) connection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, remote_address_type, handle, event_stream) self.active_connections.append(connection) return connection Loading Loading
system/audio_a2dp_hw/Android.bp +4 −1 Original line number Diff line number Diff line Loading @@ -52,7 +52,10 @@ cc_library_static { cc_test { name: "net_test_audio_a2dp_hw", test_suites: ["device-tests"], defaults: ["audio_a2dp_hw_defaults"], defaults: [ "audio_a2dp_hw_defaults", "mts_defaults", ], srcs: [ "test/audio_a2dp_hw_test.cc", ], Loading
system/audio_hearing_aid_hw/Android.bp +4 −1 Original line number Diff line number Diff line Loading @@ -40,7 +40,10 @@ cc_library { cc_test { name: "net_test_audio_hearing_aid_hw", test_suites: ["device-tests"], defaults: ["audio_hearing_aid_hw_defaults"], defaults: [ "audio_hearing_aid_hw_defaults", "mts_defaults", ], srcs: [ "test/audio_hearing_aid_hw_test.cc", ], Loading
system/blueberry/facade/hci/le_acl_manager_facade.proto +13 −3 Original line number Diff line number Diff line Loading @@ -6,15 +6,14 @@ import "google/protobuf/empty.proto"; import "blueberry/facade/common.proto"; service LeAclManagerFacade { rpc CreateConnection(blueberry.facade.BluetoothAddressWithType) returns (stream LeConnectionEvent) {} rpc CreateBackgroundAndDirectConnection(blueberry.facade.BluetoothAddressWithType) returns (stream LeConnectionEvent) {} rpc CreateConnection(CreateConnectionMsg) returns (stream LeConnectionEvent) {} rpc CancelConnection(blueberry.facade.BluetoothAddressWithType) returns (google.protobuf.Empty) {} rpc Disconnect(LeHandleMsg) returns (google.protobuf.Empty) {} rpc ConnectionCommand(LeConnectionCommandMsg) returns (google.protobuf.Empty) {} rpc SendAclData(LeAclData) returns (google.protobuf.Empty) {} rpc FetchAclData(LeHandleMsg) returns (stream LeAclData) {} rpc FetchIncomingConnection(google.protobuf.Empty) returns (stream LeConnectionEvent) {} rpc AddDeviceToResolvingList(IrkMsg) returns (google.protobuf.Empty) {} } message LeHandleMsg { Loading @@ -34,3 +33,14 @@ message LeAclData { bytes payload = 2; } message CreateConnectionMsg { blueberry.facade.BluetoothAddressWithType peer_address = 1; bool is_direct = 2; } message IrkMsg { blueberry.facade.BluetoothAddressWithType peer = 1; bytes peer_irk = 2; bytes local_irk = 3; }
system/blueberry/tests/gd/cert/py_hci.py +83 −2 Original line number Diff line number Diff line Loading @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from datetime import timedelta from google.protobuf import empty_pb2 as empty_proto from blueberry.tests.gd.cert.event_stream import EventStream Loading Loading @@ -70,6 +71,43 @@ class PyHciAclConnection(IEventStream): return self.our_acl_stream.get_event_queue() class PyHciLeAclConnection(IEventStream): def __init__(self, handle, acl_stream, device, peer, peer_type, peer_resolvable, local_resolvable): self.handle = int(handle) self.device = device self.peer = peer self.peer_type = peer_type self.peer_resolvable = peer_resolvable self.local_resolvable = local_resolvable # todo, handle we got is 0, so doesn't match - fix before enabling filtering self.our_acl_stream = FilteringEventStream(acl_stream, None) def send(self, pb_flag, b_flag, data): acl = AclBuilder(self.handle, pb_flag, b_flag, RawBuilder(data)) self.device.hci.SendAcl(common.Data(payload=bytes(acl.Serialize()))) def send_first(self, data): self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data)) def send_continuing(self, data): self.send(hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data)) def get_event_queue(self): return self.our_acl_stream.get_event_queue() def local_resolvable_address(self): return self.local_resolvable def peer_resolvable_address(self): return self.peer_resolvable def peer_address(self): return self.peer class PyHciAdvertisement(object): def __init__(self, handle, py_hci): Loading @@ -79,7 +117,7 @@ class PyHciAdvertisement(object): def set_data(self, complete_name): data = GapData() data.data_type = GapDataType.COMPLETE_LOCAL_NAME data.data = list(bytes(complete_name)) data.data = list(complete_name) self.py_hci.send_command( LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) Loading @@ -87,7 +125,7 @@ class PyHciAdvertisement(object): def set_scan_response(self, shortened_name): data = GapData() data.data_type = GapDataType.SHORTENED_LOCAL_NAME data.data = list(bytes(shortened_name)) data.data = list(shortened_name) self.py_hci.send_command( LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) Loading Loading @@ -121,6 +159,7 @@ class PyHci(Closable): self.register_for_events(hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_REQUEST, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) self.register_for_le_events(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE) self.acl_stream = EventStream(self.device.hci.StreamAcl(empty_proto.Empty())) def close(self): Loading Loading @@ -187,6 +226,48 @@ class PyHci(Closable): raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__) return PyHciAclConnection(handle, self.acl_stream, self.device) def set_random_le_address(self, addr): self.send_command(hci_packets.LeSetRandomAddressBuilder(addr)) assertThat(self.event_stream).emits(HciMatchers.CommandComplete(OpCode.LE_SET_RANDOM_ADDRESS)) def initiate_le_connection(self, remote_addr): phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() phy_scan_params.scan_interval = 0x60 phy_scan_params.scan_window = 0x30 phy_scan_params.conn_interval_min = 0x18 phy_scan_params.conn_interval_max = 0x28 phy_scan_params.conn_latency = 0 phy_scan_params.supervision_timeout = 0x1f4 phy_scan_params.min_ce_length = 0 phy_scan_params.max_ce_length = 0 self.send_command( hci_packets.LeExtendedCreateConnectionBuilder( hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params])) assertThat(self.event_stream).emits(HciMatchers.CommandStatus(OpCode.LE_EXTENDED_CREATE_CONNECTION)) def incoming_le_connection(self): connection_complete = HciCaptures.LeConnectionCompleteCapture() assertThat(self.le_event_stream).emits(connection_complete) handle = connection_complete.get().GetConnectionHandle() peer = connection_complete.get().GetPeerAddress() peer_type = connection_complete.get().GetPeerAddressType() local_resolvable = connection_complete.get().GetLocalResolvablePrivateAddress() peer_resolvable = connection_complete.get().GetPeerResolvablePrivateAddress() if self.acl_stream is None: raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__) return PyHciLeAclConnection(handle, self.acl_stream, self.device, peer, peer_type, peer_resolvable, local_resolvable) def incoming_le_connection_fails(self): connection_complete = HciCaptures.LeConnectionCompleteCapture() assertThat(self.le_event_stream).emitsNone(connection_complete, timeout=timedelta(seconds=5)) def add_device_to_resolving_list(self, peer_address_type, peer_address, peer_irk, local_irk): self.send_command( hci_packets.LeAddDeviceToResolvingListBuilder(peer_address_type, peer_address, peer_irk, local_irk)) def create_advertisement(self, handle, own_address, Loading
system/blueberry/tests/gd/cert/py_le_acl_manager.py +9 −12 Original line number Diff line number Diff line Loading @@ -29,12 +29,13 @@ from blueberry.facade.hci import le_acl_manager_facade_pb2 as le_acl_manager_fac class PyLeAclManagerAclConnection(IEventStream, Closable): def __init__(self, le_acl_manager, address, remote_addr, handle, event_stream): def __init__(self, le_acl_manager, address, remote_addr, remote_addr_type, handle, event_stream): """ An abstract representation for an LE ACL connection in GD certification test :param le_acl_manager: The LeAclManager from this GD device :param address: The local device address :param remote_addr: Remote device address :param remote_addr_type: Remote device address type :param handle: Connection handle :param event_stream: The connection event stream for this connection """ Loading @@ -46,6 +47,7 @@ class PyLeAclManagerAclConnection(IEventStream, Closable): self.acl_stream = EventStream( self.le_acl_manager.FetchAclData(le_acl_manager_facade.LeHandleMsg(handle=self.handle))) self.remote_address = remote_addr self.remote_address_type = remote_addr_type self.own_address = address self.disconnect_reason = None Loading Loading @@ -113,18 +115,11 @@ class PyLeAclManager(Closable): safeClose(pair[0]) self.le_acl_manager.CancelConnection(pair[1]) def initiate_connection(self, remote_addr): def initiate_connection(self, remote_addr, is_direct=True): assertThat(self.next_token in self.outgoing_connection_event_streams).isFalse() create_connection_msg = le_acl_manager_facade.CreateConnectionMsg(peer_address=remote_addr, is_direct=is_direct) self.outgoing_connection_event_streams[self.next_token] = EventStream( self.le_acl_manager.CreateConnection(remote_addr)), remote_addr token = self.next_token self.next_token += 1 return token def initiate_background_and_direct_connection(self, remote_addr): assertThat(self.next_token in self.outgoing_connection_event_streams).isFalse() self.outgoing_connection_event_streams[self.next_token] = EventStream( self.le_acl_manager.CreateBackgroundAndDirectConnection(remote_addr)), remote_addr self.le_acl_manager.CreateConnection(create_connection_msg)), remote_addr token = self.next_token self.next_token += 1 return token Loading @@ -135,11 +130,13 @@ class PyLeAclManager(Closable): complete = connection_complete.get() handle = complete.GetConnectionHandle() remote = complete.GetPeerAddress() remote_address_type = complete.GetPeerAddressType() if complete.GetSubeventCode() == hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE: address = complete.GetLocalResolvablePrivateAddress() else: address = None connection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, handle, event_stream) connection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, remote_address_type, handle, event_stream) self.active_connections.append(connection) return connection Loading