Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ac365b20 authored by Zach Johnson's avatar Zach Johnson
Browse files

remove invalid test, restucture simple_hal_test

use PyHal abstractions

test_none_event only passed on C++ gd because the reset event
was dropped because the test had not yet asked for the event
stream.

doesn't happen with rust, so the test was failing. by using the
abstractions, the stream gets registered before anthing happens
so the behavior is consistent.

Bug: 171749953
Tag: #gd-refactor
Test: gd/cert/run --rhost SimpleHalTest
Change-Id: I88e7cc45d194030fdff10a247ff8bc09e6473bb5
parent f0240603
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -42,7 +42,7 @@ class PyHal(Closable):
        return self.acl_stream

    def send_hci_command(self, command):
        self.device.hal.SendCommand(hal_facade.Command(payload=bytes(command)))
        self.device.hal.SendCommand(hal_facade.Command(payload=bytes(command.Serialize())))

    def send_acl(self, acl):
        self.device.hal.SendAcl(hal_facade.AclPacket(payload=bytes(acl)))
+238 −257
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ from datetime import timedelta
from cert.gd_base_test import GdBaseTestClass
from cert.event_stream import EventStream
from cert.truth import assertThat
from cert.py_hal import PyHal
from google.protobuf import empty_pb2
from facade import rootservice_pb2 as facade_rootservice_pb2
from hal import facade_pb2 as hal_facade_pb2
@@ -36,11 +37,16 @@ class SimpleHalTest(GdBaseTestClass):
    def setup_test(self):
        super().setup_test()

        self.send_dut_hci_command(hci_packets.ResetBuilder())
        self.send_cert_hci_command(hci_packets.ResetBuilder())
        self.dut_hal = PyHal(self.dut)
        self.cert_hal = PyHal(self.cert)

    def send_cert_hci_command(self, command):
        self.cert.hal.SendCommand(hal_facade_pb2.Command(payload=bytes(command.Serialize())), timeout=_GRPC_TIMEOUT)
        self.dut_hal.send_hci_command(hci_packets.ResetBuilder())
        self.cert_hal.send_hci_command(hci_packets.ResetBuilder())

    def teardown_test(self):
        self.dut_hal.close()
        self.cert_hal.close()
        super().teardown_test()

    def send_cert_acl_data(self, handle, pb_flag, b_flag, acl):
        lower = handle & 0xff
@@ -50,10 +56,7 @@ class SimpleHalTest(GdBaseTestClass):
        lower_length = len(acl) & 0xff
        upper_length = (len(acl) & 0xff00) >> 8
        concatenated = bytes([lower, upper, lower_length, upper_length] + list(acl))
        self.cert.hal.SendAcl(hal_facade_pb2.AclPacket(payload=concatenated))

    def send_dut_hci_command(self, command):
        self.dut.hal.SendCommand(hal_facade_pb2.Command(payload=bytes(command.Serialize())), timeout=_GRPC_TIMEOUT)
        self.cert_hal.send_acl(concatenated)

    def send_dut_acl_data(self, handle, pb_flag, b_flag, acl):
        lower = handle & 0xff
@@ -63,66 +66,55 @@ class SimpleHalTest(GdBaseTestClass):
        lower_length = len(acl) & 0xff
        upper_length = (len(acl) & 0xff00) >> 8
        concatenated = bytes([lower, upper, lower_length, upper_length] + list(acl))
        self.dut.hal.SendAcl(hal_facade_pb2.AclPacket(payload=concatenated), timeout=_GRPC_TIMEOUT)

    def test_none_event(self):
        with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream:
            assertThat(hci_event_stream).emitsNone(timeout=timedelta(seconds=1))
        self.dut_hal.send_acl(concatenated)

    def test_fetch_hci_event(self):
        with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream:

            self.send_dut_hci_command(
                hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM,
                                                            '0C:05:04:03:02:01'))
        self.dut_hal.send_hci_command(
            hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01'))
        event = hci_packets.LeAddDeviceToConnectListCompleteBuilder(1, hci_packets.ErrorCode.SUCCESS)

            assertThat(hci_event_stream).emits(lambda packet: bytes(event.Serialize()) in packet.payload)
        assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: bytes(event.Serialize()) in packet.payload)

    def test_loopback_hci_command(self):
        with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream:

            self.send_dut_hci_command(hci_packets.WriteLoopbackModeBuilder(hci_packets.LoopbackMode.ENABLE_LOCAL))
        self.dut_hal.send_hci_command(hci_packets.WriteLoopbackModeBuilder(hci_packets.LoopbackMode.ENABLE_LOCAL))

        command = hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM,
                                                              '0C:05:04:03:02:01')
            self.send_dut_hci_command(command)
        self.dut_hal.send_hci_command(command)

            assertThat(hci_event_stream).emits(lambda packet: bytes(command.Serialize()) in packet.payload)
        assertThat(
            self.dut_hal.get_hci_event_stream()).emits(lambda packet: bytes(command.Serialize()) in packet.payload)

    def test_inquiry_from_dut(self):
        with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream:
        self.cert_hal.send_hci_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))

            self.send_cert_hci_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
        lap = hci_packets.Lap()
        lap.lap = 0x33
            self.send_dut_hci_command(hci_packets.InquiryBuilder(lap, 0x30, 0xff))
        self.dut_hal.send_hci_command(hci_packets.InquiryBuilder(lap, 0x30, 0xff))

            assertThat(hci_event_stream).emits(lambda packet: b'\x02\x0f' in packet.payload
        assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'\x02\x0f' in packet.payload
                                                              # Expecting an HCI Event (code 0x02, length 0x0f)
                                                             )

    def test_le_ad_scan_cert_advertises(self):
        with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream:

        # DUT scans
            self.send_dut_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'))
        self.dut_hal.send_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'))
        phy_scan_params = hci_packets.PhyScanParameters()
        phy_scan_params.le_scan_interval = 6553
        phy_scan_params.le_scan_window = 6553
        phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE

            self.send_dut_hci_command(
        self.dut_hal.send_hci_command(
            hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                                                           hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1,
                                                           [phy_scan_params]))
            self.send_dut_hci_command(
        self.dut_hal.send_hci_command(
            hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED,
                                                       hci_packets.FilterDuplicates.DISABLED, 0, 0))

        # CERT Advertises
        advertising_handle = 0
            self.send_cert_hci_command(
        self.cert_hal.send_hci_command(
            hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
                advertising_handle,
                hci_packets.LegacyAdvertisingProperties.ADV_IND,
@@ -138,14 +130,14 @@ class SimpleHalTest(GdBaseTestClass):
                hci_packets.Enable.DISABLED  # Scan request notification
            ))

            self.send_cert_hci_command(
        self.cert_hal.send_hci_command(
            hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))

        gap_name = hci_packets.GapData()
        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(b'Im_A_Cert'))

            self.send_cert_hci_command(
        self.cert_hal.send_hci_command(
            hci_packets.LeSetExtendedAdvertisingDataBuilder(
                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
@@ -153,28 +145,23 @@ class SimpleHalTest(GdBaseTestClass):
        enabled_set.advertising_handle = advertising_handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
            self.send_cert_hci_command(
        self.cert_hal.send_hci_command(
            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))

            assertThat(hci_event_stream).emits(lambda packet: b'Im_A_Cert' in packet.payload)
        assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.payload)

        # Disable Advertising
            self.send_cert_hci_command(
        self.cert_hal.send_hci_command(
            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.DISABLED, [enabled_set]))

        # Disable Scanning
            self.send_dut_hci_command(
        self.dut_hal.send_hci_command(
            hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED,
                                                       hci_packets.FilterDuplicates.DISABLED, 0, 0))

    def test_le_connection_dut_advertises(self):
        with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream, \
                EventStream(self.cert.hal.StreamEvents(empty_pb2.Empty())) as cert_hci_event_stream, \
                EventStream(self.dut.hal.StreamAcl(empty_pb2.Empty())) as acl_data_stream, \
                EventStream(self.cert.hal.StreamAcl(empty_pb2.Empty())) as cert_acl_data_stream:

        # Cert Connects
            self.send_cert_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'))
        self.cert_hal.send_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'))
        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
        phy_scan_params.scan_interval = 0x60
        phy_scan_params.scan_window = 0x30
@@ -184,15 +171,14 @@ class SimpleHalTest(GdBaseTestClass):
        phy_scan_params.supervision_timeout = 0x1f4
        phy_scan_params.min_ce_length = 0
        phy_scan_params.max_ce_length = 0
            self.send_cert_hci_command(
                hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS,
                                                              hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                                                              hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
                                                              '0D:05:04:03:02:01', 1, [phy_scan_params]))
        self.cert_hal.send_hci_command(
            hci_packets.LeExtendedCreateConnectionBuilder(
                hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, '0D:05:04:03:02:01', 1, [phy_scan_params]))

        # DUT Advertises
        advertising_handle = 0
            self.send_dut_hci_command(
        self.dut_hal.send_hci_command(
            hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
                advertising_handle,
                hci_packets.LegacyAdvertisingProperties.ADV_IND,
@@ -208,14 +194,14 @@ class SimpleHalTest(GdBaseTestClass):
                hci_packets.Enable.DISABLED  # Scan request notification
            ))

            self.send_dut_hci_command(
        self.dut_hal.send_hci_command(
            hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0D:05:04:03:02:01'))

        gap_name = hci_packets.GapData()
        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(b'Im_The_DUT'))

            self.send_dut_hci_command(
        self.dut_hal.send_hci_command(
            hci_packets.LeSetExtendedAdvertisingDataBuilder(
                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
@@ -224,7 +210,7 @@ class SimpleHalTest(GdBaseTestClass):
        gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
        gap_short_name.data = list(bytes(b'Im_The_D'))

            self.send_dut_hci_command(
        self.dut_hal.send_hci_command(
            hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))
@@ -233,7 +219,7 @@ class SimpleHalTest(GdBaseTestClass):
        enabled_set.advertising_handle = advertising_handle
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
            self.send_dut_hci_command(
        self.dut_hal.send_hci_command(
            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))

        conn_handle = 0xfff
@@ -249,10 +235,10 @@ class SimpleHalTest(GdBaseTestClass):
                return True
            return False

            assertThat(cert_hci_event_stream).emits(payload_handle)
        assertThat(self.cert_hal.get_hci_event_stream()).emits(payload_handle)
        cert_handle = conn_handle
        conn_handle = 0xfff
            assertThat(hci_event_stream).emits(payload_handle)
        assertThat(self.dut_hal.get_hci_event_stream()).emits(payload_handle)
        dut_handle = conn_handle

        # Send ACL Data
@@ -261,18 +247,14 @@ class SimpleHalTest(GdBaseTestClass):
        self.send_cert_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
                                hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeMoreAclData'))

            assertThat(cert_acl_data_stream).emits(lambda packet: b'SomeAclData' in packet.payload)
            assertThat(acl_data_stream).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
        assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload)
        assertThat(self.dut_hal.get_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload)

    def test_le_connect_list_connection_cert_advertises(self):
        with EventStream(self.dut.hal.StreamEvents(empty_pb2.Empty())) as hci_event_stream, \
            EventStream(self.cert.hal.StreamEvents(empty_pb2.Empty())) as cert_hci_event_stream:

        # DUT Connects
            self.send_dut_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'))
            self.send_dut_hci_command(
                hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM,
                                                            '0C:05:04:03:02:01'))
        self.dut_hal.send_hci_command(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'))
        self.dut_hal.send_hci_command(
            hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01'))
        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
        phy_scan_params.scan_interval = 0x60
        phy_scan_params.scan_window = 0x30
@@ -282,15 +264,14 @@ class SimpleHalTest(GdBaseTestClass):
        phy_scan_params.supervision_timeout = 0x1f4
        phy_scan_params.min_ce_length = 0
        phy_scan_params.max_ce_length = 0
            self.send_dut_hci_command(
                hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_CONNECT_LIST,
                                                              hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                                                              hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
                                                              'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params]))
        self.dut_hal.send_hci_command(
            hci_packets.LeExtendedCreateConnectionBuilder(
                hci_packets.InitiatorFilterPolicy.USE_CONNECT_LIST, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params]))

        # CERT Advertises
        advertising_handle = 1
            self.send_cert_hci_command(
        self.cert_hal.send_hci_command(
            hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
                advertising_handle,
                hci_packets.LegacyAdvertisingProperties.ADV_IND,
@@ -306,14 +287,14 @@ class SimpleHalTest(GdBaseTestClass):
                hci_packets.Enable.DISABLED  # Scan request notification
            ))

            self.send_cert_hci_command(
        self.cert_hal.send_hci_command(
            hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))

        gap_name = hci_packets.GapData()
        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(b'Im_A_Cert'))

            self.send_cert_hci_command(
        self.cert_hal.send_hci_command(
            hci_packets.LeSetExtendedAdvertisingDataBuilder(
                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
@@ -321,11 +302,11 @@ class SimpleHalTest(GdBaseTestClass):
        enabled_set.advertising_handle = 1
        enabled_set.duration = 0
        enabled_set.max_extended_advertising_events = 0
            self.send_cert_hci_command(
        self.cert_hal.send_hci_command(
            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))

        # LeConnectionComplete
            cert_hci_event_stream.assert_event_occurs(
        assertThat(self.cert_hal.get_hci_event_stream()).emits(
            lambda packet: b'\x3e\x13\x01\x00' in packet.payload, timeout=timedelta(seconds=20))
            hci_event_stream.assert_event_occurs(
        assertThat(self.dut_hal.get_hci_event_stream()).emits(
            lambda packet: b'\x3e\x13\x01\x00' in packet.payload, timeout=timedelta(seconds=20))
+2 −2
Original line number Diff line number Diff line
@@ -36,7 +36,7 @@ class DirectHciTest(GdBaseTestClass):
        super().setup_test()
        self.dut_hci = PyHci(self.dut, acl_streaming=True)
        self.cert_hal = PyHal(self.cert)
        self.cert_hal.send_hci_command(hci_packets.ResetBuilder().Serialize())
        self.cert_hal.send_hci_command(hci_packets.ResetBuilder())

    def teardown_test(self):
        self.dut_hci.close()
@@ -44,7 +44,7 @@ class DirectHciTest(GdBaseTestClass):
        super().teardown_test()

    def send_hal_hci_command(self, command):
        self.cert_hal.send_hci_command(bytes(command.Serialize()))
        self.cert_hal.send_hci_command(command)

    def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
        acl_msg = hci_facade.AclMsg(