Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 55bcb0d2 authored by Hansong Zhang's avatar Hansong Zhang
Browse files

LeL2capTest: Add disconnect request and credit

Verify that DUT can send disconnect request, and send credits.
Also add the first test case for connection reject due to security
requirement.

Test: cert/run --host
Change-Id: I9e3443b31d7f54868eeb4df238a4da26fe9f4c66
parent 0afd3e7a
Loading
Loading
Loading
Loading
+13 −0
Original line number Diff line number Diff line
@@ -83,6 +83,10 @@ class L2capMatchers(object):
    def LeDisconnectionResponse(scid, dcid):
        return lambda packet: L2capMatchers._is_matching_le_disconnection_response(packet, scid, dcid)

    @staticmethod
    def LeFlowControlCredit(cid):
        return lambda packet: L2capMatchers._is_matching_le_flow_control_credit(packet, cid)

    @staticmethod
    def SFrame(req_seq=None, f=None, s=None, p=None):
        return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, req_seq, f, s, p)
@@ -263,6 +267,15 @@ class L2capMatchers(object):
        return request.GetSourceCid() == scid and request.GetDestinationCid(
        ) == dcid

    @staticmethod
    def _is_matching_le_flow_control_credit(packet, cid):
        frame = L2capMatchers.le_control_frame_with_code(
            packet, LeCommandCode.LE_FLOW_CONTROL_CREDIT)
        if frame is None:
            return False
        request = l2cap_packets.LeFlowControlCreditView(frame)
        return request.GetCid() == cid

    @staticmethod
    def _information_response_with_type(packet, info_type):
        frame = L2capMatchers.control_frame_with_code(
+8 −0
Original line number Diff line number Diff line
@@ -74,6 +74,14 @@ class PyLeL2capChannel(IEventStream):
        self._device.l2cap_le.SendDynamicChannelPacket(
            l2cap_le_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload))

    def close_channel(self):
        self._device.l2cap_le.CloseDynamicChannel(
            l2cap_le_facade_pb2.CloseDynamicChannelRequest(
                remote=common.BluetoothAddressWithType(
                    address=common.BluetoothAddress(
                        address=b"22:33:ff:ff:11:00")),
                psm=self._psm))


class CreditBasedConnectionResponseFutureWrapper(object):
    """
+46 −15
Original line number Diff line number Diff line
@@ -26,11 +26,19 @@ from cert.event_stream import FilteringEventStream
from cert.event_stream import IEventStream
from cert.matchers import L2capMatchers
from cert.captures import L2capCaptures
from mobly import asserts


class CertLeL2capChannel(IEventStream):

    def __init__(self, device, scid, dcid, acl_stream, acl, control_channel):
    def __init__(self,
                 device,
                 scid,
                 dcid,
                 acl_stream,
                 acl,
                 control_channel,
                 initial_credits=0):
        self._device = device
        self._scid = scid
        self._dcid = dcid
@@ -39,6 +47,7 @@ class CertLeL2capChannel(IEventStream):
        self._control_channel = control_channel
        self._our_acl_view = FilteringEventStream(
            acl_stream, L2capMatchers.ExtractBasicFrame(scid))
        self._credits_left = initial_credits

    def get_event_queue(self):
        return self._our_acl_view.get_event_queue()
@@ -46,16 +55,18 @@ class CertLeL2capChannel(IEventStream):
    def send(self, packet):
        frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet)
        self._acl.send(frame.Serialize())
        self._credits_left -= 1

    def send_first_le_i_frame(self, sdu_size, packet):
        frame = l2cap_packets.FirstLeInformationFrameBuilder(
            self._dcid, sdu_size, packet)
        self._acl.send(frame.Serialize())
        self._credits_left -= 1

    def disconnect_and_verify(self):
        assertThat(self._scid).isNotEqualTo(1)
        self._control_channel.send(
            l2cap_packets.DisconnectionRequestBuilder(1, self._dcid,
            l2cap_packets.LeDisconnectionRequestBuilder(1, self._dcid,
                                                        self._scid))

        assertThat(self._control_channel).emits(
@@ -70,6 +81,9 @@ class CertLeL2capChannel(IEventStream):
            l2cap_packets.LeFlowControlCreditBuilder(2, self._scid,
                                                     num_credits))

    def credits_left(self):
        return self._credits_left


class CertLeL2cap(Closable):

@@ -83,9 +97,11 @@ class CertLeL2cap(Closable):
            self._on_disconnection_request_default,
            LeCommandCode.DISCONNECTION_RESPONSE:
            self._on_disconnection_response_default,
            LeCommandCode.LE_FLOW_CONTROL_CREDIT:
            self._on_credit,
        }

        self.scid_to_dcid = {}
        self._cid_to_cert_channels = {}

    def close(self):
        self._le_acl_manager.close()
@@ -128,10 +144,13 @@ class CertLeL2cap(Closable):

        response = L2capCaptures.CreditBasedConnectionResponse(scid)
        assertThat(self.control_channel).emits(response)
        return CertLeL2capChannel(self._device, scid,
        channel = CertLeL2capChannel(self._device, scid,
                                     response.get().GetDestinationCid(),
                                     self._get_acl_stream(), self._le_acl,
                                  self.control_channel)
                                     self.control_channel,
                                     response.get().GetInitialCredits())
        self._cid_to_cert_channels[scid] = channel
        return channel

    def verify_and_respond_open_channel_from_remote(
            self, psm=0x33,
@@ -140,9 +159,12 @@ class CertLeL2cap(Closable):
        assertThat(self.control_channel).emits(request)
        (scid, dcid) = self._respond_connection_request_default(
            request.get(), result)
        return CertLeL2capChannel(self._device, scid, dcid,
        channel = CertLeL2capChannel(self._device, scid, dcid,
                                     self._get_acl_stream(), self._le_acl,
                                  self.control_channel)
                                     self.control_channel,
                                     request.get().GetInitialCredits())
        self._cid_to_cert_channels[scid] = channel
        return channel

    def verify_and_reject_open_channel_from_remote(self, psm=0x33):
        request = L2capCaptures.CreditBasedConnectionRequest(psm)
@@ -151,6 +173,10 @@ class CertLeL2cap(Closable):
        reject = l2cap_packets.LeCommandRejectNotUnderstoodBuilder(sid)
        self.control_channel.send(reject)

    def verify_le_flow_control_credit(self, channel):
        assertThat(self.control_channel).emits(
            L2capMatchers.LeFlowControlCredit(channel._dcid))

    def _respond_connection_request_default(
            self, request,
            result=LeCreditBasedConnectionResponseResult.SUCCESS):
@@ -167,10 +193,6 @@ class CertLeL2cap(Closable):
        self.control_channel.send(response)
        return (our_scid, our_dcid)

    # prefer to use channel abstraction instead, if at all possible
    def send_acl(self, packet):
        self._acl.send(packet.Serialize())

    def get_control_channel(self):
        return self.control_channel

@@ -190,6 +212,15 @@ class CertLeL2cap(Closable):
        disconnection_response = l2cap_packets.LeDisconnectionResponseView(
            request)

    def _on_credit(self, l2cap_le_control_view):
        credit_view = l2cap_packets.LeFlowControlCreditView(
            l2cap_le_control_view)
        cid = credit_view.GetCid()
        if cid not in self._cid_to_cert_channels:
            return
        self._cid_to_cert_channels[
            cid]._credits_left += credit_view.GetCredits()

    def _handle_control_packet(self, l2cap_packet):
        packet_bytes = l2cap_packet.payload
        l2cap_view = l2cap_packets.BasicFrameView(
+101 −29
Original line number Diff line number Diff line
@@ -305,6 +305,58 @@ class LeL2capTest(GdBaseTestClass):
        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.LeCommandReject())

    def test_command_reject_reserved_pdu_codes(self):
        """
        L2CAP/LE/REJ/BI-02-C
        """
        self._setup_link_from_cert()
        self.cert_l2cap.get_control_channel().send(
            l2cap_packets.MoveChannelRequestBuilder(2, 0, 0))
        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.LeCommandReject())

    def test_le_credit_based_connection_request_legacy_peer(self):
        """
        L2CAP/LE/CFC/BV-01-C
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33)
        self.cert_l2cap.verify_and_reject_open_channel_from_remote(psm=0x33)
        assertThat(response_future.get_status()).isNotEqualTo(
            LeCreditBasedConnectionResponseResult.SUCCESS)

    def test_le_credit_based_connection_request_on_supported_le_psm(self):
        """
        L2CAP/LE/CFC/BV-02-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_channel_from_dut()
        cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET)
        assertThat(dut_channel).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'))

    def test_credit_based_connection_response_on_supported_le_psm(self):
        """
        L2CAP/LE/CFC/BV-03-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        dut_channel.send(b'hello')
        assertThat(cert_channel).emits(
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))

    def test_credit_based_connection_request_on_an_unsupported_le_psm(self):
        """
        L2CAP/LE/CFC/BV-04-C
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED)
        assertThat(response_future.get_status()).isEqualTo(
            LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED)

    def test_credit_based_connection_request_unsupported_le_psm(self):
        """
        L2CAP/LE/CFC/BV-05-C
@@ -339,47 +391,75 @@ class LeL2capTest(GdBaseTestClass):
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5),
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))

    def test_le_credit_based_connection_request_legacy_peer(self):
    def test_credit_exchange_sending_credits(self):
        """
        L2CAP/LE/CFC/BV-01-C
        L2CAP/LE/CFC/BV-07-C
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33)
        self.cert_l2cap.verify_and_reject_open_channel_from_remote(psm=0x33)
        assertThat(response_future.get_status()).isNotEqualTo(
            LeCreditBasedConnectionResponseResult.SUCCESS)
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        credits = cert_channel.credits_left()
        # Note: DUT only needs to send credit when ALL credits are consumed.
        # Here we enforce that DUT sends credit after receiving 3 packets, to
        # test without sending too many packets (may take too long).
        # This behavior is not expected for all Bluetooth stacks.
        for _ in range(min(credits + 1, 3)):
            cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET)
        self.cert_l2cap.verify_le_flow_control_credit(cert_channel)

    def test_le_credit_based_connection_request_on_supported_le_psm(self):
    def test_disconnection_request(self):
        """
        L2CAP/LE/CFC/BV-02-C
        L2CAP/LE/CFC/BV-08-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_channel_from_dut()
        cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET)
        assertThat(dut_channel).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'))
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        dut_channel.close_channel()
        cert_channel.verify_disconnect_request()

    def test_credit_based_connection_response_on_supported_le_psm(self):
    def test_disconnection_response(self):
        """
        L2CAP/LE/CFC/BV-03-C
        L2CAP/LE/CFC/BV-09-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        dut_channel.send(b'hello')
        assertThat(cert_channel).emits(
            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))
        cert_channel.disconnect_and_verify()

    def test_credit_based_connection_request_on_an_unsupported_le_psm(self):
    def test_security_insufficient_authentication_initiator(self):
        """
        L2CAP/LE/CFC/BV-04-C
        L2CAP/LE/CFC/BV-10-C
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED)
            result=LeCreditBasedConnectionResponseResult.
            INSUFFICIENT_AUTHENTICATION)
        assertThat(response_future.get_status()).isEqualTo(
            LeCreditBasedConnectionResponseResult.LE_PSM_NOT_SUPPORTED)
            LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHENTICATION)

    def test_security_insufficient_authorization_initiator(self):
        """
        L2CAP/LE/CFC/BV-12-C
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.
            INSUFFICIENT_AUTHORIZATION)
        assertThat(response_future.get_status()).isEqualTo(
            LeCreditBasedConnectionResponseResult.INSUFFICIENT_AUTHORIZATION)

    def test_request_refused_due_to_invalid_source_cid_initiator(self):
        """
        L2CAP/LE/CFC/BV-18-C
        """
        self._setup_link_from_cert()
        response_future = self.dut_l2cap.connect_coc_to_cert(psm=0x33)
        self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm=0x33,
            result=LeCreditBasedConnectionResponseResult.INVALID_SOURCE_CID)
        assertThat(response_future.get_status()).isEqualTo(
            LeCreditBasedConnectionResponseResult.INVALID_SOURCE_CID)

    def test_request_refused_due_to_unacceptable_parameters_initiator(self):
        """
@@ -402,11 +482,3 @@ class LeL2capTest(GdBaseTestClass):
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        cert_channel.send_credits(65535)
        cert_channel.verify_disconnect_request()

    def test_disconnection_response(self):
        """
        L2CAP/LE/CFC/BV-09-C
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        cert_channel.disconnect_and_verify()
+6 −0
Original line number Diff line number Diff line
@@ -67,6 +67,12 @@ class L2capLeModuleFacadeService : public L2capLeModuleFacade::Service {
    if (service_helper->second->channel_ == nullptr) {
      return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Channel not open");
    }
    auto address = service_helper->second->channel_->GetDevice().GetAddress();
    hci::Address peer_address;
    ASSERT(hci::Address::FromString(request->remote().address().address(), peer_address));
    if (address != peer_address) {
      return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Remote address doesn't match");
    }
    service_helper->second->channel_->Close();
    return ::grpc::Status::OK;
  }