Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit fd31d9cc authored by Zach Johnson's avatar Zach Johnson Committed by Gerrit Code Review
Browse files

Merge changes Ic66d8fcd,I9ffbe874,I59e6926b,I1fb172f7,I0eb60ace, ...

* changes:
  Add some wait commands inside PyHal
  Simplify scan parameters & start/stop
  Simplify le random address setting for tests
  Simplify le connection logic for SimpleHalTest
  Simplify Le connect logic for SimpleHalTest
  Simplify advertisement in SimpleHalTest
  Add HciMatchers.Exactly, for more compact tests
  Add HAL reset helper
  Add advertisement abstraction in PyHal
  Add an advertisment abstraction to PyHci
parents 0117f793 ad7a872b
Loading
Loading
Loading
Loading
+45 −10
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@
#   See the License for the specific language governing permissions and
#   limitations under the License.

import logging
import bluetooth_packets_python3 as bt_packets
import logging

@@ -55,6 +56,32 @@ class HciMatchers(object):
            else:
                return complete

    @staticmethod
    def CommandStatus(opcode=None):
        return lambda msg: HciMatchers._is_matching_command_status(msg.payload, opcode)

    @staticmethod
    def ExtractMatchingCommandStatus(packet_bytes, opcode=None):
        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode)

    @staticmethod
    def _is_matching_command_status(packet_bytes, opcode=None):
        return HciMatchers._extract_matching_command_status(packet_bytes, opcode) is not None

    @staticmethod
    def _extract_matching_command_status(packet_bytes, opcode=None):
        event = HciMatchers._extract_matching_event(packet_bytes, EventCode.COMMAND_STATUS)
        if event is None:
            return None
        complete = hci_packets.CommandStatusView(event)
        if opcode is None or complete is None:
            return complete
        else:
            if complete.GetCommandOpCode() != opcode:
                return None
            else:
                return complete

    @staticmethod
    def EventWithCode(event_code):
        return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code)
@@ -86,10 +113,10 @@ class HciMatchers(object):

    @staticmethod
    def _extract_matching_le_event(packet_bytes, subevent_code):
        event = hci_packets.LeMetaEventView(
            HciMatchers._extract_matching_event(packet_bytes, hci_packets.EventCode.LE_META_EVENT))
        if event is None:
        inner_event = HciMatchers._extract_matching_event(packet_bytes, hci_packets.EventCode.LE_META_EVENT)
        if inner_event is None:
            return None
        event = hci_packets.LeMetaEventView(inner_event)
        if event.GetSubeventCode() != subevent_code:
            return None
        return event
@@ -104,12 +131,16 @@ class HciMatchers(object):

    @staticmethod
    def _extract_le_connection_complete(packet_bytes):
        event = hci_packets.LeConnectionCompleteView(
            HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.CONNECTION_COMPLETE))
        if event is not None:
            return event
        return hci_packets.LeEnhancedConnectionCompleteView(
            HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE))
        inner_event = HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.CONNECTION_COMPLETE)
        if inner_event is not None:
            return hci_packets.LeConnectionCompleteView(inner_event)

        inner_event = HciMatchers._extract_matching_le_event(packet_bytes,
                                                             hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
        if inner_event is not None:
            return hci_packets.LeEnhancedConnectionCompleteView(inner_event)

        return None

    @staticmethod
    def LogEventCode():
@@ -165,7 +196,11 @@ class HciMatchers(object):

    @staticmethod
    def LoopbackOf(packet):
        data = bytes(hci_packets.LoopbackCommandBuilder(packet).Serialize())
        return HciMatchers.Exactly(hci_packets.LoopbackCommandBuilder(packet))

    @staticmethod
    def Exactly(packet):
        data = bytes(packet.Serialize())
        return lambda event: data == event.payload


+167 −0
Original line number Diff line number Diff line
@@ -30,6 +30,26 @@ from bluetooth_packets_python3 import RawBuilder
from bluetooth_packets_python3.hci_packets import BroadcastFlag
from bluetooth_packets_python3.hci_packets import PacketBoundaryFlag
from bluetooth_packets_python3 import hci_packets
from cert.matchers import HciMatchers
from bluetooth_packets_python3.hci_packets import FilterDuplicates
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingLegacyParametersBuilder
from bluetooth_packets_python3.hci_packets import LegacyAdvertisingProperties
from bluetooth_packets_python3.hci_packets import PeerAddressType
from bluetooth_packets_python3.hci_packets import AdvertisingFilterPolicy
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingRandomAddressBuilder
from bluetooth_packets_python3.hci_packets import GapData
from bluetooth_packets_python3.hci_packets import GapDataType
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingDataBuilder
from bluetooth_packets_python3.hci_packets import Operation
from bluetooth_packets_python3.hci_packets import OwnAddressType
from bluetooth_packets_python3.hci_packets import LeScanningFilterPolicy
from bluetooth_packets_python3.hci_packets import Enable
from bluetooth_packets_python3.hci_packets import FragmentPreference
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingScanResponseBuilder
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder
from bluetooth_packets_python3.hci_packets import LeSetExtendedScanEnableBuilder
from bluetooth_packets_python3.hci_packets import EnabledSet
from bluetooth_packets_python3.hci_packets import OpCode


class PyHalAclConnection(IEventStream):
@@ -50,6 +70,47 @@ class PyHalAclConnection(IEventStream):
        return self.our_acl_stream.get_event_queue()


class PyHalAdvertisement(object):

    def __init__(self, handle, py_hal):
        self.handle = handle
        self.py_hal = py_hal

    def set_data(self, complete_name):
        data = GapData()
        data.data_type = GapDataType.COMPLETE_LOCAL_NAME
        data.data = list(bytes(complete_name))
        self.py_hal.send_hci_command(
            LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_DATA)

    def set_scan_response(self, shortened_name):
        data = GapData()
        data.data_type = GapDataType.SHORTENED_LOCAL_NAME
        data.data = list(bytes(shortened_name))
        self.py_hal.send_hci_command(
            LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                        FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_SCAN_RESPONSE)

    def start(self):
        enabled_set = EnabledSet()
        enabled_set.advertising_handle = self.handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))
        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)

    def stop(self):
        enabled_set = EnabledSet()
        enabled_set.advertising_handle = self.handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.DISABLED, [enabled_set]))
        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)


class PyHal(Closable):

    def __init__(self, device):
@@ -67,6 +128,12 @@ class PyHal(Closable):
    def get_hci_event_stream(self):
        return self.hci_event_stream

    def wait_for_complete(self, opcode):
        assertThat(self.hci_event_stream).emits(HciMatchers.CommandComplete(opcode))

    def wait_for_status(self, opcode):
        assertThat(self.hci_event_stream).emits(HciMatchers.CommandStatus(opcode))

    def get_acl_stream(self):
        return self.acl_stream

@@ -86,6 +153,38 @@ class PyHal(Closable):
        assertThat(self.hci_event_stream).emits(read_bd_addr)
        return read_bd_addr.get().GetBdAddr()

    def set_random_le_address(self, addr):
        self.send_hci_command(hci_packets.LeSetRandomAddressBuilder(addr))
        self.wait_for_complete(OpCode.LE_SET_RANDOM_ADDRESS)

    def set_scan_parameters(self):
        phy_scan_params = hci_packets.PhyScanParameters()
        phy_scan_params.le_scan_interval = 6553
        phy_scan_params.le_scan_window = 6553
        phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE

        self.send_hci_command(
            hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                                                           hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1,
                                                           [phy_scan_params]))
        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_PARAMETERS)

    def start_scanning(self):
        self.send_hci_command(
            hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED,
                                                       hci_packets.FilterDuplicates.DISABLED, 0, 0))
        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE)

    def stop_scanning(self):
        self.send_hci_command(
            hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.DISABLED,
                                                       hci_packets.FilterDuplicates.DISABLED, 0, 0))
        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE)

    def reset(self):
        self.send_hci_command(hci_packets.ResetBuilder())
        self.wait_for_complete(OpCode.RESET)

    def enable_inquiry_and_page_scan(self):
        self.send_hci_command(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN))

@@ -114,3 +213,71 @@ class PyHal(Closable):

        handle = connection_complete.get().GetConnectionHandle()
        return PyHalAclConnection(handle, self.acl_stream, self.device)

    def initiate_le_connection(self, remote_addr):
        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
        phy_scan_params.scan_interval = 0x60
        phy_scan_params.scan_window = 0x30
        phy_scan_params.conn_interval_min = 0x18
        phy_scan_params.conn_interval_max = 0x28
        phy_scan_params.conn_latency = 0
        phy_scan_params.supervision_timeout = 0x1f4
        phy_scan_params.min_ce_length = 0
        phy_scan_params.max_ce_length = 0
        self.send_hci_command(
            hci_packets.LeExtendedCreateConnectionBuilder(
                hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params]))
        self.wait_for_status(OpCode.LE_EXTENDED_CREATE_CONNECTION)

    def add_to_connect_list(self, remote_addr):
        self.send_hci_command(
            hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, remote_addr))

    def initiate_le_connection_by_connect_list(self, remote_addr):
        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
        phy_scan_params.scan_interval = 0x60
        phy_scan_params.scan_window = 0x30
        phy_scan_params.conn_interval_min = 0x18
        phy_scan_params.conn_interval_max = 0x28
        phy_scan_params.conn_latency = 0
        phy_scan_params.supervision_timeout = 0x1f4
        phy_scan_params.min_ce_length = 0
        phy_scan_params.max_ce_length = 0
        self.send_hci_command(
            hci_packets.LeExtendedCreateConnectionBuilder(
                hci_packets.InitiatorFilterPolicy.USE_CONNECT_LIST, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params]))

    def complete_le_connection(self):
        connection_complete = HciCaptures.LeConnectionCompleteCapture()
        assertThat(self.hci_event_stream).emits(connection_complete)

        handle = connection_complete.get().GetConnectionHandle()
        return PyHalAclConnection(handle, self.acl_stream, self.device)

    def create_advertisement(self,
                             handle,
                             own_address,
                             properties=LegacyAdvertisingProperties.ADV_IND,
                             min_interval=400,
                             max_interval=450,
                             channel_map=7,
                             own_address_type=OwnAddressType.RANDOM_DEVICE_ADDRESS,
                             peer_address_type=PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
                             peer_address='00:00:00:00:00:00',
                             filter_policy=AdvertisingFilterPolicy.ALL_DEVICES,
                             tx_power=0xF8,
                             sid=1,
                             scan_request_notification=Enable.DISABLED):

        self.send_hci_command(
            LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map,
                                                            own_address_type, peer_address_type, peer_address,
                                                            filter_policy, tx_power, sid, scan_request_notification))
        self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_PARAMETERS)

        self.send_hci_command(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address))
        self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_RANDOM_ADDRESS)

        return PyHalAdvertisement(handle, self)
+75 −0
Original line number Diff line number Diff line
@@ -24,6 +24,26 @@ from cert.captures import HciCaptures
from bluetooth_packets_python3 import hci_packets
from cert.truth import assertThat
from hci.facade import facade_pb2 as hci_facade
from cert.matchers import HciMatchers
from bluetooth_packets_python3.hci_packets import FilterDuplicates
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingLegacyParametersBuilder
from bluetooth_packets_python3.hci_packets import LegacyAdvertisingProperties
from bluetooth_packets_python3.hci_packets import PeerAddressType
from bluetooth_packets_python3.hci_packets import AdvertisingFilterPolicy
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingRandomAddressBuilder
from bluetooth_packets_python3.hci_packets import GapData
from bluetooth_packets_python3.hci_packets import GapDataType
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingDataBuilder
from bluetooth_packets_python3.hci_packets import Operation
from bluetooth_packets_python3.hci_packets import OwnAddressType
from bluetooth_packets_python3.hci_packets import LeScanningFilterPolicy
from bluetooth_packets_python3.hci_packets import Enable
from bluetooth_packets_python3.hci_packets import FragmentPreference
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingScanResponseBuilder
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder
from bluetooth_packets_python3.hci_packets import LeSetExtendedScanEnableBuilder
from bluetooth_packets_python3.hci_packets import EnabledSet
from bluetooth_packets_python3.hci_packets import OpCode


class PyHciAclConnection(IEventStream):
@@ -51,6 +71,38 @@ class PyHciAclConnection(IEventStream):
        return self.our_acl_stream.get_event_queue()


class PyHciAdvertisement(object):

    def __init__(self, handle, py_hci):
        self.handle = handle
        self.py_hci = py_hci

    def set_data(self, complete_name):
        data = GapData()
        data.data_type = GapDataType.COMPLETE_LOCAL_NAME
        data.data = list(bytes(complete_name))
        self.py_hci.send_command_with_complete(
            LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))

    def set_scan_response(self, shortened_name):
        data = GapData()
        data.data_type = GapDataType.SHORTENED_LOCAL_NAME
        data.data = list(bytes(shortened_name))
        self.py_hci.send_command_with_complete(
            LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                        FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))

    def start(self):
        enabled_set = EnabledSet()
        enabled_set.advertising_handle = self.handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.py_hci.send_command_with_complete(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))
        assertThat(self.py_hci.get_event_stream()).emits(
            HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE))


class PyHci(Closable):

    event_stream = None
@@ -139,3 +191,26 @@ class PyHci(Closable):
        if self.acl_stream is None:
            raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
        return PyHciAclConnection(handle, self.acl_stream, self.device)

    def create_advertisement(self,
                             handle,
                             own_address,
                             properties=LegacyAdvertisingProperties.ADV_IND,
                             min_interval=400,
                             max_interval=450,
                             channel_map=7,
                             own_address_type=OwnAddressType.RANDOM_DEVICE_ADDRESS,
                             peer_address_type=PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
                             peer_address='00:00:00:00:00:00',
                             filter_policy=AdvertisingFilterPolicy.ALL_DEVICES,
                             tx_power=0xF8,
                             sid=1,
                             scan_request_notification=Enable.DISABLED):

        self.send_command_with_complete(
            LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map,
                                                            own_address_type, peer_address_type, peer_address,
                                                            filter_policy, tx_power, sid, scan_request_notification))

        self.send_command_with_complete(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address))
        return PyHciAdvertisement(handle, self)
+52 −203

File changed.

Preview size limit exceeded, changes collapsed.

+14 −79
Original line number Diff line number Diff line
@@ -215,52 +215,10 @@ class DirectHciTest(GdBaseTestClass):
                                              OwnAddressType.RANDOM_DEVICE_ADDRESS, AddressType.RANDOM_DEVICE_ADDRESS,
                                              '0D:05:04:03:02:01', 1, [phy_scan_params]))

        # DUT Advertises
        advertising_handle = 0
        self.dut_hci.send_command_with_complete(
            LeSetExtendedAdvertisingLegacyParametersBuilder(
                advertising_handle,
                LegacyAdvertisingProperties.ADV_IND,
                400,
                450,
                7,
                OwnAddressType.RANDOM_DEVICE_ADDRESS,
                PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
                '00:00:00:00:00:00',
                AdvertisingFilterPolicy.ALL_DEVICES,
                0xF8,
                1,  #SID
                Enable.DISABLED  # Scan request notification
            ))

        self.dut_hci.send_command_with_complete(
            LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0D:05:04:03:02:01'))

        gap_name = GapData()
        gap_name.data_type = GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(b'Im_The_DUT'))

        self.dut_hci.send_command_with_complete(
            LeSetExtendedAdvertisingDataBuilder(advertising_handle, Operation.COMPLETE_ADVERTISEMENT,
                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))

        gap_short_name = GapData()
        gap_short_name.data_type = GapDataType.SHORTENED_LOCAL_NAME
        gap_short_name.data = list(bytes(b'Im_The_D'))

        self.dut_hci.send_command_with_complete(
            LeSetExtendedAdvertisingScanResponseBuilder(advertising_handle, Operation.COMPLETE_ADVERTISEMENT,
                                                        FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))

        enabled_set = EnabledSet()
        enabled_set.advertising_handle = advertising_handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.dut_hci.send_command_with_complete(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))

        # Check for success of Enable
        assertThat(self.dut_hci.get_event_stream()).emits(
            HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE))
        advertisement = self.dut_hci.create_advertisement(0, '0D:05:04:03:02:01')
        advertisement.set_data(b'Im_The_DUT')
        advertisement.set_scan_response(b'Im_The_D')
        advertisement.start()

        (dut_handle, cert_handle) = self._verify_le_connection_complete()

@@ -286,39 +244,16 @@ class DirectHciTest(GdBaseTestClass):
                                              OwnAddressType.RANDOM_DEVICE_ADDRESS, AddressType.RANDOM_DEVICE_ADDRESS,
                                              'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params]))

        # CERT Advertises
        advertising_handle = 1
        self.cert_hal.send_hci_command(
            LeSetExtendedAdvertisingLegacyParametersBuilder(
                advertising_handle,
                LegacyAdvertisingProperties.ADV_IND,
                512,
                768,
                7,
                OwnAddressType.RANDOM_DEVICE_ADDRESS,
                PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
                'A6:A5:A4:A3:A2:A1',
                AdvertisingFilterPolicy.ALL_DEVICES,
                0x7F,
                0,  # SID
                Enable.DISABLED  # Scan request notification
            ))

        self.cert_hal.send_hci_command(
            LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))

        gap_name = GapData()
        gap_name.data_type = GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(b'Im_A_Cert'))

        self.cert_hal.send_hci_command(
            LeSetExtendedAdvertisingDataBuilder(advertising_handle, Operation.COMPLETE_ADVERTISEMENT,
                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
        enabled_set = EnabledSet()
        enabled_set.advertising_handle = 1
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.cert_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))
        advertisement = self.cert_hal.create_advertisement(
            1,
            '0C:05:04:03:02:01',
            min_interval=512,
            max_interval=768,
            peer_address='A6:A5:A4:A3:A2:A1',
            tx_power=0x7f,
            sid=0)
        advertisement.set_data(b'Im_A_Cert')
        advertisement.start()

        # LeConnectionComplete
        self._verify_le_connection_complete()