Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 4d973010 authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "Implementation of OOB PTS cert test cases"

parents e7097553 028b5f36
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -594,3 +594,7 @@ class SecurityMatchers(object):
    @staticmethod
    def BondMsg(type):
        return lambda event: True if event.message_type == type else False

    @staticmethod
    def HelperMsg(type):
        return lambda event: True if event.message_type == type else False
+12 −0
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ from cert.captures import SecurityCaptures
from cert.closable import Closable
from cert.closable import safeClose
from cert.event_stream import EventStream
from cert.matchers import SecurityMatchers
from cert.truth import assertThat
from datetime import timedelta
from facade import common_pb2 as common
@@ -32,6 +33,7 @@ from security.facade_pb2 import LeAuthRequirementsMessage
from security.facade_pb2 import OobDataPresentMessage
from security.facade_pb2 import UiCallbackMsg
from security.facade_pb2 import UiCallbackType
from security.facade_pb2 import HelperMsgType


class PyLeSecurity(Closable):
@@ -41,6 +43,7 @@ class PyLeSecurity(Closable):

    _ui_event_stream = None
    _bond_event_stream = None
    _helper_event_stream = None

    def __init__(self, device):
        logging.info("DUT: Init")
@@ -48,6 +51,7 @@ class PyLeSecurity(Closable):
        self._device.wait_channel_ready()
        self._ui_event_stream = EventStream(self._device.security.FetchUiEvents(empty_proto.Empty()))
        self._bond_event_stream = EventStream(self._device.security.FetchBondEvents(empty_proto.Empty()))
        self._helper_event_stream = EventStream(self._device.security.FetchHelperEvents(empty_proto.Empty()))

    def get_ui_stream(self):
        return self._ui_event_stream
@@ -60,6 +64,9 @@ class PyLeSecurity(Closable):
        assertThat(self._ui_event_stream).emits(display_passkey_capture, timeout=timeout)
        return display_passkey_capture.get()

    def wait_device_disconnect(self, address):
        assertThat(self._helper_event_stream).emits(SecurityMatchers.HelperMsg(HelperMsgType.DEVICE_DISCONNECTED))

    def SetLeAuthRequirements(self, *args, **kwargs):
        return self._device.security.SetLeAuthRequirements(LeAuthRequirementsMessage(*args, **kwargs))

@@ -73,3 +80,8 @@ class PyLeSecurity(Closable):
            safeClose(self._bond_event_stream)
        else:
            logging.info("DUT: Bond Event Stream is None!")

        if self._helper_event_stream is not None:
            safeClose(self._helper_event_stream)
        else:
            logging.info("DUT: Helper Event Stream is None!")
+204 −3
Original line number Diff line number Diff line
@@ -702,6 +702,207 @@ class LeSecurityTest(GdBaseTestClass):

            assertThat(self.dut_security.get_bond_stream()).emits(SecurityMatchers.BondMsg(BondMsgType.DEVICE_UNBONDED))

            #disconnection should happen after one second timeout, sleep for 2 just to be sure
            #TODO(jpawlowski): wait for disconnection event instead of sleeping.
            time.sleep(2)
            self.dut_security.wait_device_disconnect(self.cert_address)

    @metadata(
        pts_test_id="SM/SLA/SCOB/BV-02-C", pts_test_name="Out of Band, IUT Responder, Secure Connections – Success")
    def test_out_of_band_iut_responder_secure_connections(self):
        """
            Verify that the IUT supporting LE Secure Connections is able to perform the Out-of-Band pairing procedure correctly when acting as slave, responder.
        """

        oob_combinations = [(OOB_NOT_PRESENT, OOB_PRESENT), (OOB_PRESENT, OOB_NOT_PRESENT), (OOB_PRESENT, OOB_PRESENT)]

        for (dut_oob_flag, cert_oob_flag) in oob_combinations:
            print("oob flag combination dut: " + str(dut_oob_flag) + ", cert: " + str(cert_oob_flag))

            self._prepare_dut_for_connection()

            if dut_oob_flag == LeOobDataFlag.PRESENT:
                oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty())
                self.dut.security.SetOutOfBandData(
                    OobDataMessage(
                        address=self.cert_address,
                        le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                        le_sc_random_value=oobdata.le_sc_random_value))

            if cert_oob_flag == LeOobDataFlag.PRESENT:
                oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty())
                self.cert.security.SetOutOfBandData(
                    OobDataMessage(
                        address=self.dut_address,
                        le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                        le_sc_random_value=oobdata.le_sc_random_value))

            self.dut.security.SetLeIoCapability(KEYBOARD_ONLY)
            self.dut.security.SetLeOobDataPresent(dut_oob_flag)
            self.dut_security.SetLeAuthRequirements(bond=1, mitm=1, secure_connections=1)

            self.cert.security.SetLeIoCapability(DISPLAY_ONLY)
            self.cert.security.SetLeOobDataPresent(cert_oob_flag)
            self.cert_security.SetLeAuthRequirements(bond=1, mitm=1, secure_connections=1)

            # 1. Lower Tester transmits a Pairing Request command with OOB data flag set to either 0x00 or 0x01, and Secure Connections flag set to '1'.
            self.cert.security.CreateBondLe(self.dut_address)

            assertThat(self.dut_security.get_ui_stream()).emits(
                SecurityMatchers.UiMsg(UiMsgType.DISPLAY_PAIRING_PROMPT))

            # 2. IUT responds with a Pairing Response command with Secure Connections flag set to '1' and OOB data flag set to either 0x00 or 0x01.
            self.dut.security.SendUiCallback(
                UiCallbackMsg(
                    message_type=UiCallbackType.PAIRING_PROMPT, boolean=True, unique_id=1, address=self.cert_address))

            # 3. IUT uses the 128-bit value generated by the Lower Tester as the confirm value. Similarly, the Lower Tester uses the 128-bit value generated by the IUT as the confirm value.

            # 4. IUT and Lower Tester perform phase 2 of the pairing process and establish an encrypted link with an LTK generated using the OOB data in phase 2.
            assertThat(self.cert_security.get_bond_stream()).emits(
                SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED), timeout=timedelta(seconds=10))

            assertThat(self.dut_security.get_bond_stream()).emits(
                SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED), timeout=timedelta(seconds=10))

            self.cert.security.RemoveBond(self.dut_address)
            self.dut.security.RemoveBond(self.cert_address)

            assertThat(self.dut_security.get_bond_stream()).emits(SecurityMatchers.BondMsg(BondMsgType.DEVICE_UNBONDED))

            self.dut_security.wait_device_disconnect(self.cert_address)

    @metadata(
        pts_test_id="SM/SLA/SCOB/BV-03-C",
        pts_test_name="Out of Band, IUT Responder, Secure Connections – Handle AuthReq Flag RFU Correctly")
    def test_out_of_band_iut_responder_secure_connections_auth_req_rfu(self):
        """
            Verify that the IUT supporting LE Secure Connections is able to perform the Out-of-Band pairing procedure when receiving additional bits set in the AuthReq flag. Reserved For Future Use bits are correctly handled when acting as slave, responder.
        """

        reserved_bits_combinations = [1, 2, 3]

        for reserved_bits in reserved_bits_combinations:
            print("reserved bits in cert dut: " + str(reserved_bits))

            self._prepare_dut_for_connection()

            oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty())
            self.dut.security.SetOutOfBandData(
                OobDataMessage(
                    address=self.cert_address,
                    le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                    le_sc_random_value=oobdata.le_sc_random_value))

            oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty())
            self.cert.security.SetOutOfBandData(
                OobDataMessage(
                    address=self.dut_address,
                    le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                    le_sc_random_value=oobdata.le_sc_random_value))

            self.dut.security.SetLeIoCapability(KEYBOARD_ONLY)
            self.dut.security.SetLeOobDataPresent(OOB_PRESENT)
            self.dut_security.SetLeAuthRequirements(bond=1, mitm=0, secure_connections=1)

            self.cert.security.SetLeIoCapability(DISPLAY_ONLY)
            self.cert.security.SetLeOobDataPresent(OOB_PRESENT)
            self.cert_security.SetLeAuthRequirements(bond=1, mitm=1, secure_connections=1, reserved_bits=reserved_bits)

            # 1. Lower Tester transmits Pairing Request command with:
            # a. IO Capability set to any IO capability
            # b. OOB data flag set to 0x01 (OOB Authentication data from remote device present)
            # c. MITM set to ‘0’, Secure Connections flag is set to '1', and all reserved bits are set to a random value.
            self.cert.security.CreateBondLe(self.dut_address)

            assertThat(self.dut_security.get_ui_stream()).emits(
                SecurityMatchers.UiMsg(UiMsgType.DISPLAY_PAIRING_PROMPT))

            # 2. IUT responds with a Pairing Response command, with:
            # a. IO Capability set to any IO capability
            # b. OOB data flag set to 0x01 (OOB Authentication data present)
            # c. Secure Connections flag is set to '1', All reserved bits are set to ‘0’
            self.dut.security.SendUiCallback(
                UiCallbackMsg(
                    message_type=UiCallbackType.PAIRING_PROMPT, boolean=True, unique_id=1, address=self.cert_address))

            # 3. IUT and Lower Tester perform phase 2 of the OOB authenticated pairing and establish an encrypted link with the generated LTK.

            assertThat(self.cert_security.get_bond_stream()).emits(
                SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED), timeout=timedelta(seconds=10))

            assertThat(self.dut_security.get_bond_stream()).emits(
                SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED), timeout=timedelta(seconds=10))

            self.cert.security.RemoveBond(self.dut_address)
            self.dut.security.RemoveBond(self.cert_address)

            assertThat(self.dut_security.get_bond_stream()).emits(SecurityMatchers.BondMsg(BondMsgType.DEVICE_UNBONDED))

            self.dut_security.wait_device_disconnect(self.cert_address)

    @metadata(
        pts_test_id="SM/MAS/SCOB/BV-04-C",
        pts_test_name="Out of Band, IUT Initiator, Secure Connections – Handle AuthReq Flag RFU Correctly")
    def test_out_of_band_iut_initiator_secure_connections_auth_req_rfu(self):
        """
            Verify that the IUT supporting LE Secure Connections is able to perform the Out-of-Band pairing procedure when receiving additional bits set in the AuthReq flag. Reserved For Future Use bits are correctly handled when acting as master, initiator.
        """

        reserved_bits_combinations = [1, 2, 3]

        for reserved_bits in reserved_bits_combinations:
            print("reserved bits in cert dut: " + str(reserved_bits))

            self._prepare_cert_for_connection()

            oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty())
            self.dut.security.SetOutOfBandData(
                OobDataMessage(
                    address=self.cert_address,
                    le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                    le_sc_random_value=oobdata.le_sc_random_value))

            oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty())
            self.cert.security.SetOutOfBandData(
                OobDataMessage(
                    address=self.dut_address,
                    le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                    le_sc_random_value=oobdata.le_sc_random_value))

            self.dut.security.SetLeIoCapability(KEYBOARD_ONLY)
            self.dut.security.SetLeOobDataPresent(OOB_PRESENT)
            self.dut_security.SetLeAuthRequirements(bond=1, mitm=0, secure_connections=1, reserved_bits=0)

            self.cert.security.SetLeIoCapability(DISPLAY_ONLY)
            self.cert.security.SetLeOobDataPresent(OOB_PRESENT)
            self.cert_security.SetLeAuthRequirements(bond=1, mitm=1, secure_connections=1, reserved_bits=reserved_bits)

            # 1. IUT transmits Pairing Request command with:
            # a. IO Capability set to any IO capability
            # b. OOB data flag set to 0x01 (OOB Authentication data present)
            # c. MITM set to ‘0’, Secure Connections flag is set to '1', and all reserved bits are set to ‘0’
            self.dut.security.CreateBondLe(self.cert_address)

            assertThat(self.cert_security.get_ui_stream()).emits(
                SecurityMatchers.UiMsg(UiMsgType.DISPLAY_PAIRING_PROMPT))

            # 2. Lower Tester responds with a Pairing Response command, with:
            # a. IO Capability set to any IO capability
            # b. OOB data flag set to 0x01 (OOB Authentication data present)
            # c. Secure Connections flag is set to '1', and all reserved bits are set to a random value.
            self.cert.security.SendUiCallback(
                UiCallbackMsg(
                    message_type=UiCallbackType.PAIRING_PROMPT, boolean=True, unique_id=1, address=self.dut_address))

            # 3. IUT and Lower Tester perform phase 2 of the OOB authenticated pairing and establish an encrypted link with the generated LTK.

            assertThat(self.dut_security.get_bond_stream()).emits(
                SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED), timeout=timedelta(seconds=10))

            assertThat(self.cert_security.get_bond_stream()).emits(
                SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED), timeout=timedelta(seconds=10))

            self.dut.security.RemoveBond(self.cert_address)
            self.cert.security.RemoveBond(self.dut_address)

            assertThat(self.dut_security.get_bond_stream()).emits(SecurityMatchers.BondMsg(BondMsgType.DEVICE_UNBONDED))

            self.dut_security.wait_device_disconnect(self.cert_address)
+47 −3
Original line number Diff line number Diff line
@@ -19,12 +19,15 @@
#include "hci/address_with_type.h"
#include "hci/le_address_manager.h"
#include "l2cap/classic/security_policy.h"
#include "l2cap/le/l2cap_le_module.h"
#include "os/handler.h"
#include "security/facade.grpc.pb.h"
#include "security/security_manager_listener.h"
#include "security/security_module.h"
#include "security/ui.h"

using bluetooth::l2cap::le::L2capLeModule;

namespace bluetooth {
namespace security {

@@ -40,10 +43,41 @@ constexpr uint8_t AUTH_REQ_RFU_MASK = 0xC0;

class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public ISecurityManagerListener, public UI {
 public:
  SecurityModuleFacadeService(SecurityModule* security_module, ::bluetooth::os::Handler* security_handler)
      : security_module_(security_module), security_handler_(security_handler) {
  SecurityModuleFacadeService(
      SecurityModule* security_module, L2capLeModule* l2cap_le_module, ::bluetooth::os::Handler* security_handler)
      : security_module_(security_module), l2cap_le_module_(l2cap_le_module), security_handler_(security_handler) {
    security_module_->GetSecurityManager()->RegisterCallbackListener(this, security_handler_);
    security_module_->GetSecurityManager()->SetUserInterfaceHandler(this, security_handler_);

    /* In order to receive connect/disconenct event, we must register service */
    l2cap_le_module_->GetFixedChannelManager()->RegisterService(
        bluetooth::l2cap::kLastFixedChannel - 2,
        common::BindOnce(&SecurityModuleFacadeService::OnL2capRegistrationCompleteLe, common::Unretained(this)),
        common::Bind(&SecurityModuleFacadeService::OnConnectionOpenLe, common::Unretained(this)),
        security_handler_);
  }

  void OnL2capRegistrationCompleteLe(
      l2cap::le::FixedChannelManager::RegistrationResult result,
      std::unique_ptr<l2cap::le::FixedChannelService> le_smp_service) {
    ASSERT_LOG(
        result == bluetooth::l2cap::le::FixedChannelManager::RegistrationResult::SUCCESS,
        "Failed to register to LE SMP Fixed Channel Service");
  }

  void OnConnectionOpenLe(std::unique_ptr<l2cap::le::FixedChannel> channel) {
    channel->RegisterOnCloseCallback(
        security_handler_,
        common::BindOnce(
            &SecurityModuleFacadeService::OnConnectionClosedLe, common::Unretained(this), channel->GetDevice()));
  }

  void OnConnectionClosedLe(hci::AddressWithType address, hci::ErrorCode error_code) {
    SecurityHelperMsg disconnected;
    disconnected.mutable_peer()->mutable_address()->set_address(address.ToString());
    disconnected.mutable_peer()->set_type(static_cast<facade::BluetoothAddressTypeEnum>(address.GetAddressType()));
    disconnected.set_message_type(HelperMsgType::DEVICE_DISCONNECTED);
    helper_events_.OnIncomingEvent(disconnected);
  }

  ::grpc::Status CreateBond(::grpc::ServerContext* context, const facade::BluetoothAddressWithType* request,
@@ -118,6 +152,12 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public
    return bond_events_.RunLoop(context, writer);
  }

  ::grpc::Status FetchHelperEvents(
      ::grpc::ServerContext* context,
      const ::google::protobuf::Empty* request,
      ::grpc::ServerWriter<SecurityHelperMsg>* writer) override {
    return helper_events_.RunLoop(context, writer);
  }
  ::grpc::Status SetIoCapability(::grpc::ServerContext* context, const IoCapabilityMessage* request,
                                 ::google::protobuf::Empty* response) override {
    security_module_->GetFacadeConfigurationApi()->SetIoCapability(
@@ -365,9 +405,11 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public

 private:
  SecurityModule* security_module_;
  L2capLeModule* l2cap_le_module_;
  ::bluetooth::os::Handler* security_handler_;
  ::bluetooth::grpc::GrpcEventQueue<UiMsg> ui_events_{"UI events"};
  ::bluetooth::grpc::GrpcEventQueue<BondMsg> bond_events_{"Bond events"};
  ::bluetooth::grpc::GrpcEventQueue<SecurityHelperMsg> helper_events_{"Events that don't fit any other category"};
  ::bluetooth::grpc::GrpcEventQueue<EnforceSecurityPolicyMsg> enforce_security_policy_events_{
      "Enforce Security Policy Events"};
  uint32_t unique_id{1};
@@ -378,11 +420,13 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public
void SecurityModuleFacadeModule::ListDependencies(ModuleList* list) {
  ::bluetooth::grpc::GrpcFacadeModule::ListDependencies(list);
  list->add<SecurityModule>();
  list->add<L2capLeModule>();
}

void SecurityModuleFacadeModule::Start() {
  ::bluetooth::grpc::GrpcFacadeModule::Start();
  service_ = new SecurityModuleFacadeService(GetDependency<SecurityModule>(), GetHandler());
  service_ =
      new SecurityModuleFacadeService(GetDependency<SecurityModule>(), GetDependency<L2capLeModule>(), GetHandler());
}

void SecurityModuleFacadeModule::Stop() {
+8 −0
Original line number Diff line number Diff line
@@ -24,6 +24,7 @@ service SecurityModuleFacade {
  rpc SendUiCallback(UiCallbackMsg) returns (google.protobuf.Empty) {}
  rpc FetchUiEvents(google.protobuf.Empty) returns (stream UiMsg) {}
  rpc FetchBondEvents(google.protobuf.Empty) returns (stream BondMsg) {}
  rpc FetchHelperEvents(google.protobuf.Empty) returns (stream SecurityHelperMsg) {}
  rpc EnforceSecurityPolicy(SecurityPolicyMessage) returns (google.protobuf.Empty) {}
  rpc FetchEnforceSecurityPolicyEvents(google.protobuf.Empty) returns (stream EnforceSecurityPolicyMsg) {}
}
@@ -75,6 +76,13 @@ message BondMsg {
  facade.BluetoothAddressWithType peer = 2;
}

enum HelperMsgType { DEVICE_DISCONNECTED = 0; }

message SecurityHelperMsg {
  HelperMsgType message_type = 1;
  facade.BluetoothAddressWithType peer = 2;
}

enum IoCapabilities {
  DISPLAY_ONLY = 0;
  DISPLAY_YES_NO_IO_CAP = 1;