Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 856677df authored by Zach Johnson's avatar Zach Johnson
Browse files

Add an advertisment abstraction to PyHci

Bug: 171749953
Tag: #gd-refactor
Test: gd/cert/run --host
Change-Id: I919e9769023f53ac645313a151fdff8e70417e26
parent 3f918a19
Loading
Loading
Loading
Loading
+75 −0
Original line number Diff line number Diff line
@@ -24,6 +24,26 @@ from cert.captures import HciCaptures
from bluetooth_packets_python3 import hci_packets
from cert.truth import assertThat
from hci.facade import facade_pb2 as hci_facade
from cert.matchers import HciMatchers
from bluetooth_packets_python3.hci_packets import FilterDuplicates
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingLegacyParametersBuilder
from bluetooth_packets_python3.hci_packets import LegacyAdvertisingProperties
from bluetooth_packets_python3.hci_packets import PeerAddressType
from bluetooth_packets_python3.hci_packets import AdvertisingFilterPolicy
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingRandomAddressBuilder
from bluetooth_packets_python3.hci_packets import GapData
from bluetooth_packets_python3.hci_packets import GapDataType
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingDataBuilder
from bluetooth_packets_python3.hci_packets import Operation
from bluetooth_packets_python3.hci_packets import OwnAddressType
from bluetooth_packets_python3.hci_packets import LeScanningFilterPolicy
from bluetooth_packets_python3.hci_packets import Enable
from bluetooth_packets_python3.hci_packets import FragmentPreference
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingScanResponseBuilder
from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder
from bluetooth_packets_python3.hci_packets import LeSetExtendedScanEnableBuilder
from bluetooth_packets_python3.hci_packets import EnabledSet
from bluetooth_packets_python3.hci_packets import OpCode


class PyHciAclConnection(IEventStream):
@@ -51,6 +71,38 @@ class PyHciAclConnection(IEventStream):
        return self.our_acl_stream.get_event_queue()


class PyHciAdvertisement(object):

    def __init__(self, handle, py_hci):
        self.handle = handle
        self.py_hci = py_hci

    def set_data(self, complete_name):
        data = GapData()
        data.data_type = GapDataType.COMPLETE_LOCAL_NAME
        data.data = list(bytes(complete_name))
        self.py_hci.send_command_with_complete(
            LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))

    def set_scan_response(self, shortened_name):
        data = GapData()
        data.data_type = GapDataType.SHORTENED_LOCAL_NAME
        data.data = list(bytes(shortened_name))
        self.py_hci.send_command_with_complete(
            LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                        FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))

    def start(self):
        enabled_set = EnabledSet()
        enabled_set.advertising_handle = self.handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.py_hci.send_command_with_complete(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))
        assertThat(self.py_hci.get_event_stream()).emits(
            HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE))


class PyHci(Closable):

    event_stream = None
@@ -139,3 +191,26 @@ class PyHci(Closable):
        if self.acl_stream is None:
            raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
        return PyHciAclConnection(handle, self.acl_stream, self.device)

    def create_advertisement(self,
                             handle,
                             own_address,
                             properties=LegacyAdvertisingProperties.ADV_IND,
                             min_interval=400,
                             max_interval=450,
                             channel_map=7,
                             own_address_type=OwnAddressType.RANDOM_DEVICE_ADDRESS,
                             peer_address_type=PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
                             peer_address='00:00:00:00:00:00',
                             filter_policy=AdvertisingFilterPolicy.ALL_DEVICES,
                             tx_power=0xF8,
                             sid=1,
                             scan_request_notification=Enable.DISABLED):

        self.send_command_with_complete(
            LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map,
                                                            own_address_type, peer_address_type, peer_address,
                                                            filter_policy, tx_power, sid, scan_request_notification))

        self.send_command_with_complete(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address))
        return PyHciAdvertisement(handle, self)
+4 −46
Original line number Diff line number Diff line
@@ -215,52 +215,10 @@ class DirectHciTest(GdBaseTestClass):
                                              OwnAddressType.RANDOM_DEVICE_ADDRESS, AddressType.RANDOM_DEVICE_ADDRESS,
                                              '0D:05:04:03:02:01', 1, [phy_scan_params]))

        # DUT Advertises
        advertising_handle = 0
        self.dut_hci.send_command_with_complete(
            LeSetExtendedAdvertisingLegacyParametersBuilder(
                advertising_handle,
                LegacyAdvertisingProperties.ADV_IND,
                400,
                450,
                7,
                OwnAddressType.RANDOM_DEVICE_ADDRESS,
                PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
                '00:00:00:00:00:00',
                AdvertisingFilterPolicy.ALL_DEVICES,
                0xF8,
                1,  #SID
                Enable.DISABLED  # Scan request notification
            ))

        self.dut_hci.send_command_with_complete(
            LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0D:05:04:03:02:01'))

        gap_name = GapData()
        gap_name.data_type = GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(b'Im_The_DUT'))

        self.dut_hci.send_command_with_complete(
            LeSetExtendedAdvertisingDataBuilder(advertising_handle, Operation.COMPLETE_ADVERTISEMENT,
                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))

        gap_short_name = GapData()
        gap_short_name.data_type = GapDataType.SHORTENED_LOCAL_NAME
        gap_short_name.data = list(bytes(b'Im_The_D'))

        self.dut_hci.send_command_with_complete(
            LeSetExtendedAdvertisingScanResponseBuilder(advertising_handle, Operation.COMPLETE_ADVERTISEMENT,
                                                        FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))

        enabled_set = EnabledSet()
        enabled_set.advertising_handle = advertising_handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.dut_hci.send_command_with_complete(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))

        # Check for success of Enable
        assertThat(self.dut_hci.get_event_stream()).emits(
            HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE))
        advertisement = self.dut_hci.create_advertisement(0, '0D:05:04:03:02:01')
        advertisement.set_data(b'Im_The_DUT')
        advertisement.set_scan_response(b'Im_The_D')
        advertisement.start()

        (dut_handle, cert_handle) = self._verify_le_connection_complete()