Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 923e47ee authored by Henri Chataing's avatar Henri Chataing Committed by Automerger Merge Worker
Browse files

Merge changes I7c4faacd,I09564981,I14bb0091,I06fca2a3,I8f6b68e1 into main am:...

Merge changes I7c4faacd,I09564981,I14bb0091,I06fca2a3,I8f6b68e1 into main am: 67969b2e am: c008fcb8

Original change: https://android-review.googlesource.com/c/platform/packages/modules/Bluetooth/+/3001135



Change-Id: I74b24abcef1cd716d3801036773392dd2cdfec84
Signed-off-by: default avatarAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
parents 21a88365 c008fcb8
Loading
Loading
Loading
Loading
+0 −61
Original line number Diff line number Diff line
@@ -15,11 +15,8 @@
#   limitations under the License.

import bluetooth_packets_python3 as bt_packets
from bluetooth_packets_python3 import l2cap_packets
from bluetooth_packets_python3.l2cap_packets import CommandCode, LeCommandCode
from blueberry.tests.gd.cert.capture import Capture
from blueberry.tests.gd.cert.matchers import HciMatchers
from blueberry.tests.gd.cert.matchers import L2capMatchers
from blueberry.tests.gd.cert.matchers import SecurityMatchers
from blueberry.facade.security.facade_pb2 import UiMsgType
import hci_packets as hci
@@ -99,64 +96,6 @@ class HciCaptures(object):
                       lambda packet: hci.Event.parse_all(packet.payload))


class L2capCaptures(object):

    @staticmethod
    def ConnectionRequest(psm):
        return Capture(L2capMatchers.ConnectionRequest(psm), L2capCaptures._extract_connection_request)

    @staticmethod
    def _extract_connection_request(packet):
        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_REQUEST)
        return l2cap_packets.ConnectionRequestView(frame)

    @staticmethod
    def ConnectionResponse(scid):
        return Capture(L2capMatchers.ConnectionResponse(scid), L2capCaptures._extract_connection_response)

    @staticmethod
    def _extract_connection_response(packet):
        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_RESPONSE)
        return l2cap_packets.ConnectionResponseView(frame)

    @staticmethod
    def ConfigurationRequest(cid=None):
        return Capture(L2capMatchers.ConfigurationRequest(cid), L2capCaptures._extract_configuration_request)

    @staticmethod
    def _extract_configuration_request(packet):
        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)
        return l2cap_packets.ConfigurationRequestView(frame)

    @staticmethod
    def CreditBasedConnectionRequest(psm):
        return Capture(L2capMatchers.CreditBasedConnectionRequest(psm),
                       L2capCaptures._extract_credit_based_connection_request)

    @staticmethod
    def _extract_credit_based_connection_request(packet):
        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST)
        return l2cap_packets.LeCreditBasedConnectionRequestView(frame)

    @staticmethod
    def CreditBasedConnectionResponse():
        return Capture(L2capMatchers.CreditBasedConnectionResponse(),
                       L2capCaptures._extract_credit_based_connection_response)

    @staticmethod
    def _extract_credit_based_connection_response(packet):
        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_RESPONSE)
        return l2cap_packets.LeCreditBasedConnectionResponseView(frame)

    @staticmethod
    def LinkSecurityInterfaceCallbackEvent(type):
        return Capture(L2capMatchers.LinkSecurityInterfaceCallbackEvent(type), L2capCaptures._extract_address)

    @staticmethod
    def _extract_address(packet):
        return packet.address


class SecurityCaptures(object):

    @staticmethod
+0 −20
Original line number Diff line number Diff line
@@ -28,7 +28,6 @@ from blueberry.tests.gd.cert.behavior import ReplyStage
from blueberry.tests.gd.cert.event_stream import EventStream, FilteringEventStream
from blueberry.tests.gd.cert.metadata import metadata
from blueberry.tests.gd.cert.truth import assertThat
from bluetooth_packets_python3 import l2cap_packets
import hci_packets as hci

from mobly import asserts
@@ -192,25 +191,6 @@ class CertSelfTest(base_test.BaseTestClass):
        logging.debug(outside.serialize())
        logging.debug("Done!")

    def test_l2cap_config_options(self):
        mtu_opt = l2cap_packets.MtuConfigurationOption()
        mtu_opt.mtu = 123
        fcs_opt = l2cap_packets.FrameCheckSequenceOption()
        fcs_opt.fcs_type = l2cap_packets.FcsType.DEFAULT
        request = l2cap_packets.ConfigurationRequestBuilder(
            0x1d,  # Command ID
            0xc1d,  # Channel ID
            l2cap_packets.Continuation.END,
            [mtu_opt, fcs_opt])
        request_b_frame = l2cap_packets.BasicFrameBuilder(0x01, request)
        handle = 123
        wrapped = hci.Acl(handle=handle,
                          packet_boundary_flag=hci.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
                          broadcast_flag=hci.BroadcastFlag.POINT_TO_POINT,
                          payload=bytes(request_b_frame.Serialize()))
        # Size is ACL (4) + L2CAP (4) + Configure (8) + MTU (4) + FCS (3)
        asserts.assert_true(len(wrapped.serialize()) == 23, "Packet serialized incorrectly")

    def test_assertThat_boolean_success(self):
        assertThat(True).isTrue()
        assertThat(False).isFalse()
+0 −455
Original line number Diff line number Diff line
@@ -18,13 +18,6 @@ import bluetooth_packets_python3 as bt_packets
import logging
import sys

from bluetooth_packets_python3 import l2cap_packets
from bluetooth_packets_python3.l2cap_packets import CommandCode, LeCommandCode
from bluetooth_packets_python3.l2cap_packets import ConfigurationResponseResult
from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult
from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType
from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult

from blueberry.utils import bluetooth
import hci_packets as hci

@@ -274,454 +267,6 @@ class NeighborMatchers(object):
        return bluetooth.Address(address) == event.address


class L2capMatchers(object):

    @staticmethod
    def ConnectionRequest(psm):
        return lambda packet: L2capMatchers._is_matching_connection_request(packet, psm)

    @staticmethod
    def ConnectionResponse(scid):
        return lambda packet: L2capMatchers._is_matching_connection_response(packet, scid)

    @staticmethod
    def ConfigurationResponse(result=ConfigurationResponseResult.SUCCESS):
        return lambda packet: L2capMatchers._is_matching_configuration_response(packet, result)

    @staticmethod
    def ConfigurationRequest(cid=None):
        return lambda packet: L2capMatchers._is_matching_configuration_request_with_cid(packet, cid)

    @staticmethod
    def ConfigurationRequestWithErtm():
        return lambda packet: L2capMatchers._is_matching_configuration_request_with_ertm(packet)

    @staticmethod
    def ConfigurationRequestView(dcid):
        return lambda request_view: request_view.GetDestinationCid() == dcid

    @staticmethod
    def DisconnectionRequest(scid, dcid):
        return lambda packet: L2capMatchers._is_matching_disconnection_request(packet, scid, dcid)

    @staticmethod
    def DisconnectionResponse(scid, dcid):
        return lambda packet: L2capMatchers._is_matching_disconnection_response(packet, scid, dcid)

    @staticmethod
    def EchoResponse():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.ECHO_RESPONSE)

    @staticmethod
    def CommandReject():
        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT)

    @staticmethod
    def LeCommandReject():
        return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT)

    @staticmethod
    def LeConnectionParameterUpdateRequest():
        return lambda packet: L2capMatchers._is_le_control_frame_with_code(
            packet, LeCommandCode.CONNECTION_PARAMETER_UPDATE_REQUEST)

    @staticmethod
    def LeConnectionParameterUpdateResponse(result=l2cap_packets.ConnectionParameterUpdateResponseResult.ACCEPTED):
        return lambda packet: L2capMatchers._is_matching_connection_parameter_update_response(packet, result)

    @staticmethod
    def CreditBasedConnectionRequest(psm):
        return lambda packet: L2capMatchers._is_matching_credit_based_connection_request(packet, psm)

    @staticmethod
    def CreditBasedConnectionResponse(result=LeCreditBasedConnectionResponseResult.SUCCESS):
        return lambda packet: L2capMatchers._is_matching_credit_based_connection_response(packet, result)

    @staticmethod
    def CreditBasedConnectionResponseUsedCid():
        return lambda packet: L2capMatchers._is_matching_credit_based_connection_response(
            packet, LeCreditBasedConnectionResponseResult.SOURCE_CID_ALREADY_ALLOCATED
        ) or L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT)

    @staticmethod
    def LeDisconnectionRequest(scid, dcid):
        return lambda packet: L2capMatchers._is_matching_le_disconnection_request(packet, scid, dcid)

    @staticmethod
    def LeDisconnectionResponse(scid, dcid):
        return lambda packet: L2capMatchers._is_matching_le_disconnection_response(packet, scid, dcid)

    @staticmethod
    def LeFlowControlCredit(cid):
        return lambda packet: L2capMatchers._is_matching_le_flow_control_credit(packet, cid)

    @staticmethod
    def SFrame(req_seq=None, f=None, s=None, p=None):
        return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, req_seq, f, s, p)

    @staticmethod
    def IFrame(tx_seq=None, payload=None, f=None):
        return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=False)

    @staticmethod
    def IFrameWithFcs(tx_seq=None, payload=None, f=None):
        return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=True)

    @staticmethod
    def IFrameStart(tx_seq=None, payload=None, f=None):
        return lambda packet: L2capMatchers._is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False)

    @staticmethod
    def Data(payload):
        return lambda packet: packet.GetPayload().GetBytes() == payload

    @staticmethod
    def FirstLeIFrame(payload, sdu_size):
        return lambda packet: L2capMatchers._is_matching_first_le_i_frame(packet, payload, sdu_size)

    # this is a hack - should be removed
    @staticmethod
    def PartialData(payload):
        return lambda packet: payload in packet.GetPayload().GetBytes()

    # this is a hack - should be removed
    @staticmethod
    def PacketPayloadRawData(payload):
        return lambda packet: payload in packet.payload

    # this is a hack - should be removed
    @staticmethod
    def PacketPayloadWithMatchingPsm(psm):
        return lambda packet: None if psm != packet.psm else packet

    # this is a hack - should be removed
    @staticmethod
    def PacketPayloadWithMatchingCid(cid):
        return lambda packet: None if cid != packet.fixed_cid else packet

    @staticmethod
    def ExtractBasicFrame(scid):
        return lambda packet: L2capMatchers._basic_frame_for(packet, scid)

    @staticmethod
    def ExtractBasicFrameWithFcs(scid):
        return lambda packet: L2capMatchers._basic_frame_with_fcs_for(packet, scid)

    @staticmethod
    def InformationRequestWithType(info_type):
        return lambda packet: L2capMatchers._information_request_with_type(packet, info_type)

    @staticmethod
    def InformationResponseExtendedFeatures(supports_ertm=None,
                                            supports_streaming=None,
                                            supports_fcs=None,
                                            supports_fixed_channels=None):
        return lambda packet: L2capMatchers._is_matching_information_response_extended_features(
            packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels)

    @staticmethod
    def _basic_frame(packet):
        if packet is None:
            return None
        return l2cap_packets.BasicFrameView(bt_packets.PacketViewLittleEndian(list(packet.payload)))

    @staticmethod
    def _basic_frame_with_fcs(packet):
        if packet is None:
            return None
        return l2cap_packets.BasicFrameWithFcsView(bt_packets.PacketViewLittleEndian(list(packet.payload)))

    @staticmethod
    def _basic_frame_for(packet, scid):
        frame = L2capMatchers._basic_frame(packet)
        if frame.GetChannelId() != scid:
            return None
        return frame

    @staticmethod
    def _basic_frame_with_fcs_for(packet, scid):
        frame = L2capMatchers._basic_frame(packet)
        if frame.GetChannelId() != scid:
            return None
        frame = L2capMatchers._basic_frame_with_fcs(packet)
        if frame is None:
            return None
        return frame

    @staticmethod
    def _information_frame(packet):
        standard_frame = l2cap_packets.StandardFrameView(packet)
        if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME:
            return None
        return l2cap_packets.EnhancedInformationFrameView(standard_frame)

    @staticmethod
    def _information_frame_with_fcs(packet):
        standard_frame = l2cap_packets.StandardFrameWithFcsView(packet)
        if standard_frame is None:
            return None
        if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME:
            return None
        return l2cap_packets.EnhancedInformationFrameWithFcsView(standard_frame)

    @staticmethod
    def _information_start_frame(packet):
        start_frame = L2capMatchers._information_frame(packet)
        if start_frame is None:
            return None
        return l2cap_packets.EnhancedInformationStartFrameView(start_frame)

    @staticmethod
    def _information_start_frame_with_fcs(packet):
        start_frame = L2capMatchers._information_frame_with_fcs(packet)
        if start_frame is None:
            return None
        return l2cap_packets.EnhancedInformationStartFrameWithFcsView(start_frame)

    @staticmethod
    def _supervisory_frame(packet):
        standard_frame = l2cap_packets.StandardFrameView(packet)
        if standard_frame.GetFrameType() != l2cap_packets.FrameType.S_FRAME:
            return None
        return l2cap_packets.EnhancedSupervisoryFrameView(standard_frame)

    @staticmethod
    def _is_matching_information_frame(packet, tx_seq, payload, f, fcs=False):
        if fcs:
            frame = L2capMatchers._information_frame_with_fcs(packet)
        else:
            frame = L2capMatchers._information_frame(packet)
        if frame is None:
            return False
        if tx_seq is not None and frame.GetTxSeq() != tx_seq:
            return False
        if payload is not None and frame.GetPayload().GetBytes() != payload:
            return False
        if f is not None and frame.GetF() != f:
            return False
        return True

    @staticmethod
    def _is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False):
        if fcs:
            frame = L2capMatchers._information_start_frame_with_fcs(packet)
        else:
            frame = L2capMatchers._information_start_frame(packet)
        if frame is None:
            return False
        if tx_seq is not None and frame.GetTxSeq() != tx_seq:
            return False
        if payload is not None and frame.GetPayload().GetBytes() != payload:
            return False
        if f is not None and frame.GetF() != f:
            return False
        return True

    @staticmethod
    def _is_matching_supervisory_frame(packet, req_seq, f, s, p):
        frame = L2capMatchers._supervisory_frame(packet)
        if frame is None:
            return False
        if req_seq is not None and frame.GetReqSeq() != req_seq:
            return False
        if f is not None and frame.GetF() != f:
            return False
        if s is not None and frame.GetS() != s:
            return False
        if p is not None and frame.GetP() != p:
            return False
        return True

    @staticmethod
    def _is_matching_first_le_i_frame(packet, payload, sdu_size):
        first_le_i_frame = l2cap_packets.FirstLeInformationFrameView(packet)
        return first_le_i_frame.GetPayload().GetBytes() == payload and first_le_i_frame.GetL2capSduLength() == sdu_size

    @staticmethod
    def _control_frame(packet):
        if packet.GetChannelId() != 1:
            return None
        return l2cap_packets.ControlView(packet.GetPayload())

    @staticmethod
    def _le_control_frame(packet):
        if packet.GetChannelId() != 5:
            return None
        return l2cap_packets.LeControlView(packet.GetPayload())

    @staticmethod
    def control_frame_with_code(packet, code):
        frame = L2capMatchers._control_frame(packet)
        if frame is None or frame.GetCode() != code:
            return None
        return frame

    @staticmethod
    def le_control_frame_with_code(packet, code):
        frame = L2capMatchers._le_control_frame(packet)
        if frame is None or frame.GetCode() != code:
            return None
        return frame

    @staticmethod
    def _is_control_frame_with_code(packet, code):
        return L2capMatchers.control_frame_with_code(packet, code) is not None

    @staticmethod
    def _is_le_control_frame_with_code(packet, code):
        return L2capMatchers.le_control_frame_with_code(packet, code) is not None

    @staticmethod
    def _is_matching_connection_request(packet, psm):
        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_REQUEST)
        if frame is None:
            return False
        request = l2cap_packets.ConnectionRequestView(frame)
        return request.GetPsm() == psm

    @staticmethod
    def _is_matching_connection_response(packet, scid):
        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_RESPONSE)
        if frame is None:
            return False
        response = l2cap_packets.ConnectionResponseView(frame)
        return response.GetSourceCid() == scid and response.GetResult(
        ) == ConnectionResponseResult.SUCCESS and response.GetDestinationCid() != 0

    @staticmethod
    def _is_matching_configuration_request_with_cid(packet, cid=None):
        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)
        if frame is None:
            return False
        request = l2cap_packets.ConfigurationRequestView(frame)
        dcid = request.GetDestinationCid()
        return cid is None or cid == dcid

    @staticmethod
    def _is_matching_configuration_request_with_ertm(packet):
        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)
        if frame is None:
            return False
        request = l2cap_packets.ConfigurationRequestView(frame)
        config_bytes = request.GetBytes()
        # TODO(b/153189503): Use packet struct parser.
        return b"\x04\x09\x03" in config_bytes

    @staticmethod
    def _is_matching_configuration_response(packet, result=ConfigurationResponseResult.SUCCESS):
        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_RESPONSE)
        if frame is None:
            return False
        response = l2cap_packets.ConfigurationResponseView(frame)
        return response.GetResult() == result

    @staticmethod
    def _is_matching_disconnection_request(packet, scid, dcid):
        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST)
        if frame is None:
            return False
        request = l2cap_packets.DisconnectionRequestView(frame)
        return request.GetSourceCid() == scid and request.GetDestinationCid() == dcid

    @staticmethod
    def _is_matching_disconnection_response(packet, scid, dcid):
        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.DISCONNECTION_RESPONSE)
        if frame is None:
            return False
        response = l2cap_packets.DisconnectionResponseView(frame)
        return response.GetSourceCid() == scid and response.GetDestinationCid() == dcid

    @staticmethod
    def _is_matching_le_disconnection_response(packet, scid, dcid):
        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.DISCONNECTION_RESPONSE)
        if frame is None:
            return False
        response = l2cap_packets.LeDisconnectionResponseView(frame)
        return response.GetSourceCid() == scid and response.GetDestinationCid() == dcid

    @staticmethod
    def _is_matching_le_disconnection_request(packet, scid, dcid):
        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.DISCONNECTION_REQUEST)
        if frame is None:
            return False
        request = l2cap_packets.LeDisconnectionRequestView(frame)
        return request.GetSourceCid() == scid and request.GetDestinationCid() == dcid

    @staticmethod
    def _is_matching_le_flow_control_credit(packet, cid):
        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_FLOW_CONTROL_CREDIT)
        if frame is None:
            return False
        request = l2cap_packets.LeFlowControlCreditView(frame)
        return request.GetCid() == cid

    @staticmethod
    def _information_request_with_type(packet, info_type):
        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.INFORMATION_REQUEST)
        if frame is None:
            return None
        request = l2cap_packets.InformationRequestView(frame)
        if request.GetInfoType() != info_type:
            return None
        return request

    @staticmethod
    def _information_response_with_type(packet, info_type):
        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.INFORMATION_RESPONSE)
        if frame is None:
            return None
        response = l2cap_packets.InformationResponseView(frame)
        if response.GetInfoType() != info_type:
            return None
        return response

    @staticmethod
    def _is_matching_information_response_extended_features(packet, supports_ertm, supports_streaming, supports_fcs,
                                                            supports_fixed_channels):
        frame = L2capMatchers._information_response_with_type(packet,
                                                              InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED)
        if frame is None:
            return False
        features = l2cap_packets.InformationResponseExtendedFeaturesView(frame)
        if supports_ertm is not None and features.GetEnhancedRetransmissionMode() != supports_ertm:
            return False
        if supports_streaming is not None and features.GetStreamingMode != supports_streaming:
            return False
        if supports_fcs is not None and features.GetFcsOption() != supports_fcs:
            return False
        if supports_fixed_channels is not None and features.GetFixedChannels() != supports_fixed_channels:
            return False
        return True

    @staticmethod
    def _is_matching_connection_parameter_update_response(packet, result):
        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.CONNECTION_PARAMETER_UPDATE_RESPONSE)
        if frame is None:
            return False
        return l2cap_packets.ConnectionParameterUpdateResponseView(frame).GetResult() == result

    @staticmethod
    def _is_matching_credit_based_connection_request(packet, psm):
        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST)
        if frame is None:
            return False
        request = l2cap_packets.LeCreditBasedConnectionRequestView(frame)
        return request.GetLePsm() == psm

    @staticmethod
    def _is_matching_credit_based_connection_response(packet, result):
        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_RESPONSE)
        if frame is None:
            return False
        response = l2cap_packets.LeCreditBasedConnectionResponseView(frame)
        return response.GetResult() == result and (result != LeCreditBasedConnectionResponseResult.SUCCESS or
                                                   response.GetDestinationCid() != 0)

    @staticmethod
    def LinkSecurityInterfaceCallbackEvent(type):
        return lambda event: True if event.event_type == type else False


class SecurityMatchers(object):

    @staticmethod
+0 −264

File deleted.

Preview size limit exceeded, changes collapsed.

+0 −200

File deleted.

Preview size limit exceeded, changes collapsed.

Loading