Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 342a1d15 authored by Henri Chataing's avatar Henri Chataing
Browse files

RootCanal: Fragment large advertising reports

LE Extended advertising lets host stacks define
advertising data as large as 1650 bytes.
Extended advertising reports can generally not be sent as one
HCI event and need to be fragmented.

Implement extended scanning test LL/DDI/SCN/BV-19-C
for validation.

Bug: 271910328
Test: atest rootcanal_ll_test
Change-Id: I2f7ea668524078a8241994814ac863ab3efc9b6f
parent e54a5b04
Loading
Loading
Loading
Loading
+41 −12
Original line number Diff line number Diff line
@@ -2716,8 +2716,7 @@ void LinkLayerController::ScanIncomingLeLegacyAdvertisingPdu(
      // shall be ignored unless either:
      //  • the TargetA field is identical to the scanner's device address, or
      //  • the TargetA field is a resolvable private address, address
      //  resolution is
      //    enabled, and the address is resolved successfully
      //  resolution is enabled, and the address is resolved successfully
      case bluetooth::hci::LeScanningFilterPolicy::ACCEPT_ALL:
      case bluetooth::hci::LeScanningFilterPolicy::FILTER_ACCEPT_LIST_ONLY:
        if (!IsLocalPublicOrRandomAddress(target_address) &&
@@ -2832,13 +2831,20 @@ void LinkLayerController::ScanIncomingLeLegacyAdvertisingPdu(
    response.address_ = resolved_advertising_address.GetAddress();
    response.primary_phy_ = bluetooth::hci::PrimaryPhyType::LE_1M;
    response.secondary_phy_ = bluetooth::hci::SecondaryPhyType::NO_PACKETS;
    response.advertising_sid_ = 0xff;  // Not ADI field provided.
    response.advertising_sid_ = 0xff;  // ADI not provided.
    response.tx_power_ = 0x7f;         // TX power information not available.
    response.rssi_ = rssi;
    response.periodic_advertising_interval_ = 0;  // No periodic advertising.
    if (directed_advertising) {
      response.direct_address_type_ =
          bluetooth::hci::DirectAdvertisingAddressType(
              target_address.GetAddressType());
      response.direct_address_ = target_address.GetAddress();
    } else {
      response.direct_address_type_ =
          bluetooth::hci::DirectAdvertisingAddressType::NO_ADDRESS_PROVIDED;
      response.direct_address_ = Address::kEmpty;
    }
    response.advertising_data_ = advertising_data;

    send_event_(bluetooth::hci::LeExtendedAdvertisingReportRawBuilder::Create(
@@ -3248,17 +3254,40 @@ void LinkLayerController::ScanIncomingLeExtendedAdvertisingPdu(
    response.address_ = resolved_advertising_address.GetAddress();
    response.primary_phy_ = bluetooth::hci::PrimaryPhyType::LE_1M;
    response.secondary_phy_ = bluetooth::hci::SecondaryPhyType::NO_PACKETS;
    response.advertising_sid_ = 0xff;  // Not ADI field provided.
    response.advertising_sid_ = pdu.GetSid();
    response.tx_power_ = 0x7f;         // TX power information not available.
    response.rssi_ = rssi;
    response.periodic_advertising_interval_ = 0;  // No periodic advertising.
    if (directed_advertising) {
      response.direct_address_type_ =
          bluetooth::hci::DirectAdvertisingAddressType(
              target_address.GetAddressType());
      response.direct_address_ = target_address.GetAddress();
    } else {
      response.direct_address_type_ =
          bluetooth::hci::DirectAdvertisingAddressType::NO_ADDRESS_PROVIDED;
      response.direct_address_ = Address::kEmpty;
    }
    response.advertising_data_ = advertising_data;

    // Each extended advertising report can only pass 229 bytes of
    // advertising data (255 - size of report fields).
    // RootCanal must fragment the report as necessary.
    const size_t max_fragment_size = 229;
    size_t offset = 0;
    do {
      size_t remaining_size = advertising_data.size() - offset;
      size_t fragment_size = std::min(max_fragment_size, remaining_size);
      response.data_status_ = remaining_size <= max_fragment_size
                                  ? bluetooth::hci::DataStatus::COMPLETE
                                  : bluetooth::hci::DataStatus::CONTINUING;
      response.advertising_data_ =
          std::vector(advertising_data.begin() + offset,
                      advertising_data.begin() + offset + fragment_size);
      offset += fragment_size;
      send_event_(bluetooth::hci::LeExtendedAdvertisingReportRawBuilder::Create(
          {response}));
    } while (offset < advertising_data.size());
  }

  // Did the user enable Active scanning ?
+2 −2
Original line number Diff line number Diff line
@@ -182,8 +182,8 @@ packet LeExtendedAdvertisingPdu : LinkLayerPacket (type = LE_EXTENDED_ADVERTISIN
  connectable: 1,
  scannable: 1,
  directed: 1,
  sid: 4,
  _reserved_: 1,
  _reserved_: 5,
  sid: 8, // 0xff when not provided
  tx_power: 8,
  primary_phy: PrimaryPhyType,
  secondary_phy: SecondaryPhyType,
+212 −0
Original line number Diff line number Diff line
import hci_packets as hci
import link_layer_packets as ll
import math
import random
from dataclasses import dataclass
from hci_packets import ErrorCode
from py.bluetooth import Address
from py.controller import ControllerTest
from typing import Optional

ADV_IND = 0x13
ADV_DIRECT_IND = 0x15
ADV_SCAN_IND = 0x12
ADV_NONCONN_IND = 0x10
ADV_EXT_IND = 0x0


@dataclass
class TestRound:
    duration: int
    advertising_event_properties: int
    target_address: Optional[Address]
    advertising_data_length: int


class Test(ControllerTest):

    # LL/DDI/SCN/BV-19-C  [Extended Scanning, Passive – LE 1M PHY]
    async def test(self):
        # Test rounds.
        # Note: some tests are skipped as no distinction is made between
        # ADV_EXT_IND, AUX_ADV_IND, AUX_CHAIN_IND.
        controller = self.controller
        invalid_address = Address("11:22:33:44:55:66")
        test_rounds = [
            TestRound(0x0, ADV_IND, None, 0),
            TestRound(0x0, ADV_IND, None, 31),
            TestRound(0x0, ADV_DIRECT_IND, controller.address, 0),
            TestRound(0x0, ADV_DIRECT_IND, invalid_address, 0),
            TestRound(0x0, ADV_NONCONN_IND, None, 0),
            TestRound(0x0, ADV_NONCONN_IND, None, 31),
            TestRound(0x0, ADV_EXT_IND, None, 0),
            TestRound(0x0, ADV_EXT_IND, controller.address, 0),
            TestRound(0x0, ADV_EXT_IND, invalid_address, 0),
            TestRound(0x0, ADV_EXT_IND, None, 191),
            TestRound(0x0, ADV_EXT_IND, None, 382),
            TestRound(0x1f4, ADV_EXT_IND, controller.address, 0),
            TestRound(0x0, ADV_EXT_IND, None, 31),
            TestRound(0x0, ADV_EXT_IND, None, 1645),
        ]

        # 7. Repeat steps 1–5 for each Round shown in Table 4.30
        for test_round in test_rounds:
            await self.steps_1_6(**vars(test_round))

    async def steps_1_6(self, duration: int, advertising_event_properties: int, target_address: Optional[Address],
                        advertising_data_length: int):

        controller = self.controller
        lower_tester_address = Address("ca:fe:ca:fe:00:01")

        # 1. The Upper Tester sends an HCI_LE_Set_Extended_Scan_Parameters_Command to the IUT.
        # The Scanning_PHYs parameter shall be set as specified in Table 4.29, Scan_Type[0] set to 0x00
        # (Passive Scanning), Scan_Interval[0] set to 0x0010, and Scan_Window[0] set to 0x0010.
        # Own_Address_Type shall be set to 0x00 (Public Device Address), and Scanning_Filter_Policy
        # shall be set to 0x00 (Accept All).
        controller.send_cmd(
            hci.LeSetExtendedScanParameters(own_address_type=hci.OwnAddressType.PUBLIC_DEVICE_ADDRESS,
                                            scanning_filter_policy=hci.LeScanningFilterPolicy.ACCEPT_ALL,
                                            scanning_phys=0x1,
                                            parameters=[
                                                hci.PhyScanParameters(le_scan_type=hci.LeScanType.PASSIVE,
                                                                      le_scan_interval=0x0010,
                                                                      le_scan_window=0x0010)
                                            ]))

        await self.expect_evt(
            hci.LeSetExtendedScanParametersComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))

        # 2. The Upper Tester sends an HCI_LE_Set_Extended_Scan_Enable command to the IUT to enable
        # scanning. Filter_Duplicates and Period shall be set to zero. The Duration parameter shall be set
        # to the value specified in Table 4.30 for this round.
        controller.send_cmd(
            hci.LeSetExtendedScanEnable(enable=hci.Enable.ENABLED,
                                        filter_duplicates=hci.Enable.DISABLED,
                                        duration=duration,
                                        period=0))

        await self.expect_evt(hci.LeSetExtendedScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))

        # 3. The Lower Tester begins advertising using the PDU Type specified in Table 4.30 for this round. If
        # AUX_ADV_IND is included in the round, the ADV_EXT_IND shall include an AuxPtr that refers to
        # the AUX_ADV_IND, and all fields specified should be included with the AUX_ADV_IND only. If
        # AdvA is specified the appropriate PDU shall include the field, where “LT” equals the Lower Tester
        # address. If InitA/TargetA is specified the appropriate PDU shall include the field, where “IUT”
        # equals the IUT address and “Not IUT” equals a random address other than the IUT address. If
        # AdvData is specified the PDU shall include the field populated with random octets of the specified
        # count. If the AdvData is greater in length than will fit in one PDU, the Lower Tester shall include
        # an AuxPtr field and send one or more AUX_CHAIN_IND PDUs containing the remaining data.
        # Each PDU except the last shall contain as much AdvData as can fit. If Duration is set to 0x0000,
        # repeat for at least 20 advertising intervals, otherwise repeat until the end of the round.
        connectable = (advertising_event_properties & 0x1) != 0
        scannable = (advertising_event_properties & 0x2) != 0
        directed = (advertising_event_properties & 0x4) != 0
        high_duty_cycle = (advertising_event_properties & 0x8) != 0
        legacy = (advertising_event_properties & 0x10) != 0

        if legacy:
            if advertising_event_properties == ADV_IND:
                advertising_type = ll.LegacyAdvertisingType.ADV_IND
            elif advertising_event_properties == ADV_DIRECT_IND:
                advertising_type = ll.LegacyAdvertisingType.ADV_DIRECT_IND
            elif advertising_event_properties == ADV_SCAN_IND:
                advertising_type = ll.LegacyAdvertisingType.ADV_SCAN_IND
            elif advertising_event_properties == ADV_NONCONN_IND:
                advertising_type = ll.LegacyAdvertisingType.ADV_NONCONN_IND
            pdu = ll.LeLegacyAdvertisingPdu(source_address=lower_tester_address,
                                            destination_address=target_address or Address(),
                                            advertising_address_type=ll.AddressType.PUBLIC,
                                            target_address_type=ll.AddressType.PUBLIC,
                                            advertising_type=advertising_type,
                                            advertising_data=[])
        else:
            pdu = ll.LeExtendedAdvertisingPdu(source_address=lower_tester_address,
                                              destination_address=target_address or Address(),
                                              advertising_address_type=ll.AddressType.PUBLIC,
                                              target_address_type=ll.AddressType.PUBLIC,
                                              connectable=connectable,
                                              scannable=scannable,
                                              directed=not target_address is None,
                                              sid=0,
                                              tx_power=0x7f,
                                              primary_phy=ll.PrimaryPhyType.LE_1M,
                                              secondary_phy=ll.SecondaryPhyType.NO_PACKETS,
                                              advertising_data=[])

        # 4. For undirected advertisements or advertisements directed at the IUT, the Upper Tester receives
        # one or more HCI_LE_Extended_Advertising_Report events from the IUT with an advertising
        # event type matching the type sent in step 3 and the Primary_PHY set as specified in Table 4.29,
        # and if the advertisements used extended PDUs, the Secondary_PHY shall be set as specified in
        # Table 4.29. If AdvData was included in the advertisement, the Upper Tester receives the data
        # included in one or more of the advertising packets. If AdvData is sent to the Upper Tester in
        # multiple reports, the Data Status in the Event_Type field for each report except the last is set to
        # “Incomplete, more data to come”, 0b01. The Event_Type field for the last report sent with
        # advertisement data is set to “Complete”, 0b00. If the advertisement was directed at the IUT, the
        # Upper Tester receives the Direct Address Type and Direct Address used to direct the
        # advertisement at the IUT.
        for n in range(3):
            advertising_data = [random.randint(1, 254) for n in range(advertising_data_length)]
            pdu.advertising_data = advertising_data

            if not legacy:
                sid = random.randint(0, 15)
                pdu.sid = sid
            else:
                sid = 0xff

            controller.send_ll(pdu, rssi=0)

            if target_address and target_address != controller.address:
                # If the controller still emits an event, the error
                # will appear in the subsequent rounds.
                continue

            offset = 0
            max_fragment_length = 229
            num_fragments = math.ceil(advertising_data_length / max_fragment_length) or 1
            for n in range(num_fragments):
                remaining_length = advertising_data_length - offset
                fragment_length = min(max_fragment_length, remaining_length)
                data_status = hci.DataStatus.CONTINUING if remaining_length > max_fragment_length else hci.DataStatus.COMPLETE
                await self.expect_evt(
                    hci.LeExtendedAdvertisingReportRaw(responses=[
                        hci.LeExtendedAdvertisingResponseRaw(
                            connectable=connectable,
                            scannable=scannable,
                            directed=not target_address is None,
                            scan_response=False,
                            legacy=legacy,
                            data_status=data_status,
                            address_type=hci.AddressType.PUBLIC_DEVICE_ADDRESS,
                            address=lower_tester_address,
                            primary_phy=hci.PrimaryPhyType.LE_1M,
                            secondary_phy=hci.SecondaryPhyType.NO_PACKETS,
                            advertising_sid=sid,
                            tx_power=0x7f,
                            rssi=0,
                            periodic_advertising_interval=0,
                            direct_address_type=hci.DirectAdvertisingAddressType.NO_ADDRESS_PROVIDED
                            if not target_address else hci.DirectAdvertisingAddressType.PUBLIC_DEVICE_ADDRESS,
                            direct_address=target_address or Address(),
                            advertising_data=advertising_data[offset:offset + fragment_length])
                    ]))
                offset += fragment_length

        if duration > 0:
            # 5. If the Duration was set to 0x0000 (No Scanning Duration), repeat step 4 until a number of
            # advertising reports (10) have been generated. Each time the Upper Tester receives a report, the
            # Lower Tester shall change the AdvData, if any. If the round uses extended advertising PDUs, it
            # shall also change the DID sub-field of the ADI field to a new value.
            # Otherwise repeat step 4 until the amount of time specified for Duration has elapsed. Afterwards,
            # the Upper Tester receives an HCI_Scan_Timeout event from the IUT. Skip step 6.
            timeout = duration * 100 + 10
            await self.expect_evt(hci.LeScanTimeout(), timeout=timeout)

        else:
            # 6. Upper Tester sends an HCI_LE_Set_Scan_Enable to the IUT to disable scanning and receives
            # an HCI_Command_Complete event in response.
            controller.send_cmd(hci.LeSetExtendedScanEnable(enable=hci.Enable.DISABLED))

            await self.expect_evt(
                hci.LeSetExtendedScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+1 −0
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ tests = [
    'LL.DDI.SCN.BV_13_C',
    'LL.DDI.SCN.BV_14_C',
    'LL.DDI.SCN.BV_18_C',
    'LL.DDI.SCN.BV_19_C',
]

if __name__ == "__main__":