Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 482370c7 authored by Hansong Zhang's avatar Hansong Zhang
Browse files

L2capTest: Add connect, send, and disconnect from DUT

Test: cert/run --host
Change-Id: Ica4fcfd32ea0d6fad57a1e2fa3ed84a6a4f89cca
parent 7246fe04
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -61,6 +61,18 @@ def LeConnectionCompleteCapture():

class L2capCaptures(object):

    @staticmethod
    def ConnectionRequest(psm):
        return Capture(
            L2capMatchers.ConnectionRequest(psm),
            L2capCaptures._extract_connection_request)

    @staticmethod
    def _extract_connection_request(packet):
        frame = L2capMatchers.control_frame_with_code(
            packet, CommandCode.CONNECTION_REQUEST)
        return l2cap_packets.ConnectionRequestView(frame)

    @staticmethod
    def ConnectionResponse(scid):
        return Capture(
+25 −6
Original line number Diff line number Diff line
@@ -25,12 +25,12 @@ from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionRespo
class L2capMatchers(object):

    @staticmethod
    def ConnectionResponse(scid):
        return lambda packet: L2capMatchers._is_matching_connection_response(packet, scid)
    def ConnectionRequest(psm):
        return lambda packet: L2capMatchers._is_matching_connection_request(packet, psm)

    @staticmethod
    def ConnectionRequest():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONNECTION_REQUEST)
    def ConnectionResponse(scid):
        return lambda packet: L2capMatchers._is_matching_connection_response(packet, scid)

    @staticmethod
    def ConfigurationResponse():
@@ -41,8 +41,8 @@ class L2capMatchers(object):
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)

    @staticmethod
    def DisconnectionRequest():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST)
    def DisconnectionRequest(scid, dcid):
        return lambda packet: L2capMatchers._is_matching_disconnection_request(packet, scid, dcid)

    @staticmethod
    def DisconnectionResponse(scid, dcid):
@@ -235,6 +235,15 @@ class L2capMatchers(object):
        return L2capMatchers.le_control_frame_with_code(packet,
                                                        code) is not None

    @staticmethod
    def _is_matching_connection_request(packet, psm):
        frame = L2capMatchers.control_frame_with_code(
            packet, CommandCode.CONNECTION_REQUEST)
        if frame is None:
            return False
        request = l2cap_packets.ConnectionRequestView(frame)
        return request.GetPsm() == psm

    @staticmethod
    def _is_matching_connection_response(packet, scid):
        frame = L2capMatchers.control_frame_with_code(
@@ -246,6 +255,16 @@ class L2capMatchers(object):
        ) == ConnectionResponseResult.SUCCESS and response.GetDestinationCid(
        ) != 0

    @staticmethod
    def _is_matching_disconnection_request(packet, scid, dcid):
        frame = L2capMatchers.control_frame_with_code(
            packet, CommandCode.DISCONNECTION_REQUEST)
        if frame is None:
            return False
        request = l2cap_packets.DisconnectionRequestView(frame)
        return request.GetSourceCid() == scid and request.GetDestinationCid(
        ) == dcid

    @staticmethod
    def _is_matching_disconnection_response(packet, scid, dcid):
        frame = L2capMatchers.control_frame_with_code(
+45 −9
Original line number Diff line number Diff line
@@ -35,27 +35,63 @@ class PyL2capChannel(object):

    def send(self, payload):
        self._device.l2cap.SendDynamicChannelPacket(
            l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload))
            l2cap_facade_pb2.DynamicChannelPacket(
                psm=self._psm, payload=payload))

    def close_channel(self):
        self._device.l2cap.CloseChannel(
            l2cap_facade_pb2.CloseChannelRequest(psm=self._psm))


class _ClassicConnectionResponseFutureWrapper(object):
    """
    The future object returned when we send a connection request from DUT. Can be used to get connection status and
    create the corresponding PyL2capDynamicChannel object later
    """

    def __init__(self, grpc_response_future, device, psm):
        self._grpc_response_future = grpc_response_future
        self._device = device
        self._psm = psm

    def get_channel(self):
        return PyL2capChannel(self._device, self._psm)


class PyL2cap(Closable):

    def __init__(self, device):
    def __init__(self, device, cert_address):
        self._device = device
        self._cert_address = cert_address

    def close(self):
        pass

    def open_channel(self,
    def register_dynamic_channel(
            self,
            psm=0x33,
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):

        # todo, I don't understand what SetDynamicChannel means?
        self._device.l2cap.SetDynamicChannel(
            l2cap_facade_pb2.SetEnableDynamicChannelRequest(
                psm=psm, retransmission_mode=mode))
        return PyL2capChannel(self._device, psm)

    def connect_dynamic_channel_to_cert(
            self,
            psm=0x33,
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):
        """
        Send open Dynamic channel request to CERT.
        Get a future for connection result, to be used after CERT accepts request
        """
        self.register_dynamic_channel(psm, mode)
        response_future = self._device.l2cap.OpenChannel.future(
            l2cap_facade_pb2.OpenChannelRequest(
                psm=psm, remote=self._cert_address, mode=mode))

        return _ClassicConnectionResponseFutureWrapper(response_future,
                                                       self._device, psm)


class PyLeL2capFixedChannel(IEventStream):

@@ -108,7 +144,7 @@ class PyLeL2capDynamicChannel(IEventStream):
                psm=self._psm))


class CreditBasedConnectionResponseFutureWrapper(object):
class _CreditBasedConnectionResponseFutureWrapper(object):
    """
    The future object returned when we send a connection request from DUT. Can be used to get connection status and
    create the corresponding PyLeL2capDynamicChannel object later
@@ -168,7 +204,7 @@ class PyLeL2cap(Closable):
                    address=common.BluetoothAddress(
                        address=b"22:33:ff:ff:11:00"))))

        return CreditBasedConnectionResponseFutureWrapper(
        return _CreditBasedConnectionResponseFutureWrapper(
            response_future, self._device, psm, self._le_l2cap_stream)

    def update_connection_parameter(self,
+37 −13
Original line number Diff line number Diff line
@@ -90,6 +90,10 @@ class CertL2capChannel(IEventStream):
        assertThat(self._control_channel).emits(
            L2capMatchers.DisconnectionResponse(self._scid, self._dcid))

    def verify_disconnect_request(self):
        assertThat(self._control_channel).emits(
            L2capMatchers.DisconnectionRequest(self._dcid, self._scid))


class CertL2cap(Closable):

@@ -153,6 +157,38 @@ class CertL2cap(Closable):
                                self._get_acl_stream(), self._acl,
                                self.control_channel)

    def verify_and_respond_open_channel_from_remote(self, psm=0x33):
        request = L2capCaptures.ConnectionRequest(psm)
        assertThat(self.control_channel).emits(request)

        sid = request.get().GetIdentifier()
        cid = request.get().GetSourceCid()

        self.scid_to_dcid[cid] = cid

        connection_response = l2cap_packets.ConnectionResponseBuilder(
            sid, cid, cid, l2cap_packets.ConnectionResponseResult.SUCCESS,
            l2cap_packets.ConnectionResponseStatus.
            NO_FURTHER_INFORMATION_AVAILABLE)
        self.control_channel.send(connection_response)

        config_options = []
        if self.basic_option is not None:
            config_options.append(self.basic_option)
        elif self.ertm_option is not None:
            config_options.append(self.ertm_option)
        if self.fcs_option is not None:
            config_options.append(self.fcs_option)

        config_request = l2cap_packets.ConfigurationRequestBuilder(
            sid + 1, cid, l2cap_packets.Continuation.END, config_options)
        self.control_channel.send(config_request)

        channel = CertL2capChannel(self._device, cid, cid,
                                   self._get_acl_stream(), self._acl,
                                   self.control_channel)
        return channel

    # prefer to use channel abstraction instead, if at all possible
    def send_acl(self, packet):
        self._acl.send(packet.Serialize())
@@ -232,19 +268,7 @@ class CertL2cap(Closable):
            CONFIGURATION_REQUEST] = self._on_configuration_request_send_configuration_request_basic_mode

    def _on_connection_request_default(self, l2cap_control_view):
        connection_request_view = l2cap_packets.ConnectionRequestView(
            l2cap_control_view)
        sid = connection_request_view.GetIdentifier()
        cid = connection_request_view.GetSourceCid()

        self.scid_to_dcid[cid] = cid

        connection_response = l2cap_packets.ConnectionResponseBuilder(
            sid, cid, cid, l2cap_packets.ConnectionResponseResult.SUCCESS,
            l2cap_packets.ConnectionResponseStatus.
            NO_FURTHER_INFORMATION_AVAILABLE)
        self.control_channel.send(connection_response)
        return True
        pass

    def _on_connection_response_default(self, l2cap_control_view):
        connection_response_view = l2cap_packets.ConnectionResponseView(
+44 −26
Original line number Diff line number Diff line
@@ -59,7 +59,7 @@ class L2capTest(GdBaseTestClass):
        self.cert_address = common_pb2.BluetoothAddress(
            address=self.cert.address)

        self.dut_l2cap = PyL2cap(self.dut)
        self.dut_l2cap = PyL2cap(self.dut, self.cert_address)
        self.cert_l2cap = CertL2cap(self.cert)

    def teardown_test(self):
@@ -81,7 +81,7 @@ class L2capTest(GdBaseTestClass):
                                  psm=0x33,
                                  mode=RetransmissionFlowControlMode.BASIC):

        dut_channel = self.dut_l2cap.open_channel(psm, mode)
        dut_channel = self.dut_l2cap.register_dynamic_channel(psm, mode)
        cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid)

        return (dut_channel, cert_channel)
@@ -99,6 +99,17 @@ class L2capTest(GdBaseTestClass):

        return result

    def _open_channel_from_dut(self,
                               psm=0x33,
                               mode=RetransmissionFlowControlMode.BASIC):
        dut_channel_future = self.dut_l2cap.connect_dynamic_channel_to_cert(
            psm, mode)
        cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote(
            psm)
        dut_channel = dut_channel_future.get_channel()

        return (dut_channel, cert_channel)

    def test_connect_dynamic_channel_and_send_data(self):
        self._setup_link_from_cert()

@@ -146,14 +157,27 @@ class L2capTest(GdBaseTestClass):
        initiate the configuration procedure.
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_channel_from_dut(psm=0x33)

        psm = 0x33
        # TODO: Use another test case
        self.dut.l2cap.OpenChannel(
            l2cap_facade_pb2.OpenChannelRequest(
                remote=self.cert_address, psm=psm))
        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.ConnectionRequest())
    def test_send_data(self):
        """
        L2CAP/COS/CED/BV-03-C [Send data]
        """
        self._setup_link_from_cert()

        (dut_channel, cert_channel) = self._open_channel(scid=0x41, psm=0x33)
        dut_channel.send(b'hello')
        assertThat(cert_channel).emits(L2capMatchers.Data(b'hello'))

    def test_disconnect(self):
        """
        L2CAP/COS/CED/BV-04-C [Disconnect]
        """
        self._setup_link_from_cert()

        (dut_channel, cert_channel) = self._open_channel(scid=0x41, psm=0x33)
        dut_channel.close_channel()
        cert_channel.verify_disconnect_request()

    def test_accept_disconnect(self):
        """
@@ -569,8 +593,7 @@ class L2capTest(GdBaseTestClass):

        # Retransmission timer = 2, 20 * monitor timer = 360, so total timeout is 362
        time.sleep(362)
        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.DisconnectionRequest())
        cert_channel.verify_disconnect_request()

    def test_i_frame_transmissions_exceed_max_transmit(self):
        """
@@ -589,8 +612,7 @@ class L2capTest(GdBaseTestClass):
        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0))

        cert_channel.send_s_frame(req_seq=0, f=Final.POLL_RESPONSE)
        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.DisconnectionRequest())
        cert_channel.verify_disconnect_request()

    def test_respond_to_rej(self):
        """
@@ -900,14 +922,10 @@ class L2capTest(GdBaseTestClass):
        L2CAP/CMC/BV-12-C
        """
        self._setup_link_from_cert()

        self.dut.l2cap.OpenChannel(
            l2cap_facade_pb2.OpenChannelRequest(
                remote=self.cert_address,
                psm=0x33,
                mode=RetransmissionFlowControlMode.ERTM))
        dut_channel_future = self.dut_l2cap.connect_dynamic_channel_to_cert(
            psm=0x33, mode=RetransmissionFlowControlMode.ERTM)
        assertThat(self.cert_l2cap.get_control_channel()).emitsNone(
            L2capMatchers.ConfigurationRequest())
            L2capMatchers.ConnectionRequest(0x33))

    def test_config_respond_basic_mode_when_using_mandatory_ertm(self):
        """
@@ -916,11 +934,11 @@ class L2capTest(GdBaseTestClass):
        self._setup_link_from_cert()
        self.cert_l2cap.reply_with_nothing()
        self.cert_l2cap.reply_with_basic_mode()
        self._open_unvalidated_channel(
        (dut_channel, cert_channel) = self._open_unvalidated_channel(
            scid=0x41, psm=0x33, mode=RetransmissionFlowControlMode.ERTM)
        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.ConfigurationRequest(),
            L2capMatchers.DisconnectionRequest()).inOrder()
            L2capMatchers.ConfigurationRequest())
        cert_channel.verify_disconnect_request()

    def test_config_request_basic_mode_when_using_mandatory_ertm(self):
        """
@@ -929,8 +947,8 @@ class L2capTest(GdBaseTestClass):
        self._setup_link_from_cert()
        self.cert_l2cap.reply_with_nothing()
        self.cert_l2cap.config_with_basic_mode()
        self._open_unvalidated_channel(
        (dut_channel, cert_channel) = self._open_unvalidated_channel(
            scid=0x41, psm=0x33, mode=RetransmissionFlowControlMode.ERTM)
        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.ConfigurationRequest(),
            L2capMatchers.DisconnectionRequest()).inOrder()
            L2capMatchers.ConfigurationRequest())
        cert_channel.verify_disconnect_request()
Loading