Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a4b4e27e authored by Zach Johnson's avatar Zach Johnson
Browse files

Start migrating l2cap packet matchers to their own file

Test: cert/run --host --test_filter=L2capTest
Change-Id: Ia3aab45455042b10273a0741702faa7ea743c7b4
parent 4d4a4a08
Loading
Loading
Loading
Loading
+63 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2020 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3 import l2cap_packets
from bluetooth_packets_python3.l2cap_packets import CommandCode


class L2capMatchers(object):

    @staticmethod
    def ConnectionRequest():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONNECTION_REQUEST)

    @staticmethod
    def ConfigurationResponse():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONFIGURATION_RESPONSE)

    @staticmethod
    def ConfigurationRequest():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)

    @staticmethod
    def DisconnectionRequest():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST)

    @staticmethod
    def _basic_frame(packet):
        if packet is None:
            return None
        return l2cap_packets.BasicFrameView(
            bt_packets.PacketViewLittleEndian(list(packet.payload)))

    @staticmethod
    def _control_frame(packet):
        frame = L2capMatchers._basic_frame(packet)
        if frame is None or frame.GetChannelId() != 1:
            return None
        return l2cap_packets.ControlView(frame.GetPayload())

    @staticmethod
    def _control_frame_with_code(packet, code):
        frame = L2capMatchers._control_frame(packet)
        if frame is None or frame.GetCode() != code:
            return None
        return frame

    @staticmethod
    def _is_control_frame_with_code(packet, code):
        return L2capMatchers._control_frame_with_code(packet, code) is not None
+50 −83
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ from cert.truth import assertThat
from cert.closable import safeClose
from cert.py_l2cap import PyL2cap
from cert.py_acl_manager import PyAclManager
from cert.matchers import L2capMatchers
from facade import common_pb2
from facade import rootservice_pb2 as facade_rootservice
from google.protobuf import empty_pb2 as empty_proto
@@ -283,40 +284,6 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        return l2cap_packets.BasicFrameView(
            bt_packets.PacketViewLittleEndian(list(l2cap_packet.payload)))

    def get_control_frame(self, l2cap_packet, code):
        l2cap_view = self.get_basic_frame(l2cap_packet)
        if l2cap_view.GetChannelId() != 1:
            return None
        l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload())
        if l2cap_control_view.GetCode() != code:
            return None
        return l2cap_control_view

    def is_correct_connection_request(self, l2cap_packet):
        return self.get_control_frame(
            l2cap_packet,
            l2cap_packets.CommandCode.CONNECTION_REQUEST) is not None

    def is_correct_configuration_response(self, l2cap_packet):
        control_frame = self.get_control_frame(
            l2cap_packet, l2cap_packets.CommandCode.CONFIGURATION_RESPONSE)
        if control_frame is None:
            return False
        configuration_response_view = l2cap_packets.ConfigurationResponseView(
            control_frame)
        return configuration_response_view.GetResult(
        ) == l2cap_packets.ConfigurationResponseResult.SUCCESS

    def is_correct_configuration_request(self, l2cap_packet):
        return self.get_control_frame(
            l2cap_packet,
            l2cap_packets.CommandCode.CONFIGURATION_REQUEST) is not None

    def is_correct_disconnection_request(self, l2cap_packet):
        return self.get_control_frame(
            l2cap_packet,
            l2cap_packets.CommandCode.DISCONNECTION_REQUEST) is not None

    def cert_send_b_frame(self, b_frame):
        self.cert_acl.send(b_frame.Serialize())

@@ -473,8 +440,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        dcid = self.scid_to_dcid[scid]

@@ -501,7 +468,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.dut.l2cap.OpenChannel(
            l2cap_facade_pb2.OpenChannelRequest(
                remote=self.cert_address, psm=psm))
        assertThat(self.cert_acl).emits(self.is_correct_connection_request)
        assertThat(self.cert_acl).emits(L2capMatchers.ConnectionRequest())

    def test_accept_disconnect(self):
        """
@@ -583,9 +550,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            psm,
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)

        assertThat(self.cert_acl).emits(self.is_correct_configuration_response)
        assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationResponse())
        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_request, at_least_times=2)
            L2capMatchers.ConfigurationRequest(), at_least_times=2)

    def test_respond_to_echo_request(self):
        """
@@ -753,8 +720,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        self.dut.l2cap.SendDynamicChannelPacket(
            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -781,8 +748,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        self.dut.l2cap.SendDynamicChannelPacket(
            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -809,8 +776,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        self.dut.l2cap.SendDynamicChannelPacket(
            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -838,8 +805,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        dcid = self.scid_to_dcid[scid]

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        self.dut.l2cap.SendDynamicChannelPacket(
            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -892,8 +859,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        dcid = self.scid_to_dcid[scid]

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        for i in range(3):
            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
@@ -952,8 +919,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        dcid = self.scid_to_dcid[scid]

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        for i in range(3):
            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
@@ -994,8 +961,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        dcid = self.scid_to_dcid[scid]

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        self.dut.l2cap.SendDynamicChannelPacket(
            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -1040,8 +1007,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        dcid = self.scid_to_dcid[scid]

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        self.dut.l2cap.SendDynamicChannelPacket(
            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -1087,8 +1054,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        self.dut.l2cap.SendDynamicChannelPacket(
            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -1118,8 +1085,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        dcid = self.scid_to_dcid[scid]

@@ -1152,8 +1119,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        dcid = self.scid_to_dcid[scid]

@@ -1162,7 +1129,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):

        # Retransmission timer = 2, 20 * monitor timer = 360, so total timeout is 362
        time.sleep(362)
        assertThat(self.cert_acl).emits(self.is_correct_disconnection_request)
        assertThat(self.cert_acl).emits(L2capMatchers.DisconnectionRequest())

    def test_i_frame_transmissions_exceed_max_transmit(self):
        """
@@ -1184,8 +1151,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        dcid = self.scid_to_dcid[scid]

@@ -1200,7 +1167,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
        self.cert_send_b_frame(s_frame)

        assertThat(self.cert_acl).emits(self.is_correct_disconnection_request)
        assertThat(self.cert_acl).emits(L2capMatchers.DisconnectionRequest())

    def test_respond_to_rej(self):
        """
@@ -1223,8 +1190,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        dcid = self.scid_to_dcid[scid]

@@ -1267,8 +1234,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        dcid = self.scid_to_dcid[scid]

@@ -1309,8 +1276,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        dcid = self.scid_to_dcid[scid]

@@ -1352,8 +1319,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        dcid = self.scid_to_dcid[scid]

@@ -1395,8 +1362,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        dcid = self.scid_to_dcid[scid]

@@ -1453,8 +1420,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        dcid = self.scid_to_dcid[scid]

@@ -1510,8 +1477,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        dcid = self.scid_to_dcid[scid]

@@ -1569,8 +1536,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)

        assertThat(self.cert_acl).emits(
            self.is_correct_configuration_response,
            self.is_correct_configuration_request).inAnyOrder()
            L2capMatchers.ConfigurationResponse(),
            L2capMatchers.ConfigurationRequest()).inAnyOrder()

        dcid = self.scid_to_dcid[scid]

@@ -1628,7 +1595,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):

        # TODO: Fix this test. It doesn't work so far with PDL struct

        assertThat(self.cert_acl).emits(self.is_correct_configuration_request)
        assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationRequest())
        asserts.skip("Struct not working")

    def test_respond_configuration_request_ertm(self):
@@ -1651,4 +1618,4 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
        self.cert_send_b_frame(open_channel_l2cap)

        # TODO: Verify that the type should be ERTM
        assertThat(self.cert_acl).emits(self.is_correct_configuration_response)
        assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationResponse())