Loading system/gd/cert/gd_device.py +7 −0 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ import logging from facade import rootservice_pb2_grpc as facade_rootservice_pb2_grpc from cert.gd_device_base import GdDeviceBase, replace_vars from hal import facade_pb2_grpc as hal_facade_pb2_grpc from hci.facade import facade_pb2 as hci_facade from hci.facade import facade_pb2_grpc as hci_facade_pb2_grpc from hci.facade import acl_manager_facade_pb2_grpc from hci.facade import controller_facade_pb2_grpc Loading Loading @@ -83,6 +84,7 @@ class GdDevice(GdDeviceBase): self.controller_read_only_property = facade_rootservice_pb2_grpc.ReadOnlyPropertyStub( self.grpc_channel) self.hci = hci_facade_pb2_grpc.HciLayerFacadeStub(self.grpc_channel) self.hci.register_for_events = self.__register_for_hci_events self.l2cap = l2cap_facade_pb2_grpc.L2capClassicModuleFacadeStub( self.grpc_channel) self.hci_acl_manager = acl_manager_facade_pb2_grpc.AclManagerFacadeStub( Loading @@ -101,3 +103,8 @@ class GdDevice(GdDeviceBase): self.grpc_channel) self.security = security_facade_pb2_grpc.SecurityModuleFacadeStub( self.grpc_channel) def __register_for_hci_events(self, *event_codes): for event_code in event_codes: msg = hci_facade.EventCodeMsg(code=int(event_code)) self.hci.RegisterEventHandler(msg) system/gd/hci/cert/acl_manager_test.py +12 −13 Original line number Diff line number Diff line Loading @@ -38,10 +38,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): def setup_class(self): super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI') def register_for_event(self, event_code): msg = hci_facade.EventCodeMsg(code=int(event_code)) self.cert.hci.RegisterEventHandler(msg) def enqueue_hci_command(self, command, expect_complete): cmd_bytes = bytes(command.Serialize()) cmd = hci_facade.CommandMsg(command=cmd_bytes) Loading @@ -59,10 +55,11 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): self.cert.hci.SendAclData(acl_msg) def test_dut_connects(self): self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST) self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) self.register_for_event( self.cert.hci.register_for_events( hci_packets.EventCode.CONNECTION_REQUEST, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with EventStream(self.cert.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: Loading Loading @@ -127,10 +124,11 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): lambda packet: b'SomeAclData' in packet.payload) def test_cert_connects(self): self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) self.register_for_event(hci_packets.EventCode.ROLE_CHANGE) self.register_for_event( self.cert.hci.register_for_events( hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with EventStream(self.cert.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventStream(self.dut.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \ Loading Loading @@ -183,10 +181,11 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): lambda packet: b'SomeAclData' in packet.payload) def test_recombination_l2cap_packet(self): self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST) self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) self.register_for_event( self.cert.hci.register_for_events( hci_packets.EventCode.CONNECTION_REQUEST, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with EventStream(self.cert.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: Loading Loading
system/gd/cert/gd_device.py +7 −0 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ import logging from facade import rootservice_pb2_grpc as facade_rootservice_pb2_grpc from cert.gd_device_base import GdDeviceBase, replace_vars from hal import facade_pb2_grpc as hal_facade_pb2_grpc from hci.facade import facade_pb2 as hci_facade from hci.facade import facade_pb2_grpc as hci_facade_pb2_grpc from hci.facade import acl_manager_facade_pb2_grpc from hci.facade import controller_facade_pb2_grpc Loading Loading @@ -83,6 +84,7 @@ class GdDevice(GdDeviceBase): self.controller_read_only_property = facade_rootservice_pb2_grpc.ReadOnlyPropertyStub( self.grpc_channel) self.hci = hci_facade_pb2_grpc.HciLayerFacadeStub(self.grpc_channel) self.hci.register_for_events = self.__register_for_hci_events self.l2cap = l2cap_facade_pb2_grpc.L2capClassicModuleFacadeStub( self.grpc_channel) self.hci_acl_manager = acl_manager_facade_pb2_grpc.AclManagerFacadeStub( Loading @@ -101,3 +103,8 @@ class GdDevice(GdDeviceBase): self.grpc_channel) self.security = security_facade_pb2_grpc.SecurityModuleFacadeStub( self.grpc_channel) def __register_for_hci_events(self, *event_codes): for event_code in event_codes: msg = hci_facade.EventCodeMsg(code=int(event_code)) self.hci.RegisterEventHandler(msg)
system/gd/hci/cert/acl_manager_test.py +12 −13 Original line number Diff line number Diff line Loading @@ -38,10 +38,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): def setup_class(self): super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI') def register_for_event(self, event_code): msg = hci_facade.EventCodeMsg(code=int(event_code)) self.cert.hci.RegisterEventHandler(msg) def enqueue_hci_command(self, command, expect_complete): cmd_bytes = bytes(command.Serialize()) cmd = hci_facade.CommandMsg(command=cmd_bytes) Loading @@ -59,10 +55,11 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): self.cert.hci.SendAclData(acl_msg) def test_dut_connects(self): self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST) self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) self.register_for_event( self.cert.hci.register_for_events( hci_packets.EventCode.CONNECTION_REQUEST, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with EventStream(self.cert.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: Loading Loading @@ -127,10 +124,11 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): lambda packet: b'SomeAclData' in packet.payload) def test_cert_connects(self): self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) self.register_for_event(hci_packets.EventCode.ROLE_CHANGE) self.register_for_event( self.cert.hci.register_for_events( hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with EventStream(self.cert.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventStream(self.dut.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \ Loading Loading @@ -183,10 +181,11 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): lambda packet: b'SomeAclData' in packet.payload) def test_recombination_l2cap_packet(self): self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST) self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) self.register_for_event( self.cert.hci.register_for_events( hci_packets.EventCode.CONNECTION_REQUEST, hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) with EventStream(self.cert.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \ EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: Loading