Loading system/gd/cert/matchers.py +26 −0 Original line number Diff line number Diff line Loading @@ -56,6 +56,32 @@ class HciMatchers(object): else: return complete @staticmethod def CommandStatus(opcode=None): return lambda msg: HciMatchers._is_matching_command_status(msg.payload, opcode) @staticmethod def ExtractMatchingCommandStatus(packet_bytes, opcode=None): return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) @staticmethod def _is_matching_command_status(packet_bytes, opcode=None): return HciMatchers._extract_matching_command_status(packet_bytes, opcode) is not None @staticmethod def _extract_matching_command_status(packet_bytes, opcode=None): event = HciMatchers._extract_matching_event(packet_bytes, EventCode.COMMAND_STATUS) if event is None: return None complete = hci_packets.CommandStatusView(event) if opcode is None or complete is None: return complete else: if complete.GetCommandOpCode() != opcode: return None else: return complete @staticmethod def EventWithCode(event_code): return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code) Loading system/gd/cert/py_hal.py +19 −5 Original line number Diff line number Diff line Loading @@ -83,6 +83,7 @@ class PyHalAdvertisement(object): self.py_hal.send_hci_command( LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_DATA) def set_scan_response(self, shortened_name): data = GapData() Loading @@ -91,6 +92,7 @@ class PyHalAdvertisement(object): self.py_hal.send_hci_command( LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_SCAN_RESPONSE) def start(self): enabled_set = EnabledSet() Loading @@ -98,8 +100,7 @@ class PyHalAdvertisement(object): enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set])) assertThat(self.py_hal.get_hci_event_stream()).emits( HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)) self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE) def stop(self): enabled_set = EnabledSet() Loading @@ -107,8 +108,7 @@ class PyHalAdvertisement(object): enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.DISABLED, [enabled_set])) assertThat(self.py_hal.get_hci_event_stream()).emits( HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)) self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE) class PyHal(Closable): Loading @@ -128,6 +128,12 @@ class PyHal(Closable): def get_hci_event_stream(self): return self.hci_event_stream def wait_for_complete(self, opcode): assertThat(self.hci_event_stream).emits(HciMatchers.CommandComplete(opcode)) def wait_for_status(self, opcode): assertThat(self.hci_event_stream).emits(HciMatchers.CommandStatus(opcode)) def get_acl_stream(self): return self.acl_stream Loading @@ -149,6 +155,7 @@ class PyHal(Closable): def set_random_le_address(self, addr): self.send_hci_command(hci_packets.LeSetRandomAddressBuilder(addr)) self.wait_for_complete(OpCode.LE_SET_RANDOM_ADDRESS) def set_scan_parameters(self): phy_scan_params = hci_packets.PhyScanParameters() Loading @@ -160,20 +167,23 @@ class PyHal(Closable): hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1, [phy_scan_params])) self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_PARAMETERS) def start_scanning(self): self.send_hci_command( hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED, hci_packets.FilterDuplicates.DISABLED, 0, 0)) self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE) def stop_scanning(self): self.send_hci_command( hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.DISABLED, hci_packets.FilterDuplicates.DISABLED, 0, 0)) self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE) def reset(self): self.send_hci_command(hci_packets.ResetBuilder()) assertThat(self.hci_event_stream).emits(HciMatchers.CommandComplete(OpCode.RESET)) self.wait_for_complete(OpCode.RESET) def enable_inquiry_and_page_scan(self): self.send_hci_command(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN)) Loading Loading @@ -218,6 +228,7 @@ class PyHal(Closable): hci_packets.LeExtendedCreateConnectionBuilder( hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params])) self.wait_for_status(OpCode.LE_EXTENDED_CREATE_CONNECTION) def add_to_connect_list(self, remote_addr): self.send_hci_command( Loading Loading @@ -264,6 +275,9 @@ class PyHal(Closable): LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map, own_address_type, peer_address_type, peer_address, filter_policy, tx_power, sid, scan_request_notification)) self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_PARAMETERS) self.send_hci_command(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address)) self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_RANDOM_ADDRESS) return PyHalAdvertisement(handle, self) system/gd/hal/cert/simple_hal_test.py +0 −1 Original line number Diff line number Diff line Loading @@ -103,7 +103,6 @@ class SimpleHalTest(GdBaseTestClass): self.dut_hal.stop_scanning() def test_le_connection_dut_advertises(self): # Cert Connects self.cert_hal.set_random_le_address('0C:05:04:03:02:01') self.cert_hal.initiate_le_connection('0D:05:04:03:02:01') Loading Loading
system/gd/cert/matchers.py +26 −0 Original line number Diff line number Diff line Loading @@ -56,6 +56,32 @@ class HciMatchers(object): else: return complete @staticmethod def CommandStatus(opcode=None): return lambda msg: HciMatchers._is_matching_command_status(msg.payload, opcode) @staticmethod def ExtractMatchingCommandStatus(packet_bytes, opcode=None): return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) @staticmethod def _is_matching_command_status(packet_bytes, opcode=None): return HciMatchers._extract_matching_command_status(packet_bytes, opcode) is not None @staticmethod def _extract_matching_command_status(packet_bytes, opcode=None): event = HciMatchers._extract_matching_event(packet_bytes, EventCode.COMMAND_STATUS) if event is None: return None complete = hci_packets.CommandStatusView(event) if opcode is None or complete is None: return complete else: if complete.GetCommandOpCode() != opcode: return None else: return complete @staticmethod def EventWithCode(event_code): return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code) Loading
system/gd/cert/py_hal.py +19 −5 Original line number Diff line number Diff line Loading @@ -83,6 +83,7 @@ class PyHalAdvertisement(object): self.py_hal.send_hci_command( LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_DATA) def set_scan_response(self, shortened_name): data = GapData() Loading @@ -91,6 +92,7 @@ class PyHalAdvertisement(object): self.py_hal.send_hci_command( LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_SCAN_RESPONSE) def start(self): enabled_set = EnabledSet() Loading @@ -98,8 +100,7 @@ class PyHalAdvertisement(object): enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set])) assertThat(self.py_hal.get_hci_event_stream()).emits( HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)) self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE) def stop(self): enabled_set = EnabledSet() Loading @@ -107,8 +108,7 @@ class PyHalAdvertisement(object): enabled_set.duration = 0 enabled_set.max_extended_advertising_events = 0 self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.DISABLED, [enabled_set])) assertThat(self.py_hal.get_hci_event_stream()).emits( HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)) self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE) class PyHal(Closable): Loading @@ -128,6 +128,12 @@ class PyHal(Closable): def get_hci_event_stream(self): return self.hci_event_stream def wait_for_complete(self, opcode): assertThat(self.hci_event_stream).emits(HciMatchers.CommandComplete(opcode)) def wait_for_status(self, opcode): assertThat(self.hci_event_stream).emits(HciMatchers.CommandStatus(opcode)) def get_acl_stream(self): return self.acl_stream Loading @@ -149,6 +155,7 @@ class PyHal(Closable): def set_random_le_address(self, addr): self.send_hci_command(hci_packets.LeSetRandomAddressBuilder(addr)) self.wait_for_complete(OpCode.LE_SET_RANDOM_ADDRESS) def set_scan_parameters(self): phy_scan_params = hci_packets.PhyScanParameters() Loading @@ -160,20 +167,23 @@ class PyHal(Closable): hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1, [phy_scan_params])) self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_PARAMETERS) def start_scanning(self): self.send_hci_command( hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED, hci_packets.FilterDuplicates.DISABLED, 0, 0)) self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE) def stop_scanning(self): self.send_hci_command( hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.DISABLED, hci_packets.FilterDuplicates.DISABLED, 0, 0)) self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE) def reset(self): self.send_hci_command(hci_packets.ResetBuilder()) assertThat(self.hci_event_stream).emits(HciMatchers.CommandComplete(OpCode.RESET)) self.wait_for_complete(OpCode.RESET) def enable_inquiry_and_page_scan(self): self.send_hci_command(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN)) Loading Loading @@ -218,6 +228,7 @@ class PyHal(Closable): hci_packets.LeExtendedCreateConnectionBuilder( hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params])) self.wait_for_status(OpCode.LE_EXTENDED_CREATE_CONNECTION) def add_to_connect_list(self, remote_addr): self.send_hci_command( Loading Loading @@ -264,6 +275,9 @@ class PyHal(Closable): LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map, own_address_type, peer_address_type, peer_address, filter_policy, tx_power, sid, scan_request_notification)) self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_PARAMETERS) self.send_hci_command(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address)) self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_RANDOM_ADDRESS) return PyHalAdvertisement(handle, self)
system/gd/hal/cert/simple_hal_test.py +0 −1 Original line number Diff line number Diff line Loading @@ -103,7 +103,6 @@ class SimpleHalTest(GdBaseTestClass): self.dut_hal.stop_scanning() def test_le_connection_dut_advertises(self): # Cert Connects self.cert_hal.set_random_le_address('0C:05:04:03:02:01') self.cert_hal.initiate_le_connection('0D:05:04:03:02:01') Loading