Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f7a5af2b authored by weichinweng's avatar weichinweng
Browse files

Hearing Aid Dump Audio Tool: Add dump debug buffer feature

Add dump HearingAid debug buffer feature to let the HearingAid debug
information dump to debug file and make it readable.

Bug: 130781854
Test: ./dump_hearingaid_audio.py -d true btsnoop_hci.log
      ./dump_hearingaid_audio.py -sd true btsnoop_hci.log

Change-Id: I150fd897e6f197d5b853554734926ad613de4e6b
parent 7fd81bc2
Loading
Loading
Loading
Loading
+127 −26
Original line number Diff line number Diff line
@@ -20,7 +20,10 @@
# Generates a valid audio file which can be played using player like smplayer.
#
# Audio File Name Format:
# [PEER_ADDRESS]-[START TIMESTAMP]-[AUDIO_TYPE]-[SAMPLE_RATE].[CODEC]
# [PEER_ADDRESS]-[START_TIMESTAMP]-[AUDIO_TYPE]-[SAMPLE_RATE].[CODEC]
#
# Debug Infomation File Name Format:
# debug_ver_[DEBUG_VERSION]-[PEER_ADDRESS]-[START_TIMESTAMP]-[AUDIO_TYPE]-[SAMPLE_RATE].txt
#
# Player:
# smplayer
@@ -41,15 +44,34 @@ PEER_ADDRESS = "PEER_ADDRESS"
CONNECTION_HANDLE = "CONNECTION_HANDLE"
AUDIO_CONTROL_ATTR_HANDLE = "AUDIO_CONTROL_ATTR_HANDLE"
START = "START"
TIMESTAMP = "TIMESTAMP"
TIMESTAMP_STR_FORMAT = "TIMESTAMP_STR_FORMAT"
TIMESTAMP_TIME_FORMAT = "TIMESTAMP_TIME_FORMAT"
CODEC = "CODEC"
SAMPLE_RATE = "SAMPLE_RATE"
AUDIO_TYPE = "AUDIO_TYPE"
DEBUG_VERSION = "DEBUG_VERSION"
DEBUG_DATA = "DEBUG_DATA"
AUDIO_DATA_B = "AUDIO_DATA_B"

# Debug packet header struct
header_list_str = ["Event Processed",
                   "Number Packet Nacked By Slave",
                   "Number Packet Nacked By Master"]
# Debug frame information structs
data_list_str = ["Event Number",
                 "Overrun",
                 "Underrun",
                 "Skips",
                 "Rendered Audio Frame",
                 "First PDU Option",
                 "Second PDU Option",
                 "Third PDU Option"]

AUDIO_CONTROL_POINT_UUID = "f0d4de7e4a88476c9d9f1937b0996cc0"
SEC_CONVERT = 1000000
folder = None
full_debug = False
simple_debug = False

force_audio_control_attr_handle = None
default_audio_control_attr_handle = 0x0079
@@ -64,6 +86,39 @@ audio_data = {}
# Parse Hearing Aid Packet
#-----------------------------------------------------------------------

def parse_acl_ha_debug_buffer(data, result):
  """This function extracts HA debug buffer"""
  if len(data) < 5:
    return

  version, data = unpack_data(data, 1)
  update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], DEBUG_VERSION, str(version))

  debug_str = result[TIMESTAMP_TIME_FORMAT];
  for p in range(3):
    byte_data, data = unpack_data(data, 1)
    debug_str = debug_str + ", " + header_list_str[p] + "=" + str(byte_data).rjust(3)

  if full_debug:
    debug_str = debug_str + "\n" + "|".join(data_list_str) + "\n"
    while True:
      if len(data) < 7:
        break
      base = 0
      data_list_content = []
      for counter in range(6):
        p = base + counter
        byte_data, data = unpack_data(data, 1)
        if p == 1:
          data_list_content.append(str(byte_data & 0x03).rjust(len(data_list_str[p])))
          data_list_content.append(str((byte_data >> 2) & 0x03).rjust(len(data_list_str[p + 1])))
          data_list_content.append(str((byte_data >> 4) & 0x0f).rjust(len(data_list_str[p + 2])))
          base = 2
        else:
          data_list_content.append(str(byte_data).rjust(len(data_list_str[p])))
      debug_str = debug_str + "|".join(data_list_content) + "\n"

  update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], DEBUG_DATA, debug_str)

def parse_acl_ha_audio_data(data, result):
  """This function extracts HA audio data."""
@@ -118,7 +173,7 @@ def parse_acl_ha_audio_control_cmd(data, result):
    update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
                      START, True)
    update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
                      TIMESTAMP, result[TIMESTAMP])
                      TIMESTAMP_STR_FORMAT, result[TIMESTAMP_STR_FORMAT])
    parse_acl_ha_codec(data, result)
  elif control_cmd == 0x02:
    update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
@@ -188,11 +243,15 @@ def parse_acl_handle(data, result):
    if channel_id <= 0x003F:
      result[CONNECTION_HANDLE] = connection_handle
      parse_acl_opcode(data, result)
    elif result[IS_SENT] and channel_id >= 0x0040 and channel_id <= 0x007F:
    elif channel_id >= 0x0040 and channel_id <= 0x007F:
      result[CONNECTION_HANDLE] = connection_handle
      sdu, data = unpack_data(data, 2)
      if pdu - 2 == sdu:
        if result[IS_SENT]:
          parse_acl_ha_audio_data(data, result)
        else:
          if simple_debug:
            parse_acl_ha_debug_buffer(data, result)


#=======================================================================
@@ -282,7 +341,7 @@ def parse_packet(btsnoop_file):
      return False
    if packet_flag != 2 and drop == 0:
      packet_result[IS_SENT] = (packet_flag == 0)
      packet_result[TIMESTAMP] = convert_time_str(timestamp)
      packet_result[TIMESTAMP_STR_FORMAT], packet_result[TIMESTAMP_TIME_FORMAT] = convert_time_str(timestamp)
      parse_packet_data(packet_data, packet_result)
  else:
    return False
@@ -300,24 +359,42 @@ def dump_audio_data(data):
  file_type = "." + data[CODEC]
  file_name_list = []
  file_name_list.append(data[PEER_ADDRESS])
  file_name_list.append(data[TIMESTAMP])
  file_name_list.append(data[TIMESTAMP_STR_FORMAT])
  file_name_list.append(data[AUDIO_TYPE])
  file_name_list.append(data[SAMPLE_RATE])
  if folder is not None:
    if not os.path.exists(folder):
      os.makedirs(folder)
    file_name = os.path.join(folder, "-".join(file_name_list) + file_type)
    audio_file_name = os.path.join(folder, "-".join(file_name_list) + file_type)
    if data.has_key(DEBUG_VERSION):
      file_prefix = "debug_ver_" + data[DEBUG_VERSION] + "-"
      debug_file_name = os.path.join(folder, file_prefix + "-".join(file_name_list) + ".txt")
  else:
    file_name = "-".join(file_name_list) + file_type
  sys.stdout.write("Start to dump audio file : " + file_name + "\n")
    audio_file_name = "-".join(file_name_list) + file_type
    if data.has_key(DEBUG_VERSION):
      file_prefix = "debug_ver_" + data[DEBUG_VERSION] + "-"
      debug_file_name = file_prefix + "-".join(file_name_list) + ".txt"

  sys.stdout.write("Start to dump Audio File : %s\n" % audio_file_name)
  if data.has_key(AUDIO_DATA_B):
    with open(file_name, "wb+") as g722_file:
      g722_file.write(data[AUDIO_DATA_B])
      sys.stdout.write("Finished to dump Audio File: %s\n\n" % file_name)
    with open(audio_file_name, "wb+") as audio_file:
      audio_file.write(data[AUDIO_DATA_B])
      sys.stdout.write("Finished to dump Audio File: %s\n\n" % audio_file_name)
  else:
    sys.stdout.write("Fail to dump Audio File: %s\n" % file_name)
    sys.stdout.write("Fail to dump Audio File: %s\n" % audio_file_name)
    sys.stdout.write("There isn't any Hearing Aid audio data.\n\n")

  if simple_debug:
    sys.stdout.write("Start to dump audio %s Debug File\n" % audio_file_name)
    if data.has_key(DEBUG_DATA):
      with open(debug_file_name, "wb+") as debug_file:
        debug_file.write(data[DEBUG_DATA])
        sys.stdout.write("Finished to dump Debug File: %s\n\n" % debug_file_name)
    else:
      sys.stdout.write("Fail to dump audio %s Debug File\n" % audio_file_name)
      sys.stdout.write("There isn't any Hearing Aid debug data.\n\n")



def update_audio_data(relate_key, relate_value, key, value):
  """
@@ -328,10 +405,12 @@ def update_audio_data(relate_key, relate_value, key, value):
      CONNECTION_HANDLE: CONNECTION_HANDLE,
      AUDIO_CONTROL_ATTR_HANDLE: AUDIO_CONTROL_ATTR_HANDLE,
      START: True or False,
      TIMESTAMP: START_TIMESTAMP,
      TIMESTAMP_STR_FORMAT: START_TIMESTAMP_STR_FORMAT,
      CODEC: CODEC,
      SAMPLE_RATE: SAMPLE_RATE,
      AUDIO_TYPE: AUDIO_TYPE,
      DEBUG_VERSION: DEBUG_VERSION,
      DEBUG_DATA: DEBUG_DATA,
      AUDIO_DATA_B: AUDIO_DATA_B
    },
    PEER_ADDRESS_2:{
@@ -339,10 +418,12 @@ def update_audio_data(relate_key, relate_value, key, value):
      CONNECTION_HANDLE: CONNECTION_HANDLE,
      AUDIO_CONTROL_ATTR_HANDLE: AUDIO_CONTROL_ATTR_HANDLE,
      START: True or False,
      TIMESTAMP: START_TIMESTAMP,
      TIMESTAMP_STR_FORMAT: START_TIMESTAMP_STR_FORMAT,
      CODEC: CODEC,
      SAMPLE_RATE: SAMPLE_RATE,
      AUDIO_TYPE: AUDIO_TYPE,
      DEBUG_VERSION: DEBUG_VERSION,
      DEBUG_DATA: DEBUG_DATA,
      AUDIO_DATA_B: AUDIO_DATA_B
    }
  }
@@ -368,16 +449,18 @@ def update_audio_data(relate_key, relate_value, key, value):
              # Clear data except PEER_ADDRESS, CONNECTION_HANDLE and
              # AUDIO_CONTROL_ATTR_HANDLE.
              audio_data[i].pop(key, "")
              audio_data[i].pop(TIMESTAMP, "")
              audio_data[i].pop(TIMESTAMP_STR_FORMAT, "")
              audio_data[i].pop(CODEC, "")
              audio_data[i].pop(SAMPLE_RATE, "")
              audio_data[i].pop(AUDIO_TYPE, "")
              audio_data[i].pop(DEBUG_VERSION, "")
              audio_data[i].pop(DEBUG_DATA, "")
              audio_data[i].pop(AUDIO_DATA_B, "")
            elif key == AUDIO_DATA_B:
            elif key == AUDIO_DATA_B or key == DEBUG_DATA:
              if audio_data[i].has_key(START) and audio_data[i][START]:
                if audio_data[i].has_key(AUDIO_DATA_B):
                  ori_audio_data = audio_data[i].pop(AUDIO_DATA_B, "")
                  value = ori_audio_data + value
                if audio_data[i].has_key(key):
                  ori_data = audio_data[i].pop(key, "")
                  value = ori_data + value
              else:
                # Audio doesn't start, don't record.
                return
@@ -423,11 +506,15 @@ def convert_time_str(timestamp):
  """This function converts time to string format."""
  really_timestamp = float(timestamp) / SEC_CONVERT
  local_timestamp = time.localtime(really_timestamp)
  time_str = time.strftime("%m_%d__%H_%M_%S", local_timestamp)
  dt = really_timestamp - long(really_timestamp)
  ms_str = "{0:06}".format(int(round(dt * 1000000)))
  full_time_str = time_str + "_" + ms_str
  return full_time_str

  str_format = time.strftime("%m_%d__%H_%M_%S", local_timestamp)
  full_str_format = str_format + "_" + ms_str

  time_format = time.strftime("%m-%d %H:%M:%S", local_timestamp)
  full_time_format = time_format + "." + ms_str
  return full_str_format, full_time_format


def set_config():
@@ -451,6 +538,12 @@ def set_config():
  argv_parser.add_argument("-a", "--attr-handle",
                           help="force to select audio control attr handle.",
                           dest="audio_control_attr_handle", type=int)
  argv_parser.add_argument("-d", "--debug",
                           help="dump full debug buffer content.",
                           dest="full_debug", default="False")
  argv_parser.add_argument("-sd", "--simple-debug",
                           help="dump debug buffer header content.",
                           dest="simple_debug", default="False")
  arg = argv_parser.parse_args()

  if arg.folder is not None:
@@ -474,7 +567,7 @@ def set_config():
                      arg.connection_handle1)
    if arg.no_start.lower() == "true":
      update_audio_data(PEER_ADDRESS, fake_name, START, True)
      update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP, "Unknown")
      update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP_STR_FORMAT, "Unknown")
      update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec)
      update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown")
      update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown")
@@ -486,7 +579,7 @@ def set_config():
                      arg.connection_handle2)
    if arg.no_start.lower() == "true":
      update_audio_data(PEER_ADDRESS, fake_name, START, True)
      update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP, "Unknown")
      update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP_STR_FORMAT, "Unknown")
      update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec)
      update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown")
      update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown")
@@ -495,6 +588,14 @@ def set_config():
    global force_audio_control_attr_handle
    force_audio_control_attr_handle = arg.audio_control_attr_handle

  global full_debug
  global simple_debug
  if arg.full_debug.lower() == "true":
    full_debug = True
    simple_debug = True
  elif arg.simple_debug.lower() == "true":
    simple_debug = True

  if os.path.isfile(arg.BTSNOOP):
    return arg.BTSNOOP
  else: