Loading system/blueberry/tests/gd/cert/matchers.py +40 −0 Original line number Diff line number Diff line Loading @@ -120,6 +120,46 @@ class HciMatchers(object): return None return event @staticmethod def LeAdvertisement(subevent_code=hci_packets.SubeventCode.EXTENDED_ADVERTISING_REPORT, address=None, data=None): return lambda msg: HciMatchers._extract_matching_le_advertisement(msg.payload, subevent_code, address, data) is not None @staticmethod def ExtractLeAdvertisement(packet_bytes, subevent_code=hci_packets.SubeventCode.EXTENDED_ADVERTISING_REPORT, address=None, data=None): return HciMatchers._extract_matching_le_advertisement(packet_bytes, subevent_code, address, data) @staticmethod def _extract_matching_le_advertisement(packet_bytes, subevent_code=hci_packets.SubeventCode.EXTENDED_ADVERTISING_REPORT, address=None, data=None): inner_event = HciMatchers._extract_matching_le_event(packet_bytes, subevent_code) if inner_event is None: return None matched = False event = None if subevent_code == hci_packets.SubeventCode.EXTENDED_ADVERTISING_REPORT: event = hci_packets.LeExtendedAdvertisingReportView(inner_event) else: event = hci_packets.LeAdvertisingReportView(inner_event) responses = event.GetResponses() for response in responses: if matched == False: matched = (address == None or response.address.lower() == address.lower()) and (data == None or data in packet_bytes) if not matched: return None return event @staticmethod def LeConnectionComplete(): return lambda msg: HciMatchers._extract_le_connection_complete(msg.payload) is not None Loading system/blueberry/tests/gd/cert/py_hal.py +61 −22 Original line number Diff line number Diff line Loading @@ -46,6 +46,11 @@ from bluetooth_packets_python3.hci_packets import LeSetExtendedScanResponseDataB from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder from bluetooth_packets_python3.hci_packets import EnabledSet from bluetooth_packets_python3.hci_packets import OpCode from bluetooth_packets_python3.hci_packets import LeSetAdvertisingDataBuilder from bluetooth_packets_python3.hci_packets import LeSetAdvertisingEnableBuilder from bluetooth_packets_python3.hci_packets import LeSetAdvertisingParametersBuilder from bluetooth_packets_python3.hci_packets import LeSetScanResponseDataBuilder from bluetooth_packets_python3.hci_packets import AdvertisingType from blueberry.facade import common_pb2 as common Loading @@ -69,14 +74,19 @@ class PyHalAclConnection(IEventStream): class PyHalAdvertisement(object): def __init__(self, handle, py_hal): def __init__(self, handle, py_hal, is_legacy): self.handle = handle self.py_hal = py_hal self.legacy = is_legacy def set_data(self, complete_name): data = GapData() data.data_type = GapDataType.COMPLETE_LOCAL_NAME data.data = list(bytes(complete_name)) if self.legacy: self.py_hal.send_hci_command(LeSetAdvertisingDataBuilder([data])) self.py_hal.wait_for_complete(OpCode.LE_SET_ADVERTISING_DATA) else: self.py_hal.send_hci_command( LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) Loading @@ -86,12 +96,20 @@ class PyHalAdvertisement(object): data = GapData() data.data_type = GapDataType.SHORTENED_LOCAL_NAME data.data = list(bytes(shortened_name)) if self.legacy: self.py_hal.send_hci_command(LeSetScanResponseDataBuilder([data])) self.py_hal.wait_for_complete(OpCode.LE_SET_SCAN_RESPONSE_DATA) else: self.py_hal.send_hci_command( LeSetExtendedScanResponseDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_RESPONSE_DATA) def start(self): if self.legacy: self.py_hal.send_hci_command(LeSetAdvertisingEnableBuilder(Enable.ENABLED)) self.py_hal.wait_for_complete(OpCode.LE_SET_ADVERTISING_ENABLE) else: enabled_set = EnabledSet() enabled_set.advertising_handle = self.handle enabled_set.duration = 0 Loading @@ -100,6 +118,10 @@ class PyHalAdvertisement(object): self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE) def stop(self): if self.legacy: self.py_hal.send_hci_command(LeSetAdvertisingEnableBuilder(Enable.DISABLED)) self.py_hal.wait_for_complete(OpCode.LE_SET_ADVERTISING_ENABLE) else: enabled_set = EnabledSet() enabled_set.advertising_handle = self.handle enabled_set.duration = 0 Loading Loading @@ -292,4 +314,21 @@ class PyHal(Closable): self.send_hci_command(LeSetAdvertisingSetRandomAddressBuilder(handle, own_address)) self.wait_for_complete(OpCode.LE_SET_ADVERTISING_SET_RANDOM_ADDRESS) return PyHalAdvertisement(handle, self) return PyHalAdvertisement(handle, self, False) def create_legacy_advertisement(self, advertising_type=AdvertisingType.ADV_IND, min_interval=400, max_interval=450, channel_map=7, own_address_type=OwnAddressType.RANDOM_DEVICE_ADDRESS, peer_address_type=PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, peer_address='00:00:00:00:00:00', filter_policy=AdvertisingFilterPolicy.ALL_DEVICES): self.send_hci_command( LeSetAdvertisingParametersBuilder(min_interval, max_interval, advertising_type, own_address_type, peer_address_type, peer_address, channel_map, filter_policy)) self.wait_for_complete(OpCode.LE_SET_ADVERTISING_PARAMETERS) return PyHalAdvertisement(None, self, True) system/blueberry/tests/gd/hci/direct_hci_test.py +46 −0 Original line number Diff line number Diff line Loading @@ -262,6 +262,52 @@ class DirectHciTest(gd_base_test.GdBaseTestClass): # LeConnectionComplete self._verify_le_connection_complete() def test_le_filter_accept_list_connection_cert_advertises_legacy(self): # DUT Connects self.dut_hci.send_command(LeSetRandomAddressBuilder('0D:05:04:03:02:01')) self.dut_hci.send_command( LeAddDeviceToFilterAcceptListBuilder(FilterAcceptListAddressType.RANDOM, '0C:05:04:03:02:01')) phy_scan_params = self._create_phy_scan_params() self.dut_hci.send_command( LeExtendedCreateConnectionBuilder(InitiatorFilterPolicy.USE_FILTER_ACCEPT_LIST, OwnAddressType.RANDOM_DEVICE_ADDRESS, AddressType.RANDOM_DEVICE_ADDRESS, 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params])) self.cert_hal.unmask_event(EventCode.LE_META_EVENT) self.cert_hal.send_hci_command(LeSetRandomAddressBuilder('0C:05:04:03:02:01')) advertisement = self.cert_hal.create_legacy_advertisement( min_interval=512, max_interval=768, peer_address='A6:A5:A4:A3:A2:A1') advertisement.set_data(b'Im_A_Cert') advertisement.start() # LeConnectionComplete self._verify_le_connection_complete() def test_le_ad_scan_cert_advertises_legacy(self): self.dut_hci.register_for_le_events(SubeventCode.EXTENDED_ADVERTISING_REPORT, SubeventCode.ADVERTISING_REPORT) # DUT Scans self.dut_hci.send_command(LeSetRandomAddressBuilder('0D:05:04:03:02:01')) phy_scan_params = PhyScanParameters() phy_scan_params.le_scan_interval = 6553 phy_scan_params.le_scan_window = 6553 phy_scan_params.le_scan_type = LeScanType.ACTIVE self.dut_hci.send_command( LeSetExtendedScanParametersBuilder(OwnAddressType.RANDOM_DEVICE_ADDRESS, LeScanningFilterPolicy.ACCEPT_ALL, 1, [phy_scan_params])) self.dut_hci.send_command(LeSetExtendedScanEnableBuilder(Enable.ENABLED, FilterDuplicates.DISABLED, 0, 0)) self.cert_hal.unmask_event(EventCode.LE_META_EVENT) self.cert_hal.send_hci_command(LeSetRandomAddressBuilder('0C:05:04:03:02:01')) advertisement = self.cert_hal.create_legacy_advertisement( min_interval=512, max_interval=768, peer_address='A6:A5:A4:A3:A2:A1') advertisement.set_data(b'Im_A_Cert') advertisement.start() assertThat(self.dut_hci.get_le_event_stream()).emits( HciMatchers.LeAdvertisement(address='0C:05:04:03:02:01', data=b'Im_A_Cert')) def test_connection_dut_connects(self): self.dut_hci.send_command(WritePageTimeoutBuilder(0x4000)) Loading Loading
system/blueberry/tests/gd/cert/matchers.py +40 −0 Original line number Diff line number Diff line Loading @@ -120,6 +120,46 @@ class HciMatchers(object): return None return event @staticmethod def LeAdvertisement(subevent_code=hci_packets.SubeventCode.EXTENDED_ADVERTISING_REPORT, address=None, data=None): return lambda msg: HciMatchers._extract_matching_le_advertisement(msg.payload, subevent_code, address, data) is not None @staticmethod def ExtractLeAdvertisement(packet_bytes, subevent_code=hci_packets.SubeventCode.EXTENDED_ADVERTISING_REPORT, address=None, data=None): return HciMatchers._extract_matching_le_advertisement(packet_bytes, subevent_code, address, data) @staticmethod def _extract_matching_le_advertisement(packet_bytes, subevent_code=hci_packets.SubeventCode.EXTENDED_ADVERTISING_REPORT, address=None, data=None): inner_event = HciMatchers._extract_matching_le_event(packet_bytes, subevent_code) if inner_event is None: return None matched = False event = None if subevent_code == hci_packets.SubeventCode.EXTENDED_ADVERTISING_REPORT: event = hci_packets.LeExtendedAdvertisingReportView(inner_event) else: event = hci_packets.LeAdvertisingReportView(inner_event) responses = event.GetResponses() for response in responses: if matched == False: matched = (address == None or response.address.lower() == address.lower()) and (data == None or data in packet_bytes) if not matched: return None return event @staticmethod def LeConnectionComplete(): return lambda msg: HciMatchers._extract_le_connection_complete(msg.payload) is not None Loading
system/blueberry/tests/gd/cert/py_hal.py +61 −22 Original line number Diff line number Diff line Loading @@ -46,6 +46,11 @@ from bluetooth_packets_python3.hci_packets import LeSetExtendedScanResponseDataB from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder from bluetooth_packets_python3.hci_packets import EnabledSet from bluetooth_packets_python3.hci_packets import OpCode from bluetooth_packets_python3.hci_packets import LeSetAdvertisingDataBuilder from bluetooth_packets_python3.hci_packets import LeSetAdvertisingEnableBuilder from bluetooth_packets_python3.hci_packets import LeSetAdvertisingParametersBuilder from bluetooth_packets_python3.hci_packets import LeSetScanResponseDataBuilder from bluetooth_packets_python3.hci_packets import AdvertisingType from blueberry.facade import common_pb2 as common Loading @@ -69,14 +74,19 @@ class PyHalAclConnection(IEventStream): class PyHalAdvertisement(object): def __init__(self, handle, py_hal): def __init__(self, handle, py_hal, is_legacy): self.handle = handle self.py_hal = py_hal self.legacy = is_legacy def set_data(self, complete_name): data = GapData() data.data_type = GapDataType.COMPLETE_LOCAL_NAME data.data = list(bytes(complete_name)) if self.legacy: self.py_hal.send_hci_command(LeSetAdvertisingDataBuilder([data])) self.py_hal.wait_for_complete(OpCode.LE_SET_ADVERTISING_DATA) else: self.py_hal.send_hci_command( LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) Loading @@ -86,12 +96,20 @@ class PyHalAdvertisement(object): data = GapData() data.data_type = GapDataType.SHORTENED_LOCAL_NAME data.data = list(bytes(shortened_name)) if self.legacy: self.py_hal.send_hci_command(LeSetScanResponseDataBuilder([data])) self.py_hal.wait_for_complete(OpCode.LE_SET_SCAN_RESPONSE_DATA) else: self.py_hal.send_hci_command( LeSetExtendedScanResponseDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_RESPONSE_DATA) def start(self): if self.legacy: self.py_hal.send_hci_command(LeSetAdvertisingEnableBuilder(Enable.ENABLED)) self.py_hal.wait_for_complete(OpCode.LE_SET_ADVERTISING_ENABLE) else: enabled_set = EnabledSet() enabled_set.advertising_handle = self.handle enabled_set.duration = 0 Loading @@ -100,6 +118,10 @@ class PyHalAdvertisement(object): self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE) def stop(self): if self.legacy: self.py_hal.send_hci_command(LeSetAdvertisingEnableBuilder(Enable.DISABLED)) self.py_hal.wait_for_complete(OpCode.LE_SET_ADVERTISING_ENABLE) else: enabled_set = EnabledSet() enabled_set.advertising_handle = self.handle enabled_set.duration = 0 Loading Loading @@ -292,4 +314,21 @@ class PyHal(Closable): self.send_hci_command(LeSetAdvertisingSetRandomAddressBuilder(handle, own_address)) self.wait_for_complete(OpCode.LE_SET_ADVERTISING_SET_RANDOM_ADDRESS) return PyHalAdvertisement(handle, self) return PyHalAdvertisement(handle, self, False) def create_legacy_advertisement(self, advertising_type=AdvertisingType.ADV_IND, min_interval=400, max_interval=450, channel_map=7, own_address_type=OwnAddressType.RANDOM_DEVICE_ADDRESS, peer_address_type=PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, peer_address='00:00:00:00:00:00', filter_policy=AdvertisingFilterPolicy.ALL_DEVICES): self.send_hci_command( LeSetAdvertisingParametersBuilder(min_interval, max_interval, advertising_type, own_address_type, peer_address_type, peer_address, channel_map, filter_policy)) self.wait_for_complete(OpCode.LE_SET_ADVERTISING_PARAMETERS) return PyHalAdvertisement(None, self, True)
system/blueberry/tests/gd/hci/direct_hci_test.py +46 −0 Original line number Diff line number Diff line Loading @@ -262,6 +262,52 @@ class DirectHciTest(gd_base_test.GdBaseTestClass): # LeConnectionComplete self._verify_le_connection_complete() def test_le_filter_accept_list_connection_cert_advertises_legacy(self): # DUT Connects self.dut_hci.send_command(LeSetRandomAddressBuilder('0D:05:04:03:02:01')) self.dut_hci.send_command( LeAddDeviceToFilterAcceptListBuilder(FilterAcceptListAddressType.RANDOM, '0C:05:04:03:02:01')) phy_scan_params = self._create_phy_scan_params() self.dut_hci.send_command( LeExtendedCreateConnectionBuilder(InitiatorFilterPolicy.USE_FILTER_ACCEPT_LIST, OwnAddressType.RANDOM_DEVICE_ADDRESS, AddressType.RANDOM_DEVICE_ADDRESS, 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params])) self.cert_hal.unmask_event(EventCode.LE_META_EVENT) self.cert_hal.send_hci_command(LeSetRandomAddressBuilder('0C:05:04:03:02:01')) advertisement = self.cert_hal.create_legacy_advertisement( min_interval=512, max_interval=768, peer_address='A6:A5:A4:A3:A2:A1') advertisement.set_data(b'Im_A_Cert') advertisement.start() # LeConnectionComplete self._verify_le_connection_complete() def test_le_ad_scan_cert_advertises_legacy(self): self.dut_hci.register_for_le_events(SubeventCode.EXTENDED_ADVERTISING_REPORT, SubeventCode.ADVERTISING_REPORT) # DUT Scans self.dut_hci.send_command(LeSetRandomAddressBuilder('0D:05:04:03:02:01')) phy_scan_params = PhyScanParameters() phy_scan_params.le_scan_interval = 6553 phy_scan_params.le_scan_window = 6553 phy_scan_params.le_scan_type = LeScanType.ACTIVE self.dut_hci.send_command( LeSetExtendedScanParametersBuilder(OwnAddressType.RANDOM_DEVICE_ADDRESS, LeScanningFilterPolicy.ACCEPT_ALL, 1, [phy_scan_params])) self.dut_hci.send_command(LeSetExtendedScanEnableBuilder(Enable.ENABLED, FilterDuplicates.DISABLED, 0, 0)) self.cert_hal.unmask_event(EventCode.LE_META_EVENT) self.cert_hal.send_hci_command(LeSetRandomAddressBuilder('0C:05:04:03:02:01')) advertisement = self.cert_hal.create_legacy_advertisement( min_interval=512, max_interval=768, peer_address='A6:A5:A4:A3:A2:A1') advertisement.set_data(b'Im_A_Cert') advertisement.start() assertThat(self.dut_hci.get_le_event_stream()).emits( HciMatchers.LeAdvertisement(address='0C:05:04:03:02:01', data=b'Im_A_Cert')) def test_connection_dut_connects(self): self.dut_hci.send_command(WritePageTimeoutBuilder(0x4000)) Loading