Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 028b5f36 authored by Jakub Pawlowski's avatar Jakub Pawlowski
Browse files

Implementation of OOB PTS cert test cases

SM/SLA/SCOB/BV-02-C
SM/SLA/SCOB/BV-03-C
SM/MAS/SCOB/BV-04-C

This patch also fixes how we wait for disconnection event. Instead of
using fixed timeout, wait for actual disconnection.

Tag: #gd-refactor
Bug: 155399771
Test: gd/cert/run --host LeSecurityTest
Change-Id: I6d132334a8f435f2fbc10a1fc010ace06134abae
parent 8e294a46
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -594,3 +594,7 @@ class SecurityMatchers(object):
    @staticmethod
    def BondMsg(type):
        return lambda event: True if event.message_type == type else False

    @staticmethod
    def HelperMsg(type):
        return lambda event: True if event.message_type == type else False
+12 −0
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ from cert.captures import SecurityCaptures
from cert.closable import Closable
from cert.closable import safeClose
from cert.event_stream import EventStream
from cert.matchers import SecurityMatchers
from cert.truth import assertThat
from datetime import timedelta
from facade import common_pb2 as common
@@ -32,6 +33,7 @@ from security.facade_pb2 import LeAuthRequirementsMessage
from security.facade_pb2 import OobDataPresentMessage
from security.facade_pb2 import UiCallbackMsg
from security.facade_pb2 import UiCallbackType
from security.facade_pb2 import HelperMsgType


class PyLeSecurity(Closable):
@@ -41,6 +43,7 @@ class PyLeSecurity(Closable):

    _ui_event_stream = None
    _bond_event_stream = None
    _helper_event_stream = None

    def __init__(self, device):
        logging.info("DUT: Init")
@@ -48,6 +51,7 @@ class PyLeSecurity(Closable):
        self._device.wait_channel_ready()
        self._ui_event_stream = EventStream(self._device.security.FetchUiEvents(empty_proto.Empty()))
        self._bond_event_stream = EventStream(self._device.security.FetchBondEvents(empty_proto.Empty()))
        self._helper_event_stream = EventStream(self._device.security.FetchHelperEvents(empty_proto.Empty()))

    def get_ui_stream(self):
        return self._ui_event_stream
@@ -60,6 +64,9 @@ class PyLeSecurity(Closable):
        assertThat(self._ui_event_stream).emits(display_passkey_capture, timeout=timeout)
        return display_passkey_capture.get()

    def wait_device_disconnect(self, address):
        assertThat(self._helper_event_stream).emits(SecurityMatchers.HelperMsg(HelperMsgType.DEVICE_DISCONNECTED))

    def SetLeAuthRequirements(self, *args, **kwargs):
        return self._device.security.SetLeAuthRequirements(LeAuthRequirementsMessage(*args, **kwargs))

@@ -73,3 +80,8 @@ class PyLeSecurity(Closable):
            safeClose(self._bond_event_stream)
        else:
            logging.info("DUT: Bond Event Stream is None!")

        if self._helper_event_stream is not None:
            safeClose(self._helper_event_stream)
        else:
            logging.info("DUT: Helper Event Stream is None!")
+204 −3
Original line number Diff line number Diff line
@@ -702,6 +702,207 @@ class LeSecurityTest(GdBaseTestClass):

            assertThat(self.dut_security.get_bond_stream()).emits(SecurityMatchers.BondMsg(BondMsgType.DEVICE_UNBONDED))

            #disconnection should happen after one second timeout, sleep for 2 just to be sure
            #TODO(jpawlowski): wait for disconnection event instead of sleeping.
            time.sleep(2)
            self.dut_security.wait_device_disconnect(self.cert_address)

    @metadata(
        pts_test_id="SM/SLA/SCOB/BV-02-C", pts_test_name="Out of Band, IUT Responder, Secure Connections – Success")
    def test_out_of_band_iut_responder_secure_connections(self):
        """
            Verify that the IUT supporting LE Secure Connections is able to perform the Out-of-Band pairing procedure correctly when acting as slave, responder.
        """

        oob_combinations = [(OOB_NOT_PRESENT, OOB_PRESENT), (OOB_PRESENT, OOB_NOT_PRESENT), (OOB_PRESENT, OOB_PRESENT)]

        for (dut_oob_flag, cert_oob_flag) in oob_combinations:
            print("oob flag combination dut: " + str(dut_oob_flag) + ", cert: " + str(cert_oob_flag))

            self._prepare_dut_for_connection()

            if dut_oob_flag == LeOobDataFlag.PRESENT:
                oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty())
                self.dut.security.SetOutOfBandData(
                    OobDataMessage(
                        address=self.cert_address,
                        le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                        le_sc_random_value=oobdata.le_sc_random_value))

            if cert_oob_flag == LeOobDataFlag.PRESENT:
                oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty())
                self.cert.security.SetOutOfBandData(
                    OobDataMessage(
                        address=self.dut_address,
                        le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                        le_sc_random_value=oobdata.le_sc_random_value))

            self.dut.security.SetLeIoCapability(KEYBOARD_ONLY)
            self.dut.security.SetLeOobDataPresent(dut_oob_flag)
            self.dut_security.SetLeAuthRequirements(bond=1, mitm=1, secure_connections=1)

            self.cert.security.SetLeIoCapability(DISPLAY_ONLY)
            self.cert.security.SetLeOobDataPresent(cert_oob_flag)
            self.cert_security.SetLeAuthRequirements(bond=1, mitm=1, secure_connections=1)

            # 1. Lower Tester transmits a Pairing Request command with OOB data flag set to either 0x00 or 0x01, and Secure Connections flag set to '1'.
            self.cert.security.CreateBondLe(self.dut_address)

            assertThat(self.dut_security.get_ui_stream()).emits(
                SecurityMatchers.UiMsg(UiMsgType.DISPLAY_PAIRING_PROMPT))

            # 2. IUT responds with a Pairing Response command with Secure Connections flag set to '1' and OOB data flag set to either 0x00 or 0x01.
            self.dut.security.SendUiCallback(
                UiCallbackMsg(
                    message_type=UiCallbackType.PAIRING_PROMPT, boolean=True, unique_id=1, address=self.cert_address))

            # 3. IUT uses the 128-bit value generated by the Lower Tester as the confirm value. Similarly, the Lower Tester uses the 128-bit value generated by the IUT as the confirm value.

            # 4. IUT and Lower Tester perform phase 2 of the pairing process and establish an encrypted link with an LTK generated using the OOB data in phase 2.
            assertThat(self.cert_security.get_bond_stream()).emits(
                SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED), timeout=timedelta(seconds=10))

            assertThat(self.dut_security.get_bond_stream()).emits(
                SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED), timeout=timedelta(seconds=10))

            self.cert.security.RemoveBond(self.dut_address)
            self.dut.security.RemoveBond(self.cert_address)

            assertThat(self.dut_security.get_bond_stream()).emits(SecurityMatchers.BondMsg(BondMsgType.DEVICE_UNBONDED))

            self.dut_security.wait_device_disconnect(self.cert_address)

    @metadata(
        pts_test_id="SM/SLA/SCOB/BV-03-C",
        pts_test_name="Out of Band, IUT Responder, Secure Connections – Handle AuthReq Flag RFU Correctly")
    def test_out_of_band_iut_responder_secure_connections_auth_req_rfu(self):
        """
            Verify that the IUT supporting LE Secure Connections is able to perform the Out-of-Band pairing procedure when receiving additional bits set in the AuthReq flag. Reserved For Future Use bits are correctly handled when acting as slave, responder.
        """

        reserved_bits_combinations = [1, 2, 3]

        for reserved_bits in reserved_bits_combinations:
            print("reserved bits in cert dut: " + str(reserved_bits))

            self._prepare_dut_for_connection()

            oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty())
            self.dut.security.SetOutOfBandData(
                OobDataMessage(
                    address=self.cert_address,
                    le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                    le_sc_random_value=oobdata.le_sc_random_value))

            oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty())
            self.cert.security.SetOutOfBandData(
                OobDataMessage(
                    address=self.dut_address,
                    le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                    le_sc_random_value=oobdata.le_sc_random_value))

            self.dut.security.SetLeIoCapability(KEYBOARD_ONLY)
            self.dut.security.SetLeOobDataPresent(OOB_PRESENT)
            self.dut_security.SetLeAuthRequirements(bond=1, mitm=0, secure_connections=1)

            self.cert.security.SetLeIoCapability(DISPLAY_ONLY)
            self.cert.security.SetLeOobDataPresent(OOB_PRESENT)
            self.cert_security.SetLeAuthRequirements(bond=1, mitm=1, secure_connections=1, reserved_bits=reserved_bits)

            # 1. Lower Tester transmits Pairing Request command with:
            # a. IO Capability set to any IO capability
            # b. OOB data flag set to 0x01 (OOB Authentication data from remote device present)
            # c. MITM set to ‘0’, Secure Connections flag is set to '1', and all reserved bits are set to a random value.
            self.cert.security.CreateBondLe(self.dut_address)

            assertThat(self.dut_security.get_ui_stream()).emits(
                SecurityMatchers.UiMsg(UiMsgType.DISPLAY_PAIRING_PROMPT))

            # 2. IUT responds with a Pairing Response command, with:
            # a. IO Capability set to any IO capability
            # b. OOB data flag set to 0x01 (OOB Authentication data present)
            # c. Secure Connections flag is set to '1', All reserved bits are set to ‘0’
            self.dut.security.SendUiCallback(
                UiCallbackMsg(
                    message_type=UiCallbackType.PAIRING_PROMPT, boolean=True, unique_id=1, address=self.cert_address))

            # 3. IUT and Lower Tester perform phase 2 of the OOB authenticated pairing and establish an encrypted link with the generated LTK.

            assertThat(self.cert_security.get_bond_stream()).emits(
                SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED), timeout=timedelta(seconds=10))

            assertThat(self.dut_security.get_bond_stream()).emits(
                SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED), timeout=timedelta(seconds=10))

            self.cert.security.RemoveBond(self.dut_address)
            self.dut.security.RemoveBond(self.cert_address)

            assertThat(self.dut_security.get_bond_stream()).emits(SecurityMatchers.BondMsg(BondMsgType.DEVICE_UNBONDED))

            self.dut_security.wait_device_disconnect(self.cert_address)

    @metadata(
        pts_test_id="SM/MAS/SCOB/BV-04-C",
        pts_test_name="Out of Band, IUT Initiator, Secure Connections – Handle AuthReq Flag RFU Correctly")
    def test_out_of_band_iut_initiator_secure_connections_auth_req_rfu(self):
        """
            Verify that the IUT supporting LE Secure Connections is able to perform the Out-of-Band pairing procedure when receiving additional bits set in the AuthReq flag. Reserved For Future Use bits are correctly handled when acting as master, initiator.
        """

        reserved_bits_combinations = [1, 2, 3]

        for reserved_bits in reserved_bits_combinations:
            print("reserved bits in cert dut: " + str(reserved_bits))

            self._prepare_cert_for_connection()

            oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty())
            self.dut.security.SetOutOfBandData(
                OobDataMessage(
                    address=self.cert_address,
                    le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                    le_sc_random_value=oobdata.le_sc_random_value))

            oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty())
            self.cert.security.SetOutOfBandData(
                OobDataMessage(
                    address=self.dut_address,
                    le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                    le_sc_random_value=oobdata.le_sc_random_value))

            self.dut.security.SetLeIoCapability(KEYBOARD_ONLY)
            self.dut.security.SetLeOobDataPresent(OOB_PRESENT)
            self.dut_security.SetLeAuthRequirements(bond=1, mitm=0, secure_connections=1, reserved_bits=0)

            self.cert.security.SetLeIoCapability(DISPLAY_ONLY)
            self.cert.security.SetLeOobDataPresent(OOB_PRESENT)
            self.cert_security.SetLeAuthRequirements(bond=1, mitm=1, secure_connections=1, reserved_bits=reserved_bits)

            # 1. IUT transmits Pairing Request command with:
            # a. IO Capability set to any IO capability
            # b. OOB data flag set to 0x01 (OOB Authentication data present)
            # c. MITM set to ‘0’, Secure Connections flag is set to '1', and all reserved bits are set to ‘0’
            self.dut.security.CreateBondLe(self.cert_address)

            assertThat(self.cert_security.get_ui_stream()).emits(
                SecurityMatchers.UiMsg(UiMsgType.DISPLAY_PAIRING_PROMPT))

            # 2. Lower Tester responds with a Pairing Response command, with:
            # a. IO Capability set to any IO capability
            # b. OOB data flag set to 0x01 (OOB Authentication data present)
            # c. Secure Connections flag is set to '1', and all reserved bits are set to a random value.
            self.cert.security.SendUiCallback(
                UiCallbackMsg(
                    message_type=UiCallbackType.PAIRING_PROMPT, boolean=True, unique_id=1, address=self.dut_address))

            # 3. IUT and Lower Tester perform phase 2 of the OOB authenticated pairing and establish an encrypted link with the generated LTK.

            assertThat(self.dut_security.get_bond_stream()).emits(
                SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED), timeout=timedelta(seconds=10))

            assertThat(self.cert_security.get_bond_stream()).emits(
                SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED), timeout=timedelta(seconds=10))

            self.dut.security.RemoveBond(self.cert_address)
            self.cert.security.RemoveBond(self.dut_address)

            assertThat(self.dut_security.get_bond_stream()).emits(SecurityMatchers.BondMsg(BondMsgType.DEVICE_UNBONDED))

            self.dut_security.wait_device_disconnect(self.cert_address)
+47 −3
Original line number Diff line number Diff line
@@ -19,12 +19,15 @@
#include "hci/address_with_type.h"
#include "hci/le_address_manager.h"
#include "l2cap/classic/security_policy.h"
#include "l2cap/le/l2cap_le_module.h"
#include "os/handler.h"
#include "security/facade.grpc.pb.h"
#include "security/security_manager_listener.h"
#include "security/security_module.h"
#include "security/ui.h"

using bluetooth::l2cap::le::L2capLeModule;

namespace bluetooth {
namespace security {

@@ -40,10 +43,41 @@ constexpr uint8_t AUTH_REQ_RFU_MASK = 0xC0;

class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public ISecurityManagerListener, public UI {
 public:
  SecurityModuleFacadeService(SecurityModule* security_module, ::bluetooth::os::Handler* security_handler)
      : security_module_(security_module), security_handler_(security_handler) {
  SecurityModuleFacadeService(
      SecurityModule* security_module, L2capLeModule* l2cap_le_module, ::bluetooth::os::Handler* security_handler)
      : security_module_(security_module), l2cap_le_module_(l2cap_le_module), security_handler_(security_handler) {
    security_module_->GetSecurityManager()->RegisterCallbackListener(this, security_handler_);
    security_module_->GetSecurityManager()->SetUserInterfaceHandler(this, security_handler_);

    /* In order to receive connect/disconenct event, we must register service */
    l2cap_le_module_->GetFixedChannelManager()->RegisterService(
        bluetooth::l2cap::kLastFixedChannel - 2,
        common::BindOnce(&SecurityModuleFacadeService::OnL2capRegistrationCompleteLe, common::Unretained(this)),
        common::Bind(&SecurityModuleFacadeService::OnConnectionOpenLe, common::Unretained(this)),
        security_handler_);
  }

  void OnL2capRegistrationCompleteLe(
      l2cap::le::FixedChannelManager::RegistrationResult result,
      std::unique_ptr<l2cap::le::FixedChannelService> le_smp_service) {
    ASSERT_LOG(
        result == bluetooth::l2cap::le::FixedChannelManager::RegistrationResult::SUCCESS,
        "Failed to register to LE SMP Fixed Channel Service");
  }

  void OnConnectionOpenLe(std::unique_ptr<l2cap::le::FixedChannel> channel) {
    channel->RegisterOnCloseCallback(
        security_handler_,
        common::BindOnce(
            &SecurityModuleFacadeService::OnConnectionClosedLe, common::Unretained(this), channel->GetDevice()));
  }

  void OnConnectionClosedLe(hci::AddressWithType address, hci::ErrorCode error_code) {
    SecurityHelperMsg disconnected;
    disconnected.mutable_peer()->mutable_address()->set_address(address.ToString());
    disconnected.mutable_peer()->set_type(static_cast<facade::BluetoothAddressTypeEnum>(address.GetAddressType()));
    disconnected.set_message_type(HelperMsgType::DEVICE_DISCONNECTED);
    helper_events_.OnIncomingEvent(disconnected);
  }

  ::grpc::Status CreateBond(::grpc::ServerContext* context, const facade::BluetoothAddressWithType* request,
@@ -118,6 +152,12 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public
    return bond_events_.RunLoop(context, writer);
  }

  ::grpc::Status FetchHelperEvents(
      ::grpc::ServerContext* context,
      const ::google::protobuf::Empty* request,
      ::grpc::ServerWriter<SecurityHelperMsg>* writer) override {
    return helper_events_.RunLoop(context, writer);
  }
  ::grpc::Status SetIoCapability(::grpc::ServerContext* context, const IoCapabilityMessage* request,
                                 ::google::protobuf::Empty* response) override {
    security_module_->GetFacadeConfigurationApi()->SetIoCapability(
@@ -365,9 +405,11 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public

 private:
  SecurityModule* security_module_;
  L2capLeModule* l2cap_le_module_;
  ::bluetooth::os::Handler* security_handler_;
  ::bluetooth::grpc::GrpcEventQueue<UiMsg> ui_events_{"UI events"};
  ::bluetooth::grpc::GrpcEventQueue<BondMsg> bond_events_{"Bond events"};
  ::bluetooth::grpc::GrpcEventQueue<SecurityHelperMsg> helper_events_{"Events that don't fit any other category"};
  ::bluetooth::grpc::GrpcEventQueue<EnforceSecurityPolicyMsg> enforce_security_policy_events_{
      "Enforce Security Policy Events"};
  uint32_t unique_id{1};
@@ -378,11 +420,13 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public
void SecurityModuleFacadeModule::ListDependencies(ModuleList* list) {
  ::bluetooth::grpc::GrpcFacadeModule::ListDependencies(list);
  list->add<SecurityModule>();
  list->add<L2capLeModule>();
}

void SecurityModuleFacadeModule::Start() {
  ::bluetooth::grpc::GrpcFacadeModule::Start();
  service_ = new SecurityModuleFacadeService(GetDependency<SecurityModule>(), GetHandler());
  service_ =
      new SecurityModuleFacadeService(GetDependency<SecurityModule>(), GetDependency<L2capLeModule>(), GetHandler());
}

void SecurityModuleFacadeModule::Stop() {
+8 −0
Original line number Diff line number Diff line
@@ -24,6 +24,7 @@ service SecurityModuleFacade {
  rpc SendUiCallback(UiCallbackMsg) returns (google.protobuf.Empty) {}
  rpc FetchUiEvents(google.protobuf.Empty) returns (stream UiMsg) {}
  rpc FetchBondEvents(google.protobuf.Empty) returns (stream BondMsg) {}
  rpc FetchHelperEvents(google.protobuf.Empty) returns (stream SecurityHelperMsg) {}
  rpc EnforceSecurityPolicy(SecurityPolicyMessage) returns (google.protobuf.Empty) {}
  rpc FetchEnforceSecurityPolicyEvents(google.protobuf.Empty) returns (stream EnforceSecurityPolicyMsg) {}
}
@@ -75,6 +76,13 @@ message BondMsg {
  facade.BluetoothAddressWithType peer = 2;
}

enum HelperMsgType { DEVICE_DISCONNECTED = 0; }

message SecurityHelperMsg {
  HelperMsgType message_type = 1;
  facade.BluetoothAddressWithType peer = 2;
}

enum IoCapabilities {
  DISPLAY_ONLY = 0;
  DISPLAY_YES_NO_IO_CAP = 1;