Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 9c3630f6 authored by Alice Kuo's avatar Alice Kuo
Browse files

Add broadcast support on ISO dump tool

Support Broadcast stream audio data dump from hci log

Bug: 229346816
Test: python3 ./dump_le_audio.py BTSNOOP.cfa -v --header
Change-Id: Ia8e535daa8033adef37d57fc1da0c50752b81dd1
parent fe0c1d96
Loading
Loading
Loading
Loading
+297 −47
Original line number Diff line number Diff line
@@ -79,6 +79,13 @@ OPCODE_RELEASE = 0x08
# opcode for hci command
OPCODE_HCI_CREATE_CIS = 0x2064
OPCODE_REMOVE_ISO_DATA_PATH = 0x206F
OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA = 0x203F
OPCODE_LE_CREATE_BIG = 0x2068
OPCODE_LE_SETUP_ISO_DATA_PATH = 0x206E

# HCI event
EVENT_CODE_LE_META_EVENT = 0x3E
SUBEVENT_CODE_LE_CREATE_BIG_COMPLETE = 0x1B

TYPE_STREAMING_AUDIO_CONTEXTS = 0x02

@@ -114,6 +121,9 @@ AUDIO_LOCATION_LEFT = 0x01
AUDIO_LOCATION_RIGHT = 0x02
AUDIO_LOCATION_CENTER = 0x04

AD_TYPE_SERVICE_DATA_16_BIT = 0x16
BASIC_AUDIO_ANNOUNCEMENT_SERVICE = 0x1851

packet_number = 0
debug_enable = False
add_header = False
@@ -158,13 +168,46 @@ class AseStream:
        print("octets_per_frame: " + str(self.octets_per_frame))


class Broadcast:

    def __init__(self):
        self.num_of_bis = defaultdict(int)  # subgroup - num_of_bis
        self.bis = defaultdict(BisStream)  # bis_index - codec_config
        self.bis_index_handle_map = defaultdict(int)  # bis_index - bis_handle
        self.bis_index_list = []

    def dump(self):
        for bis_index, iso_stream in self.bis.items():
            print("bis_index: " + str(bis_index) + " bis handle: " + str(self.bis_index_handle_map[bis_index]))
            iso_stream.dump()


class BisStream:

    def __init__(self):
        self.sampling_frequencies = 0xFF
        self.frame_duration = 0xFF
        self.channel_allocation = 0xFFFFFFFF
        self.octets_per_frame = 0xFFFF
        self.output_dump = []
        self.start_time = 0xFFFFFFFF

    def dump(self):
        print("start_time: " + str(self.start_time))
        print("sampling_frequencies: " + str(self.sampling_frequencies))
        print("frame_duration: " + str(self.frame_duration))
        print("channel_allocation: " + str(self.channel_allocation))
        print("octets_per_frame: " + str(self.octets_per_frame))


connection_map = defaultdict(Connection)
cis_acl_map = defaultdict(int)
broadcast_map = defaultdict(Broadcast)
big_adv_map = defaultdict(int)
bis_stream_map = defaultdict(BisStream)


def generate_header(file, connection):
    header = bytearray.fromhex('1ccc1200')
    for ase in connection.ase.values():
def generate_header(file, stream, is_cis):
    sf_case = {
        SAMPLE_FREQUENCY_8000: 80,
        SAMPLE_FREQUENCY_11025: 110,
@@ -180,13 +223,22 @@ def generate_header(file, connection):
        SAMPLE_FREQUENCY_192000: 1920,
        SAMPLE_FREQUENCY_384000: 2840,
    }
        header = header + struct.pack("<H", sf_case[ase.sampling_frequencies])
    fd_case = {FRAME_DURATION_7_5: 7.5, FRAME_DURATION_10: 10}
        header = header + struct.pack("<H", int(ase.octets_per_frame * 8 * 10 / fd_case[ase.frame_duration]))
    al_case = {AUDIO_LOCATION_MONO: 1, AUDIO_LOCATION_LEFT: 1, AUDIO_LOCATION_RIGHT: 1, AUDIO_LOCATION_CENTER: 2}
        header = header + struct.pack("<HHHL", al_case[ase.channel_allocation], fd_case[ase.frame_duration] * 100, 0,
                                      48000000)

    header = bytearray.fromhex('1ccc1200')
    if is_cis:
        for ase in stream.ase.values():
            header = header + struct.pack("<H", sf_case[ase.sampling_frequencies])
            header = header + struct.pack("<H", int(ase.octets_per_frame * 8 * 10 / fd_case[ase.frame_duration]))
            header = header + struct.pack("<HHHL", al_case[ase.channel_allocation], fd_case[ase.frame_duration] * 100,
                                          0, 48000000)
            break
    else:
        header = header + struct.pack("<H", sf_case[stream.sampling_frequencies])
        header = header + struct.pack("<H", int(stream.octets_per_frame * 8 * 10 / fd_case[stream.frame_duration]))
        header = header + struct.pack("<HHHL", al_case[stream.channel_allocation], fd_case[stream.frame_duration] * 100,
                                      0, 48000000)
    file.write(header)


@@ -206,7 +258,7 @@ def parse_codec_information(connection_handle, ase_id, packet):
            ase.frame_duration = value
        elif config_type == TYPE_CHANNEL_ALLOCATION:
            ase.channel_allocation = value
        elif TYPE_OCTETS_PER_FRAME:
        elif config_type == TYPE_OCTETS_PER_FRAME:
            ase.octets_per_frame = value
        length -= (config_length + 1)

@@ -284,6 +336,64 @@ def parse_att_packet(packet, connection_handle, flags, timestamp):
    packet_handle.get((opcode, flags), lambda x, y, z: None)(packet, connection_handle, timestamp)


def parse_big_codec_information(adv_handle, packet):
    # Ignore presentation delay
    packet = unpack_data(packet, 3, True)
    number_of_subgroup, packet = unpack_data(packet, 1, False)
    for subgroup in range(number_of_subgroup):
        num_of_bis, packet = unpack_data(packet, 1, False)
        broadcast_map[adv_handle].num_of_bis[subgroup] = num_of_bis
        # Ignore codec id
        packet = unpack_data(packet, 5, True)
        length, packet = unpack_data(packet, 1, False)
        if len(packet) < length:
            print("Invalid subgroup codec information length")
            return

        while length > 0:
            config_length, packet = unpack_data(packet, 1, False)
            config_type, packet = unpack_data(packet, 1, False)
            value, packet = unpack_data(packet, config_length - 1, False)
            if config_type == TYPE_SAMPLING_FREQUENCIES:
                sampling_frequencies = value
            elif config_type == TYPE_FRAME_DURATION:
                frame_duration = value
            elif config_type == TYPE_OCTETS_PER_FRAME:
                octets_per_frame = value
            else:
                print("Unknown config type")
            length -= (config_length + 1)

        # Ignore metadata
        metadata_length, packet = unpack_data(packet, 1, False)
        packet = unpack_data(packet, metadata_length, True)

        for count in range(num_of_bis):
            bis_index, packet = unpack_data(packet, 1, False)
            broadcast_map[adv_handle].bis_index_list.append(bis_index)
            length, packet = unpack_data(packet, 1, False)
            if len(packet) < length:
                print("Invalid level 3 codec information length")
                return

            while length > 0:
                config_length, packet = unpack_data(packet, 1, False)
                config_type, packet = unpack_data(packet, 1, False)
                value, packet = unpack_data(packet, config_length - 1, False)
                if config_type == TYPE_CHANNEL_ALLOCATION:
                    channel_allocation = value
                else:
                    print("Ignored config type")
                length -= (config_length + 1)

            broadcast_map[adv_handle].bis[bis_index].sampling_frequencies = sampling_frequencies
            broadcast_map[adv_handle].bis[bis_index].frame_duration = frame_duration
            broadcast_map[adv_handle].bis[bis_index].octets_per_frame = octets_per_frame
            broadcast_map[adv_handle].bis[bis_index].channel_allocation = channel_allocation

    return packet


def debug_print(log):
    global packet_number
    print("#" + str(packet_number) + ": " + log)
@@ -303,7 +413,7 @@ def unpack_data(data, byte, ignore):
    return value, data[byte:]


def parse_command_packet(packet):
def parse_command_packet(packet, timestamp):
    opcode, packet = unpack_data(packet, 2, False)
    if opcode == OPCODE_HCI_CREATE_CIS:
        debug_print("OPCODE_HCI_CREATE_CIS")
@@ -330,9 +440,96 @@ def parse_command_packet(packet):
            debug_print("Invalid cmd length")
            return

        cis_handle, packet = unpack_data(packet, 2, False)
        acl_handle = cis_acl_map[cis_handle]
        dump_audio_data_to_file(acl_handle)
        iso_handle, packet = unpack_data(packet, 2, False)
        # CIS stream
        if iso_handle in cis_acl_map:
            acl_handle = cis_acl_map[iso_handle]
            dump_cis_audio_data_to_file(acl_handle)
        # To Do: BIS stream
        elif iso_handle in bis_stream_map:
            dump_bis_audio_data_to_file(iso_handle)
    elif opcode == OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA:
        debug_print("OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA")

        length, packet = unpack_data(packet, 1, False)
        if length != len(packet):
            debug_print("Invalid cmd length")
            return

        if length < 21:
            debug_print("Ignored. Not basic audio announcement")
            return

        adv_hdl, packet = unpack_data(packet, 1, False)
        #ignore operation, advertising_data_length
        packet = unpack_data(packet, 2, True)
        length, packet = unpack_data(packet, 1, False)
        if length != len(packet):
            debug_print("Invalid AD element length")
            return

        ad_type, packet = unpack_data(packet, 1, False)
        service, packet = unpack_data(packet, 2, False)
        if ad_type != AD_TYPE_SERVICE_DATA_16_BIT or service != BASIC_AUDIO_ANNOUNCEMENT_SERVICE:
            debug_print("Ignored. Not basic audio announcement")
            return

        packet = parse_big_codec_information(adv_hdl, packet)
    elif opcode == OPCODE_LE_CREATE_BIG:
        debug_print("OPCODE_LE_CREATE_BIG")

        length, packet = unpack_data(packet, 1, False)
        if length != len(packet) and length < 31:
            debug_print("Invalid Create BIG command length")
            return

        big_handle, packet = unpack_data(packet, 1, False)
        adv_handle, packet = unpack_data(packet, 1, False)
        big_adv_map[big_handle] = adv_handle
    elif opcode == OPCODE_LE_SETUP_ISO_DATA_PATH:
        debug_print("OPCODE_LE_SETUP_ISO_DATA_PATH")
        length, packet = unpack_data(packet, 1, False)
        if len(packet) != length:
            debug_print("Invalid LE SETUP ISO DATA PATH command length")
            return

        iso_handle, packet = unpack_data(packet, 2, False)
        if iso_handle in bis_stream_map:
            bis_stream_map[iso_handle].start_time = timestamp


def parse_event_packet(packet):
    event_code, packet = unpack_data(packet, 1, False)
    if event_code != EVENT_CODE_LE_META_EVENT:
        return

    length, packet = unpack_data(packet, 1, False)
    if len(packet) != length:
        print("Invalid LE mata event length")
        return

    subevent_code, packet = unpack_data(packet, 1, False)
    if subevent_code != SUBEVENT_CODE_LE_CREATE_BIG_COMPLETE:
        return

    status, packet = unpack_data(packet, 1, False)
    if status != 0x00:
        debug_print("Create_BIG failed")
        return

    big_handle, packet = unpack_data(packet, 1, False)
    if big_handle not in big_adv_map:
        print("Invalid BIG handle")
        return
    adv_handle = big_adv_map[big_handle]
    # Ignore, we don't care these parameter
    packet = unpack_data(packet, 15, True)
    num_of_bis, packet = unpack_data(packet, 1, False)
    for count in range(num_of_bis):
        bis_handle, packet = unpack_data(packet, 2, False)
        bis_index = broadcast_map[adv_handle].bis_index_list[count]
        broadcast_map[adv_handle].bis_index_handle_map[bis_index] = bis_handle
        bis_stream_map[bis_handle] = broadcast_map[adv_handle].bis[bis_index]


def convert_time_str(timestamp):
@@ -348,7 +545,7 @@ def convert_time_str(timestamp):
    return full_str_format


def dump_audio_data_to_file(acl_handle):
def dump_cis_audio_data_to_file(acl_handle):
    if debug_enable:
        connection_map[acl_handle].dump()
    file_name = ""
@@ -389,20 +586,20 @@ def dump_audio_data_to_file(acl_handle):
        break

    if connection_map[acl_handle].input_dump != []:
        debug_print("Dump input...")
        debug_print("Dump unicast input...")
        f = open(file_name + "_input.bin", 'wb')
        if add_header == True:
            generate_header(f, connection_map[acl_handle])
            generate_header(f, connection_map[acl_handle], True)
        arr = bytearray(connection_map[acl_handle].input_dump)
        f.write(arr)
        f.close()
        connection_map[acl_handle].input_dump = []

    if connection_map[acl_handle].output_dump != []:
        debug_print("Dump output...")
        debug_print("Dump unicast output...")
        f = open(file_name + "_output.bin", 'wb')
        if add_header == True:
            generate_header(f, connection_map[acl_handle])
            generate_header(f, connection_map[acl_handle], True)
        arr = bytearray(connection_map[acl_handle].output_dump)
        f.write(arr)
        f.close()
@@ -411,6 +608,51 @@ def dump_audio_data_to_file(acl_handle):
    return


def dump_bis_audio_data_to_file(iso_handle):
    if debug_enable:
        bis_stream_map[iso_handle].dump()
    file_name = "broadcast"
    sf_case = {
        SAMPLE_FREQUENCY_8000: "8000",
        SAMPLE_FREQUENCY_11025: "11025",
        SAMPLE_FREQUENCY_16000: "16000",
        SAMPLE_FREQUENCY_22050: "22050",
        SAMPLE_FREQUENCY_24000: "24000",
        SAMPLE_FREQUENCY_32000: "32000",
        SAMPLE_FREQUENCY_44100: "44100",
        SAMPLE_FREQUENCY_48000: "48000",
        SAMPLE_FREQUENCY_88200: "88200",
        SAMPLE_FREQUENCY_96000: "96000",
        SAMPLE_FREQUENCY_176400: "176400",
        SAMPLE_FREQUENCY_192000: "192000",
        SAMPLE_FREQUENCY_384000: "284000"
    }
    file_name += ("_sf" + sf_case[bis_stream_map[iso_handle].sampling_frequencies])
    fd_case = {FRAME_DURATION_7_5: "7_5", FRAME_DURATION_10: "10"}
    file_name += ("_fd" + fd_case[bis_stream_map[iso_handle].frame_duration])
    al_case = {
        AUDIO_LOCATION_MONO: "mono",
        AUDIO_LOCATION_LEFT: "left",
        AUDIO_LOCATION_RIGHT: "right",
        AUDIO_LOCATION_CENTER: "center"
    }
    file_name += ("_" + al_case[bis_stream_map[iso_handle].channel_allocation])
    file_name += ("_frame" + str(bis_stream_map[iso_handle].octets_per_frame))
    file_name += ("_" + convert_time_str(bis_stream_map[iso_handle].start_time))

    if bis_stream_map[iso_handle].output_dump != []:
        debug_print("Dump broadcast output...")
        f = open(file_name + "_output.bin", 'wb')
        if add_header == True:
            generate_header(f, bis_stream_map[iso_handle], False)
        arr = bytearray(bis_stream_map[iso_handle].output_dump)
        f.write(arr)
        f.close()
        bis_stream_map[iso_handle].output_dump = []

    return


def parse_acl_packet(packet, flags, timestamp):
    # Check the minimum acl length, HCI leader (4 bytes)
    # + L2CAP header (4 bytes)
@@ -441,8 +683,8 @@ def parse_acl_packet(packet, flags, timestamp):


def parse_iso_packet(packet, flags):
    cis_handle, packet = unpack_data(packet, 2, False)
    cis_handle &= 0x0EFF
    iso_handle, packet = unpack_data(packet, 2, False)
    iso_handle &= 0x0EFF
    iso_data_load_length, packet = unpack_data(packet, 2, False)
    if iso_data_load_length != len(packet):
        debug_print("Invalid iso data load length")
@@ -457,13 +699,18 @@ def parse_iso_packet(packet, flags):
        debug_print("Invalid iso sdu length")
        return

    acl_handle = cis_acl_map[cis_handle]
    # CIS stream
    if iso_handle in cis_acl_map:
        acl_handle = cis_acl_map[iso_handle]
        if flags == SENT:
            connection_map[acl_handle].output_dump.extend(struct.pack("<H", len(packet)))
            connection_map[acl_handle].output_dump.extend(list(packet))
        elif flags == RECEIVED:
            connection_map[acl_handle].input_dump.extend(struct.pack("<H", len(packet)))
            connection_map[acl_handle].input_dump.extend(list(packet))
    elif iso_handle in bis_stream_map:
        bis_stream_map[iso_handle].output_dump.extend(struct.pack("<H", len(packet)))
        bis_stream_map[iso_handle].output_dump.extend(list(packet))


def parse_next_packet(btsnoop_file):
@@ -490,10 +737,10 @@ def parse_next_packet(btsnoop_file):
        return False

    packet_handle = {
        COMMADN_PACKET: (lambda x, y, z: parse_command_packet(x)),
        COMMADN_PACKET: (lambda x, y, z: parse_command_packet(x, z)),
        ACL_PACKET: (lambda x, y, z: parse_acl_packet(x, y, z)),
        SCO_PACKET: (lambda x, y, z: None),
        EVENT_PACKET: (lambda x, y, z: None),
        EVENT_PACKET: (lambda x, y, z: parse_event_packet(x)),
        ISO_PACKET: (lambda x, y, z: parse_iso_packet(x, y))
    }
    packet_handle.get(type, lambda x, y, z: None)(packet, flags, timestamp)
@@ -535,7 +782,10 @@ def main():
                break

    for handle in connection_map.keys():
        dump_audio_data_to_file(handle)
        dump_cis_audio_data_to_file(handle)

    for handle in bis_stream_map.keys():
        dump_bis_audio_data_to_file(handle)


if __name__ == "__main__":