Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f964df08 authored by Hansong Zhang's avatar Hansong Zhang
Browse files

Fix L2capTest:test_continuation_flag

Test: cert/run --host
Change-Id: I8bab6d3d6f49ca5c27991e6d140b5e2710e25569
parent 10ae707b
Loading
Loading
Loading
Loading
+32 −0
Original line number Diff line number Diff line
@@ -190,6 +190,12 @@ class CertL2cap(Closable):
            CommandCode.
            CONNECTION_RESPONSE] = self._on_connection_response_configuration_request_with_unknown_options_and_hint

    # more of a hack for the moment
    def reply_with_continuation_flag(self):
        self.control_table[
            CommandCode.
            CONNECTION_RESPONSE] = self._on_connection_response_configuration_request_with_continuation_flag

    def _on_connection_request_default(self, l2cap_control_view):
        connection_request_view = l2cap_packets.ConnectionRequestView(
            l2cap_control_view)
@@ -249,6 +255,32 @@ class CertL2cap(Closable):
        self._acl.send(bytes(byte_array))
        return True

    def _on_connection_response_configuration_request_with_continuation_flag(
            self, l2cap_control_view):
        connection_response_view = l2cap_packets.ConnectionResponseView(
            l2cap_control_view)
        sid = connection_response_view.GetIdentifier()
        scid = connection_response_view.GetSourceCid()
        dcid = connection_response_view.GetDestinationCid()
        self.scid_to_dcid[scid] = dcid

        mtu_opt = l2cap_packets.MtuConfigurationOption()
        mtu_opt.mtu = 0x1234

        options = [mtu_opt]
        config_request = l2cap_packets.ConfigurationRequestBuilder(
            sid + 1, dcid, l2cap_packets.Continuation.CONTINUE, options)

        self.control_channel.send(config_request)

        flush_timeout_option = l2cap_packets.FlushTimeoutConfigurationOption()
        flush_timeout_option.flush_timeout = 65535
        option = [flush_timeout_option]
        config_request = l2cap_packets.ConfigurationRequestBuilder(
            sid + 2, dcid, l2cap_packets.Continuation.END, option)
        self.get_control_channel().send(config_request)
        return True

    def _on_configuration_request_default(self, l2cap_control_view):
        configuration_request = l2cap_packets.ConfigurationRequestView(
            l2cap_control_view)
+9 −45
Original line number Diff line number Diff line
@@ -68,28 +68,6 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
    def cert_send_b_frame(self, b_frame):
        self.cert_l2cap.send_acl(b_frame)

    def _send_configuration_request(self,
                                    sid,
                                    dcid,
                                    continuation=l2cap_packets.Continuation.END,
                                    options=[],
                                    payload=[]):

        config_request = l2cap_packets.ConfigurationRequestBuilder(
            sid, dcid, continuation, options)

        config_request_l2cap = l2cap_packets.BasicFrameBuilder(
            1, config_request)

        config_request_l2cap = config_request_l2cap.Serialize()
        config_request_l2cap.extend(payload)
        config_request_l2cap[0] += len(payload)
        config_request_l2cap[6] += len(payload)
        self.cert_device.hci_acl_manager.SendAclData(
            acl_manager_facade.AclData(
                handle=self.cert_acl_handle,
                payload=bytes(config_request_l2cap)))

    def _setup_link_from_cert(self):
        self.dut.neighbor.EnablePageScan(
            neighbor_facade.EnableMsg(enabled=True))
@@ -213,29 +191,15 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        Verify the IUT is able to receive configuration requests that have the continuation flag set.
        """
        cert_acl_handle = self._setup_link_from_cert()
        with EventCallbackStream(
                self.cert_device.hci_acl_manager.FetchAclData(
                    empty_proto.Empty())) as cert_acl_data_stream:
            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
            scid = 0x41
            psm = 0x33
            cert_acl_data_stream.register_callback(self._handle_control_packet)

        # Send configuration request with CONTINUE
            self.on_connection_response = lambda log: self._on_connection_response_use_mtu(log, continuation=l2cap_packets.Continuation.CONTINUE, mtu_value=48)

            self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid,
                               psm)
            cert_acl_data_asserts.assert_event_occurs(
                self.is_correct_configuration_response)
            flush_timeout_option = l2cap_packets.FlushTimeoutConfigurationOption(
            )
            flush_timeout_option.flush_timeout = 65535
            option = [flush_timeout_option]
            self._send_configuration_request(
                3, self.scid_to_dcid[scid], options=option)
            cert_acl_data_asserts.assert_event_occurs(
                self.is_correct_configuration_response)
        self.cert_l2cap.reply_with_continuation_flag()

        (dut_channel, cert_channel) = self._open_unvalidated_channel(
            scid=0x41, psm=0x33)

        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.ConfigurationResponse(), at_least_times=2)

    def test_retry_config_after_rejection(self):
        """