Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 265a8739 authored by Hansong Zhang's avatar Hansong Zhang
Browse files

LeL2cap: Add security check for connection request

We need to check authentication requirement for incoming connection
request. Currently security module is not integrated so we reject all
connection that has security requirement.

Test: cert/run --host
Change-Id: Ibaffc2de2ad02e01edab7f23d8ef39c2fff23893
parent b463ebf6
Loading
Loading
Loading
Loading
+6 −2
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ from google.protobuf import empty_pb2 as empty_proto

from l2cap.classic import facade_pb2 as l2cap_facade_pb2
from l2cap.le import facade_pb2 as l2cap_le_facade_pb2
from l2cap.le.facade_pb2 import SecurityLevel
from bluetooth_packets_python3 import l2cap_packets
from cert.event_stream import FilteringEventStream
from cert.event_stream import EventStream, IEventStream
@@ -203,10 +204,13 @@ class PyLeL2cap(Closable):
    def get_fixed_channel(self, cid=4):
        return PyLeL2capFixedChannel(self._device, cid, self._le_l2cap_stream)

    def register_coc(self, cert_address, psm=0x33):
    def register_coc(self,
                     cert_address,
                     psm=0x33,
                     security_level=SecurityLevel.NO_SECURITY):
        self._device.l2cap_le.SetDynamicChannel(
            l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(
                psm=psm, enable=True))
                psm=psm, enable=True, security_level=security_level))
        return PyLeL2capDynamicChannel(self._device, cert_address, psm,
                                       self._le_l2cap_stream)

+10 −0
Original line number Diff line number Diff line
@@ -158,6 +158,16 @@ class CertLeL2cap(Closable):
        self._cid_to_cert_channels[scid] = channel
        return channel

    def open_channel_with_expected_result(
            self, psm=0x33,
            result=LeCreditBasedConnectionResponseResult.SUCCESS):
        self.control_channel.send(
            l2cap_packets.LeCreditBasedConnectionRequestBuilder(
                1, psm, 0x40, 1000, 100, 6))

        response = L2capMatchers.CreditBasedConnectionResponse(result)
        assertThat(self.control_channel).emits(response)

    def verify_and_respond_open_channel_from_remote(
            self,
            psm=0x33,
+83 −16
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3 import hci_packets, l2cap_packets
from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult
from l2cap.le.cert.cert_le_l2cap import CertLeL2cap
from l2cap.le.facade_pb2 import SecurityLevel

# Assemble a sample packet.
SAMPLE_PACKET = bt_packets.RawBuilder([0x19, 0x26, 0x08, 0x17])
@@ -41,12 +42,15 @@ class LeL2capTest(GdBaseTestClass):
        self.dut_l2cap = PyLeL2cap(self.dut)
        self.cert_l2cap = CertLeL2cap(self.cert)
        self.dut_address = common.BluetoothAddressWithType(
            address = common.BluetoothAddress(address=bytes(b'0D:05:04:03:02:01')),
            address=common.BluetoothAddress(
                address=bytes(b'0D:05:04:03:02:01')),
            type=common.RANDOM_DEVICE_ADDRESS)
        self.cert_address = common.BluetoothAddressWithType(
            address = common.BluetoothAddress(address=bytes(b'55:11:FF:AA:33:22')),
            address=common.BluetoothAddress(
                address=bytes(b'55:11:FF:AA:33:22')),
            type=common.RANDOM_DEVICE_ADDRESS)
        self.cert_l2cap._device.hci_le_acl_manager.SetInitiatorAddress(self.cert_address)
        self.cert_l2cap._device.hci_le_acl_manager.SetInitiatorAddress(
            self.cert_address)

    def teardown_test(self):
        self.cert_l2cap.close()
@@ -68,9 +72,11 @@ class LeL2capTest(GdBaseTestClass):
            event_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
            address_type=common.RANDOM_DEVICE_ADDRESS,
            peer_address_type=common.PUBLIC_DEVICE_ADDRESS,
            peer_address=common.BluetoothAddress(address=bytes(b'00:00:00:00:00:00')),
            peer_address=common.BluetoothAddress(
                address=bytes(b'00:00:00:00:00:00')),
            channel_map=7,
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.
            ALL_DEVICES)
        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
        create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(
            request)
@@ -97,13 +103,16 @@ class LeL2capTest(GdBaseTestClass):
            event_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
            address_type=common.RANDOM_DEVICE_ADDRESS,
            peer_address_type=common.PUBLIC_DEVICE_ADDRESS,
            peer_address=common.BluetoothAddress(address=bytes(b'00:00:00:00:00:00')),
            peer_address=common.BluetoothAddress(
                address=bytes(b'00:00:00:00:00:00')),
            channel_map=7,
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.
            ALL_DEVICES)
        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
        create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(
            request)
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm)
        self.cert_l2cap.wait_for_connection()
        # TODO: Currently we can only connect by using Dynamic channel API. Use fixed channel instead.
        cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote(
@@ -126,7 +135,8 @@ class LeL2capTest(GdBaseTestClass):
        return (dut_channel, cert_channel)

    def _open_channel_from_dut(self, psm=0x33):
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm)
        cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm)
        dut_channel = response_future.get_channel()
@@ -358,7 +368,8 @@ class LeL2capTest(GdBaseTestClass):
        Verify that an IUT sending an LE Credit Based Connection Request to a legacy peer and receiving a Command Reject does not establish the channel.
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm=0x33)
        self.cert_l2cap.verify_and_reject_open_channel_from_remote(psm=0x33)
        assertThat(response_future.get_status()).isNotEqualTo(
            LeCreditBasedConnectionResponseResult.SUCCESS)
@@ -398,7 +409,8 @@ class LeL2capTest(GdBaseTestClass):
        Verify that an IUT sending an LE Credit Based Connection Request on an unsupported LE_PSM will not establish a channel upon receiving an LE Credit Based Connection Response refusing the connection.
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED)
@@ -493,7 +505,8 @@ class LeL2capTest(GdBaseTestClass):
        Verify that the IUT does not establish the channel upon receipt of an LE Credit Based Connection Response indicating the connection was refused with Result “0x0005 – Connection Refused – Insufficient Authentication".
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.
@@ -501,6 +514,23 @@ class LeL2capTest(GdBaseTestClass):
        assertThat(response_future.get_status()).isEqualTo(
            LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHENTICATION)

    @metadata(
        pts_test_id="L2CAP/LE/CFC/BV-11-C",
        pts_test_name="Security - Insufficient Authentication – Responder")
    def test_security_insufficient_authentication_responder(self):
        """
        Verify that an IUT refuses to create a connection upon reception of an LE Credit Based Connection
Request which fails to satisfy authentication requirements.
        """
        self._setup_link_from_cert()
        psm = 0x33
        self.dut_l2cap.register_coc(
            self.cert_address, psm,
            SecurityLevel.AUTHENTICATED_PAIRING_WITH_ENCRYPTION)
        self.cert_l2cap.open_channel_with_expected_result(
            psm,
            LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHENTICATION)

    @metadata(
        pts_test_id="L2CAP/LE/CFC/BV-12-C",
        pts_test_name="Security - Insufficient Authorization – Initiator")
@@ -509,7 +539,8 @@ class LeL2capTest(GdBaseTestClass):
        Verify that the IUT does not establish the channel upon receipt of an LE Credit Based Connection Response indicating the connection was refused with Result “0x0006 – Connection Refused – Insufficient Authorization”.
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.
@@ -517,6 +548,22 @@ class LeL2capTest(GdBaseTestClass):
        assertThat(response_future.get_status()).isEqualTo(
            LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHORIZATION)

    @metadata(
        pts_test_id="L2CAP/LE/CFC/BV-13-C",
        pts_test_name="Security - Insufficient Authorization – Responder")
    def test_security_insufficient_authorization_responder(self):
        """
        Verify that an IUT refuses to create a connection upon reception of an LE Credit Based Connection
        Request which fails to satisfy authentication requirements.
        """
        self._setup_link_from_cert()
        psm = 0x33
        self.dut_l2cap.register_coc(self.cert_address, psm,
                                    SecurityLevel.AUTHORIZATION)
        self.cert_l2cap.open_channel_with_expected_result(
            psm,
            LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHORIZATION)

    @metadata(
        pts_test_id="L2CAP/LE/CFC/BV-14-C",
        pts_test_name="Security - Insufficient Key Size – Initiator")
@@ -538,6 +585,23 @@ class LeL2capTest(GdBaseTestClass):
            LeCreditBasedConnectionResponseResult.
            INSUFFICIENT_ENCRYPTION_KEY_SIZE)

    @metadata(
        pts_test_id="L2CAP/LE/CFC/BV-15-C",
        pts_test_name="Security - Insufficient Encryption Key Size – Responder")
    def test_security_insufficient_encryption_key_size_responder(self):
        """
        Verify that an IUT refuses to create a connection upon receipt of an LE Credit Based Connection
        Request which fails to satisfy Encryption Key Size requirements.
        """
        self._setup_link_from_cert()
        psm = 0x33
        self.dut_l2cap.register_coc(
            self.cert_address, psm,
            SecurityLevel.AUTHENTICATED_PAIRING_WITH_128_BIT_KEY)
        self.cert_l2cap.open_channel_with_expected_result(
            psm, LeCreditBasedConnectionResponseResult.
            INSUFFICIENT_ENCRYPTION_KEY_SIZE)

    @metadata(
        pts_test_id="L2CAP/LE/CFC/BV-16-C",
        pts_test_name=
@@ -569,7 +633,8 @@ class LeL2capTest(GdBaseTestClass):
        Verify that an IUT sending an LE Credit Based Connection Request does not establish the channel upon receiving an LE Credit Based Connection Response refusing the connection with result "0x0009 – Connection refused – Invalid Source CID".
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.INVALID_SOURCE_CID)
@@ -587,7 +652,8 @@ class LeL2capTest(GdBaseTestClass):
        Verify that an IUT sending an LE Credit Based Connection Request does not establish the channel upon receiving an LE Credit Based Connection Response refusing the connection with result "0x000A – Connection refused – Source CID already allocated".
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.
@@ -625,7 +691,8 @@ class LeL2capTest(GdBaseTestClass):
        Verify that an IUT sending an LE Credit Based Connection Request does not establish the channel upon receiving an LE Credit Based Connection Response refusing the connection with result "0x000B – Connection refused – Unacceptable Parameters".
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(self.cert_address, psm=0x33)
        response_future = self.dut_l2cap.connect_coc_to_cert(
            self.cert_address, psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.UNACCEPTABLE_PARAMETERS
+1 −0
Original line number Diff line number Diff line
@@ -47,6 +47,7 @@ bool DynamicChannelManager::RegisterService(Psm psm, DynamicChannelConfiguration
                                            OnConnectionOpenCallback on_connection_open, os::Handler* handler) {
  internal::DynamicChannelServiceImpl::PendingRegistration pending_registration{
      .user_handler_ = handler,
      .security_policy_ = security_policy,
      .on_registration_complete_callback_ = std::move(on_registration_complete),
      .on_connection_open_callback_ = std::move(on_connection_open),
      .configuration_ = configuration_option,
+6 −4
Original line number Diff line number Diff line
@@ -81,8 +81,10 @@ class L2capLeModuleFacadeService : public L2capLeModuleFacade::Service {
                                   const ::bluetooth::l2cap::le::SetEnableDynamicChannelRequest* request,
                                   ::google::protobuf::Empty* response) override {
    if (request->enable()) {
      dynamic_channel_helper_map_.emplace(request->psm(), std::make_unique<L2capDynamicChannelHelper>(
                                                              this, l2cap_layer_, facade_handler_, request->psm()));
      dynamic_channel_helper_map_.emplace(
          request->psm(),
          std::make_unique<L2capDynamicChannelHelper>(this, l2cap_layer_, facade_handler_, request->psm(),
                                                      static_cast<SecurityPolicy::Level>(request->security_level())));
      return ::grpc::Status::OK;
    } else {
      auto service_helper = dynamic_channel_helper_map_.find(request->psm());
@@ -111,11 +113,11 @@ class L2capLeModuleFacadeService : public L2capLeModuleFacade::Service {
  class L2capDynamicChannelHelper {
   public:
    L2capDynamicChannelHelper(L2capLeModuleFacadeService* service, L2capLeModule* l2cap_layer, os::Handler* handler,
                              Psm psm)
                              Psm psm, SecurityPolicy::Level security_level)
        : facade_service_(service), l2cap_layer_(l2cap_layer), handler_(handler), psm_(psm) {
      dynamic_channel_manager_ = l2cap_layer_->GetDynamicChannelManager();
      dynamic_channel_manager_->RegisterService(
          psm, {}, {},
          psm, {}, security_level,
          common::BindOnce(&L2capDynamicChannelHelper::on_l2cap_service_registration_complete,
                           common::Unretained(this)),
          common::Bind(&L2capDynamicChannelHelper::on_connection_open, common::Unretained(this)), handler_);
Loading