Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 73484f0b authored by bidsharma's avatar bidsharma Committed by Automerger Merge Worker
Browse files

LeAclManagerTest: Use PyHci abstractions am: ab406e18

parents 99861a4f ab406e18
Loading
Loading
Loading
Loading
+71 −2
Original line number Diff line number Diff line
@@ -70,6 +70,42 @@ class PyHciAclConnection(IEventStream):
        return self.our_acl_stream.get_event_queue()


class PyHciLeAclConnection(IEventStream):

    def __init__(self, handle, acl_stream, device, peer, peer_resolvable, local_resolvable):
        self.handle = int(handle)
        self.device = device
        self.peer = peer
        self.peer_resolvable = peer_resolvable
        self.local_resolvable = local_resolvable
        # todo, handle we got is 0, so doesn't match - fix before enabling filtering
        self.our_acl_stream = FilteringEventStream(acl_stream, None)

    def send(self, pb_flag, b_flag, data):
        acl = AclBuilder(self.handle, pb_flag, b_flag, RawBuilder(data))
        self.device.hci.SendAcl(common.Data(payload=bytes(acl.Serialize())))

    def send_first(self, data):
        self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
                  hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data))

    def send_continuing(self, data):
        self.send(hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT,
                  bytes(data))

    def get_event_queue(self):
        return self.our_acl_stream.get_event_queue()

    def local_resolvable_address(self):
        return self.local_resolvable

    def peer_resolvable_address(self):
        return self.peer_resolvable

    def peer_address(self):
        return self.peer


class PyHciAdvertisement(object):

    def __init__(self, handle, py_hci):
@@ -79,7 +115,7 @@ class PyHciAdvertisement(object):
    def set_data(self, complete_name):
        data = GapData()
        data.data_type = GapDataType.COMPLETE_LOCAL_NAME
        data.data = list(bytes(complete_name))
        data.data = list(complete_name)
        self.py_hci.send_command(
            LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
@@ -87,7 +123,7 @@ class PyHciAdvertisement(object):
    def set_scan_response(self, shortened_name):
        data = GapData()
        data.data_type = GapDataType.SHORTENED_LOCAL_NAME
        data.data = list(bytes(shortened_name))
        data.data = list(shortened_name)
        self.py_hci.send_command(
            LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                        FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
@@ -121,6 +157,7 @@ class PyHci(Closable):
            self.register_for_events(hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_REQUEST,
                                     hci_packets.EventCode.CONNECTION_COMPLETE,
                                     hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
            self.register_for_le_events(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
            self.acl_stream = EventStream(self.device.hci.StreamAcl(empty_proto.Empty()))

    def close(self):
@@ -187,6 +224,38 @@ class PyHci(Closable):
            raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
        return PyHciAclConnection(handle, self.acl_stream, self.device)

    def set_random_le_address(self, addr):
        self.send_command(hci_packets.LeSetRandomAddressBuilder(addr))
        assertThat(self.event_stream).emits(HciMatchers.CommandComplete(OpCode.LE_SET_RANDOM_ADDRESS))

    def initiate_le_connection(self, remote_addr):
        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
        phy_scan_params.scan_interval = 0x60
        phy_scan_params.scan_window = 0x30
        phy_scan_params.conn_interval_min = 0x18
        phy_scan_params.conn_interval_max = 0x28
        phy_scan_params.conn_latency = 0
        phy_scan_params.supervision_timeout = 0x1f4
        phy_scan_params.min_ce_length = 0
        phy_scan_params.max_ce_length = 0
        self.send_command(
            hci_packets.LeExtendedCreateConnectionBuilder(
                hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params]))
        assertThat(self.event_stream).emits(HciMatchers.CommandStatus(OpCode.LE_EXTENDED_CREATE_CONNECTION))

    def incoming_le_connection(self):
        connection_complete = HciCaptures.LeConnectionCompleteCapture()
        assertThat(self.le_event_stream).emits(connection_complete)

        handle = connection_complete.get().GetConnectionHandle()
        peer = connection_complete.get().GetPeerAddress()
        local_resolvable = connection_complete.get().GetLocalResolvablePrivateAddress()
        peer_resolvable = connection_complete.get().GetPeerResolvablePrivateAddress()
        if self.acl_stream is None:
            raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
        return PyHciLeAclConnection(handle, self.acl_stream, self.device, peer, peer_resolvable, local_resolvable)

    def create_advertisement(self,
                             handle,
                             own_address,
+0 −2
Original line number Diff line number Diff line
@@ -205,7 +205,6 @@ class DirectHciTest(gd_base_test.GdBaseTestClass):

    def test_le_connection_dut_advertises(self):
        self.dut_hci.register_for_le_events(SubeventCode.CONNECTION_COMPLETE, SubeventCode.ADVERTISING_SET_TERMINATED,
                                            SubeventCode.ENHANCED_CONNECTION_COMPLETE,
                                            SubeventCode.READ_REMOTE_FEATURES_COMPLETE)
        # Cert Connects
        self.cert_hal.unmask_event(EventCode.LE_META_EVENT)
@@ -239,7 +238,6 @@ class DirectHciTest(gd_base_test.GdBaseTestClass):
            lambda packet: logging.debug(packet.payload) or b'SomeMoreAclData' in packet.payload)

    def test_le_filter_accept_list_connection_cert_advertises(self):
        self.dut_hci.register_for_le_events(SubeventCode.CONNECTION_COMPLETE, SubeventCode.ENHANCED_CONNECTION_COMPLETE)
        # DUT Connects
        self.dut_hci.send_command(LeSetRandomAddressBuilder('0D:05:04:03:02:01'))
        self.dut_hci.send_command(
+34 −169
Original line number Diff line number Diff line
@@ -16,16 +16,13 @@

from blueberry.tests.gd.cert import gd_base_test
from blueberry.tests.gd.cert.closable import safeClose
from blueberry.tests.gd.cert.event_stream import EventStream
from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.gd.cert.py_hci import PyHci, PyHciAdvertisement
from blueberry.tests.gd.cert.py_le_acl_manager import PyLeAclManager
from google.protobuf import empty_pb2 as empty_proto
from blueberry.facade import common_pb2 as common
from blueberry.facade.hci import le_acl_manager_facade_pb2 as le_acl_manager_facade
from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade
from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade
from blueberry.facade.hci import hci_facade_pb2 as hci_facade
import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3 import hci_packets
from bluetooth_packets_python3 import RawBuilder
from mobly import test_runner
@@ -38,14 +35,12 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):

    def setup_test(self):
        gd_base_test.GdBaseTestClass.setup_test(self)
        self.cert_hci = PyHci(self.cert, acl_streaming=True)
        self.dut_le_acl_manager = PyLeAclManager(self.dut)
        self.cert_hci_le_event_stream = EventStream(self.cert.hci.StreamLeSubevents(empty_proto.Empty()))
        self.cert_acl_data_stream = EventStream(self.cert.hci.StreamAcl(empty_proto.Empty()))

    def teardown_test(self):
        safeClose(self.cert_hci_le_event_stream)
        safeClose(self.cert_acl_data_stream)
        safeClose(self.dut_le_acl_manager)
        self.cert_hci.close()
        gd_base_test.GdBaseTestClass.teardown_test(self)

    def set_privacy_policy_static(self):
@@ -74,89 +69,32 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
        self.cert.hci.SendAcl(common.Data(payload=bytes(acl.Serialize())))

    def dut_connects(self, check_address):
        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
        self.register_for_le_event(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)

        # Cert Advertises
        advertising_handle = 0
        self.enqueue_hci_command(
            hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
        py_hci_adv = PyHciAdvertisement(advertising_handle, self.cert_hci)

        self.cert_hci.create_advertisement(
            advertising_handle,
            '0C:05:04:03:02:01',
            hci_packets.LegacyAdvertisingProperties.ADV_IND,
                400,
                450,
                7,
                hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
                '00:00:00:00:00:00',
                hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
                0xF8,
                1,  #SID
                hci_packets.Enable.DISABLED  # Scan request notification
            ))

        self.enqueue_hci_command(
            hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))

        gap_name = hci_packets.GapData()
        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(b'Im_A_Cert'))
        )

        self.enqueue_hci_command(
            hci_packets.LeSetExtendedAdvertisingDataBuilder(
                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))

        gap_short_name = hci_packets.GapData()
        gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
        gap_short_name.data = list(bytes(b'Im_A_C'))

        self.enqueue_hci_command(
            hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))

        enabled_set = hci_packets.EnabledSet()
        enabled_set.advertising_handle = advertising_handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.enqueue_hci_command(
            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
        py_hci_adv.set_data(b'Im_A_Cert')
        py_hci_adv.set_scan_response(b'Im_A_C')
        py_hci_adv.start()

        self.dut_le_acl = self.dut_le_acl_manager.connect_to_remote(
            remote_addr=common.BluetoothAddressWithType(
                address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:01', 'utf8')),
                type=int(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS)))

        # Cert gets ConnectionComplete with a handle and sends ACL data
        handle = 0xfff
        address = hci_packets.Address()

        def get_handle(packet):
            packet_bytes = packet.payload
            nonlocal handle
            nonlocal address
            if b'\x3e\x13\x01\x00' in packet_bytes:
                cc_view = hci_packets.LeConnectionCompleteView(
                    hci_packets.LeMetaEventView(
                        hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
                handle = cc_view.GetConnectionHandle()
                address = cc_view.GetPeerAddress()
                return True
            if b'\x3e\x1f\x0A\x00' in packet_bytes:
                cc_view = hci_packets.LeEnhancedConnectionCompleteView(
                    hci_packets.LeMetaEventView(
                        hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
                handle = cc_view.GetConnectionHandle()
                address = cc_view.GetPeerAddress()
                return True
            return False

        self.cert_hci_le_event_stream.assert_event_occurs(get_handle)
        self.cert_handle = handle
        dut_address_from_complete = address
        py_hci_le_acl_connection = self.cert_hci.incoming_le_connection()
        self.cert_acl_data_stream = py_hci_le_acl_connection.our_acl_stream
        assertThat(py_hci_le_acl_connection.handle).isNotNone()
        if check_address:
            assertThat(dut_address_from_complete).isEqualTo(self.dut_address.decode())
            assertThat(py_hci_le_acl_connection.peer).isEqualTo(self.dut_address.decode())

        self.cert_handle = py_hci_le_acl_connection.handle

    def send_receive_and_check(self):
        self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
@@ -164,7 +102,7 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
                              bytes(b'\x19\x00\x07\x00SomeAclData from the Cert'))

        self.dut_le_acl.send(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT')
        self.cert_acl_data_stream.assert_event_occurs(lambda packet: b'SomeMoreAclData' in packet.payload)
        assertThat(self.cert_acl_data_stream).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
        assertThat(self.dut_le_acl).emits(lambda packet: b'SomeAclData' in packet.payload)

    def test_dut_connects(self):
@@ -208,9 +146,6 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):

    def test_cert_connects(self):
        self.set_privacy_policy_static()
        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
        self.register_for_le_event(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)

        self.dut_le_acl_manager.listen_for_incoming_connections()

        # DUT Advertises
@@ -233,53 +168,21 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
        self.dut.hci_le_advertising_manager.CreateAdvertiser(request)

        # Cert Connects
        self.enqueue_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'))
        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
        phy_scan_params.scan_interval = 0x60
        phy_scan_params.scan_window = 0x30
        phy_scan_params.conn_interval_min = 0x18
        phy_scan_params.conn_interval_max = 0x28
        phy_scan_params.conn_latency = 0
        phy_scan_params.supervision_timeout = 0x1f4
        phy_scan_params.min_ce_length = 0
        phy_scan_params.max_ce_length = 0
        self.enqueue_hci_command(
            hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS,
                                                          hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                                                          hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
                                                          self.dut_address.decode(), 1, [phy_scan_params]))
        self.cert_hci.set_random_le_address('0C:05:04:03:02:01')
        self.cert_hci.initiate_le_connection(self.dut_address.decode())

        # Cert gets ConnectionComplete with a handle and sends ACL data
        handle = 0xfff

        def get_handle(packet):
            packet_bytes = packet.payload
            nonlocal handle
            if b'\x3e\x13\x01\x00' in packet_bytes:
                cc_view = hci_packets.LeConnectionCompleteView(
                    hci_packets.LeMetaEventView(
                        hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
                handle = cc_view.GetConnectionHandle()
                return True
            if b'\x3e\x1f\x0A\x00' in packet_bytes:
                cc_view = hci_packets.LeEnhancedConnectionCompleteView(
                    hci_packets.LeMetaEventView(
                        hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))))
                handle = cc_view.GetConnectionHandle()
                return True
            return False

        self.cert_hci_le_event_stream.assert_event_occurs(get_handle)
        self.cert_handle = handle
        py_hci_le_acl_connection = self.cert_hci.incoming_le_connection()

        self.enqueue_acl_data(self.cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
        py_hci_le_acl_connection.send(hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
                                      hci_packets.BroadcastFlag.POINT_TO_POINT,
                              bytes(b'\x19\x00\x07\x00SomeAclData from the Cert'))
                                      b'\x19\x00\x07\x00SomeAclData from the Cert')
        self.cert_acl_data_stream = py_hci_le_acl_connection.our_acl_stream
        assertThat(py_hci_le_acl_connection.handle).isNotNone()
        self.cert_handle = py_hci_le_acl_connection.handle

        # DUT gets a connection complete event and sends and receives
        handle = 0xfff
        self.dut_le_acl = self.dut_le_acl_manager.complete_incoming_connection()

        self.send_receive_and_check()

    def test_recombination_l2cap_packet(self):
@@ -294,8 +197,6 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):
        assertThat(self.dut_le_acl).emits(lambda packet: b'Hello!' in packet.payload)

    def test_background_connection(self):
        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
        self.register_for_le_event(hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
        self.set_privacy_policy_static()

        # Start background and direct connection
@@ -309,49 +210,13 @@ class LeAclManagerTest(gd_base_test.GdBaseTestClass):

        # Cert Advertises
        advertising_handle = 0
        self.enqueue_hci_command(
            hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
                advertising_handle,
                hci_packets.LegacyAdvertisingProperties.ADV_IND,
                155,  # 100ms = 160 * .625ms "LOW_LATENCY"
                165,
                7,
                hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
                '00:00:00:00:00:00',
                hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
                0xF8,
                1,  #SID
                hci_packets.Enable.DISABLED  # Scan request notification
            ))

        self.enqueue_hci_command(
            hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))

        gap_name = hci_packets.GapData()
        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(b'Im_A_Cert'))

        self.enqueue_hci_command(
            hci_packets.LeSetExtendedAdvertisingDataBuilder(
                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))

        gap_short_name = hci_packets.GapData()
        gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
        gap_short_name.data = list(bytes(b'Im_A_C'))

        self.enqueue_hci_command(
            hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))

        enabled_set = hci_packets.EnabledSet()
        enabled_set.advertising_handle = advertising_handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
        self.enqueue_hci_command(
            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
        py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, '0C:05:04:03:02:01',
                                                        hci_packets.LegacyAdvertisingProperties.ADV_IND, 155, 165)

        py_hci_adv.set_data(b'Im_A_Cert')
        py_hci_adv.set_scan_response(b'Im_A_C')
        py_hci_adv.start()

        # Check background connection complete
        self.dut_le_acl_manager.complete_outgoing_connection(token)