Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f107d14a authored by Alice Kuo's avatar Alice Kuo
Browse files

Add the header in the front of the dump data for LC3.TS v1.0.3 format

The commit contains the following change,
1. Add the headeer for the LC3.TS - LC3 Conformance Interoperability Test Software Release V1.0.3
2. Correct the document
3. Fix the minor naming error

Bug: 150670922
Bug: 200748923
Test: Parse the btsnoop with LE audio streo media case, use the TS to
decoded the dump data to wav files, use the player to confirm the audio

Change-Id: I70a47fc58ad506513abd191f9055650c109695b3
parent 6336c5c5
Loading
Loading
Loading
Loading
+50 −9
Original line number Diff line number Diff line
@@ -20,25 +20,27 @@
#
# Audio File Name Format:
# [Context]_sf[Sample frequency]_fd[Frame duration]_[Channel allocation]_
# frame[Octects per frame]_[Direction].bin
# frame[Octets per frame]_[Stream start timestamp]_[Direction].bin
#
#
# Usage:
# ./dump_le_audio.py BTSNOOP.cfa
#
# -v, --verbose to enable the verbose log
# --header, Add the header for LC3.TS.
#
# NOTE:
# Please make sure you HCI Snoop data file includes the following frames:
# GATT service discovery for "ASE Control Point" chracteristic
# GATT config codec via ASE Control Point
# HCI create CIS
# Point with "Start stream", and the data frames.
# HCI create CIS to point out the "Start stream", and the data frames.
# HCI remove ISO data path to trigger dump audio data
#
# Correspondsing Spec.
# ASCS_1.0
# PACS_1.0
# BAP_1.0
# LC3.TS V1.0.3
#
from collections import defaultdict
from os import X_OK
@@ -82,7 +84,7 @@ TYPE_STREAMING_AUDIO_CONTEXTS = 0x02
TYPE_SAMPLING_FREQUENCIES = 0x01
TYPE_FRAME_DURATION = 0x02
TYPE_CHANNEL_ALLOCATION = 0x03
TYPE_OCTECTS_PER_FRAME = 0x04
TYPE_OCTETS_PER_FRAME = 0x04

CONTEXT_TYPE_CONVERSATIONAL = 0x0002
CONTEXT_TYPE_MEDIA = 0x0004
@@ -113,6 +115,7 @@ AUDIO_LOCATION_CENTER = 0x04

packet_number = 0
debug_enable = False
add_header = False


class Connection:
@@ -144,19 +147,47 @@ class AseStream:
        self.sampling_frequencies = 0xFF
        self.frame_duration = 0xFF
        self.channel_allocation = 0xFFFFFFFF
        self.octects_per_frame = 0xFFFF
        self.octets_per_frame = 0xFFFF

    def dump(self):
        print("sampling_frequencies: " + str(self.sampling_frequencies))
        print("frame_duration: " + str(self.frame_duration))
        print("channel_allocation: " + str(self.channel_allocation))
        print("octects_per_frame: " + str(self.octects_per_frame))
        print("octets_per_frame: " + str(self.octets_per_frame))


connection_map = defaultdict(Connection)
cis_acl_map = defaultdict(int)


def generate_header(file, connection):
    header = bytearray.fromhex('1ccc1200')
    for ase in connection.ase.values():
        sf_case = {
            SAMPLE_FREQUENCY_8000: 80,
            SAMPLE_FREQUENCY_11025: 110,
            SAMPLE_FREQUENCY_16000: 160,
            SAMPLE_FREQUENCY_22050: 220,
            SAMPLE_FREQUENCY_24000: 240,
            SAMPLE_FREQUENCY_32000: 320,
            SAMPLE_FREQUENCY_44100: 441,
            SAMPLE_FREQUENCY_48000: 480,
            SAMPLE_FREQUENCY_88200: 882,
            SAMPLE_FREQUENCY_96000: 960,
            SAMPLE_FREQUENCY_176400: 1764,
            SAMPLE_FREQUENCY_192000: 1920,
            SAMPLE_FREQUENCY_384000: 2840,
        }
        header = header + struct.pack("<H", sf_case[ase.sampling_frequencies])
        fd_case = {FRAME_DURATION_7_5: 7.5, FRAME_DURATION_10: 10}
        header = header + struct.pack("<H", ase.octets_per_frame * 8 * 10 / fd_case[ase.frame_duration])
        al_case = {AUDIO_LOCATION_MONO: 1, AUDIO_LOCATION_LEFT: 1, AUDIO_LOCATION_RIGHT: 1, AUDIO_LOCATION_CENTER: 2}
        header = header + struct.pack("<HHHL", al_case[ase.channel_allocation], fd_case[ase.frame_duration] * 100, 0,
                                      48000000)
        break
    file.write(header)


def parse_codec_information(connection_handle, ase_id, packet):
    length, packet = unpack_data(packet, 1, False)
    if len(packet) < length:
@@ -173,8 +204,8 @@ def parse_codec_information(connection_handle, ase_id, packet):
            ase.frame_duration = value
        elif config_type == TYPE_CHANNEL_ALLOCATION:
            ase.channel_allocation = value
        elif TYPE_OCTECTS_PER_FRAME:
            ase.octects_per_frame = value
        elif TYPE_OCTETS_PER_FRAME:
            ase.octets_per_frame = value
        length -= (config_length + 1)

    return packet
@@ -343,13 +374,15 @@ def dump_audio_data_to_file(acl_handle):
            AUDIO_LOCATION_CENTER: "center"
        }
        file_name += ("_" + al_case[ase.channel_allocation])
        file_name += ("_frame" + str(ase.octects_per_frame))
        file_name += ("_frame" + str(ase.octets_per_frame))
        file_name += ("_" + convert_time_str(connection_map[acl_handle].start_time))
        break

    if connection_map[acl_handle].input_dump != []:
        debug_print("Dump input...")
        f = open(file_name + "_input.bin", 'wb')
        if add_header == True:
            generate_header(f, connection_map[acl_handle])
        arr = bytearray(connection_map[acl_handle].input_dump)
        f.write(arr)
        f.close()
@@ -358,6 +391,8 @@ def dump_audio_data_to_file(acl_handle):
    if connection_map[acl_handle].output_dump != []:
        debug_print("Dump output...")
        f = open(file_name + "_output.bin", 'wb')
        if add_header == True:
            generate_header(f, connection_map[acl_handle])
        arr = bytearray(connection_map[acl_handle].output_dump)
        f.write(arr)
        f.close()
@@ -457,13 +492,19 @@ def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("btsnoop_file", help="btsnoop file contains LE audio start procedure")
    parser.add_argument("-v", "--verbose", help="Enable verbose log.", action="store_true")
    parser.add_argument("--header", help="Add the header for LC3.TS.", action="store_true")

    argv = parser.parse_args()
    BTSNOOP_FILE_NAME = argv.btsnoop_file

    global debug_enable
    global add_header
    if argv.verbose:
        debug_enable = True

    if argv.header:
        add_header = True

    with open(BTSNOOP_FILE_NAME, "rb") as btsnoop_file:
        if btsnoop_file.read(16) != BTSNOOP_HEADER:
            print("Invalid btsnoop header")