Loading system/tools/scripts/dump_le_audio.py +297 −47 Original line number Diff line number Diff line Loading @@ -79,6 +79,13 @@ OPCODE_RELEASE = 0x08 # opcode for hci command OPCODE_HCI_CREATE_CIS = 0x2064 OPCODE_REMOVE_ISO_DATA_PATH = 0x206F OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA = 0x203F OPCODE_LE_CREATE_BIG = 0x2068 OPCODE_LE_SETUP_ISO_DATA_PATH = 0x206E # HCI event EVENT_CODE_LE_META_EVENT = 0x3E SUBEVENT_CODE_LE_CREATE_BIG_COMPLETE = 0x1B TYPE_STREAMING_AUDIO_CONTEXTS = 0x02 Loading Loading @@ -114,6 +121,9 @@ AUDIO_LOCATION_LEFT = 0x01 AUDIO_LOCATION_RIGHT = 0x02 AUDIO_LOCATION_CENTER = 0x04 AD_TYPE_SERVICE_DATA_16_BIT = 0x16 BASIC_AUDIO_ANNOUNCEMENT_SERVICE = 0x1851 packet_number = 0 debug_enable = False add_header = False Loading Loading @@ -158,13 +168,46 @@ class AseStream: print("octets_per_frame: " + str(self.octets_per_frame)) class Broadcast: def __init__(self): self.num_of_bis = defaultdict(int) # subgroup - num_of_bis self.bis = defaultdict(BisStream) # bis_index - codec_config self.bis_index_handle_map = defaultdict(int) # bis_index - bis_handle self.bis_index_list = [] def dump(self): for bis_index, iso_stream in self.bis.items(): print("bis_index: " + str(bis_index) + " bis handle: " + str(self.bis_index_handle_map[bis_index])) iso_stream.dump() class BisStream: def __init__(self): self.sampling_frequencies = 0xFF self.frame_duration = 0xFF self.channel_allocation = 0xFFFFFFFF self.octets_per_frame = 0xFFFF self.output_dump = [] self.start_time = 0xFFFFFFFF def dump(self): print("start_time: " + str(self.start_time)) print("sampling_frequencies: " + str(self.sampling_frequencies)) print("frame_duration: " + str(self.frame_duration)) print("channel_allocation: " + str(self.channel_allocation)) print("octets_per_frame: " + str(self.octets_per_frame)) connection_map = defaultdict(Connection) cis_acl_map = defaultdict(int) broadcast_map = defaultdict(Broadcast) big_adv_map = defaultdict(int) bis_stream_map = defaultdict(BisStream) def generate_header(file, connection): header = bytearray.fromhex('1ccc1200') for ase in connection.ase.values(): def generate_header(file, stream, is_cis): sf_case = { SAMPLE_FREQUENCY_8000: 80, SAMPLE_FREQUENCY_11025: 110, Loading @@ -180,13 +223,22 @@ def generate_header(file, connection): SAMPLE_FREQUENCY_192000: 1920, SAMPLE_FREQUENCY_384000: 2840, } header = header + struct.pack("<H", sf_case[ase.sampling_frequencies]) fd_case = {FRAME_DURATION_7_5: 7.5, FRAME_DURATION_10: 10} header = header + struct.pack("<H", int(ase.octets_per_frame * 8 * 10 / fd_case[ase.frame_duration])) al_case = {AUDIO_LOCATION_MONO: 1, AUDIO_LOCATION_LEFT: 1, AUDIO_LOCATION_RIGHT: 1, AUDIO_LOCATION_CENTER: 2} header = header + struct.pack("<HHHL", al_case[ase.channel_allocation], fd_case[ase.frame_duration] * 100, 0, 48000000) header = bytearray.fromhex('1ccc1200') if is_cis: for ase in stream.ase.values(): header = header + struct.pack("<H", sf_case[ase.sampling_frequencies]) header = header + struct.pack("<H", int(ase.octets_per_frame * 8 * 10 / fd_case[ase.frame_duration])) header = header + struct.pack("<HHHL", al_case[ase.channel_allocation], fd_case[ase.frame_duration] * 100, 0, 48000000) break else: header = header + struct.pack("<H", sf_case[stream.sampling_frequencies]) header = header + struct.pack("<H", int(stream.octets_per_frame * 8 * 10 / fd_case[stream.frame_duration])) header = header + struct.pack("<HHHL", al_case[stream.channel_allocation], fd_case[stream.frame_duration] * 100, 0, 48000000) file.write(header) Loading @@ -206,7 +258,7 @@ def parse_codec_information(connection_handle, ase_id, packet): ase.frame_duration = value elif config_type == TYPE_CHANNEL_ALLOCATION: ase.channel_allocation = value elif TYPE_OCTETS_PER_FRAME: elif config_type == TYPE_OCTETS_PER_FRAME: ase.octets_per_frame = value length -= (config_length + 1) Loading Loading @@ -284,6 +336,64 @@ def parse_att_packet(packet, connection_handle, flags, timestamp): packet_handle.get((opcode, flags), lambda x, y, z: None)(packet, connection_handle, timestamp) def parse_big_codec_information(adv_handle, packet): # Ignore presentation delay packet = unpack_data(packet, 3, True) number_of_subgroup, packet = unpack_data(packet, 1, False) for subgroup in range(number_of_subgroup): num_of_bis, packet = unpack_data(packet, 1, False) broadcast_map[adv_handle].num_of_bis[subgroup] = num_of_bis # Ignore codec id packet = unpack_data(packet, 5, True) length, packet = unpack_data(packet, 1, False) if len(packet) < length: print("Invalid subgroup codec information length") return while length > 0: config_length, packet = unpack_data(packet, 1, False) config_type, packet = unpack_data(packet, 1, False) value, packet = unpack_data(packet, config_length - 1, False) if config_type == TYPE_SAMPLING_FREQUENCIES: sampling_frequencies = value elif config_type == TYPE_FRAME_DURATION: frame_duration = value elif config_type == TYPE_OCTETS_PER_FRAME: octets_per_frame = value else: print("Unknown config type") length -= (config_length + 1) # Ignore metadata metadata_length, packet = unpack_data(packet, 1, False) packet = unpack_data(packet, metadata_length, True) for count in range(num_of_bis): bis_index, packet = unpack_data(packet, 1, False) broadcast_map[adv_handle].bis_index_list.append(bis_index) length, packet = unpack_data(packet, 1, False) if len(packet) < length: print("Invalid level 3 codec information length") return while length > 0: config_length, packet = unpack_data(packet, 1, False) config_type, packet = unpack_data(packet, 1, False) value, packet = unpack_data(packet, config_length - 1, False) if config_type == TYPE_CHANNEL_ALLOCATION: channel_allocation = value else: print("Ignored config type") length -= (config_length + 1) broadcast_map[adv_handle].bis[bis_index].sampling_frequencies = sampling_frequencies broadcast_map[adv_handle].bis[bis_index].frame_duration = frame_duration broadcast_map[adv_handle].bis[bis_index].octets_per_frame = octets_per_frame broadcast_map[adv_handle].bis[bis_index].channel_allocation = channel_allocation return packet def debug_print(log): global packet_number print("#" + str(packet_number) + ": " + log) Loading @@ -303,7 +413,7 @@ def unpack_data(data, byte, ignore): return value, data[byte:] def parse_command_packet(packet): def parse_command_packet(packet, timestamp): opcode, packet = unpack_data(packet, 2, False) if opcode == OPCODE_HCI_CREATE_CIS: debug_print("OPCODE_HCI_CREATE_CIS") Loading @@ -330,9 +440,96 @@ def parse_command_packet(packet): debug_print("Invalid cmd length") return cis_handle, packet = unpack_data(packet, 2, False) acl_handle = cis_acl_map[cis_handle] dump_audio_data_to_file(acl_handle) iso_handle, packet = unpack_data(packet, 2, False) # CIS stream if iso_handle in cis_acl_map: acl_handle = cis_acl_map[iso_handle] dump_cis_audio_data_to_file(acl_handle) # To Do: BIS stream elif iso_handle in bis_stream_map: dump_bis_audio_data_to_file(iso_handle) elif opcode == OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA: debug_print("OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA") length, packet = unpack_data(packet, 1, False) if length != len(packet): debug_print("Invalid cmd length") return if length < 21: debug_print("Ignored. Not basic audio announcement") return adv_hdl, packet = unpack_data(packet, 1, False) #ignore operation, advertising_data_length packet = unpack_data(packet, 2, True) length, packet = unpack_data(packet, 1, False) if length != len(packet): debug_print("Invalid AD element length") return ad_type, packet = unpack_data(packet, 1, False) service, packet = unpack_data(packet, 2, False) if ad_type != AD_TYPE_SERVICE_DATA_16_BIT or service != BASIC_AUDIO_ANNOUNCEMENT_SERVICE: debug_print("Ignored. Not basic audio announcement") return packet = parse_big_codec_information(adv_hdl, packet) elif opcode == OPCODE_LE_CREATE_BIG: debug_print("OPCODE_LE_CREATE_BIG") length, packet = unpack_data(packet, 1, False) if length != len(packet) and length < 31: debug_print("Invalid Create BIG command length") return big_handle, packet = unpack_data(packet, 1, False) adv_handle, packet = unpack_data(packet, 1, False) big_adv_map[big_handle] = adv_handle elif opcode == OPCODE_LE_SETUP_ISO_DATA_PATH: debug_print("OPCODE_LE_SETUP_ISO_DATA_PATH") length, packet = unpack_data(packet, 1, False) if len(packet) != length: debug_print("Invalid LE SETUP ISO DATA PATH command length") return iso_handle, packet = unpack_data(packet, 2, False) if iso_handle in bis_stream_map: bis_stream_map[iso_handle].start_time = timestamp def parse_event_packet(packet): event_code, packet = unpack_data(packet, 1, False) if event_code != EVENT_CODE_LE_META_EVENT: return length, packet = unpack_data(packet, 1, False) if len(packet) != length: print("Invalid LE mata event length") return subevent_code, packet = unpack_data(packet, 1, False) if subevent_code != SUBEVENT_CODE_LE_CREATE_BIG_COMPLETE: return status, packet = unpack_data(packet, 1, False) if status != 0x00: debug_print("Create_BIG failed") return big_handle, packet = unpack_data(packet, 1, False) if big_handle not in big_adv_map: print("Invalid BIG handle") return adv_handle = big_adv_map[big_handle] # Ignore, we don't care these parameter packet = unpack_data(packet, 15, True) num_of_bis, packet = unpack_data(packet, 1, False) for count in range(num_of_bis): bis_handle, packet = unpack_data(packet, 2, False) bis_index = broadcast_map[adv_handle].bis_index_list[count] broadcast_map[adv_handle].bis_index_handle_map[bis_index] = bis_handle bis_stream_map[bis_handle] = broadcast_map[adv_handle].bis[bis_index] def convert_time_str(timestamp): Loading @@ -348,7 +545,7 @@ def convert_time_str(timestamp): return full_str_format def dump_audio_data_to_file(acl_handle): def dump_cis_audio_data_to_file(acl_handle): if debug_enable: connection_map[acl_handle].dump() file_name = "" Loading Loading @@ -389,20 +586,20 @@ def dump_audio_data_to_file(acl_handle): break if connection_map[acl_handle].input_dump != []: debug_print("Dump input...") debug_print("Dump unicast input...") f = open(file_name + "_input.bin", 'wb') if add_header == True: generate_header(f, connection_map[acl_handle]) generate_header(f, connection_map[acl_handle], True) arr = bytearray(connection_map[acl_handle].input_dump) f.write(arr) f.close() connection_map[acl_handle].input_dump = [] if connection_map[acl_handle].output_dump != []: debug_print("Dump output...") debug_print("Dump unicast output...") f = open(file_name + "_output.bin", 'wb') if add_header == True: generate_header(f, connection_map[acl_handle]) generate_header(f, connection_map[acl_handle], True) arr = bytearray(connection_map[acl_handle].output_dump) f.write(arr) f.close() Loading @@ -411,6 +608,51 @@ def dump_audio_data_to_file(acl_handle): return def dump_bis_audio_data_to_file(iso_handle): if debug_enable: bis_stream_map[iso_handle].dump() file_name = "broadcast" sf_case = { SAMPLE_FREQUENCY_8000: "8000", SAMPLE_FREQUENCY_11025: "11025", SAMPLE_FREQUENCY_16000: "16000", SAMPLE_FREQUENCY_22050: "22050", SAMPLE_FREQUENCY_24000: "24000", SAMPLE_FREQUENCY_32000: "32000", SAMPLE_FREQUENCY_44100: "44100", SAMPLE_FREQUENCY_48000: "48000", SAMPLE_FREQUENCY_88200: "88200", SAMPLE_FREQUENCY_96000: "96000", SAMPLE_FREQUENCY_176400: "176400", SAMPLE_FREQUENCY_192000: "192000", SAMPLE_FREQUENCY_384000: "284000" } file_name += ("_sf" + sf_case[bis_stream_map[iso_handle].sampling_frequencies]) fd_case = {FRAME_DURATION_7_5: "7_5", FRAME_DURATION_10: "10"} file_name += ("_fd" + fd_case[bis_stream_map[iso_handle].frame_duration]) al_case = { AUDIO_LOCATION_MONO: "mono", AUDIO_LOCATION_LEFT: "left", AUDIO_LOCATION_RIGHT: "right", AUDIO_LOCATION_CENTER: "center" } file_name += ("_" + al_case[bis_stream_map[iso_handle].channel_allocation]) file_name += ("_frame" + str(bis_stream_map[iso_handle].octets_per_frame)) file_name += ("_" + convert_time_str(bis_stream_map[iso_handle].start_time)) if bis_stream_map[iso_handle].output_dump != []: debug_print("Dump broadcast output...") f = open(file_name + "_output.bin", 'wb') if add_header == True: generate_header(f, bis_stream_map[iso_handle], False) arr = bytearray(bis_stream_map[iso_handle].output_dump) f.write(arr) f.close() bis_stream_map[iso_handle].output_dump = [] return def parse_acl_packet(packet, flags, timestamp): # Check the minimum acl length, HCI leader (4 bytes) # + L2CAP header (4 bytes) Loading Loading @@ -441,8 +683,8 @@ def parse_acl_packet(packet, flags, timestamp): def parse_iso_packet(packet, flags): cis_handle, packet = unpack_data(packet, 2, False) cis_handle &= 0x0EFF iso_handle, packet = unpack_data(packet, 2, False) iso_handle &= 0x0EFF iso_data_load_length, packet = unpack_data(packet, 2, False) if iso_data_load_length != len(packet): debug_print("Invalid iso data load length") Loading @@ -457,13 +699,18 @@ def parse_iso_packet(packet, flags): debug_print("Invalid iso sdu length") return acl_handle = cis_acl_map[cis_handle] # CIS stream if iso_handle in cis_acl_map: acl_handle = cis_acl_map[iso_handle] if flags == SENT: connection_map[acl_handle].output_dump.extend(struct.pack("<H", len(packet))) connection_map[acl_handle].output_dump.extend(list(packet)) elif flags == RECEIVED: connection_map[acl_handle].input_dump.extend(struct.pack("<H", len(packet))) connection_map[acl_handle].input_dump.extend(list(packet)) elif iso_handle in bis_stream_map: bis_stream_map[iso_handle].output_dump.extend(struct.pack("<H", len(packet))) bis_stream_map[iso_handle].output_dump.extend(list(packet)) def parse_next_packet(btsnoop_file): Loading @@ -490,10 +737,10 @@ def parse_next_packet(btsnoop_file): return False packet_handle = { COMMADN_PACKET: (lambda x, y, z: parse_command_packet(x)), COMMADN_PACKET: (lambda x, y, z: parse_command_packet(x, z)), ACL_PACKET: (lambda x, y, z: parse_acl_packet(x, y, z)), SCO_PACKET: (lambda x, y, z: None), EVENT_PACKET: (lambda x, y, z: None), EVENT_PACKET: (lambda x, y, z: parse_event_packet(x)), ISO_PACKET: (lambda x, y, z: parse_iso_packet(x, y)) } packet_handle.get(type, lambda x, y, z: None)(packet, flags, timestamp) Loading Loading @@ -535,7 +782,10 @@ def main(): break for handle in connection_map.keys(): dump_audio_data_to_file(handle) dump_cis_audio_data_to_file(handle) for handle in bis_stream_map.keys(): dump_bis_audio_data_to_file(handle) if __name__ == "__main__": Loading Loading
system/tools/scripts/dump_le_audio.py +297 −47 Original line number Diff line number Diff line Loading @@ -79,6 +79,13 @@ OPCODE_RELEASE = 0x08 # opcode for hci command OPCODE_HCI_CREATE_CIS = 0x2064 OPCODE_REMOVE_ISO_DATA_PATH = 0x206F OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA = 0x203F OPCODE_LE_CREATE_BIG = 0x2068 OPCODE_LE_SETUP_ISO_DATA_PATH = 0x206E # HCI event EVENT_CODE_LE_META_EVENT = 0x3E SUBEVENT_CODE_LE_CREATE_BIG_COMPLETE = 0x1B TYPE_STREAMING_AUDIO_CONTEXTS = 0x02 Loading Loading @@ -114,6 +121,9 @@ AUDIO_LOCATION_LEFT = 0x01 AUDIO_LOCATION_RIGHT = 0x02 AUDIO_LOCATION_CENTER = 0x04 AD_TYPE_SERVICE_DATA_16_BIT = 0x16 BASIC_AUDIO_ANNOUNCEMENT_SERVICE = 0x1851 packet_number = 0 debug_enable = False add_header = False Loading Loading @@ -158,13 +168,46 @@ class AseStream: print("octets_per_frame: " + str(self.octets_per_frame)) class Broadcast: def __init__(self): self.num_of_bis = defaultdict(int) # subgroup - num_of_bis self.bis = defaultdict(BisStream) # bis_index - codec_config self.bis_index_handle_map = defaultdict(int) # bis_index - bis_handle self.bis_index_list = [] def dump(self): for bis_index, iso_stream in self.bis.items(): print("bis_index: " + str(bis_index) + " bis handle: " + str(self.bis_index_handle_map[bis_index])) iso_stream.dump() class BisStream: def __init__(self): self.sampling_frequencies = 0xFF self.frame_duration = 0xFF self.channel_allocation = 0xFFFFFFFF self.octets_per_frame = 0xFFFF self.output_dump = [] self.start_time = 0xFFFFFFFF def dump(self): print("start_time: " + str(self.start_time)) print("sampling_frequencies: " + str(self.sampling_frequencies)) print("frame_duration: " + str(self.frame_duration)) print("channel_allocation: " + str(self.channel_allocation)) print("octets_per_frame: " + str(self.octets_per_frame)) connection_map = defaultdict(Connection) cis_acl_map = defaultdict(int) broadcast_map = defaultdict(Broadcast) big_adv_map = defaultdict(int) bis_stream_map = defaultdict(BisStream) def generate_header(file, connection): header = bytearray.fromhex('1ccc1200') for ase in connection.ase.values(): def generate_header(file, stream, is_cis): sf_case = { SAMPLE_FREQUENCY_8000: 80, SAMPLE_FREQUENCY_11025: 110, Loading @@ -180,13 +223,22 @@ def generate_header(file, connection): SAMPLE_FREQUENCY_192000: 1920, SAMPLE_FREQUENCY_384000: 2840, } header = header + struct.pack("<H", sf_case[ase.sampling_frequencies]) fd_case = {FRAME_DURATION_7_5: 7.5, FRAME_DURATION_10: 10} header = header + struct.pack("<H", int(ase.octets_per_frame * 8 * 10 / fd_case[ase.frame_duration])) al_case = {AUDIO_LOCATION_MONO: 1, AUDIO_LOCATION_LEFT: 1, AUDIO_LOCATION_RIGHT: 1, AUDIO_LOCATION_CENTER: 2} header = header + struct.pack("<HHHL", al_case[ase.channel_allocation], fd_case[ase.frame_duration] * 100, 0, 48000000) header = bytearray.fromhex('1ccc1200') if is_cis: for ase in stream.ase.values(): header = header + struct.pack("<H", sf_case[ase.sampling_frequencies]) header = header + struct.pack("<H", int(ase.octets_per_frame * 8 * 10 / fd_case[ase.frame_duration])) header = header + struct.pack("<HHHL", al_case[ase.channel_allocation], fd_case[ase.frame_duration] * 100, 0, 48000000) break else: header = header + struct.pack("<H", sf_case[stream.sampling_frequencies]) header = header + struct.pack("<H", int(stream.octets_per_frame * 8 * 10 / fd_case[stream.frame_duration])) header = header + struct.pack("<HHHL", al_case[stream.channel_allocation], fd_case[stream.frame_duration] * 100, 0, 48000000) file.write(header) Loading @@ -206,7 +258,7 @@ def parse_codec_information(connection_handle, ase_id, packet): ase.frame_duration = value elif config_type == TYPE_CHANNEL_ALLOCATION: ase.channel_allocation = value elif TYPE_OCTETS_PER_FRAME: elif config_type == TYPE_OCTETS_PER_FRAME: ase.octets_per_frame = value length -= (config_length + 1) Loading Loading @@ -284,6 +336,64 @@ def parse_att_packet(packet, connection_handle, flags, timestamp): packet_handle.get((opcode, flags), lambda x, y, z: None)(packet, connection_handle, timestamp) def parse_big_codec_information(adv_handle, packet): # Ignore presentation delay packet = unpack_data(packet, 3, True) number_of_subgroup, packet = unpack_data(packet, 1, False) for subgroup in range(number_of_subgroup): num_of_bis, packet = unpack_data(packet, 1, False) broadcast_map[adv_handle].num_of_bis[subgroup] = num_of_bis # Ignore codec id packet = unpack_data(packet, 5, True) length, packet = unpack_data(packet, 1, False) if len(packet) < length: print("Invalid subgroup codec information length") return while length > 0: config_length, packet = unpack_data(packet, 1, False) config_type, packet = unpack_data(packet, 1, False) value, packet = unpack_data(packet, config_length - 1, False) if config_type == TYPE_SAMPLING_FREQUENCIES: sampling_frequencies = value elif config_type == TYPE_FRAME_DURATION: frame_duration = value elif config_type == TYPE_OCTETS_PER_FRAME: octets_per_frame = value else: print("Unknown config type") length -= (config_length + 1) # Ignore metadata metadata_length, packet = unpack_data(packet, 1, False) packet = unpack_data(packet, metadata_length, True) for count in range(num_of_bis): bis_index, packet = unpack_data(packet, 1, False) broadcast_map[adv_handle].bis_index_list.append(bis_index) length, packet = unpack_data(packet, 1, False) if len(packet) < length: print("Invalid level 3 codec information length") return while length > 0: config_length, packet = unpack_data(packet, 1, False) config_type, packet = unpack_data(packet, 1, False) value, packet = unpack_data(packet, config_length - 1, False) if config_type == TYPE_CHANNEL_ALLOCATION: channel_allocation = value else: print("Ignored config type") length -= (config_length + 1) broadcast_map[adv_handle].bis[bis_index].sampling_frequencies = sampling_frequencies broadcast_map[adv_handle].bis[bis_index].frame_duration = frame_duration broadcast_map[adv_handle].bis[bis_index].octets_per_frame = octets_per_frame broadcast_map[adv_handle].bis[bis_index].channel_allocation = channel_allocation return packet def debug_print(log): global packet_number print("#" + str(packet_number) + ": " + log) Loading @@ -303,7 +413,7 @@ def unpack_data(data, byte, ignore): return value, data[byte:] def parse_command_packet(packet): def parse_command_packet(packet, timestamp): opcode, packet = unpack_data(packet, 2, False) if opcode == OPCODE_HCI_CREATE_CIS: debug_print("OPCODE_HCI_CREATE_CIS") Loading @@ -330,9 +440,96 @@ def parse_command_packet(packet): debug_print("Invalid cmd length") return cis_handle, packet = unpack_data(packet, 2, False) acl_handle = cis_acl_map[cis_handle] dump_audio_data_to_file(acl_handle) iso_handle, packet = unpack_data(packet, 2, False) # CIS stream if iso_handle in cis_acl_map: acl_handle = cis_acl_map[iso_handle] dump_cis_audio_data_to_file(acl_handle) # To Do: BIS stream elif iso_handle in bis_stream_map: dump_bis_audio_data_to_file(iso_handle) elif opcode == OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA: debug_print("OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA") length, packet = unpack_data(packet, 1, False) if length != len(packet): debug_print("Invalid cmd length") return if length < 21: debug_print("Ignored. Not basic audio announcement") return adv_hdl, packet = unpack_data(packet, 1, False) #ignore operation, advertising_data_length packet = unpack_data(packet, 2, True) length, packet = unpack_data(packet, 1, False) if length != len(packet): debug_print("Invalid AD element length") return ad_type, packet = unpack_data(packet, 1, False) service, packet = unpack_data(packet, 2, False) if ad_type != AD_TYPE_SERVICE_DATA_16_BIT or service != BASIC_AUDIO_ANNOUNCEMENT_SERVICE: debug_print("Ignored. Not basic audio announcement") return packet = parse_big_codec_information(adv_hdl, packet) elif opcode == OPCODE_LE_CREATE_BIG: debug_print("OPCODE_LE_CREATE_BIG") length, packet = unpack_data(packet, 1, False) if length != len(packet) and length < 31: debug_print("Invalid Create BIG command length") return big_handle, packet = unpack_data(packet, 1, False) adv_handle, packet = unpack_data(packet, 1, False) big_adv_map[big_handle] = adv_handle elif opcode == OPCODE_LE_SETUP_ISO_DATA_PATH: debug_print("OPCODE_LE_SETUP_ISO_DATA_PATH") length, packet = unpack_data(packet, 1, False) if len(packet) != length: debug_print("Invalid LE SETUP ISO DATA PATH command length") return iso_handle, packet = unpack_data(packet, 2, False) if iso_handle in bis_stream_map: bis_stream_map[iso_handle].start_time = timestamp def parse_event_packet(packet): event_code, packet = unpack_data(packet, 1, False) if event_code != EVENT_CODE_LE_META_EVENT: return length, packet = unpack_data(packet, 1, False) if len(packet) != length: print("Invalid LE mata event length") return subevent_code, packet = unpack_data(packet, 1, False) if subevent_code != SUBEVENT_CODE_LE_CREATE_BIG_COMPLETE: return status, packet = unpack_data(packet, 1, False) if status != 0x00: debug_print("Create_BIG failed") return big_handle, packet = unpack_data(packet, 1, False) if big_handle not in big_adv_map: print("Invalid BIG handle") return adv_handle = big_adv_map[big_handle] # Ignore, we don't care these parameter packet = unpack_data(packet, 15, True) num_of_bis, packet = unpack_data(packet, 1, False) for count in range(num_of_bis): bis_handle, packet = unpack_data(packet, 2, False) bis_index = broadcast_map[adv_handle].bis_index_list[count] broadcast_map[adv_handle].bis_index_handle_map[bis_index] = bis_handle bis_stream_map[bis_handle] = broadcast_map[adv_handle].bis[bis_index] def convert_time_str(timestamp): Loading @@ -348,7 +545,7 @@ def convert_time_str(timestamp): return full_str_format def dump_audio_data_to_file(acl_handle): def dump_cis_audio_data_to_file(acl_handle): if debug_enable: connection_map[acl_handle].dump() file_name = "" Loading Loading @@ -389,20 +586,20 @@ def dump_audio_data_to_file(acl_handle): break if connection_map[acl_handle].input_dump != []: debug_print("Dump input...") debug_print("Dump unicast input...") f = open(file_name + "_input.bin", 'wb') if add_header == True: generate_header(f, connection_map[acl_handle]) generate_header(f, connection_map[acl_handle], True) arr = bytearray(connection_map[acl_handle].input_dump) f.write(arr) f.close() connection_map[acl_handle].input_dump = [] if connection_map[acl_handle].output_dump != []: debug_print("Dump output...") debug_print("Dump unicast output...") f = open(file_name + "_output.bin", 'wb') if add_header == True: generate_header(f, connection_map[acl_handle]) generate_header(f, connection_map[acl_handle], True) arr = bytearray(connection_map[acl_handle].output_dump) f.write(arr) f.close() Loading @@ -411,6 +608,51 @@ def dump_audio_data_to_file(acl_handle): return def dump_bis_audio_data_to_file(iso_handle): if debug_enable: bis_stream_map[iso_handle].dump() file_name = "broadcast" sf_case = { SAMPLE_FREQUENCY_8000: "8000", SAMPLE_FREQUENCY_11025: "11025", SAMPLE_FREQUENCY_16000: "16000", SAMPLE_FREQUENCY_22050: "22050", SAMPLE_FREQUENCY_24000: "24000", SAMPLE_FREQUENCY_32000: "32000", SAMPLE_FREQUENCY_44100: "44100", SAMPLE_FREQUENCY_48000: "48000", SAMPLE_FREQUENCY_88200: "88200", SAMPLE_FREQUENCY_96000: "96000", SAMPLE_FREQUENCY_176400: "176400", SAMPLE_FREQUENCY_192000: "192000", SAMPLE_FREQUENCY_384000: "284000" } file_name += ("_sf" + sf_case[bis_stream_map[iso_handle].sampling_frequencies]) fd_case = {FRAME_DURATION_7_5: "7_5", FRAME_DURATION_10: "10"} file_name += ("_fd" + fd_case[bis_stream_map[iso_handle].frame_duration]) al_case = { AUDIO_LOCATION_MONO: "mono", AUDIO_LOCATION_LEFT: "left", AUDIO_LOCATION_RIGHT: "right", AUDIO_LOCATION_CENTER: "center" } file_name += ("_" + al_case[bis_stream_map[iso_handle].channel_allocation]) file_name += ("_frame" + str(bis_stream_map[iso_handle].octets_per_frame)) file_name += ("_" + convert_time_str(bis_stream_map[iso_handle].start_time)) if bis_stream_map[iso_handle].output_dump != []: debug_print("Dump broadcast output...") f = open(file_name + "_output.bin", 'wb') if add_header == True: generate_header(f, bis_stream_map[iso_handle], False) arr = bytearray(bis_stream_map[iso_handle].output_dump) f.write(arr) f.close() bis_stream_map[iso_handle].output_dump = [] return def parse_acl_packet(packet, flags, timestamp): # Check the minimum acl length, HCI leader (4 bytes) # + L2CAP header (4 bytes) Loading Loading @@ -441,8 +683,8 @@ def parse_acl_packet(packet, flags, timestamp): def parse_iso_packet(packet, flags): cis_handle, packet = unpack_data(packet, 2, False) cis_handle &= 0x0EFF iso_handle, packet = unpack_data(packet, 2, False) iso_handle &= 0x0EFF iso_data_load_length, packet = unpack_data(packet, 2, False) if iso_data_load_length != len(packet): debug_print("Invalid iso data load length") Loading @@ -457,13 +699,18 @@ def parse_iso_packet(packet, flags): debug_print("Invalid iso sdu length") return acl_handle = cis_acl_map[cis_handle] # CIS stream if iso_handle in cis_acl_map: acl_handle = cis_acl_map[iso_handle] if flags == SENT: connection_map[acl_handle].output_dump.extend(struct.pack("<H", len(packet))) connection_map[acl_handle].output_dump.extend(list(packet)) elif flags == RECEIVED: connection_map[acl_handle].input_dump.extend(struct.pack("<H", len(packet))) connection_map[acl_handle].input_dump.extend(list(packet)) elif iso_handle in bis_stream_map: bis_stream_map[iso_handle].output_dump.extend(struct.pack("<H", len(packet))) bis_stream_map[iso_handle].output_dump.extend(list(packet)) def parse_next_packet(btsnoop_file): Loading @@ -490,10 +737,10 @@ def parse_next_packet(btsnoop_file): return False packet_handle = { COMMADN_PACKET: (lambda x, y, z: parse_command_packet(x)), COMMADN_PACKET: (lambda x, y, z: parse_command_packet(x, z)), ACL_PACKET: (lambda x, y, z: parse_acl_packet(x, y, z)), SCO_PACKET: (lambda x, y, z: None), EVENT_PACKET: (lambda x, y, z: None), EVENT_PACKET: (lambda x, y, z: parse_event_packet(x)), ISO_PACKET: (lambda x, y, z: parse_iso_packet(x, y)) } packet_handle.get(type, lambda x, y, z: None)(packet, flags, timestamp) Loading Loading @@ -535,7 +782,10 @@ def main(): break for handle in connection_map.keys(): dump_audio_data_to_file(handle) dump_cis_audio_data_to_file(handle) for handle in bis_stream_map.keys(): dump_bis_audio_data_to_file(handle) if __name__ == "__main__": Loading