Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c02f5595 authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "Security: Add classic bonding test"

parents 8fe493e9 d4e4e49b
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -92,10 +92,18 @@ class RootFacadeService : public ::bluetooth::facade::RootFacade::Service {
        modules.add<::bluetooth::neighbor::facade::NeighborFacadeModule>();
        modules.add<::bluetooth::facade::ReadOnlyPropertyServerModule>();
        modules.add<::bluetooth::l2cap::classic::L2capClassicModuleFacadeModule>();
        modules.add<::bluetooth::hci::facade::HciLayerFacadeModule>();
        break;
      case BluetoothModule::SECURITY:
        modules.add<::bluetooth::facade::ReadOnlyPropertyServerModule>();
        modules.add<::bluetooth::hci::facade::ControllerFacadeModule>();
        modules.add<::bluetooth::security::SecurityModuleFacadeModule>();
        modules.add<::bluetooth::neighbor::facade::NeighborFacadeModule>();
        modules.add<::bluetooth::l2cap::classic::L2capClassicModuleFacadeModule>();
        modules.add<::bluetooth::hci::facade::HciLayerFacadeModule>();
        modules.add<::bluetooth::hci::facade::ControllerFacadeModule>();
        modules.add<::bluetooth::hci::facade::LeAdvertisingManagerFacadeModule>();
        modules.add<::bluetooth::hci::facade::LeScanningManagerFacadeModule>();
        break;
      case BluetoothModule::SHIM:
        modules.add<::bluetooth::shim::Advertising>();
+153 −15
Original line number Diff line number Diff line
@@ -17,17 +17,22 @@
from datetime import timedelta
import os
import sys
import logging

from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass
from cert.event_callback_stream import EventCallbackStream
from cert.event_asserts import EventAsserts
from google.protobuf import empty_pb2
from facade import common_pb2
from google.protobuf import empty_pb2 as empty_proto
from facade import common_pb2 as common
from facade import rootservice_pb2 as facade_rootservice_pb2
from google.protobuf import empty_pb2
from security import facade_pb2 as security_facade_pb2
from l2cap.classic import facade_pb2 as l2cap_facade_pb2
from hci.facade import facade_pb2 as hci_facade
from hci.facade import acl_manager_facade_pb2 as acl_manager_facade
from hci.facade import controller_facade_pb2 as controller_facade
from l2cap.classic import facade_pb2 as l2cap_facade
from neighbor.facade import facade_pb2 as neighbor_facade
from security import facade_pb2 as security_facade
from bluetooth_packets_python3 import hci_packets
import bluetooth_packets_python3 as bt_packets


class SimpleSecurityTest(GdFacadeOnlyBaseTestClass):
@@ -46,31 +51,164 @@ class SimpleSecurityTest(GdFacadeOnlyBaseTestClass):
                    'L2CAP'),))

        self.device_under_test.address = self.device_under_test.controller_read_only_property.ReadLocalAddress(
            empty_pb2.Empty()).address
            empty_proto.Empty()).address
        self.cert_device.address = self.cert_device.controller_read_only_property.ReadLocalAddress(
            empty_pb2.Empty()).address
            empty_proto.Empty()).address

        self.dut_address = common_pb2.BluetoothAddress(
        self.device_under_test.neighbor.EnablePageScan(
            neighbor_facade.EnableMsg(enabled=True))
        self.cert_device.neighbor.EnablePageScan(
            neighbor_facade.EnableMsg(enabled=True))

        self.dut_address = common.BluetoothAddress(
            address=self.device_under_test.address)
        self.cert_address = common_pb2.BluetoothAddress(
        self.cert_address = common.BluetoothAddress(
            address=self.cert_device.address)

        self.dut_address_with_type = common_pb2.BluetoothAddressWithType()
        self.dut_address_with_type = common.BluetoothAddressWithType()
        self.dut_address_with_type.address.CopyFrom(self.dut_address)
        self.dut_address_with_type.type = common_pb2.BluetoothPeerAddressTypeEnum.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS
        self.dut_address_with_type.type = common.BluetoothPeerAddressTypeEnum.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS

        self.cert_address_with_type = common_pb2.BluetoothAddressWithType()
        self.cert_address_with_type = common.BluetoothAddressWithType()
        self.cert_address_with_type.address.CopyFrom(self.cert_address)
        self.cert_address_with_type.type = common_pb2.BluetoothPeerAddressTypeEnum.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS
        self.cert_address_with_type.type = common.BluetoothPeerAddressTypeEnum.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS

        self.device_under_test.wait_channel_ready()
        self.cert_device.wait_channel_ready()

        self.cert_name = b'ImTheCert'
        self.cert_device.hci_controller.WriteLocalName(
            controller_facade.NameMsg(name=self.cert_name))
        self.dut_name = b'ImTheDUT'
        self.device_under_test.hci_controller.WriteLocalName(
            controller_facade.NameMsg(name=self.dut_name))

    def teardown_test(self):
        self.device_under_test.rootservice.StopStack(
            facade_rootservice_pb2.StopStackRequest())
        self.cert_device.rootservice.StopStack(
            facade_rootservice_pb2.StopStackRequest())

    def test_pass(self):
        pass
    def tmp_register_for_event(self, event_code):
        msg = hci_facade.EventCodeMsg(code=int(event_code))
        self.device_under_test.hci.RegisterEventHandler(msg)

    def tmp_enqueue_hci_command(self, command, expect_complete):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        if (expect_complete):
            self.device_under_test.hci.EnqueueCommandWithComplete(cmd)
        else:
            self.device_under_test.hci.EnqueueCommandWithStatus(cmd)

    def register_for_event(self, event_code):
        msg = hci_facade.EventCodeMsg(code=int(event_code))
        self.cert_device.hci.RegisterEventHandler(msg)

    def enqueue_hci_command(self, command, expect_complete):
        cmd_bytes = bytes(command.Serialize())
        cmd = hci_facade.CommandMsg(command=cmd_bytes)
        if (expect_complete):
            self.cert_device.hci.EnqueueCommandWithComplete(cmd)
        else:
            self.cert_device.hci.EnqueueCommandWithStatus(cmd)

    def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
        acl_msg = hci_facade.AclMsg(
            handle=int(handle),
            packet_boundary_flag=int(pb_flag),
            broadcast_flag=int(b_flag),
            data=acl)
        self.cert_device.hci.SendAclData(acl_msg)

    def test_dut_connects(self):
        # Cert event registration
        self.register_for_event(hci_packets.EventCode.LINK_KEY_REQUEST)
        self.register_for_event(hci_packets.EventCode.IO_CAPABILITY_REQUEST)
        self.register_for_event(hci_packets.EventCode.IO_CAPABILITY_RESPONSE)
        self.register_for_event(hci_packets.EventCode.USER_PASSKEY_NOTIFICATION)
        self.register_for_event(hci_packets.EventCode.USER_CONFIRMATION_REQUEST)
        self.register_for_event(
            hci_packets.EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION)
        self.register_for_event(hci_packets.EventCode.LINK_KEY_NOTIFICATION)
        self.register_for_event(hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE)
        with EventCallbackStream(self.device_under_test.security.FetchUiEvents(empty_proto.Empty())) as dut_ui_stream, \
            EventCallbackStream(self.device_under_test.security.FetchBondEvents(empty_proto.Empty())) as dut_bond_stream, \
            EventCallbackStream(self.device_under_test.neighbor.GetRemoteNameEvents(empty_proto.Empty())) as name_event_stream, \
            EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream:

            cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
            dut_ui_event_asserts = EventAsserts(dut_ui_stream)
            dut_bond_asserts = EventAsserts(dut_bond_stream)
            dut_name_asserts = EventAsserts(name_event_stream)

            dut_address = self.device_under_test.hci_controller.GetMacAddress(
                empty_proto.Empty()).address
            cert_address = self.cert_device.hci_controller.GetMacAddress(
                empty_proto.Empty()).address

            # Enable Simple Secure Pairing
            self.enqueue_hci_command(
                hci_packets.WriteSimplePairingModeBuilder(
                    hci_packets.Enable.ENABLED), True)

            cert_hci_event_asserts.assert_event_occurs(
                lambda msg: b'\x0e\x04\x01\x56\x0c' in msg.event)

            # Get the name
            self.device_under_test.neighbor.ReadRemoteName(
                neighbor_facade.RemoteNameRequestMsg(
                    address=cert_address,
                    page_scan_repetition_mode=1,
                    clock_offset=0x6855))

            dut_name_asserts.assert_event_occurs(
                lambda msg: self.cert_name in msg.name)

            self.device_under_test.security.CreateBond(
                common.BluetoothAddressWithType(
                    address=common.BluetoothAddress(address=cert_address),
                    type=common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS))

            cert_hci_event_asserts.assert_event_occurs(
                lambda event: logging.debug(event.event) or hci_packets.EventCode.IO_CAPABILITY_REQUEST in event.event
            )

            self.enqueue_hci_command(
                hci_packets.IoCapabilityRequestReplyBuilder(
                    dut_address.decode('utf8'),
                    hci_packets.IoCapability.DISPLAY_YES_NO,
                    hci_packets.OobDataPresent.NOT_PRESENT,
                    hci_packets.AuthenticationRequirements.
                    DEDICATED_BONDING_MITM_PROTECTION), True)

            cert_hci_event_asserts.assert_event_occurs(
                lambda event: logging.debug(event.event) or hci_packets.EventCode.USER_CONFIRMATION_REQUEST in event.event
            )
            self.enqueue_hci_command(
                hci_packets.UserConfirmationRequestReplyBuilder(
                    dut_address.decode('utf8')), True)

            logging.info("Waiting for UI event")
            ui_id = -1

            def get_unique_id(event):
                if (event.message_type ==
                        security_facade.UiMsgType.DISPLAY_YES_NO_WITH_VALUE):
                    nonlocal ui_id
                    ui_id = event.unique_id
                    return True
                return False

            dut_ui_event_asserts.assert_event_occurs(get_unique_id)

            logging.info("Sending UI response")
            self.device_under_test.security.SendUiCallback(
                security_facade.UiCallbackMsg(
                    message_type=security_facade.UiCallbackType.YES_NO,
                    boolean=True,
                    unique_id=ui_id))

            dut_bond_asserts.assert_event_occurs(
                lambda bond_event: bond_event.message_type == security_facade.BondMsgType.DEVICE_BONDED
            )
+124 −29
Original line number Diff line number Diff line
@@ -15,9 +15,8 @@
 */
#include "security/facade.h"

#include "hci/hci_layer.h"
#include "l2cap/classic/l2cap_classic_module.h"
#include "l2cap/le/l2cap_le_module.h"
#include "grpc/grpc_event_queue.h"
#include "hci/address_with_type.h"
#include "os/handler.h"
#include "security/facade.grpc.pb.h"
#include "security/security_manager_listener.h"
@@ -28,11 +27,8 @@ namespace security {

class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public ISecurityManagerListener {
 public:
  SecurityModuleFacadeService(SecurityModule* security_module, l2cap::le::L2capLeModule* l2cap_le_module,
                              l2cap::classic::L2capClassicModule* l2cap_classic_module, hci::HciLayer* hci_layer,
                              ::bluetooth::os::Handler* security_handler)
      : security_module_(security_module), l2cap_le_module_(l2cap_le_module),
        l2cap_classic_module_(l2cap_classic_module), security_handler_(security_handler) {
  SecurityModuleFacadeService(SecurityModule* security_module, ::bluetooth::os::Handler* security_handler)
      : security_module_(security_module), security_handler_(security_handler) {
    security_module_->GetSecurityManager()->RegisterCallbackListener(this, security_handler_);
  }

@@ -63,38 +59,137 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public
    return ::grpc::Status::OK;
  }

  void OnDisplayYesNoDialogWithValue(const bluetooth::hci::AddressWithType& address, uint32_t numeric_value,
                                     common::OnceCallback<void(bool)> input_callback) {}
  void OnDisplayYesNoDialog(const bluetooth::hci::AddressWithType& address,
                            common::OnceCallback<void(bool)> input_callback) {}
  void OnDisplayPasskeyDialog(const bluetooth::hci::AddressWithType& address, uint32_t passkey) {}
  void OnDisplayPasskeyInputDialog(const bluetooth::hci::AddressWithType& address,
                                   common::OnceCallback<void(uint32_t)> input_callback){};
  void OnDisplayCancelDialog(const bluetooth::hci::AddressWithType& address) {}
  void OnDeviceBonded(hci::AddressWithType device) {}
  void OnDeviceUnbonded(hci::AddressWithType device) {}
  void OnDeviceBondFailed(hci::AddressWithType device) {}
  ::grpc::Status FetchUiEvents(::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
                               ::grpc::ServerWriter<UiMsg>* writer) override {
    return ui_events_.RunLoop(context, writer);
  }

  ::grpc::Status SendUiCallback(::grpc::ServerContext* context, const UiCallbackMsg* request,
                                ::google::protobuf::Empty* response) override {
    switch (request->message_type()) {
      case UiCallbackType::PASSKEY:
        security_handler_->Post(
            common::BindOnce(std::move(user_passkey_callbacks_[request->unique_id()]), request->numeric_value()));
        break;
      case UiCallbackType::YES_NO:
        security_handler_->Post(
            common::BindOnce(std::move(user_yes_no_callbacks_[request->unique_id()]), request->boolean()));
        break;
      default:
        LOG_ERROR("Unknown UiCallbackType %d", static_cast<int>(request->message_type()));
        return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "Unknown UiCallbackType");
    }
    return ::grpc::Status::OK;
  }

  ::grpc::Status FetchBondEvents(::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
                                 ::grpc::ServerWriter<BondMsg>* writer) override {
    return bond_events_.RunLoop(context, writer);
  }

  void OnDisplayYesNoDialogWithValue(const bluetooth::hci::AddressWithType& peer, uint32_t numeric_value,
                                     common::OnceCallback<void(bool)> callback) override {
    LOG_INFO("%s value = 0x%x", peer.ToString().c_str(), numeric_value);
    user_yes_no_callbacks_.emplace(unique_id, std::move(callback));
    UiMsg display_with_value;
    display_with_value.mutable_peer()->mutable_address()->set_address(peer.ToString());
    display_with_value.mutable_peer()->set_type(facade::BluetoothAddressTypeEnum::PUBLIC_DEVICE_ADDRESS);
    display_with_value.set_message_type(UiMsgType::DISPLAY_YES_NO_WITH_VALUE);
    display_with_value.set_numeric_value(numeric_value);
    display_with_value.set_unique_id(unique_id++);
    ui_events_.OnIncomingEvent(display_with_value);
  }

  void OnDisplayYesNoDialog(const bluetooth::hci::AddressWithType& peer,
                            common::OnceCallback<void(bool)> callback) override {
    LOG_INFO("%s", peer.ToString().c_str());
    user_yes_no_callbacks_.emplace(unique_id, std::move(callback));
    UiMsg display_yes_no;
    display_yes_no.mutable_peer()->mutable_address()->set_address(peer.ToString());
    display_yes_no.mutable_peer()->set_type(facade::BluetoothAddressTypeEnum::PUBLIC_DEVICE_ADDRESS);
    display_yes_no.set_message_type(UiMsgType::DISPLAY_YES_NO);
    display_yes_no.set_unique_id(unique_id++);
    ui_events_.OnIncomingEvent(display_yes_no);
  }

  void OnDisplayPasskeyDialog(const bluetooth::hci::AddressWithType& peer, uint32_t passkey) override {
    LOG_INFO("%s value = 0x%x", peer.ToString().c_str(), passkey);
    UiMsg display_passkey;
    display_passkey.mutable_peer()->mutable_address()->set_address(peer.ToString());
    display_passkey.mutable_peer()->set_type(facade::BluetoothAddressTypeEnum::PUBLIC_DEVICE_ADDRESS);
    display_passkey.set_message_type(UiMsgType::DISPLAY_PASSKEY);
    display_passkey.set_numeric_value(passkey);
    display_passkey.set_unique_id(unique_id++);
    ui_events_.OnIncomingEvent(display_passkey);
  }

  void OnDisplayPasskeyInputDialog(const bluetooth::hci::AddressWithType& peer,
                                   common::OnceCallback<void(uint32_t)> callback) override {
    LOG_INFO("%s", peer.ToString().c_str());
    user_passkey_callbacks_.emplace(unique_id, std::move(callback));
    UiMsg display_passkey_input;
    display_passkey_input.mutable_peer()->mutable_address()->set_address(peer.ToString());
    display_passkey_input.mutable_peer()->set_type(facade::BluetoothAddressTypeEnum::PUBLIC_DEVICE_ADDRESS);
    display_passkey_input.set_message_type(UiMsgType::DISPLAY_PASSKEY_ENTRY);
    display_passkey_input.set_unique_id(unique_id++);
    ui_events_.OnIncomingEvent(display_passkey_input);
  }

  void OnDisplayCancelDialog(const bluetooth::hci::AddressWithType& peer) override {
    LOG_INFO("%s", peer.ToString().c_str());
    UiMsg display_cancel;
    display_cancel.mutable_peer()->mutable_address()->set_address(peer.ToString());
    display_cancel.mutable_peer()->set_type(facade::BluetoothAddressTypeEnum::PUBLIC_DEVICE_ADDRESS);
    display_cancel.set_message_type(UiMsgType::DISPLAY_CANCEL);
    display_cancel.set_unique_id(unique_id++);
    ui_events_.OnIncomingEvent(display_cancel);
  }

  void OnDeviceBonded(hci::AddressWithType peer) override {
    LOG_INFO("%s", peer.ToString().c_str());
    BondMsg bonded;
    bonded.mutable_peer()->mutable_address()->set_address(peer.ToString());
    bonded.mutable_peer()->set_type(facade::BluetoothAddressTypeEnum::PUBLIC_DEVICE_ADDRESS);
    bonded.set_message_type(BondMsgType::DEVICE_BONDED);
    bond_events_.OnIncomingEvent(bonded);
  }

  void OnDeviceUnbonded(hci::AddressWithType peer) override {
    LOG_INFO("%s", peer.ToString().c_str());
    BondMsg unbonded;
    unbonded.mutable_peer()->mutable_address()->set_address(peer.ToString());
    unbonded.mutable_peer()->set_type(facade::BluetoothAddressTypeEnum::PUBLIC_DEVICE_ADDRESS);
    unbonded.set_message_type(BondMsgType::DEVICE_UNBONDED);
    bond_events_.OnIncomingEvent(unbonded);
  }

  void OnDeviceBondFailed(hci::AddressWithType peer) override {
    LOG_INFO("%s", peer.ToString().c_str());
    BondMsg bond_failed;
    bond_failed.mutable_peer()->mutable_address()->set_address(peer.ToString());
    bond_failed.mutable_peer()->set_type(facade::BluetoothAddressTypeEnum::PUBLIC_DEVICE_ADDRESS);
    bond_failed.set_message_type(BondMsgType::DEVICE_BOND_FAILED);
    bond_events_.OnIncomingEvent(bond_failed);
  }

 private:
  SecurityModule* security_module_ __attribute__((unused));
  l2cap::le::L2capLeModule* l2cap_le_module_ __attribute__((unused));
  l2cap::classic::L2capClassicModule* l2cap_classic_module_ __attribute__((unused));
  ::bluetooth::os::Handler* security_handler_ __attribute__((unused));
  SecurityModule* security_module_;
  ::bluetooth::os::Handler* security_handler_;
  ::bluetooth::grpc::GrpcEventQueue<UiMsg> ui_events_{"UI events"};
  ::bluetooth::grpc::GrpcEventQueue<BondMsg> bond_events_{"Bond events"};
  uint32_t unique_id{1};
  std::map<uint32_t, common::OnceCallback<void(bool)>> user_yes_no_callbacks_;
  std::map<uint32_t, common::OnceCallback<void(uint32_t)>> user_passkey_callbacks_;
};

void SecurityModuleFacadeModule::ListDependencies(ModuleList* list) {
  ::bluetooth::grpc::GrpcFacadeModule::ListDependencies(list);
  list->add<SecurityModule>();
  list->add<l2cap::le::L2capLeModule>();
  list->add<l2cap::classic::L2capClassicModule>();
  list->add<hci::HciLayer>();
}

void SecurityModuleFacadeModule::Start() {
  ::bluetooth::grpc::GrpcFacadeModule::Start();
  service_ = new SecurityModuleFacadeService(GetDependency<SecurityModule>(), GetDependency<l2cap::le::L2capLeModule>(),
                                             GetDependency<l2cap::classic::L2capClassicModule>(),
                                             GetDependency<hci::HciLayer>(), GetHandler());
  service_ = new SecurityModuleFacadeService(GetDependency<SecurityModule>(), GetHandler());
}

void SecurityModuleFacadeModule::Stop() {
+0 −1
Original line number Diff line number Diff line
@@ -18,7 +18,6 @@
#include <grpc++/grpc++.h>

#include "grpc/grpc_module.h"
#include "hci/address_with_type.h"

namespace bluetooth {
namespace security {
+41 −0
Original line number Diff line number Diff line
@@ -9,4 +9,45 @@ service SecurityModuleFacade {
  rpc CreateBond(facade.BluetoothAddressWithType) returns (google.protobuf.Empty) {}
  rpc CancelBond(facade.BluetoothAddressWithType) returns (google.protobuf.Empty) {}
  rpc RemoveBond(facade.BluetoothAddressWithType) returns (google.protobuf.Empty) {}
  rpc SendUiCallback(UiCallbackMsg) returns (google.protobuf.Empty) {}
  rpc FetchUiEvents(google.protobuf.Empty) returns (stream UiMsg) {}
  rpc FetchBondEvents(google.protobuf.Empty) returns (stream BondMsg) {}
}

enum UiMsgType {
  DISPLAY_YES_NO_WITH_VALUE = 0;
  DISPLAY_YES_NO = 1;
  DISPLAY_PASSKEY = 2;
  DISPLAY_PASSKEY_ENTRY = 3;
  DISPLAY_CANCEL = 4;
}

message UiMsg {
  UiMsgType message_type = 1;
  facade.BluetoothAddressWithType peer = 2;
  uint32 numeric_value = 3;
  uint32 unique_id = 4;
}

enum UiCallbackType {
  YES_NO = 0;
  PASSKEY = 1;
}

message UiCallbackMsg {
  UiCallbackType message_type = 1;
  bool boolean = 2;
  uint32 numeric_value = 3;
  uint32 unique_id = 4;
}

enum BondMsgType {
  DEVICE_BONDED = 0;
  DEVICE_UNBONDED = 1;
  DEVICE_BOND_FAILED = 2;
}

message BondMsg {
  BondMsgType message_type = 1;
  facade.BluetoothAddressWithType peer = 2;
}