Loading tools/rootcanal/Android.bp +101 −0 Original line number Diff line number Diff line Loading @@ -103,6 +103,84 @@ cc_library_static { ], } // This library implements Python bindings to the DualModeController // class to enable scripted testing in Python. cc_library_host_shared { name: "lib_rootcanal_python3", defaults: [ "bluetooth_py3_native_extension_defaults", "rootcanal_defaults", ], srcs: [ "model/controller/acl_connection.cc", "model/controller/acl_connection_handler.cc", "model/controller/controller_properties.cc", "model/controller/dual_mode_controller.cc", "model/controller/dual_mode_controller_python3.cc", "model/controller/isochronous_connection_handler.cc", "model/controller/le_advertiser.cc", "model/controller/link_layer_controller.cc", "model/controller/sco_connection.cc", "model/controller/security_manager.cc", "model/devices/device.cc", "model/setup/async_manager.cc", ":BluetoothPacketSources", ":BluetoothHciClassSources", ":BluetoothCryptoToolboxSources", ], export_include_dirs: [ "include", ".", ], stl: "libc++_static", static_libs: [ "liblog", "libjsoncpp", ], whole_static_libs: [ "liblmp", ], header_libs: [ "pybind11_headers", ], cflags: [ "-fexceptions", ], rtti: true, } // Generate the python parser+serializer backend for // packets/link_layer_packets.pdl. genrule { name: "link_layer_packets_python3_gen", defaults: [ "pdl_python_generator_defaults" ], cmd: "$(location :pdl) $(in) |" + " $(location :pdl_python_generator)" + " --output $(out) --custom-type-location py.bluetooth", srcs: [ "packets/link_layer_packets.pdl", ], out: [ "link_layer_packets.py", ], } // Generate the python parser+serializer backend for // hci_packets.pdl. genrule { name: "hci_packets_python3_gen", defaults: [ "pdl_python_generator_defaults" ], cmd: "$(location :pdl) $(in) |" + " $(location :pdl_python_generator)" + " --output $(out) --custom-type-location py.bluetooth", srcs: [ ":BluetoothHciPackets", ], out: [ "hci_packets.py", ], } cc_library_static { name: "libscriptedbeaconpayload-protos-lite", host_supported: true, Loading Loading @@ -160,6 +238,29 @@ cc_test_host { ], } // Implement the Bluetooth official LL test suite for root-canal. python_test_host { name: "rootcanal_ll_test", main: "test/main.py", srcs: [ "py/controller.py", "py/bluetooth.py", ":hci_packets_python3_gen", ":link_layer_packets_python3_gen", "test/main.py", "test/LL/DDI/SCN/BV_13_C.py", ], data: [ ":lib_rootcanal_python3", ], libs: [ "typing_extensions", ], test_options: { unit_test: true, }, } // test-vendor unit tests for host cc_test_host { name: "rootcanal_test_host", Loading tools/rootcanal/model/controller/dual_mode_controller_python3.cc 0 → 100644 +225 −0 Original line number Diff line number Diff line /* * Copyright 2022 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <pybind11/pybind11.h> #include <pybind11/stl.h> #include "dual_mode_controller.h" using namespace std::literals; namespace py = pybind11; namespace rootcanal { namespace hci { enum Type { CMD, EVT, ACL, SCO, ISO, }; } // namespace hci // Overload the class DualModeController to implement // SendLinkLayerPacket as forwarding packets to a registered handler. class BaseController : public DualModeController { public: BaseController() : DualModeController() { RegisterTaskScheduler( [this](std::chrono::milliseconds delay, TaskCallback const& task) { return this->async_manager_.ExecAsync(0, delay, task); }); RegisterPeriodicTaskScheduler([this](std::chrono::milliseconds delay, std::chrono::milliseconds period, TaskCallback const& task) { return this->async_manager_.ExecAsyncPeriodically(0, delay, period, task); }); RegisterTaskCancel([this](AsyncTaskId task_id) { this->async_manager_.CancelAsyncTask(task_id); }); } ~BaseController() = default; void RegisterLLChannel( std::function<void(std::shared_ptr<std::vector<uint8_t>>)> const& send_ll) { send_ll_ = send_ll; } void Start() { if (timer_task_id_ == kInvalidTaskId) { timer_task_id_ = async_manager_.ExecAsyncPeriodically( 0, 0ms, 5ms, [this]() { this->TimerTick(); }); } } void Stop() { if (timer_task_id_ != kInvalidTaskId) { async_manager_.CancelAsyncTask(timer_task_id_); timer_task_id_ = kInvalidTaskId; } } virtual void SendLinkLayerPacket( std::shared_ptr<model::packets::LinkLayerPacketBuilder> packet, Phy::Type phy_type_) override { (void)phy_type_; auto bytes = std::make_shared<std::vector<uint8_t>>(); bluetooth::packet::BitInserter inserter(*bytes); bytes->reserve(packet->size()); packet->Serialize(inserter); send_ll_(bytes); } private: std::function<void(std::shared_ptr<std::vector<uint8_t>>)> send_ll_{}; AsyncManager async_manager_; AsyncTaskId timer_task_id_; BaseController(BaseController const&) = delete; DualModeController& operator=(BaseController const&) = delete; }; PYBIND11_MODULE(lib_rootcanal_python3, m) { m.doc() = "RootCanal controller plugin"; py::enum_<hci::Type>(m, "HciType") .value("Cmd", hci::Type::CMD) .value("Evt", hci::Type::EVT) .value("Acl", hci::Type::ACL) .value("Sco", hci::Type::SCO) .value("Iso", hci::Type::ISO); m.def( "generate_rpa", [](py::bytes arg) { std::string irk_str = arg; irk_str.resize(LinkLayerController::kIrkSize); std::array<uint8_t, LinkLayerController::kIrkSize> irk{}; std::copy(irk_str.begin(), irk_str.end(), irk.begin()); bluetooth::hci::Address rpa = rootcanal::LinkLayerController::generate_rpa(irk); return rpa.address; }, "Bluetooth RPA generation"); py::class_<rootcanal::BaseController, std::shared_ptr<rootcanal::BaseController>> basic_controller(m, "BaseController"); // Implement the constructor with two callback parameters to // handle emitted HCI packets and LL packets. basic_controller.def(py::init([](py::object hci_handler, py::object ll_handler) { std::shared_ptr<BaseController> controller = std::make_shared<BaseController>(); controller->RegisterEventChannel( [=](std::shared_ptr<std::vector<uint8_t>> data) { pybind11::gil_scoped_acquire acquire; hci_handler( hci::Type::EVT, py::bytes(reinterpret_cast<char*>(data->data()), data->size())); }); controller->RegisterAclChannel( [=](std::shared_ptr<std::vector<uint8_t>> data) { pybind11::gil_scoped_acquire acquire; hci_handler( hci::Type::ACL, py::bytes(reinterpret_cast<char*>(data->data()), data->size())); }); controller->RegisterScoChannel( [=](std::shared_ptr<std::vector<uint8_t>> data) { pybind11::gil_scoped_acquire acquire; hci_handler( hci::Type::SCO, py::bytes(reinterpret_cast<char*>(data->data()), data->size())); }); controller->RegisterIsoChannel( [=](std::shared_ptr<std::vector<uint8_t>> data) { pybind11::gil_scoped_acquire acquire; hci_handler( hci::Type::ISO, py::bytes(reinterpret_cast<char*>(data->data()), data->size())); }); controller->RegisterLLChannel( [=](std::shared_ptr<std::vector<uint8_t>> data) { pybind11::gil_scoped_acquire acquire; ll_handler( py::bytes(reinterpret_cast<char*>(data->data()), data->size())); }); return controller; })); // Timer interface. basic_controller.def("start", &BaseController::Start); basic_controller.def("stop", &BaseController::Stop); // Implement method BaseController.receive_hci which // injects HCI packets into the controller as if sent from the host. basic_controller.def( "send_hci", [](std::shared_ptr<rootcanal::BaseController> controller, hci::Type typ, py::bytes data) { std::string data_str = data; std::shared_ptr<std::vector<uint8_t>> bytes = std::make_shared<std::vector<uint8_t>>(data_str.begin(), data_str.end()); switch (typ) { case hci::Type::CMD: controller->HandleCommand(bytes); break; case hci::Type::ACL: controller->HandleAcl(bytes); break; case hci::Type::SCO: controller->HandleSco(bytes); break; case hci::Type::ISO: controller->HandleIso(bytes); break; default: std::cerr << "Dropping HCI packet with unknown type " << typ << std::endl; break; } }); // Implement method BaseController.receive_hci which // injects LL packets into the controller as if sent over the air. basic_controller.def( "send_ll", [](std::shared_ptr<rootcanal::BaseController> controller, py::bytes data) { std::string data_str = data; std::shared_ptr<std::vector<uint8_t>> bytes = std::make_shared<std::vector<uint8_t>>(data_str.begin(), data_str.end()); model::packets::LinkLayerPacketView packet = model::packets::LinkLayerPacketView::Create( bluetooth::packet::PacketView<bluetooth::packet::kLittleEndian>( bytes)); if (!packet.IsValid()) { std::cerr << "Dropping malformed LL packet" << std::endl; return; } controller->IncomingPacket(std::move(packet)); }); } } // namespace rootcanal tools/rootcanal/model/controller/link_layer_controller.cc +1 −4 Original line number Diff line number Diff line Loading @@ -212,9 +212,6 @@ std::optional<AddressWithType> LinkLayerController::ResolvePrivateAddress( return {}; } static Address generate_rpa( std::array<uint8_t, LinkLayerController::kIrkSize> irk); std::optional<AddressWithType> LinkLayerController::GenerateResolvablePrivateAddress(AddressWithType address, IrkSelection irk) { Loading Loading @@ -2613,7 +2610,7 @@ void LinkLayerController::IncomingKeypressNotificationPacket( } #endif /* !ROOTCANAL_LMP */ static Address generate_rpa( Address LinkLayerController::generate_rpa( std::array<uint8_t, LinkLayerController::kIrkSize> irk) { // most significant bit, bit7, bit6 is 01 to be resolvable random // Bits of the random part of prand shall not be all 1 or all 0 Loading tools/rootcanal/model/controller/link_layer_controller.h +4 −0 Original line number Diff line number Diff line Loading @@ -62,6 +62,10 @@ class LinkLayerController { public: static constexpr size_t kIrkSize = 16; // Generate a resolvable private address using the specified IRK. static Address generate_rpa( std::array<uint8_t, LinkLayerController::kIrkSize> irk); LinkLayerController(const Address& address, const ControllerProperties& properties); Loading tools/rootcanal/py/bluetooth.py 0 → 100644 +48 −0 Original line number Diff line number Diff line from dataclasses import dataclass, field from typing import Tuple @dataclass class Address: address: bytes = field(default=bytes([0, 0, 0, 0, 0, 0])) def __post_init__(self): self.address = bytes(self.address) def from_str(address: str) -> 'Address': return Address(bytes([int(b, 16) for b in address.split(':')])) def parse(span: bytes) -> Tuple['Address', bytes]: assert len(span) > 6 return (Address(bytes(reversed(span[:6]))), span[6:]) def parse_all(span: bytes) -> 'Address': assert (len(span) == 6) return Address(bytes(reversed(span))) def serialize(self) -> bytes: return bytes(reversed(self.address)) def __repr__(self) -> str: return ':'.join([f'{b:02x}' for b in self.address]) @property def size(self) -> int: return 6 @dataclass class ClassOfDevice: def parse(span: bytes) -> Tuple['Address', bytes]: assert False def parse_all(span: bytes) -> 'Address': assert False def serialize(self) -> bytes: assert False @property def size(self) -> int: assert False Loading
tools/rootcanal/Android.bp +101 −0 Original line number Diff line number Diff line Loading @@ -103,6 +103,84 @@ cc_library_static { ], } // This library implements Python bindings to the DualModeController // class to enable scripted testing in Python. cc_library_host_shared { name: "lib_rootcanal_python3", defaults: [ "bluetooth_py3_native_extension_defaults", "rootcanal_defaults", ], srcs: [ "model/controller/acl_connection.cc", "model/controller/acl_connection_handler.cc", "model/controller/controller_properties.cc", "model/controller/dual_mode_controller.cc", "model/controller/dual_mode_controller_python3.cc", "model/controller/isochronous_connection_handler.cc", "model/controller/le_advertiser.cc", "model/controller/link_layer_controller.cc", "model/controller/sco_connection.cc", "model/controller/security_manager.cc", "model/devices/device.cc", "model/setup/async_manager.cc", ":BluetoothPacketSources", ":BluetoothHciClassSources", ":BluetoothCryptoToolboxSources", ], export_include_dirs: [ "include", ".", ], stl: "libc++_static", static_libs: [ "liblog", "libjsoncpp", ], whole_static_libs: [ "liblmp", ], header_libs: [ "pybind11_headers", ], cflags: [ "-fexceptions", ], rtti: true, } // Generate the python parser+serializer backend for // packets/link_layer_packets.pdl. genrule { name: "link_layer_packets_python3_gen", defaults: [ "pdl_python_generator_defaults" ], cmd: "$(location :pdl) $(in) |" + " $(location :pdl_python_generator)" + " --output $(out) --custom-type-location py.bluetooth", srcs: [ "packets/link_layer_packets.pdl", ], out: [ "link_layer_packets.py", ], } // Generate the python parser+serializer backend for // hci_packets.pdl. genrule { name: "hci_packets_python3_gen", defaults: [ "pdl_python_generator_defaults" ], cmd: "$(location :pdl) $(in) |" + " $(location :pdl_python_generator)" + " --output $(out) --custom-type-location py.bluetooth", srcs: [ ":BluetoothHciPackets", ], out: [ "hci_packets.py", ], } cc_library_static { name: "libscriptedbeaconpayload-protos-lite", host_supported: true, Loading Loading @@ -160,6 +238,29 @@ cc_test_host { ], } // Implement the Bluetooth official LL test suite for root-canal. python_test_host { name: "rootcanal_ll_test", main: "test/main.py", srcs: [ "py/controller.py", "py/bluetooth.py", ":hci_packets_python3_gen", ":link_layer_packets_python3_gen", "test/main.py", "test/LL/DDI/SCN/BV_13_C.py", ], data: [ ":lib_rootcanal_python3", ], libs: [ "typing_extensions", ], test_options: { unit_test: true, }, } // test-vendor unit tests for host cc_test_host { name: "rootcanal_test_host", Loading
tools/rootcanal/model/controller/dual_mode_controller_python3.cc 0 → 100644 +225 −0 Original line number Diff line number Diff line /* * Copyright 2022 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <pybind11/pybind11.h> #include <pybind11/stl.h> #include "dual_mode_controller.h" using namespace std::literals; namespace py = pybind11; namespace rootcanal { namespace hci { enum Type { CMD, EVT, ACL, SCO, ISO, }; } // namespace hci // Overload the class DualModeController to implement // SendLinkLayerPacket as forwarding packets to a registered handler. class BaseController : public DualModeController { public: BaseController() : DualModeController() { RegisterTaskScheduler( [this](std::chrono::milliseconds delay, TaskCallback const& task) { return this->async_manager_.ExecAsync(0, delay, task); }); RegisterPeriodicTaskScheduler([this](std::chrono::milliseconds delay, std::chrono::milliseconds period, TaskCallback const& task) { return this->async_manager_.ExecAsyncPeriodically(0, delay, period, task); }); RegisterTaskCancel([this](AsyncTaskId task_id) { this->async_manager_.CancelAsyncTask(task_id); }); } ~BaseController() = default; void RegisterLLChannel( std::function<void(std::shared_ptr<std::vector<uint8_t>>)> const& send_ll) { send_ll_ = send_ll; } void Start() { if (timer_task_id_ == kInvalidTaskId) { timer_task_id_ = async_manager_.ExecAsyncPeriodically( 0, 0ms, 5ms, [this]() { this->TimerTick(); }); } } void Stop() { if (timer_task_id_ != kInvalidTaskId) { async_manager_.CancelAsyncTask(timer_task_id_); timer_task_id_ = kInvalidTaskId; } } virtual void SendLinkLayerPacket( std::shared_ptr<model::packets::LinkLayerPacketBuilder> packet, Phy::Type phy_type_) override { (void)phy_type_; auto bytes = std::make_shared<std::vector<uint8_t>>(); bluetooth::packet::BitInserter inserter(*bytes); bytes->reserve(packet->size()); packet->Serialize(inserter); send_ll_(bytes); } private: std::function<void(std::shared_ptr<std::vector<uint8_t>>)> send_ll_{}; AsyncManager async_manager_; AsyncTaskId timer_task_id_; BaseController(BaseController const&) = delete; DualModeController& operator=(BaseController const&) = delete; }; PYBIND11_MODULE(lib_rootcanal_python3, m) { m.doc() = "RootCanal controller plugin"; py::enum_<hci::Type>(m, "HciType") .value("Cmd", hci::Type::CMD) .value("Evt", hci::Type::EVT) .value("Acl", hci::Type::ACL) .value("Sco", hci::Type::SCO) .value("Iso", hci::Type::ISO); m.def( "generate_rpa", [](py::bytes arg) { std::string irk_str = arg; irk_str.resize(LinkLayerController::kIrkSize); std::array<uint8_t, LinkLayerController::kIrkSize> irk{}; std::copy(irk_str.begin(), irk_str.end(), irk.begin()); bluetooth::hci::Address rpa = rootcanal::LinkLayerController::generate_rpa(irk); return rpa.address; }, "Bluetooth RPA generation"); py::class_<rootcanal::BaseController, std::shared_ptr<rootcanal::BaseController>> basic_controller(m, "BaseController"); // Implement the constructor with two callback parameters to // handle emitted HCI packets and LL packets. basic_controller.def(py::init([](py::object hci_handler, py::object ll_handler) { std::shared_ptr<BaseController> controller = std::make_shared<BaseController>(); controller->RegisterEventChannel( [=](std::shared_ptr<std::vector<uint8_t>> data) { pybind11::gil_scoped_acquire acquire; hci_handler( hci::Type::EVT, py::bytes(reinterpret_cast<char*>(data->data()), data->size())); }); controller->RegisterAclChannel( [=](std::shared_ptr<std::vector<uint8_t>> data) { pybind11::gil_scoped_acquire acquire; hci_handler( hci::Type::ACL, py::bytes(reinterpret_cast<char*>(data->data()), data->size())); }); controller->RegisterScoChannel( [=](std::shared_ptr<std::vector<uint8_t>> data) { pybind11::gil_scoped_acquire acquire; hci_handler( hci::Type::SCO, py::bytes(reinterpret_cast<char*>(data->data()), data->size())); }); controller->RegisterIsoChannel( [=](std::shared_ptr<std::vector<uint8_t>> data) { pybind11::gil_scoped_acquire acquire; hci_handler( hci::Type::ISO, py::bytes(reinterpret_cast<char*>(data->data()), data->size())); }); controller->RegisterLLChannel( [=](std::shared_ptr<std::vector<uint8_t>> data) { pybind11::gil_scoped_acquire acquire; ll_handler( py::bytes(reinterpret_cast<char*>(data->data()), data->size())); }); return controller; })); // Timer interface. basic_controller.def("start", &BaseController::Start); basic_controller.def("stop", &BaseController::Stop); // Implement method BaseController.receive_hci which // injects HCI packets into the controller as if sent from the host. basic_controller.def( "send_hci", [](std::shared_ptr<rootcanal::BaseController> controller, hci::Type typ, py::bytes data) { std::string data_str = data; std::shared_ptr<std::vector<uint8_t>> bytes = std::make_shared<std::vector<uint8_t>>(data_str.begin(), data_str.end()); switch (typ) { case hci::Type::CMD: controller->HandleCommand(bytes); break; case hci::Type::ACL: controller->HandleAcl(bytes); break; case hci::Type::SCO: controller->HandleSco(bytes); break; case hci::Type::ISO: controller->HandleIso(bytes); break; default: std::cerr << "Dropping HCI packet with unknown type " << typ << std::endl; break; } }); // Implement method BaseController.receive_hci which // injects LL packets into the controller as if sent over the air. basic_controller.def( "send_ll", [](std::shared_ptr<rootcanal::BaseController> controller, py::bytes data) { std::string data_str = data; std::shared_ptr<std::vector<uint8_t>> bytes = std::make_shared<std::vector<uint8_t>>(data_str.begin(), data_str.end()); model::packets::LinkLayerPacketView packet = model::packets::LinkLayerPacketView::Create( bluetooth::packet::PacketView<bluetooth::packet::kLittleEndian>( bytes)); if (!packet.IsValid()) { std::cerr << "Dropping malformed LL packet" << std::endl; return; } controller->IncomingPacket(std::move(packet)); }); } } // namespace rootcanal
tools/rootcanal/model/controller/link_layer_controller.cc +1 −4 Original line number Diff line number Diff line Loading @@ -212,9 +212,6 @@ std::optional<AddressWithType> LinkLayerController::ResolvePrivateAddress( return {}; } static Address generate_rpa( std::array<uint8_t, LinkLayerController::kIrkSize> irk); std::optional<AddressWithType> LinkLayerController::GenerateResolvablePrivateAddress(AddressWithType address, IrkSelection irk) { Loading Loading @@ -2613,7 +2610,7 @@ void LinkLayerController::IncomingKeypressNotificationPacket( } #endif /* !ROOTCANAL_LMP */ static Address generate_rpa( Address LinkLayerController::generate_rpa( std::array<uint8_t, LinkLayerController::kIrkSize> irk) { // most significant bit, bit7, bit6 is 01 to be resolvable random // Bits of the random part of prand shall not be all 1 or all 0 Loading
tools/rootcanal/model/controller/link_layer_controller.h +4 −0 Original line number Diff line number Diff line Loading @@ -62,6 +62,10 @@ class LinkLayerController { public: static constexpr size_t kIrkSize = 16; // Generate a resolvable private address using the specified IRK. static Address generate_rpa( std::array<uint8_t, LinkLayerController::kIrkSize> irk); LinkLayerController(const Address& address, const ControllerProperties& properties); Loading
tools/rootcanal/py/bluetooth.py 0 → 100644 +48 −0 Original line number Diff line number Diff line from dataclasses import dataclass, field from typing import Tuple @dataclass class Address: address: bytes = field(default=bytes([0, 0, 0, 0, 0, 0])) def __post_init__(self): self.address = bytes(self.address) def from_str(address: str) -> 'Address': return Address(bytes([int(b, 16) for b in address.split(':')])) def parse(span: bytes) -> Tuple['Address', bytes]: assert len(span) > 6 return (Address(bytes(reversed(span[:6]))), span[6:]) def parse_all(span: bytes) -> 'Address': assert (len(span) == 6) return Address(bytes(reversed(span))) def serialize(self) -> bytes: return bytes(reversed(self.address)) def __repr__(self) -> str: return ':'.join([f'{b:02x}' for b in self.address]) @property def size(self) -> int: return 6 @dataclass class ClassOfDevice: def parse(span: bytes) -> Tuple['Address', bytes]: assert False def parse_all(span: bytes) -> 'Address': assert False def serialize(self) -> bytes: assert False @property def size(self) -> int: assert False