Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 9135c433 authored by Alice Kuo's avatar Alice Kuo
Browse files

Add EATT support for dump_le_audio.py tool

1. The previous version only support ATT packet parsing. Add the mechanism to get the cid for EATT, and filter the EATT packet to get the codec information
2. Modify the lint error together

Bug: 279356092
Test: verified with EATT supported headset via SW path audio dump
Change-Id: I46d18fbc3236857fe1710980c76e4dda47af3462
parent abbe6e86
Loading
Loading
Loading
Loading
+53 −8
Original line number Diff line number Diff line
@@ -63,7 +63,10 @@ ISO_PACKET = 5
SENT = 0
RECEIVED = 1

L2CAP_ATT_CID = 4
L2CAP_ATT_CID = 0x0004
L2CAP_CID = 0x0005

PSM_EATT = 0x0027

# opcode for att protocol
OPCODE_ATT_READ_BY_TYPE_RSP = 0x09
@@ -83,6 +86,10 @@ OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA = 0x203F
OPCODE_LE_CREATE_BIG = 0x2068
OPCODE_LE_SETUP_ISO_DATA_PATH = 0x206E

# opcode for L2CAP channel
OPCODE_L2CAP_CREDIT_BASED_CONNECTION_REQ = 0x17
OPCODE_L2CAP_CREDIT_BASED_CONNECTION_RSP = 0x18

# HCI event
EVENT_CODE_LE_META_EVENT = 0x3E
SUBEVENT_CODE_LE_CREATE_BIG_COMPLETE = 0x1B
@@ -129,6 +136,10 @@ debug_enable = False
add_header = False
ase_handle = 0xFFFF

l2cap_identifier_set = set()
source_cid = set()
destinate_cid = set()


class Connection:

@@ -676,9 +687,44 @@ def parse_acl_packet(packet, flags, timestamp):
        return

    if debug_enable:
        debug_print("ACL connection_handle - " + str(connection_handle))
        debug_print("ACL connection_handle - " + str(connection_handle) + " channel id - " + (str(channel_id)))

    # Gather EATT CID
    if channel_id == L2CAP_CID:
        global l2cap_identifier_set
        global source_cid
        global destinate_cid
        opcode, packet = unpack_data(packet, 1, False)
        identifier, packet = unpack_data(packet, 1, False)
        l2cap_length, packet = unpack_data(packet, 2, False)
        if opcode == OPCODE_L2CAP_CREDIT_BASED_CONNECTION_REQ:
            spsm, packet = unpack_data(packet, 2, False)
            if spsm == PSM_EATT:
                if opcode == OPCODE_L2CAP_CREDIT_BASED_CONNECTION_REQ:
                    l2cap_identifier_set.add(identifier)
                    packet = unpack_data(packet, 6, True)
                    for i in range(0, l2cap_length - 8, 2):
                        cid, packet = unpack_data(packet, 2, False)
                        source_cid.add(cid)

        if opcode == OPCODE_L2CAP_CREDIT_BASED_CONNECTION_RSP:
            if identifier in l2cap_identifier_set:
                l2cap_identifier_set.remove(identifier)
                packet = unpack_data(packet, 8, True)
                for i in range(0, l2cap_length - 8, 2):
                    cid, packet = unpack_data(packet, 2, False)
                    destinate_cid.add(cid)

    # Parse ATT protocol
    if channel_id == L2CAP_ATT_CID:
        if debug_enable:
            debug_print("parse_att_packet")
        parse_att_packet(packet, connection_handle, flags, timestamp)

    if channel_id in source_cid or channel_id in destinate_cid:
        if debug_enable:
            debug_print("parse_eatt_packet")
        packet = unpack_data(packet, 2, True)
        parse_att_packet(packet, connection_handle, flags, timestamp)


@@ -720,8 +766,8 @@ def parse_next_packet(btsnoop_file):
    if len(packet_header) != 25:
        return False

    (length_original, length_captured, flags, dropped_packets, timestamp, type) = struct.unpack(
        ">IIIIqB", packet_header)
    (length_original, length_captured, flags, dropped_packets, timestamp,
     type) = struct.unpack(">IIIIqB", packet_header)

    if length_original != length_captured:
        debug_print("Filtered btnsoop, can not be parsed")
@@ -751,8 +797,7 @@ def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("btsnoop_file", help="btsnoop file contains LE audio start procedure")
    parser.add_argument("-v", "--verbose", help="Enable verbose log.", action="store_true")
    parser.add_argument(
        "--header",
    parser.add_argument("--header",
                        help="Add the header for LC3 Conformance Interoperability Test Software V.1.0.3.",
                        action="store_true")
    parser.add_argument("--ase_handle", help="Set the ASE handle manually.", type=int)