Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit b65fe697 authored by Christine Hallstrom's avatar Christine Hallstrom Committed by Automerger Merge Worker
Browse files

Merge "OobPairingSl4aTest: Create bond out of band" am: a7d99ca8 am: ddd9ca93

parents 546c201f ddd9ca93
Loading
Loading
Loading
Loading
+18 −0
Original line number Diff line number Diff line
@@ -31,6 +31,7 @@ service SecurityModuleFacade {
  rpc EnforceSecurityPolicy(SecurityPolicyMessage) returns (google.protobuf.Empty) {}
  rpc FetchEnforceSecurityPolicyEvents(google.protobuf.Empty) returns (stream EnforceSecurityPolicyMsg) {}
  rpc FetchDisconnectEvents(google.protobuf.Empty) returns (stream DisconnectMsg) {}
  rpc FetchAdvertisingCallbackEvents(google.protobuf.Empty) returns (stream AdvertisingCallbackMsg) {}
}

message OobDataMessage {
@@ -177,3 +178,20 @@ message EnforceSecurityPolicyMsg {
message DisconnectMsg {
  blueberry.facade.BluetoothAddressWithType address = 1;
}

enum AdvertisingCallbackMsgType {
  ADVERTISING_SET_STARTED = 0;
  OWN_ADDRESS_READ = 1;
}

enum AdvertisingSetStarted {
  NOT_STARTED = 0;
  STARTED = 1;
}

message AdvertisingCallbackMsg {
  AdvertisingCallbackMsgType message_type = 1;
  uint32 advertiser_id = 2;
  AdvertisingSetStarted advertising_started = 3;
  blueberry.facade.BluetoothAddress address = 4;
}
+10 −0
Original line number Diff line number Diff line
@@ -45,6 +45,8 @@ class PyLeSecurity(Closable):
        self._ui_event_stream = EventStream(self._device.security.FetchUiEvents(empty_proto.Empty()))
        self._bond_event_stream = EventStream(self._device.security.FetchBondEvents(empty_proto.Empty()))
        self._helper_event_stream = EventStream(self._device.security.FetchHelperEvents(empty_proto.Empty()))
        self._advertising_callback_event_stream = EventStream(
            self._device.security.FetchAdvertisingCallbackEvents(empty_proto.Empty()))

    def get_ui_stream(self):
        return self._ui_event_stream
@@ -52,6 +54,9 @@ class PyLeSecurity(Closable):
    def get_bond_stream(self):
        return self._bond_event_stream

    def get_advertising_callback_event_stream(self):
        return self._advertising_callback_event_stream

    def wait_for_ui_event_passkey(self, timeout=timedelta(seconds=3)):
        display_passkey_capture = SecurityCaptures.DisplayPasskey()
        assertThat(self._ui_event_stream).emits(display_passkey_capture, timeout=timeout)
@@ -79,3 +84,8 @@ class PyLeSecurity(Closable):
            safeClose(self._helper_event_stream)
        else:
            logging.info("DUT: Helper Event Stream is None!")

        if self._advertising_callback_event_stream is not None:
            safeClose(self._advertising_callback_event_stream)
        else:
            logging.info("DUT: Advertising Callback Event Stream is None!")
+163 −4
Original line number Diff line number Diff line
@@ -19,20 +19,51 @@ import logging

from google.protobuf import empty_pb2 as empty_proto

from bluetooth_packets_python3 import hci_packets

from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_phys
from blueberry.tests.gd_sl4a.lib.gd_sl4a_base_test import GdSl4aBaseTestClass
from blueberry.tests.gd.cert.matchers import SecurityMatchers
from blueberry.tests.gd.cert.py_le_security import PyLeSecurity
from blueberry.tests.gd.cert.truth import assertThat

from blueberry.facade import common_pb2 as common
from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade
from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade
from blueberry.facade.security.facade_pb2 import AdvertisingCallbackMsgType
from blueberry.facade.security.facade_pb2 import BondMsgType
from blueberry.facade.security.facade_pb2 import LeAuthRequirementsMessage
from blueberry.facade.security.facade_pb2 import LeIoCapabilityMessage
from blueberry.facade.security.facade_pb2 import LeOobDataPresentMessage
from blueberry.facade.security.facade_pb2 import UiCallbackMsg
from blueberry.facade.security.facade_pb2 import UiCallbackType
from blueberry.facade.security.facade_pb2 import UiMsgType

LeIoCapabilities = LeIoCapabilityMessage.LeIoCapabilities
LeOobDataFlag = LeOobDataPresentMessage.LeOobDataFlag

DISPLAY_ONLY = LeIoCapabilityMessage(capabilities=LeIoCapabilities.DISPLAY_ONLY)

OOB_NOT_PRESENT = LeOobDataPresentMessage(data_present=LeOobDataFlag.NOT_PRESENT)


class OobData:

    def __init__(self):
        pass
    address = None
    confirmation = None
    randomizer = None

    def __init__(self, address, confirmation, randomizer):
        self.address = address
        self.confirmation = confirmation
        self.randomizer = randomizer


class OobPairingSl4aTest(GdSl4aBaseTestClass):
    # Events sent from SL4A
    SL4A_EVENT_GENERATED = "GeneratedOobData"
    SL4A_EVENT_ERROR = "ErrorOobData"
    SL4A_EVENT_BONDED = "Bonded"

    # Matches tBT_TRANSPORT
    # Used Strings because ints were causing gRPC problems
@@ -46,8 +77,10 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):

    def setup_test(self):
        super().setup_test()
        self.cert_security = PyLeSecurity(self.cert)

    def teardown_test(self):
        self.cert_security.close()
        super().teardown_test()

    def _generate_sl4a_oob_data(self, transport):
@@ -59,15 +92,79 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
            logging.error("Failed to generate OOB data!")
            return None
        logging.info("Data received!")
        return OobData()
        logging.info(event_info["data"])
        return OobData(event_info["data"]["address_with_type"], event_info["data"]["confirmation"],
                       event_info["data"]["randomizer"])

    def _generate_cert_oob_data(self, transport):
        if transport == self.TRANSPORT_LE:
            return self.cert.security.GetLeOutOfBandData(empty_proto.Empty())
            oob_data = self.cert.security.GetLeOutOfBandData(empty_proto.Empty())
            # GetLeOutOfBandData adds null terminator to string in C code
            # (length 17) before passing back to python via gRPC where it is
            # converted back to bytes. Remove the null terminator for handling
            # in python test, since length is known to be 16 for
            # confirmation_value and random_value
            oob_data.confirmation_value = oob_data.confirmation_value[:-1]
            oob_data.random_value = oob_data.random_value[:-1]
            return oob_data
        return None

    def set_cert_privacy_policy_with_random_address_but_advertise_resolvable(self, irk):
        # Random static address below, no random resolvable address at this point
        random_address_bytes = "DD:34:02:05:5C:EE".encode()
        private_policy = le_initiator_address_facade.PrivacyPolicy(
            address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS,
            address_with_type=common.BluetoothAddressWithType(
                address=common.BluetoothAddress(address=random_address_bytes), type=common.RANDOM_DEVICE_ADDRESS),
            rotation_irk=irk)
        self.cert.security.SetLeInitiatorAddressPolicy(private_policy)
        # Bluetooth MAC address must be upper case
        return random_address_bytes.decode('utf-8').upper()

    def wait_for_own_address(self):
        own_address = common.BluetoothAddress()

        def get_address(event):
            if event.message_type == AdvertisingCallbackMsgType.OWN_ADDRESS_READ:
                nonlocal own_address
                own_address = event.address.address
                return True
            return False

        assertThat(self.cert_security.get_advertising_callback_event_stream()).emits(get_address)
        return own_address

    def wait_for_advertising_set_started(self):
        advertising_started = False

        def get_advertising_set_started(event):
            if event.message_type == AdvertisingCallbackMsgType.ADVERTISING_SET_STARTED:
                nonlocal advertising_started
                if event.advertising_started == 1:
                    advertising_started = True
                return True
            return False

        assertThat(self.cert_security.get_advertising_callback_event_stream()).emits(get_advertising_set_started)
        return advertising_started

    def wait_for_yes_no_dialog(self):
        address_with_type = common.BluetoothAddressWithType()

        def get_address_with_type(event):
            if event.message_type == UiMsgType.DISPLAY_PAIRING_PROMPT:
                nonlocal address_with_type
                address_with_type = event.peer
                return True
            return False

        assertThat(self.cert_security.get_ui_stream()).emits(get_address_with_type)
        return address_with_type

    def test_sl4a_classic_generate_oob_data(self):
        oob_data = self._generate_sl4a_oob_data(self.TRANSPORT_BREDR)
        logging.info("OOB data received")
        logging.info(oob_data)
        assertThat(oob_data).isNotNone()

    def test_sl4a_classic_generate_oob_data_twice(self):
@@ -81,3 +178,65 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass):
    def test_cert_ble_generate_oob_data(self):
        oob_data = self._generate_cert_oob_data(self.TRANSPORT_LE)
        assertThat(oob_data).isNotNone()

    def test_sl4a_create_bond_out_of_band(self):
        self.cert.security.SetLeIoCapability(DISPLAY_ONLY)
        self.cert.security.SetLeOobDataPresent(OOB_NOT_PRESENT)
        self.cert.security.SetLeAuthRequirements(LeAuthRequirementsMessage(bond=1, mitm=1, secure_connections=1))

        data = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10]
        byteArrayObject = bytearray(data)
        irk = bytes(byteArrayObject)

        DEVICE_NAME = 'Im_The_CERT!'
        self.set_cert_privacy_policy_with_random_address_but_advertise_resolvable(irk)

        gap_name = hci_packets.GapData()
        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
        config = le_advertising_facade.AdvertisingConfig(
            advertisement=[gap_data],
            interval_min=512,
            interval_max=768,
            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
            channel_map=7,
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
            include_tx_power=True,
            connectable=True,
            legacy_pdus=True,
            advertising_config=config,
            secondary_advertising_phy=ble_scan_settings_phys["1m"])
        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)

        self.wait_for_advertising_set_started()

        get_own_address_request = le_advertising_facade.GetOwnAddressRequest(
            advertiser_id=create_response.advertiser_id)
        self.cert.hci_le_advertising_manager.GetOwnAddress(get_own_address_request)
        advertising_address = self.wait_for_own_address()

        oob_data = self._generate_cert_oob_data(self.TRANSPORT_LE)
        assertThat(oob_data).isNotNone()

        self.dut.sl4a.bluetoothCreateBondOutOfBand(
            advertising_address.decode("utf-8").upper(), self.TRANSPORT_LE, oob_data.confirmation_value.hex(),
            oob_data.random_value.hex())

        address_with_type = self.wait_for_yes_no_dialog()
        self.cert.security.SendUiCallback(
            UiCallbackMsg(
                message_type=UiCallbackType.PAIRING_PROMPT, boolean=True, unique_id=1, address=address_with_type))

        assertThat(self.cert_security.get_bond_stream()).emits(SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED))

        try:
            bond_state = self.dut.ed.pop_event(self.SL4A_EVENT_BONDED, self.default_timeout)
        except queue.Empty as error:
            logging.error("Failed to generate OOB data!")

        assertThat(bond_state).isNotNone()
        assertThat(bond_state["data"]["bonded_state"]).isEqualTo(True)
+73 −5
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@
#include "grpc/grpc_event_queue.h"
#include "hci/address_with_type.h"
#include "hci/le_address_manager.h"
#include "hci/le_advertising_manager.h"
#include "l2cap/classic/security_policy.h"
#include "l2cap/le/l2cap_le_module.h"
#include "os/handler.h"
@@ -54,11 +55,20 @@ blueberry::facade::BluetoothAddressWithType ToFacadeAddressWithType(hci::Address

}  // namespace

class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public ISecurityManagerListener, public UI {
class SecurityModuleFacadeService : public SecurityModuleFacade::Service,
                                    public ISecurityManagerListener,
                                    public UI,
                                    public hci::AdvertisingCallback {
 public:
  SecurityModuleFacadeService(
      SecurityModule* security_module, L2capLeModule* l2cap_le_module, ::bluetooth::os::Handler* security_handler)
      : security_module_(security_module), l2cap_le_module_(l2cap_le_module), security_handler_(security_handler) {
      SecurityModule* security_module,
      L2capLeModule* l2cap_le_module,
      ::bluetooth::os::Handler* security_handler,
      hci::LeAdvertisingManager* le_advertising_manager)
      : security_module_(security_module),
        l2cap_le_module_(l2cap_le_module),
        security_handler_(security_handler),
        le_advertising_manager_(le_advertising_manager) {
    security_module_->GetSecurityManager()->RegisterCallbackListener(this, security_handler_);
    security_module_->GetSecurityManager()->SetUserInterfaceHandler(this, security_handler_);

@@ -224,6 +234,58 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public
      ::grpc::ServerWriter<SecurityHelperMsg>* writer) override {
    return helper_events_.RunLoop(context, writer);
  }

  ::grpc::Status FetchAdvertisingCallbackEvents(
      ::grpc::ServerContext* context,
      const ::google::protobuf::Empty* request,
      ::grpc::ServerWriter<AdvertisingCallbackMsg>* writer) override {
    le_advertising_manager_->RegisterAdvertisingCallback(this);
    return advertising_callback_events_.RunLoop(context, writer);
  }

  void OnAdvertisingSetStarted(int reg_id, uint8_t advertiser_id, int8_t tx_power, AdvertisingStatus status) {
    AdvertisingCallbackMsg advertising_set_started;
    advertising_set_started.set_message_type(AdvertisingCallbackMsgType::ADVERTISING_SET_STARTED);
    advertising_set_started.set_advertising_started(AdvertisingSetStarted::STARTED);
    advertising_set_started.set_advertiser_id(advertiser_id);
    advertising_callback_events_.OnIncomingEvent(advertising_set_started);
  }

  void OnAdvertisingEnabled(uint8_t advertiser_id, bool enable, uint8_t status) {
    // Not used yet
  }

  void OnAdvertisingDataSet(uint8_t advertiser_id, uint8_t status) {
    // Not used yet
  }

  void OnScanResponseDataSet(uint8_t advertiser_id, uint8_t status) {
    // Not used yet
  }

  void OnAdvertisingParametersUpdated(uint8_t advertiser_id, int8_t tx_power, uint8_t status) {
    // Not used yet
  }

  void OnPeriodicAdvertisingParametersUpdated(uint8_t advertiser_id, uint8_t status) {
    // Not used yet
  }

  void OnPeriodicAdvertisingDataSet(uint8_t advertiser_id, uint8_t status) {
    // Not used yet
  }

  void OnPeriodicAdvertisingEnabled(uint8_t advertiser_id, bool enable, uint8_t status) {
    // Not used yet
  }

  void OnOwnAddressRead(uint8_t advertiser_id, uint8_t address_type, Address address) {
    AdvertisingCallbackMsg get_own_address;
    get_own_address.set_message_type(AdvertisingCallbackMsgType::OWN_ADDRESS_READ);
    get_own_address.mutable_address()->set_address(address.ToString());
    advertising_callback_events_.OnIncomingEvent(get_own_address);
  }

  ::grpc::Status SetIoCapability(::grpc::ServerContext* context, const IoCapabilityMessage* request,
                                 ::google::protobuf::Empty* response) override {
    security_module_->GetFacadeConfigurationApi()->SetIoCapability(
@@ -532,6 +594,7 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public
  SecurityModule* security_module_;
  L2capLeModule* l2cap_le_module_;
  ::bluetooth::os::Handler* security_handler_;
  hci::LeAdvertisingManager* le_advertising_manager_;
  ::bluetooth::grpc::GrpcEventQueue<UiMsg> ui_events_{"UI events"};
  ::bluetooth::grpc::GrpcEventQueue<BondMsg> bond_events_{"Bond events"};
  ::bluetooth::grpc::GrpcEventQueue<SecurityHelperMsg> helper_events_{"Events that don't fit any other category"};
@@ -539,6 +602,7 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public
      "Enforce Security Policy Events"};
  ::bluetooth::grpc::GrpcEventQueue<DisconnectMsg> disconnect_events_{"Disconnect events"};
  ::bluetooth::grpc::GrpcEventQueue<OobDataBondMessage> oob_events_{"OOB Data events"};
  ::bluetooth::grpc::GrpcEventQueue<AdvertisingCallbackMsg> advertising_callback_events_{"Advertising callback events"};
  uint32_t unique_id{1};
  std::map<uint32_t, common::OnceCallback<void(bool)>> user_yes_no_callbacks_;
  std::map<uint32_t, common::OnceCallback<void(uint32_t)>> user_passkey_callbacks_;
@@ -548,12 +612,16 @@ void SecurityModuleFacadeModule::ListDependencies(ModuleList* list) const {
  ::bluetooth::grpc::GrpcFacadeModule::ListDependencies(list);
  list->add<SecurityModule>();
  list->add<L2capLeModule>();
  list->add<hci::LeAdvertisingManager>();
}

void SecurityModuleFacadeModule::Start() {
  ::bluetooth::grpc::GrpcFacadeModule::Start();
  service_ =
      new SecurityModuleFacadeService(GetDependency<SecurityModule>(), GetDependency<L2capLeModule>(), GetHandler());
  service_ = new SecurityModuleFacadeService(
      GetDependency<SecurityModule>(),
      GetDependency<L2capLeModule>(),
      GetHandler(),
      GetDependency<hci::LeAdvertisingManager>());
}

void SecurityModuleFacadeModule::Stop() {
+3 −3
Original line number Diff line number Diff line
@@ -114,10 +114,10 @@ void SecurityRecordStorage::SaveSecurityRecords(std::set<std::shared_ptr<record:
    } else if (!record->IsClassicLinkKeyValid() && record->remote_ltk) {
      mutation.Add(device.SetDeviceType(hci::DeviceType::LE));
    } else {
      LOG_ERROR(
          "Cannot determine device type from security record for '%s'; dropping!",
      mutation.Add(device.SetDeviceType(hci::DeviceType::LE));
      LOG_WARN(
          "Cannot determine device type from security record for '%s'; defaulting to LE",
          record->GetPseudoAddress()->ToString().c_str());
      continue;
    }
    mutation.Commit();
    SetClassicData(mutation, record, device);