Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6b1dcfc5 authored by Hansong Zhang's avatar Hansong Zhang
Browse files

L2capTest stability improvement

Manually configure channel.
Remove hacks. Make things more controlled and flexible.

Test: cert/run --host
Bug: 153275282
Change-Id: I01bc20bba116c29e23ee50fbc5bf1c6ea291e386
parent 53724407
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -124,6 +124,18 @@ class L2capCaptures(object):
            packet, CommandCode.CONNECTION_RESPONSE)
        return l2cap_packets.ConnectionResponseView(frame)

    @staticmethod
    def ConfigurationRequest(cid=None):
        return Capture(
            L2capMatchers.ConfigurationRequest(cid),
            L2capCaptures._extract_configuration_request)

    @staticmethod
    def _extract_configuration_request(packet):
        frame = L2capMatchers.control_frame_with_code(
            packet, CommandCode.CONFIGURATION_REQUEST)
        return l2cap_packets.ConfigurationRequestView(frame)

    @staticmethod
    def CreditBasedConnectionRequest(psm):
        return Capture(
+12 −2
Original line number Diff line number Diff line
@@ -122,8 +122,8 @@ class L2capMatchers(object):
        return lambda packet: L2capMatchers._is_matching_configuration_response(packet, result)

    @staticmethod
    def ConfigurationRequest():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)
    def ConfigurationRequest(cid=None):
        return lambda packet: L2capMatchers._is_matching_configuration_request_with_cid(packet, cid)

    @staticmethod
    def ConfigurationRequestWithErtm():
@@ -424,6 +424,16 @@ class L2capMatchers(object):
        ) == ConnectionResponseResult.SUCCESS and response.GetDestinationCid(
        ) != 0

    @staticmethod
    def _is_matching_configuration_request_with_cid(packet, cid=None):
        frame = L2capMatchers.control_frame_with_code(
            packet, CommandCode.CONFIGURATION_REQUEST)
        if frame is None:
            return False
        request = l2cap_packets.ConfigurationRequestView(frame)
        dcid = request.GetDestinationCid()
        return cid is None or cid == dcid

    @staticmethod
    def _is_matching_configuration_request_with_ertm(packet):
        frame = L2capMatchers.control_frame_with_code(
+115 −240
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@
#   See the License for the specific language governing permissions and
#   limitations under the License.

from cert.behavior import IHasBehaviors, SingleArgumentBehavior, ReplyStage
from cert.closable import Closable
from cert.closable import safeClose
from cert.py_acl_manager import PyAclManager
@@ -26,13 +27,36 @@ from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly
from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction
from bluetooth_packets_python3.l2cap_packets import Poll
from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType
from bluetooth_packets_python3.l2cap_packets import ConfigurationResponseResult
from cert.event_stream import FilteringEventStream
from cert.event_stream import IEventStream
from cert.matchers import L2capMatchers
from cert.captures import L2capCaptures


class CertL2capChannel(IEventStream):
class CertL2capControlChannelBehaviors(object):

    def __init__(self, parent):
        self.on_packet_behavior = SingleArgumentBehavior(
            lambda: CertL2capControlChannelBehaviors.CertReplyStage(parent))

    def on_packet(self, matcher):
        return self.on_packet_behavior.begin(matcher)

    class CertReplyStage(ReplyStage):

        def __init__(self, parent):
            self.parent = parent

        def send_packet(self):
            self._commit(lambda packet: self._send_packet(packet))
            return self

        def _send_packet(self, packet):
            self.parent._control_channel.send(packet)


class CertL2capChannel(IEventStream, IHasBehaviors):

    def __init__(self,
                 device,
@@ -41,21 +65,24 @@ class CertL2capChannel(IEventStream):
                 acl_stream,
                 acl,
                 control_channel,
                 fcs_enabled=False):
                 fcs=None):
        self._device = device
        self._scid = scid
        self._dcid = dcid
        self._acl_stream = acl_stream
        self._acl = acl
        self._control_channel = control_channel
        self.fcs_enabled = fcs_enabled
        if fcs_enabled:
        self.control_behaviors = CertL2capControlChannelBehaviors(self)
        if fcs == l2cap_packets.FcsType.DEFAULT:
            self._our_acl_view = FilteringEventStream(
                acl_stream, L2capMatchers.ExtractBasicFrameWithFcs(scid))
        else:
            self._our_acl_view = FilteringEventStream(
                acl_stream, L2capMatchers.ExtractBasicFrame(scid))

    def get_behaviors(self):
        return self.control_behaviors

    def get_event_queue(self):
        return self._our_acl_view.get_event_queue()

@@ -87,11 +114,14 @@ class CertL2capChannel(IEventStream):
            self._dcid, s, p, f, req_seq)
        self._acl.send(frame.Serialize())

    def send_configure_request(self, cert_channel, options):
        assertThat(self._scid).isEqualTo(1)
    def send_configure_request(self,
                               options,
                               sid=2,
                               continuation=l2cap_packets.Continuation.END):
        assertThat(self._scid).isNotEqualTo(1)
        request = l2cap_packets.ConfigurationRequestBuilder(
            2, cert_channel._dcid, l2cap_packets.Continuation.END, options)
        self.send(request)
            2, self._dcid, l2cap_packets.Continuation.END, options)
        self._control_channel.send(request)

    def send_information_request(self, type):
        assertThat(self._scid).isEqualTo(1)
@@ -104,6 +134,22 @@ class CertL2capChannel(IEventStream):
        self.send_information_request(
            InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED)

    def verify_configuration_request_and_respond(
            self, result=ConfigurationResponseResult.SUCCESS, options=None):
        request_capture = L2capCaptures.ConfigurationRequest(self._scid)
        assertThat(self._control_channel).emits(request_capture)
        request = request_capture.get()
        sid = request.GetIdentifier()
        if options is None:
            options = []
        config_response = l2cap_packets.ConfigurationResponseBuilder(
            sid, self._dcid, l2cap_packets.Continuation.END, result, options)
        self._control_channel.send(config_response)

    def verify_configuration_response(self):
        assertThat(self._control_channel).emits(
            L2capMatchers.ConfigurationResponse())

    def disconnect_and_verify(self):
        assertThat(self._scid).isNotEqualTo(1)
        self._control_channel.send(
@@ -126,8 +172,6 @@ class CertL2cap(Closable):
        self._acl = None

        self.control_table = {
            CommandCode.CONNECTION_REQUEST:
            self._on_connection_request_default,
            CommandCode.CONNECTION_RESPONSE:
            self._on_connection_response_default,
            CommandCode.CONFIGURATION_REQUEST:
@@ -145,19 +189,11 @@ class CertL2cap(Closable):
        }

        self.scid_to_dcid = {}
        self.scid_to_channel = {}

        self.support_ertm = True
        self.support_fcs = True

        self.basic_option = None
        self.ertm_option = None
        self.fcs_option = None
        self.fcs_enabled = False
        self.mtu_option = None

        self.config_response_result = l2cap_packets.ConfigurationResponseResult.SUCCESS
        self.config_response_options = []

    def close(self):
        self._acl_manager.close()
        safeClose(self._acl)
@@ -174,18 +210,25 @@ class CertL2cap(Closable):
            control_channel=None)
        self._get_acl_stream().register_callback(self._handle_control_packet)

    def open_channel(self, signal_id, psm, scid):
    def open_channel(self, signal_id, psm, scid, fcs=None):
        self.control_channel.send(
            l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid))

        response = L2capCaptures.ConnectionResponse(scid)
        assertThat(self.control_channel).emits(response)
        return CertL2capChannel(self._device, scid,
        channel = CertL2capChannel(self._device, scid,
                                   response.get().GetDestinationCid(),
                                   self._get_acl_stream(), self._acl,
                                self.control_channel, self.fcs_enabled)
                                   self.control_channel, fcs)
        self.scid_to_channel[scid] = channel

        return channel

    def verify_and_respond_open_channel_from_remote(self,
                                                    psm=0x33,
                                                    scid=None,
                                                    fcs=None):

    def verify_and_respond_open_channel_from_remote(self, psm=0x33, scid=None):
        request = L2capCaptures.ConnectionRequest(psm)
        assertThat(self.control_channel).emits(request)

@@ -196,27 +239,17 @@ class CertL2cap(Closable):

        self.scid_to_dcid[scid] = dcid

        channel = CertL2capChannel(self._device, scid, dcid,
                                   self._get_acl_stream(), self._acl,
                                   self.control_channel, fcs)
        self.scid_to_channel[scid] = channel

        connection_response = l2cap_packets.ConnectionResponseBuilder(
            sid, scid, dcid, l2cap_packets.ConnectionResponseResult.SUCCESS,
            l2cap_packets.ConnectionResponseStatus.
            NO_FURTHER_INFORMATION_AVAILABLE)
        self.control_channel.send(connection_response)

        config_options = []
        if self.basic_option is not None:
            config_options.append(self.basic_option)
        elif self.ertm_option is not None:
            config_options.append(self.ertm_option)
        if self.fcs_option is not None:
            config_options.append(self.fcs_option)

        config_request = l2cap_packets.ConfigurationRequestBuilder(
            sid + 1, dcid, l2cap_packets.Continuation.END, config_options)
        self.control_channel.send(config_request)

        channel = CertL2capChannel(self._device, scid, dcid,
                                   self._get_acl_stream(), self._acl,
                                   self.control_channel, self.fcs_enabled)
        return channel

    # prefer to use channel abstraction instead, if at all possible
@@ -230,49 +263,11 @@ class CertL2cap(Closable):
        return self._acl_manager.get_acl_stream()

    # Disable ERTM when exchange extened feature
    def disable_ertm(self):
    def claim_ertm_unsupported(self):
        self.support_ertm = False

    # Disable FCS when exchange extened feature
    def disable_fcs(self):
        self.support_fcs = False

    def set_mtu(self, mtu=672):
        self.mtu_option = l2cap_packets.MtuConfigurationOption()
        self.mtu_option.mtu = mtu

    def turn_on_ertm(self, tx_window_size=10, max_transmit=20, mps=1010):
        self.ertm_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption(
        )
        self.ertm_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.ENHANCED_RETRANSMISSION
        self.ertm_option.tx_window_size = tx_window_size
        self.ertm_option.max_transmit = max_transmit
        self.ertm_option.retransmission_time_out = 2000
        self.ertm_option.monitor_time_out = 12000
        self.ertm_option.maximum_pdu_size = mps

    def turn_on_fcs(self):
        self.fcs_option = l2cap_packets.FrameCheckSequenceOption()
        self.fcs_option.fcs_type = l2cap_packets.FcsType.DEFAULT
        self.fcs_enabled = True

    # more of a hack for the moment
    def dont_send_configuration_req(self):
        self.control_table[CommandCode.CONNECTION_RESPONSE] =\
            self.on_connection_response_dont_send_configuration_req

    def on_connection_response_dont_send_configuration_req(
            self, l2cap_control_view):
        connection_response_view = l2cap_packets.ConnectionResponseView(
            l2cap_control_view)
        sid = connection_response_view.GetIdentifier()
        scid = connection_response_view.GetSourceCid()
        dcid = connection_response_view.GetDestinationCid()
        self.scid_to_dcid[scid] = dcid

    # more of a hack for the moment
    def ignore_config_request(self):
        self.control_table[CommandCode.CONFIGURATION_REQUEST] = lambda _: True
        pass

    # more of a hack for the moment
    def reply_with_unknown_options_and_hint(self):
@@ -280,62 +275,8 @@ class CertL2cap(Closable):
            CommandCode.
            CONNECTION_RESPONSE] = self._on_connection_response_configuration_request_with_unknown_options_and_hint

    # more of a hack for the moment
    def reply_with_continuation_flag(self):
        self.control_table[
            CommandCode.
            CONNECTION_RESPONSE] = self._on_connection_response_configuration_request_with_continuation_flag

    # more of a hack for the moment
    def reply_with_nothing(self):
        self.control_table[
            CommandCode.
            CONNECTION_RESPONSE] = self._on_connection_response_do_nothing

    # more of a hack for the moment
    # Send configuration request after receive configuration request
    def config_with_basic_mode(self):
        self.control_table[
            CommandCode.
            CONFIGURATION_REQUEST] = self._on_configuration_request_send_configuration_request_basic_mode

    def set_config_response_result(self, result):
        """
        Set the result for config response packet
        """
        self.config_response_result = result

    def set_config_response_options(self, options):
        """
        Set the options (list) for config response packet
        """
        self.config_response_options = options

    def _on_connection_request_default(self, l2cap_control_view):
        pass

    def _on_connection_response_default(self, l2cap_control_view):
        connection_response_view = l2cap_packets.ConnectionResponseView(
            l2cap_control_view)
        sid = connection_response_view.GetIdentifier()
        scid = connection_response_view.GetSourceCid()
        dcid = connection_response_view.GetDestinationCid()
        self.scid_to_dcid[scid] = dcid

        options = []
        if self.basic_option is not None:
            options.append(self.basic_option)
        elif self.ertm_option is not None:
            options.append(self.ertm_option)
        if self.fcs_option is not None:
            options.append(self.fcs_option)
        if self.mtu_option is not None:
            options.append(self.mtu_option)

        config_request = l2cap_packets.ConfigurationRequestBuilder(
            sid + 1, dcid, l2cap_packets.Continuation.END, options)
        self.control_channel.send(config_request)
        return True
        pass

    def _on_connection_response_configuration_request_with_unknown_options_and_hint(
            self, l2cap_control_view):
@@ -362,124 +303,60 @@ class CertL2cap(Closable):
        self._acl.send(bytes(byte_array))
        return True

    def _on_connection_response_configuration_request_with_continuation_flag(
            self, l2cap_control_view):
        connection_response_view = l2cap_packets.ConnectionResponseView(
            l2cap_control_view)
        sid = connection_response_view.GetIdentifier()
        scid = connection_response_view.GetSourceCid()
        dcid = connection_response_view.GetDestinationCid()
        self.scid_to_dcid[scid] = dcid

        mtu_opt = l2cap_packets.MtuConfigurationOption()
        mtu_opt.mtu = 0x1234

        options = [mtu_opt]
        config_request = l2cap_packets.ConfigurationRequestBuilder(
            sid + 1, dcid, l2cap_packets.Continuation.CONTINUE, options)

        self.control_channel.send(config_request)

        flush_timeout_option = l2cap_packets.FlushTimeoutConfigurationOption()
        flush_timeout_option.flush_timeout = 65535
        option = [flush_timeout_option]
        config_request = l2cap_packets.ConfigurationRequestBuilder(
            sid + 2, dcid, l2cap_packets.Continuation.END, option)
        self.get_control_channel().send(config_request)
        return True

    def _on_connection_response_do_nothing(self, l2cap_control_view):
        connection_response_view = l2cap_packets.ConnectionResponseView(
            l2cap_control_view)
        sid = connection_response_view.GetIdentifier()
        scid = connection_response_view.GetSourceCid()
        dcid = connection_response_view.GetDestinationCid()
        self.scid_to_dcid[scid] = dcid

    def _on_configuration_request_default(self, l2cap_control_view):
        configuration_request = l2cap_packets.ConfigurationRequestView(
            l2cap_control_view)
        sid = configuration_request.GetIdentifier()
        dcid = configuration_request.GetDestinationCid()
        config_response = l2cap_packets.ConfigurationResponseBuilder(
            sid, self.scid_to_dcid.get(dcid, 0), l2cap_packets.Continuation.END,
            self.config_response_result, self.config_response_options)
        self.control_channel.send(config_response)
        pass

    def reply_with_unacceptable_parameters(self):
    @staticmethod
    def config_option_basic_explicit(mtu=642):
        mtu_opt = l2cap_packets.MtuConfigurationOption()
        mtu_opt.mtu = 123
        fcs_opt = l2cap_packets.FrameCheckSequenceOption()
        fcs_opt.fcs_type = l2cap_packets.FcsType.DEFAULT
        mtu_opt.mtu = mtu
        rfc_opt = l2cap_packets.RetransmissionAndFlowControlConfigurationOption(
        )
        rfc_opt.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC
        return [mtu_opt, rfc_opt]

        self.set_config_response_result(
            l2cap_packets.ConfigurationResponseResult.UNACCEPTABLE_PARAMETERS)
        self.set_config_response_options([mtu_opt, fcs_opt, rfc_opt])

    def reply_with_basic_mode(self):
        basic_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption(
        )
        basic_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC

        self.set_config_response_result(
            l2cap_packets.ConfigurationResponseResult.UNACCEPTABLE_PARAMETERS)
        self.set_config_response_options([basic_option])

    def reply_ertm_with_max_transmit_one(self):
    @staticmethod
    def config_option_mtu_explicit(mtu=642):
        mtu_opt = l2cap_packets.MtuConfigurationOption()
        mtu_opt.mtu = 123
        fcs_opt = l2cap_packets.FrameCheckSequenceOption()
        fcs_opt.fcs_type = l2cap_packets.FcsType.DEFAULT
        rfc_opt = l2cap_packets.RetransmissionAndFlowControlConfigurationOption(
        )
        rfc_opt.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.ENHANCED_RETRANSMISSION
        rfc_opt.tx_window_size = 10
        rfc_opt.max_transmit = 1
        rfc_opt.retransmission_time_out = 10
        rfc_opt.monitor_time_out = 10
        rfc_opt.maximum_pdu_size = 1010

        self.set_config_response_options([mtu_opt, fcs_opt, rfc_opt])

    def reply_ertm_with_specified_mps(self, mps=1010):
        mtu_opt.mtu = mtu
        return [mtu_opt]

    @staticmethod
    def config_option_ertm(mtu=642,
                           fcs=None,
                           max_transmit=10,
                           mps=1010,
                           tx_window_size=10,
                           monitor_time_out=2000):
        result = []
        mtu_opt = l2cap_packets.MtuConfigurationOption()
        mtu_opt.mtu = 123
        mtu_opt.mtu = mtu
        result.append(mtu_opt)
        if fcs is not None:
            fcs_opt = l2cap_packets.FrameCheckSequenceOption()
        fcs_opt.fcs_type = l2cap_packets.FcsType.NO_FCS
            fcs_opt.fcs_type = fcs
            result.append(fcs_opt)
        rfc_opt = l2cap_packets.RetransmissionAndFlowControlConfigurationOption(
        )
        rfc_opt.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.ENHANCED_RETRANSMISSION
        rfc_opt.tx_window_size = 10
        rfc_opt.max_transmit = 3
        rfc_opt.retransmission_time_out = 2000
        rfc_opt.monitor_time_out = 2000
        rfc_opt.tx_window_size = tx_window_size
        rfc_opt.max_transmit = max_transmit
        rfc_opt.retransmission_time_out = 1000
        rfc_opt.monitor_time_out = monitor_time_out
        rfc_opt.maximum_pdu_size = mps
        result.append(rfc_opt)
        return result

        self.set_config_response_options([mtu_opt, fcs_opt, rfc_opt])

    def _on_configuration_request_send_configuration_request_basic_mode(
            self, l2cap_control_view):
        configuration_request = l2cap_packets.ConfigurationRequestView(
            l2cap_control_view)
        sid = configuration_request.GetIdentifier()
        dcid = configuration_request.GetDestinationCid()
    @staticmethod
    def config_option_ertm_with_max_transmit_one():
        return CertL2cap.config_option_ertm(max_transmit=1)

        basic_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption(
        )
        basic_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC

        config_request = l2cap_packets.ConfigurationRequestBuilder(
            sid + 1, self.scid_to_dcid.get(dcid, 0),
            l2cap_packets.Continuation.END, [basic_option])
        self.control_channel.send(config_request)
    @staticmethod
    def config_option_ertm_with_mps(mps=1010):
        return CertL2cap.config_option_ertm(mps=mps)

    def _on_configuration_response_default(self, l2cap_control_view):
        configuration_response = l2cap_packets.ConfigurationResponseView(
            l2cap_control_view)
        sid = configuration_response.GetIdentifier()
        pass

    def _on_disconnection_request_default(self, l2cap_control_view):
        disconnection_request = l2cap_packets.DisconnectionRequestView(
@@ -492,8 +369,7 @@ class CertL2cap(Closable):
        self.control_channel.send(disconnection_response)

    def _on_disconnection_response_default(self, l2cap_control_view):
        disconnection_response = l2cap_packets.DisconnectionResponseView(
            l2cap_control_view)
        pass

    def _on_information_request_default(self, l2cap_control_view):
        information_request = l2cap_packets.InformationRequestView(
@@ -518,8 +394,7 @@ class CertL2cap(Closable):
            return

    def _on_information_response_default(self, l2cap_control_view):
        information_response = l2cap_packets.InformationResponseView(
            l2cap_control_view)
        pass

    def _handle_control_packet(self, l2cap_packet):
        packet_bytes = l2cap_packet.payload