Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d6c3e1a1 authored by Henri Chataing's avatar Henri Chataing Committed by Gerrit Code Review
Browse files

Merge "Revert "RootCanal: Implement rootcanal_ll_test""

parents 1c2f61ce cf9df6a6
Loading
Loading
Loading
Loading
+0 −101
Original line number Diff line number Diff line
@@ -103,84 +103,6 @@ cc_library_static {
    ],
}

// This library implements Python bindings to the DualModeController
// class to enable scripted testing in Python.
cc_library_host_shared {
    name: "lib_rootcanal_python3",
    defaults: [
        "bluetooth_py3_native_extension_defaults",
        "rootcanal_defaults",
    ],
    srcs: [
        "model/controller/acl_connection.cc",
        "model/controller/acl_connection_handler.cc",
        "model/controller/controller_properties.cc",
        "model/controller/dual_mode_controller.cc",
        "model/controller/dual_mode_controller_python3.cc",
        "model/controller/isochronous_connection_handler.cc",
        "model/controller/le_advertiser.cc",
        "model/controller/link_layer_controller.cc",
        "model/controller/sco_connection.cc",
        "model/controller/security_manager.cc",
        "model/devices/device.cc",
        "model/setup/async_manager.cc",
        ":BluetoothPacketSources",
        ":BluetoothHciClassSources",
        ":BluetoothCryptoToolboxSources",
    ],
    export_include_dirs: [
        "include",
        ".",
    ],
    stl: "libc++_static",
    static_libs: [
        "liblog",
        "libjsoncpp",
    ],
    whole_static_libs: [
        "liblmp",
    ],
    header_libs: [
        "pybind11_headers",
    ],
    cflags: [
        "-fexceptions",
    ],
    rtti: true,
}

// Generate the python parser+serializer backend for
// packets/link_layer_packets.pdl.
genrule {
    name: "link_layer_packets_python3_gen",
    defaults: [ "pdl_python_generator_defaults" ],
    cmd: "$(location :pdl) $(in) |" +
        " $(location :pdl_python_generator)" +
        " --output $(out) --custom-type-location py.bluetooth",
    srcs: [
        "packets/link_layer_packets.pdl",
    ],
    out: [
        "link_layer_packets.py",
    ],
}

// Generate the python parser+serializer backend for
// hci_packets.pdl.
genrule {
    name: "hci_packets_python3_gen",
    defaults: [ "pdl_python_generator_defaults" ],
    cmd: "$(location :pdl) $(in) |" +
        " $(location :pdl_python_generator)" +
        " --output $(out) --custom-type-location py.bluetooth",
    srcs: [
        ":BluetoothHciPackets",
    ],
    out: [
        "hci_packets.py",
    ],
}

cc_library_static {
    name: "libscriptedbeaconpayload-protos-lite",
    host_supported: true,
@@ -238,29 +160,6 @@ cc_test_host {
    ],
}

// Implement the Bluetooth official LL test suite for root-canal.
python_test_host {
    name: "rootcanal_ll_test",
    main: "test/main.py",
    srcs: [
        "py/controller.py",
        "py/bluetooth.py",
        ":hci_packets_python3_gen",
        ":link_layer_packets_python3_gen",
        "test/main.py",
        "test/LL/DDI/SCN/BV_13_C.py",
    ],
    data: [
        ":lib_rootcanal_python3",
    ],
    libs: [
        "typing_extensions",
    ],
    test_options: {
        unit_test: true,
    },
}

// test-vendor unit tests for host
cc_test_host {
    name: "rootcanal_test_host",
+0 −225
Original line number Diff line number Diff line
/*
 * Copyright 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <pybind11/pybind11.h>
#include <pybind11/stl.h>

#include "dual_mode_controller.h"

using namespace std::literals;
namespace py = pybind11;

namespace rootcanal {

namespace hci {
enum Type {
  CMD,
  EVT,
  ACL,
  SCO,
  ISO,
};
}  // namespace hci

// Overload the class DualModeController to implement
// SendLinkLayerPacket as forwarding packets to a registered handler.
class BaseController : public DualModeController {
 public:
  BaseController() : DualModeController() {
    RegisterTaskScheduler(
        [this](std::chrono::milliseconds delay, TaskCallback const& task) {
          return this->async_manager_.ExecAsync(0, delay, task);
        });
    RegisterPeriodicTaskScheduler([this](std::chrono::milliseconds delay,
                                         std::chrono::milliseconds period,
                                         TaskCallback const& task) {
      return this->async_manager_.ExecAsyncPeriodically(0, delay, period, task);
    });
    RegisterTaskCancel([this](AsyncTaskId task_id) {
      this->async_manager_.CancelAsyncTask(task_id);
    });
  }
  ~BaseController() = default;

  void RegisterLLChannel(
      std::function<void(std::shared_ptr<std::vector<uint8_t>>)> const&
          send_ll) {
    send_ll_ = send_ll;
  }

  void Start() {
    if (timer_task_id_ == kInvalidTaskId) {
      timer_task_id_ = async_manager_.ExecAsyncPeriodically(
          0, 0ms, 5ms, [this]() { this->TimerTick(); });
    }
  }

  void Stop() {
    if (timer_task_id_ != kInvalidTaskId) {
      async_manager_.CancelAsyncTask(timer_task_id_);
      timer_task_id_ = kInvalidTaskId;
    }
  }

  virtual void SendLinkLayerPacket(
      std::shared_ptr<model::packets::LinkLayerPacketBuilder> packet,
      Phy::Type phy_type_) override {
    (void)phy_type_;
    auto bytes = std::make_shared<std::vector<uint8_t>>();
    bluetooth::packet::BitInserter inserter(*bytes);
    bytes->reserve(packet->size());
    packet->Serialize(inserter);
    send_ll_(bytes);
  }

 private:
  std::function<void(std::shared_ptr<std::vector<uint8_t>>)> send_ll_{};
  AsyncManager async_manager_;
  AsyncTaskId timer_task_id_;

  BaseController(BaseController const&) = delete;
  DualModeController& operator=(BaseController const&) = delete;
};

PYBIND11_MODULE(lib_rootcanal_python3, m) {
  m.doc() = "RootCanal controller plugin";

  py::enum_<hci::Type>(m, "HciType")
      .value("Cmd", hci::Type::CMD)
      .value("Evt", hci::Type::EVT)
      .value("Acl", hci::Type::ACL)
      .value("Sco", hci::Type::SCO)
      .value("Iso", hci::Type::ISO);

  m.def(
      "generate_rpa",
      [](py::bytes arg) {
        std::string irk_str = arg;
        irk_str.resize(LinkLayerController::kIrkSize);

        std::array<uint8_t, LinkLayerController::kIrkSize> irk{};
        std::copy(irk_str.begin(), irk_str.end(), irk.begin());

        bluetooth::hci::Address rpa =
            rootcanal::LinkLayerController::generate_rpa(irk);
        return rpa.address;
      },
      "Bluetooth RPA generation");

  py::class_<rootcanal::BaseController,
             std::shared_ptr<rootcanal::BaseController>>
      basic_controller(m, "BaseController");

  // Implement the constructor with two callback parameters to
  // handle emitted HCI packets and LL packets.
  basic_controller.def(py::init([](py::object hci_handler,
                                   py::object ll_handler) {
    std::shared_ptr<BaseController> controller =
        std::make_shared<BaseController>();
    controller->RegisterEventChannel(
        [=](std::shared_ptr<std::vector<uint8_t>> data) {
          pybind11::gil_scoped_acquire acquire;
          hci_handler(
              hci::Type::EVT,
              py::bytes(reinterpret_cast<char*>(data->data()), data->size()));
        });
    controller->RegisterAclChannel(
        [=](std::shared_ptr<std::vector<uint8_t>> data) {
          pybind11::gil_scoped_acquire acquire;
          hci_handler(
              hci::Type::ACL,
              py::bytes(reinterpret_cast<char*>(data->data()), data->size()));
        });
    controller->RegisterScoChannel(
        [=](std::shared_ptr<std::vector<uint8_t>> data) {
          pybind11::gil_scoped_acquire acquire;
          hci_handler(
              hci::Type::SCO,
              py::bytes(reinterpret_cast<char*>(data->data()), data->size()));
        });
    controller->RegisterIsoChannel(
        [=](std::shared_ptr<std::vector<uint8_t>> data) {
          pybind11::gil_scoped_acquire acquire;
          hci_handler(
              hci::Type::ISO,
              py::bytes(reinterpret_cast<char*>(data->data()), data->size()));
        });
    controller->RegisterLLChannel(
        [=](std::shared_ptr<std::vector<uint8_t>> data) {
          pybind11::gil_scoped_acquire acquire;
          ll_handler(
              py::bytes(reinterpret_cast<char*>(data->data()), data->size()));
        });
    return controller;
  }));

  // Timer interface.
  basic_controller.def("start", &BaseController::Start);
  basic_controller.def("stop", &BaseController::Stop);

  // Implement method BaseController.receive_hci which
  // injects HCI packets into the controller as if sent from the host.
  basic_controller.def(
      "send_hci", [](std::shared_ptr<rootcanal::BaseController> controller,
                     hci::Type typ, py::bytes data) {
        std::string data_str = data;
        std::shared_ptr<std::vector<uint8_t>> bytes =
            std::make_shared<std::vector<uint8_t>>(data_str.begin(),
                                                   data_str.end());

        switch (typ) {
          case hci::Type::CMD:
            controller->HandleCommand(bytes);
            break;
          case hci::Type::ACL:
            controller->HandleAcl(bytes);
            break;
          case hci::Type::SCO:
            controller->HandleSco(bytes);
            break;
          case hci::Type::ISO:
            controller->HandleIso(bytes);
            break;
          default:
            std::cerr << "Dropping HCI packet with unknown type " << typ
                      << std::endl;
            break;
        }
      });

  // Implement method BaseController.receive_hci which
  // injects LL packets into the controller as if sent over the air.
  basic_controller.def(
      "send_ll", [](std::shared_ptr<rootcanal::BaseController> controller,
                    py::bytes data) {
        std::string data_str = data;
        std::shared_ptr<std::vector<uint8_t>> bytes =
            std::make_shared<std::vector<uint8_t>>(data_str.begin(),
                                                   data_str.end());

        model::packets::LinkLayerPacketView packet =
            model::packets::LinkLayerPacketView::Create(
                bluetooth::packet::PacketView<bluetooth::packet::kLittleEndian>(
                    bytes));
        if (!packet.IsValid()) {
          std::cerr << "Dropping malformed LL packet" << std::endl;
          return;
        }
        controller->IncomingPacket(std::move(packet));
      });
}

}  // namespace rootcanal
+4 −1
Original line number Diff line number Diff line
@@ -212,6 +212,9 @@ std::optional<AddressWithType> LinkLayerController::ResolvePrivateAddress(
  return {};
}

static Address generate_rpa(
    std::array<uint8_t, LinkLayerController::kIrkSize> irk);

std::optional<AddressWithType>
LinkLayerController::GenerateResolvablePrivateAddress(AddressWithType address,
                                                      IrkSelection irk) {
@@ -2610,7 +2613,7 @@ void LinkLayerController::IncomingKeypressNotificationPacket(
}
#endif /* !ROOTCANAL_LMP */

Address LinkLayerController::generate_rpa(
static Address generate_rpa(
    std::array<uint8_t, LinkLayerController::kIrkSize> irk) {
  // most significant bit, bit7, bit6 is 01 to be resolvable random
  // Bits of the random part of prand shall not be all 1 or all 0
+0 −4
Original line number Diff line number Diff line
@@ -62,10 +62,6 @@ class LinkLayerController {
 public:
  static constexpr size_t kIrkSize = 16;

  // Generate a resolvable private address using the specified IRK.
  static Address generate_rpa(
      std::array<uint8_t, LinkLayerController::kIrkSize> irk);

  LinkLayerController(const Address& address,
                      const ControllerProperties& properties);

tools/rootcanal/py/bluetooth.py

deleted100644 → 0
+0 −48
Original line number Diff line number Diff line
from dataclasses import dataclass, field
from typing import Tuple


@dataclass
class Address:
    address: bytes = field(default=bytes([0, 0, 0, 0, 0, 0]))

    def __post_init__(self):
        self.address = bytes(self.address)

    def from_str(address: str) -> 'Address':
        return Address(bytes([int(b, 16) for b in address.split(':')]))

    def parse(span: bytes) -> Tuple['Address', bytes]:
        assert len(span) > 6
        return (Address(bytes(reversed(span[:6]))), span[6:])

    def parse_all(span: bytes) -> 'Address':
        assert (len(span) == 6)
        return Address(bytes(reversed(span)))

    def serialize(self) -> bytes:
        return bytes(reversed(self.address))

    def __repr__(self) -> str:
        return ':'.join([f'{b:02x}' for b in self.address])

    @property
    def size(self) -> int:
        return 6


@dataclass
class ClassOfDevice:

    def parse(span: bytes) -> Tuple['Address', bytes]:
        assert False

    def parse_all(span: bytes) -> 'Address':
        assert False

    def serialize(self) -> bytes:
        assert False

    @property
    def size(self) -> int:
        assert False
Loading