Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6729c606 authored by Henri Chataing's avatar Henri Chataing
Browse files

RootCanal: Implement LL tests CIS/CEN/*, CIS/PER/*

Implement the tests:
  - LL/CIS/CEN/BV-01-C
  - LL/CIS/CEN/BV-03-C
  - LL/CIS/CEN/BV-10-C
  - LL/CIS/CEN/BV-26-C
  - LL/CIS/PER/BV-01-C
  - LL/CIS/PER/BV-02-C

Bug: 253535400
Test: atest rootcanal_ll_test
Change-Id: I29547ac07e624bee5b07e4346f52d595f01b1599
parent aa222462
Loading
Loading
Loading
Loading
+19 −0
Original line number Diff line number Diff line
@@ -197,6 +197,22 @@ genrule {
    ],
}

// Generate the python parser+serializer backend for
// rust/llcp_packets.pdl.
genrule {
    name: "llcp_packets_python3_gen",
    defaults: ["pdl_python_generator_defaults"],
    cmd: "$(location :pdlc) $(in) |" +
        " $(location :pdl_python_generator)" +
        " --output $(out) --custom-type-location py.bluetooth",
    srcs: [
        "rust/llcp_packets.pdl",
    ],
    out: [
        "llcp_packets.py",
    ],
}

// Generate the python parser+serializer backend for
// hci_packets.pdl.
genrule {
@@ -285,8 +301,11 @@ python_test_host {
    srcs: [
        ":hci_packets_python3_gen",
        ":link_layer_packets_python3_gen",
        ":llcp_packets_python3_gen",
        "py/bluetooth.py",
        "py/controller.py",
        "test/LL/CIS/CEN/*.py",
        "test/LL/CIS/PER/*.py",
        "test/LL/CON_/CEN/*.py",
        "test/LL/CON_/PER/*.py",
        "test/LL/DDI/ADV/*.py",
+199 −1
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@ import collections
import enum
import hci_packets as hci
import link_layer_packets as ll
import llcp_packets as llcp
import py.bluetooth
import sys
import typing
@@ -81,9 +82,11 @@ class Controller:
        self.address = address
        self.evt_queue = collections.deque()
        self.acl_queue = collections.deque()
        self.iso_queue = collections.deque()
        self.ll_queue = collections.deque()
        self.evt_queue_event = asyncio.Event()
        self.acl_queue_event = asyncio.Event()
        self.iso_queue_event = asyncio.Event()
        self.ll_queue_event = asyncio.Event()

    def __del__(self):
@@ -98,8 +101,12 @@ class Controller:
            print(f"<-- received HCI ACL packet data={len(packet)}[..]")
            self.acl_queue.append(packet)
            self.acl_queue_event.set()
        elif idc == Idc.Iso:
            print(f"<-- received HCI ISO packet data={len(packet)}[..]")
            self.iso_queue.append(packet)
            self.iso_queue_event.set()
        else:
            print(f"ignoring HCI packet typ={typ}")
            print(f"ignoring HCI packet typ={idc}")

    def receive_ll_(self, packet: bytes, phy: int, tx_power: int):
        print(f"<-- received LL pdu data={len(packet)}[..]")
@@ -111,12 +118,31 @@ class Controller:
        data = cmd.serialize()
        rootcanal.ffi_controller_receive_hci(c_void_p(self.instance), c_int(Idc.Cmd), c_char_p(data), c_int(len(data)))

    def send_iso(self, iso: hci.Iso):
        print(f"--> sending HCI iso pdu data={len(iso.payload)}[..]")
        data = iso.serialize()
        rootcanal.ffi_controller_receive_hci(c_void_p(self.instance), c_int(Idc.Iso), c_char_p(data), c_int(len(data)))

    def send_ll(self, pdu: ll.LinkLayerPacket, phy: Phy = Phy.LowEnergy, rssi: int = -90):
        print(f"--> sending LL pdu {pdu.__class__.__name__}")
        data = pdu.serialize()
        rootcanal.ffi_controller_receive_ll(c_void_p(self.instance), c_char_p(data), c_int(len(data)), c_int(phy),
                                            c_int(rssi))

    def send_llcp(self,
                  source_address: hci.Address,
                  destination_address: hci.Address,
                  pdu: llcp.LlcpPacket,
                  phy: Phy = Phy.LowEnergy,
                  rssi: int = -90):
        print(f"--> sending LLCP pdu {pdu.__class__.__name__}")
        ll_pdu = ll.Llcp(source_address=source_address,
                         destination_address=destination_address,
                         payload=pdu.serialize())
        data = ll_pdu.serialize()
        rootcanal.ffi_controller_receive_ll(c_void_p(self.instance), c_char_p(data), c_int(len(data)), c_int(phy),
                                            c_int(rssi))

    async def start(self):

        async def timer():
@@ -138,6 +164,13 @@ class Controller:
                evt.show()
            raise Exception("evt queue not empty at stop()")

        if self.iso_queue:
            print("iso queue not empty at stop():")
            for packet in self.iso_queue:
                iso = hci.Iso.parse_all(packet)
                iso.show()
            raise Exception("ll queue not empty at stop()")

        if self.ll_queue:
            for (packet, _) in self.ll_queue:
                pdu = ll.LinkLayerPacket.parse_all(packet)
@@ -150,6 +183,12 @@ class Controller:
            self.evt_queue_event.clear()
        return self.evt_queue.popleft()

    async def receive_iso(self):
        while not self.iso_queue:
            await self.iso_queue_event.wait()
            self.iso_queue_event.clear()
        return self.iso_queue.popleft()

    async def expect_evt(self, expected_evt: hci.Event):
        packet = await self.receive_evt()
        evt = hci.Event.parse_all(packet)
@@ -238,6 +277,18 @@ class ControllerTest(unittest.IsolatedAsyncioTestCase):
        assert evt.num_hci_command_packets == 1
        return evt

    async def expect_iso(self, expected_iso: hci.Iso, timeout: int = 3):
        packet = await asyncio.wait_for(self.controller.receive_iso(), timeout=timeout)
        iso = hci.Iso.parse_all(packet)

        if iso != expected_iso:
            print("received unexpected iso packet")
            print("expected packet:")
            expected_iso.show()
            print("received packet:")
            iso.show()
            self.assertTrue(False)

    async def expect_ll(self,
                        expected_pdus: typing.Union[list, typing.Union[ll.LinkLayerPacket, type]],
                        timeout: int = 3) -> ll.LinkLayerPacket:
@@ -265,5 +316,152 @@ class ControllerTest(unittest.IsolatedAsyncioTestCase):

        self.assertTrue(False)

    async def expect_llcp(self,
                          source_address: hci.Address,
                          destination_address: hci.Address,
                          expected_pdu: llcp.LlcpPacket,
                          timeout: int = 3) -> llcp.LlcpPacket:
        packet = await asyncio.wait_for(self.controller.receive_ll(), timeout=timeout)
        pdu = ll.LinkLayerPacket.parse_all(packet)

        if (pdu.type != ll.PacketType.LLCP or pdu.source_address != source_address or
                pdu.destination_address != destination_address):
            print("received unexpected pdu:")
            pdu.show()
            print(f"expected pdu: {source_address} -> {destination_address}")
            expected_pdu.show()
            self.assertTrue(False)

        pdu = llcp.LlcpPacket.parse_all(pdu.payload)
        if pdu != expected_pdu:
            print("received unexpected pdu:")
            pdu.show()
            print("expected pdu:")
            expected_pdu.show()
            self.assertTrue(False)

        return pdu

    async def enable_connected_isochronous_stream_host_support(self):
        """Enable Connected Isochronous Stream Host Support in the LE Feature mask."""
        self.controller.send_cmd(
            hci.LeSetHostFeature(bit_number=hci.LeHostFeatureBits.CONNECTED_ISO_STREAM_HOST_SUPPORT,
                                 bit_value=hci.Enable.ENABLED))

        await self.expect_evt(hci.LeSetHostFeatureComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))

    async def establish_le_connection_central(self, peer_address: hci.Address) -> int:
        """Establish a connection with the selected peer as Central.
        Returns the ACL connection handle for the opened link."""
        self.controller.send_cmd(
            hci.LeExtendedCreateConnection(initiator_filter_policy=hci.InitiatorFilterPolicy.USE_PEER_ADDRESS,
                                           own_address_type=hci.OwnAddressType.PUBLIC_DEVICE_ADDRESS,
                                           peer_address_type=hci.AddressType.PUBLIC_DEVICE_ADDRESS,
                                           peer_address=peer_address,
                                           initiating_phys=0x1,
                                           phy_scan_parameters=[
                                               hci.LeCreateConnPhyScanParameters(
                                                   scan_interval=0x200,
                                                   scan_window=0x100,
                                                   conn_interval_min=0x200,
                                                   conn_interval_max=0x200,
                                                   conn_latency=0x6,
                                                   supervision_timeout=0xc80,
                                                   min_ce_length=0,
                                                   max_ce_length=0,
                                               )
                                           ]))

        await self.expect_evt(hci.LeExtendedCreateConnectionStatus(status=ErrorCode.SUCCESS, num_hci_command_packets=1))

        self.controller.send_ll(ll.LeLegacyAdvertisingPdu(source_address=peer_address,
                                                          advertising_address_type=ll.AddressType.PUBLIC,
                                                          advertising_type=ll.LegacyAdvertisingType.ADV_IND,
                                                          advertising_data=[]),
                                rssi=-16)

        await self.expect_ll(
            ll.LeConnect(source_address=self.controller.address,
                         destination_address=peer_address,
                         initiating_address_type=ll.AddressType.PUBLIC,
                         advertising_address_type=ll.AddressType.PUBLIC,
                         conn_interval=0x200,
                         conn_peripheral_latency=0x6,
                         conn_supervision_timeout=0xc80))

        self.controller.send_ll(
            ll.LeConnectComplete(source_address=peer_address,
                                 destination_address=self.controller.address,
                                 initiating_address_type=ll.AddressType.PUBLIC,
                                 advertising_address_type=ll.AddressType.PUBLIC,
                                 conn_interval=0x200,
                                 conn_peripheral_latency=0x6,
                                 conn_supervision_timeout=0xc80))

        connection_complete = await self.expect_evt(
            hci.LeEnhancedConnectionComplete(status=ErrorCode.SUCCESS,
                                             connection_handle=self.Any,
                                             role=hci.Role.CENTRAL,
                                             peer_address_type=hci.AddressType.PUBLIC_DEVICE_ADDRESS,
                                             peer_address=peer_address,
                                             conn_interval=0x200,
                                             conn_latency=0x6,
                                             supervision_timeout=0xc80,
                                             central_clock_accuracy=hci.ClockAccuracy.PPM_500))

        acl_connection_handle = connection_complete.connection_handle
        await self.expect_evt(
            hci.LeChannelSelectionAlgorithm(connection_handle=acl_connection_handle,
                                            channel_selection_algorithm=hci.ChannelSelectionAlgorithm.ALGORITHM_1))

        return acl_connection_handle

    async def establish_le_connection_peripheral(self, peer_address: hci.Address) -> int:
        """Establish a connection with the selected peer as Peripheral.
        Returns the ACL connection handle for the opened link."""
        self.controller.send_cmd(
            hci.LeSetAdvertisingParameters(advertising_interval_min=0x200,
                                           advertising_interval_max=0x200,
                                           advertising_type=hci.AdvertisingType.ADV_IND,
                                           own_address_type=hci.OwnAddressType.PUBLIC_DEVICE_ADDRESS,
                                           advertising_channel_map=0x7,
                                           advertising_filter_policy=hci.AdvertisingFilterPolicy.ALL_DEVICES))

        await self.expect_evt(
            hci.LeSetAdvertisingParametersComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))

        self.controller.send_cmd(hci.LeSetAdvertisingEnable(advertising_enable=True))

        await self.expect_evt(hci.LeSetAdvertisingEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))

        self.controller.send_ll(ll.LeConnect(source_address=peer_address,
                                             destination_address=self.controller.address,
                                             initiating_address_type=ll.AddressType.PUBLIC,
                                             advertising_address_type=ll.AddressType.PUBLIC,
                                             conn_interval=0x200,
                                             conn_peripheral_latency=0x200,
                                             conn_supervision_timeout=0x200),
                                rssi=-16)

        await self.expect_ll(
            ll.LeConnectComplete(source_address=self.controller.address,
                                 destination_address=peer_address,
                                 conn_interval=0x200,
                                 conn_peripheral_latency=0x200,
                                 conn_supervision_timeout=0x200))

        connection_complete = await self.expect_evt(
            hci.LeEnhancedConnectionComplete(status=ErrorCode.SUCCESS,
                                             connection_handle=self.Any,
                                             role=hci.Role.PERIPHERAL,
                                             peer_address_type=hci.AddressType.PUBLIC_DEVICE_ADDRESS,
                                             peer_address=peer_address,
                                             conn_interval=0x200,
                                             conn_latency=0x200,
                                             supervision_timeout=0x200,
                                             central_clock_accuracy=hci.ClockAccuracy.PPM_500))

        return connection_complete.connection_handle

    def tearDown(self):
        self.controller.stop()
+192 −0

File added.

Preview size limit exceeded, changes collapsed.

+126 −0
Original line number Diff line number Diff line
import hci_packets as hci
import link_layer_packets as ll
import llcp_packets as llcp
import random
import unittest
from hci_packets import ErrorCode
from py.bluetooth import Address
from py.controller import ControllerTest


class Test(ControllerTest):

    SDU_Interval_C_TO_P = 10000  # 10ms
    SDU_Interval_P_TO_C = 10000  # 10ms
    ISO_Interval = 16  # 20ms
    Worst_Case_SCA = hci.ClockAccuracy.PPM_500
    Packing = hci.Packing.SEQUENTIAL
    Framing = hci.Enable.DISABLED
    NSE = 4
    Max_SDU_C_TO_P = 130
    Max_SDU_P_TO_C = 130
    Max_PDU_C_TO_P = 130
    Max_PDU_P_TO_C = 130
    PHY_C_TO_P = 0x1
    PHY_P_TO_C = 0x1
    FT_C_TO_P = 1
    FT_P_TO_C = 1
    BN_C_TO_P = 2
    BN_P_TO_C = 2
    Max_Transport_Latency_C_TO_P = 40000  # 40ms
    Max_Transport_Latency_P_TO_C = 40000  # 40ms
    RTN_C_TO_P = 3
    RTN_P_TO_C = 3

    # LL/CIS/CEN/BV-03-C [CIS Setup Procedure, Central Initiated, Rejected]
    async def test(self):
        # Test parameters.
        cig_id = 0x12
        cis_id = 0x42
        cis_connection_handle = 0xe00
        peer_address = Address('aa:bb:cc:dd:ee:ff')
        controller = self.controller

        # Enable Connected Isochronous Stream Host Support.
        await self.enable_connected_isochronous_stream_host_support()

        # Prelude: Establish an ACL connection as central with the IUT.
        acl_connection_handle = await self.establish_le_connection_central(peer_address)

        # 1. The Upper Tester sends an HCI_LE_Set_CIG_Parameters command to the IUT with valid
        # parameters and receives a successful HCI_Command_Complete event.
        controller.send_cmd(
            hci.LeSetCigParametersTest(cig_id=cig_id,
                                       sdu_interval_c_to_p=self.SDU_Interval_C_TO_P,
                                       sdu_interval_p_to_c=self.SDU_Interval_P_TO_C,
                                       ft_c_to_p=self.FT_C_TO_P,
                                       ft_p_to_c=self.FT_P_TO_C,
                                       iso_interval=self.ISO_Interval,
                                       worst_case_sca=self.Worst_Case_SCA,
                                       packing=self.Packing,
                                       framing=self.Framing,
                                       cis_config=[
                                           hci.LeCisParametersTestConfig(cis_id=cis_id,
                                                                         nse=self.NSE,
                                                                         max_sdu_c_to_p=self.Max_SDU_C_TO_P,
                                                                         max_sdu_p_to_c=self.Max_SDU_P_TO_C,
                                                                         max_pdu_c_to_p=self.Max_PDU_C_TO_P,
                                                                         max_pdu_p_to_c=self.Max_PDU_P_TO_C,
                                                                         phy_c_to_p=self.PHY_C_TO_P,
                                                                         phy_p_to_c=self.PHY_P_TO_C,
                                                                         bn_c_to_p=self.BN_C_TO_P,
                                                                         bn_p_to_c=self.BN_P_TO_C)
                                       ]))

        await self.expect_evt(
            hci.LeSetCigParametersTestComplete(status=ErrorCode.SUCCESS,
                                               num_hci_command_packets=1,
                                               cig_id=cig_id,
                                               connection_handle=[cis_connection_handle]))

        # 2. The Upper Tester sends an HCI_LE_Create_CIS command with the ACL_Connection_Handle of
        # the established ACL and valid Connection_Handle from the IUT received in step 1.
        controller.send_cmd(
            hci.LeCreateCis(cis_config=[
                hci.LeCreateCisConfig(cis_connection_handle=cis_connection_handle,
                                      acl_connection_handle=acl_connection_handle)
            ]))

        await self.expect_evt(hci.LeCreateCisStatus(status=ErrorCode.SUCCESS, num_hci_command_packets=1))

        # 3. The Lower Tester receives an LL_CIS_REQ PDU from the IUT with all fields set to valid values.
        cis_req = await self.expect_llcp(source_address=controller.address,
                                         destination_address=peer_address,
                                         expected_pdu=llcp.CisReq(cig_id=cig_id,
                                                                  cis_id=cis_id,
                                                                  phy_c_to_p=hci.PhyType.LE_1M,
                                                                  phy_p_to_c=hci.PhyType.LE_1M,
                                                                  framed=self.Framing == hci.Enable.ENABLED,
                                                                  max_sdu_c_to_p=self.Max_SDU_C_TO_P,
                                                                  max_sdu_p_to_c=self.Max_SDU_P_TO_C,
                                                                  sdu_interval_c_to_p=self.SDU_Interval_C_TO_P,
                                                                  sdu_interval_p_to_c=self.SDU_Interval_P_TO_C,
                                                                  max_pdu_c_to_p=self.Max_PDU_C_TO_P,
                                                                  max_pdu_p_to_c=self.Max_PDU_P_TO_C,
                                                                  nse=self.NSE,
                                                                  sub_interval=self.Any,
                                                                  bn_p_to_c=self.BN_C_TO_P,
                                                                  bn_c_to_p=self.BN_P_TO_C,
                                                                  ft_c_to_p=self.FT_C_TO_P,
                                                                  ft_p_to_c=self.FT_P_TO_C,
                                                                  iso_interval=self.ISO_Interval,
                                                                  cis_offset_min=self.Any,
                                                                  cis_offset_max=self.Any,
                                                                  conn_event_count=0))

        # 4. The Lower Tester sends an LL_REJECT_EXT_IND to the IUT with an error code not equal to 0x00.
        controller.send_llcp(source_address=peer_address,
                             destination_address=controller.address,
                             pdu=llcp.RejectExtInd(reject_opcode=llcp.Opcode.LL_CIS_REQ,
                                                   error_code=hci.ErrorCode.REMOTE_USER_TERMINATED_CONNECTION))

        # 5. The Upper Tester receives an HCI_LE_CIS_Established event from the IUT with a status failure.
        # The Status field has the same value as the LL_REJECT_EXT_IND PDU in step 4.
        await self.expect_evt(
            hci.LeCisEstablished(status=ErrorCode.REMOTE_USER_TERMINATED_CONNECTION,
                                 connection_handle=cis_connection_handle))
+270 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading