Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 03d12cf9 authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "LeL2cap: Add security check for connection request"

parents fd33acd1 265a8739
Loading
Loading
Loading
Loading
+6 −2
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ from google.protobuf import empty_pb2 as empty_proto

from l2cap.classic import facade_pb2 as l2cap_facade_pb2
from l2cap.le import facade_pb2 as l2cap_le_facade_pb2
from l2cap.le.facade_pb2 import SecurityLevel
from bluetooth_packets_python3 import l2cap_packets
from cert.event_stream import FilteringEventStream
from cert.event_stream import EventStream, IEventStream
@@ -203,10 +204,13 @@ class PyLeL2cap(Closable):
    def get_fixed_channel(self, cid=4):
        return PyLeL2capFixedChannel(self._device, cid, self._le_l2cap_stream)

    def register_coc(self, cert_address, psm=0x33):
    def register_coc(self,
                     cert_address,
                     psm=0x33,
                     security_level=SecurityLevel.NO_SECURITY):
        self._device.l2cap_le.SetDynamicChannel(
            l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(
                psm=psm, enable=True))
                psm=psm, enable=True, security_level=security_level))
        return PyLeL2capDynamicChannel(self._device, cert_address, psm,
                                       self._le_l2cap_stream)

+10 −0
Original line number Diff line number Diff line
@@ -158,6 +158,16 @@ class CertLeL2cap(Closable):
        self._cid_to_cert_channels[scid] = channel
        return channel

    def open_channel_with_expected_result(
            self, psm=0x33,
            result=LeCreditBasedConnectionResponseResult.SUCCESS):
        self.control_channel.send(
            l2cap_packets.LeCreditBasedConnectionRequestBuilder(
                1, psm, 0x40, 1000, 100, 6))

        response = L2capMatchers.CreditBasedConnectionResponse(result)
        assertThat(self.control_channel).emits(response)

    def verify_and_respond_open_channel_from_remote(
            self,
            psm=0x33,
+83 −16
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3 import hci_packets, l2cap_packets
from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult
from l2cap.le.cert.cert_le_l2cap import CertLeL2cap
from l2cap.le.facade_pb2 import SecurityLevel

# Assemble a sample packet.
SAMPLE_PACKET = bt_packets.RawBuilder([0x19, 0x26, 0x08, 0x17])
@@ -41,12 +42,15 @@ class LeL2capTest(GdBaseTestClass):
        self.dut_l2cap = PyLeL2cap(self.dut)
        self.cert_l2cap = CertLeL2cap(self.cert)
        self.dut_address = common.BluetoothAddressWithType(
            address = common.BluetoothAddress(address=bytes(b'0D:05:04:03:02:01')),
            address=common.BluetoothAddress(
                address=bytes(b'0D:05:04:03:02:01')),
            type=common.RANDOM_DEVICE_ADDRESS)
        self.cert_address = common.BluetoothAddressWithType(
            address = common.BluetoothAddress(address=bytes(b'55:11:FF:AA:33:22')),
            address=common.BluetoothAddress(
                address=bytes(b'55:11:FF:AA:33:22')),
            type=common.RANDOM_DEVICE_ADDRESS)
        self.cert_l2cap._device.hci_le_acl_manager.SetInitiatorAddress(self.cert_address)
        self.cert_l2cap._device.hci_le_acl_manager.SetInitiatorAddress(
            self.cert_address)

    def teardown_test(self):
        self.cert_l2cap.close()
@@ -68,9 +72,11 @@ class LeL2capTest(GdBaseTestClass):
            event_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
            address_type=common.RANDOM_DEVICE_ADDRESS,
            peer_address_type=common.PUBLIC_DEVICE_ADDRESS,
            peer_address=common.BluetoothAddress(address=bytes(b'00:00:00:00:00:00')),
            peer_address=common.BluetoothAddress(
                address=bytes(b'00:00:00:00:00:00')),
            channel_map=7,
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.
            ALL_DEVICES)
        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
        create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(
            request)
@@ -97,13 +103,16 @@ class LeL2capTest(GdBaseTestClass):
            event_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
            address_type=common.RANDOM_DEVICE_ADDRESS,
            peer_address_type=common.PUBLIC_DEVICE_ADDRESS,
            peer_address=common.BluetoothAddress(address=bytes(b'00:00:00:00:00:00')),
            peer_address=common.BluetoothAddress(
                address=bytes(b'00:00:00:00:00:00')),
            channel_map=7,
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.
            ALL_DEVICES)
        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
        create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(
            request)
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm)
        self.cert_l2cap.wait_for_connection()
        # TODO: Currently we can only connect by using Dynamic channel API. Use fixed channel instead.
        cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote(
@@ -126,7 +135,8 @@ class LeL2capTest(GdBaseTestClass):
        return (dut_channel, cert_channel)

    def _open_channel_from_dut(self, psm=0x33):
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm)
        cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm)
        dut_channel = response_future.get_channel()
@@ -358,7 +368,8 @@ class LeL2capTest(GdBaseTestClass):
        Verify that an IUT sending an LE Credit Based Connection Request to a legacy peer and receiving a Command Reject does not establish the channel.
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm=0x33)
        self.cert_l2cap.verify_and_reject_open_channel_from_remote(psm=0x33)
        assertThat(response_future.get_status()).isNotEqualTo(
            LeCreditBasedConnectionResponseResult.SUCCESS)
@@ -398,7 +409,8 @@ class LeL2capTest(GdBaseTestClass):
        Verify that an IUT sending an LE Credit Based Connection Request on an unsupported LE_PSM will not establish a channel upon receiving an LE Credit Based Connection Response refusing the connection.
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED)
@@ -493,7 +505,8 @@ class LeL2capTest(GdBaseTestClass):
        Verify that the IUT does not establish the channel upon receipt of an LE Credit Based Connection Response indicating the connection was refused with Result “0x0005 – Connection Refused – Insufficient Authentication".
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.
@@ -501,6 +514,23 @@ class LeL2capTest(GdBaseTestClass):
        assertThat(response_future.get_status()).isEqualTo(
            LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHENTICATION)

    @metadata(
        pts_test_id="L2CAP/LE/CFC/BV-11-C",
        pts_test_name="Security - Insufficient Authentication – Responder")
    def test_security_insufficient_authentication_responder(self):
        """
        Verify that an IUT refuses to create a connection upon reception of an LE Credit Based Connection
Request which fails to satisfy authentication requirements.
        """
        self._setup_link_from_cert()
        psm = 0x33
        self.dut_l2cap.register_coc(
            self.cert_address, psm,
            SecurityLevel.AUTHENTICATED_PAIRING_WITH_ENCRYPTION)
        self.cert_l2cap.open_channel_with_expected_result(
            psm,
            LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHENTICATION)

    @metadata(
        pts_test_id="L2CAP/LE/CFC/BV-12-C",
        pts_test_name="Security - Insufficient Authorization – Initiator")
@@ -509,7 +539,8 @@ class LeL2capTest(GdBaseTestClass):
        Verify that the IUT does not establish the channel upon receipt of an LE Credit Based Connection Response indicating the connection was refused with Result “0x0006 – Connection Refused – Insufficient Authorization”.
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.
@@ -517,6 +548,22 @@ class LeL2capTest(GdBaseTestClass):
        assertThat(response_future.get_status()).isEqualTo(
            LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHORIZATION)

    @metadata(
        pts_test_id="L2CAP/LE/CFC/BV-13-C",
        pts_test_name="Security - Insufficient Authorization – Responder")
    def test_security_insufficient_authorization_responder(self):
        """
        Verify that an IUT refuses to create a connection upon reception of an LE Credit Based Connection
        Request which fails to satisfy authentication requirements.
        """
        self._setup_link_from_cert()
        psm = 0x33
        self.dut_l2cap.register_coc(self.cert_address, psm,
                                    SecurityLevel.AUTHORIZATION)
        self.cert_l2cap.open_channel_with_expected_result(
            psm,
            LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHORIZATION)

    @metadata(
        pts_test_id="L2CAP/LE/CFC/BV-14-C",
        pts_test_name="Security - Insufficient Key Size – Initiator")
@@ -538,6 +585,23 @@ class LeL2capTest(GdBaseTestClass):
            LeCreditBasedConnectionResponseResult.
            INSUFFICIENT_ENCRYPTION_KEY_SIZE)

    @metadata(
        pts_test_id="L2CAP/LE/CFC/BV-15-C",
        pts_test_name="Security - Insufficient Encryption Key Size – Responder")
    def test_security_insufficient_encryption_key_size_responder(self):
        """
        Verify that an IUT refuses to create a connection upon receipt of an LE Credit Based Connection
        Request which fails to satisfy Encryption Key Size requirements.
        """
        self._setup_link_from_cert()
        psm = 0x33
        self.dut_l2cap.register_coc(
            self.cert_address, psm,
            SecurityLevel.AUTHENTICATED_PAIRING_WITH_128_BIT_KEY)
        self.cert_l2cap.open_channel_with_expected_result(
            psm, LeCreditBasedConnectionResponseResult.
            INSUFFICIENT_ENCRYPTION_KEY_SIZE)

    @metadata(
        pts_test_id="L2CAP/LE/CFC/BV-16-C",
        pts_test_name=
@@ -569,7 +633,8 @@ class LeL2capTest(GdBaseTestClass):
        Verify that an IUT sending an LE Credit Based Connection Request does not establish the channel upon receiving an LE Credit Based Connection Response refusing the connection with result "0x0009 – Connection refused – Invalid Source CID".
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.INVALID_SOURCE_CID)
@@ -587,7 +652,8 @@ class LeL2capTest(GdBaseTestClass):
        Verify that an IUT sending an LE Credit Based Connection Request does not establish the channel upon receiving an LE Credit Based Connection Response refusing the connection with result "0x000A – Connection refused – Source CID already allocated".
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.
@@ -625,7 +691,8 @@ class LeL2capTest(GdBaseTestClass):
        Verify that an IUT sending an LE Credit Based Connection Request does not establish the channel upon receiving an LE Credit Based Connection Response refusing the connection with result "0x000B – Connection refused – Unacceptable Parameters".
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.UNACCEPTABLE_PARAMETERS
+1 −0
Original line number Diff line number Diff line
@@ -47,6 +47,7 @@ bool DynamicChannelManager::RegisterService(Psm psm, DynamicChannelConfiguration
                                            OnConnectionOpenCallback on_connection_open, os::Handler* handler) {
  internal::DynamicChannelServiceImpl::PendingRegistration pending_registration{
      .user_handler_ = handler,
      .security_policy_ = security_policy,
      .on_registration_complete_callback_ = std::move(on_registration_complete),
      .on_connection_open_callback_ = std::move(on_connection_open),
      .configuration_ = configuration_option,
+6 −4
Original line number Diff line number Diff line
@@ -81,8 +81,10 @@ class L2capLeModuleFacadeService : public L2capLeModuleFacade::Service {
                                   const ::bluetooth::l2cap::le::SetEnableDynamicChannelRequest* request,
                                   ::google::protobuf::Empty* response) override {
    if (request->enable()) {
      dynamic_channel_helper_map_.emplace(request->psm(), std::make_unique<L2capDynamicChannelHelper>(
                                                              this, l2cap_layer_, facade_handler_, request->psm()));
      dynamic_channel_helper_map_.emplace(
          request->psm(),
          std::make_unique<L2capDynamicChannelHelper>(this, l2cap_layer_, facade_handler_, request->psm(),
                                                      static_cast<SecurityPolicy::Level>(request->security_level())));
      return ::grpc::Status::OK;
    } else {
      auto service_helper = dynamic_channel_helper_map_.find(request->psm());
@@ -111,11 +113,11 @@ class L2capLeModuleFacadeService : public L2capLeModuleFacade::Service {
  class L2capDynamicChannelHelper {
   public:
    L2capDynamicChannelHelper(L2capLeModuleFacadeService* service, L2capLeModule* l2cap_layer, os::Handler* handler,
                              Psm psm)
                              Psm psm, SecurityPolicy::Level security_level)
        : facade_service_(service), l2cap_layer_(l2cap_layer), handler_(handler), psm_(psm) {
      dynamic_channel_manager_ = l2cap_layer_->GetDynamicChannelManager();
      dynamic_channel_manager_->RegisterService(
          psm, {}, {},
          psm, {}, security_level,
          common::BindOnce(&L2capDynamicChannelHelper::on_l2cap_service_registration_complete,
                           common::Unretained(this)),
          common::Bind(&L2capDynamicChannelHelper::on_connection_open, common::Unretained(this)), handler_);
Loading