Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 5175c792 authored by Vishwanath KM's avatar Vishwanath KM
Browse files

Add tool for live btsnoop logging with Frontline BT sniffer (FTS)

* This script supports Bluetooth Virtual Sniffing via Live Import
  feature of Frontline Bluetooth Sniffer(FTS).

* It uses APIs from LiveImportAPI.dll in FTS path to communicate
  with FTS sniffer and inject HCI packets for live HCI sniffing
parent 5a888f68
Loading
Loading
Loading
Loading
+297 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
###############################################################################################################
#
#  Copyright (C) 2019 Motorola Mobility LLC
#
#  Redistribution and use in source and binary forms, with or without modification, are permitted provided that
#  the following conditions are met:
#
#  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the
#     following disclaimer.
#
#  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and
#     the following disclaimer in the documentation and/or other materials provided with the distribution.
#
#  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or
#     promote products derived from this software without specific prior written permission.
#
#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
#  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
#  PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
#  ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
#  TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
#  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
#  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
#  POSSIBILITY OF SUCH DAMAGE.
#
###############################################################################################################
###############################################################################################################
#
#                             Bluetooth Virtual Sniffing for Android
#
#  This script supports Bluetooth Virtual Sniffing via Live Import Feature of Frontline Bluetooth Sniffer(FTS)
#
#  It extracts the HCI packets from Snoop logs and redirect it to FTS for live HCI sniffing.
#
#  It uses liveimport.ini and LiveImportAPI.dll from FTS path to communicate with FTS sniffer software.
#
#  It works on both Windows and Ubuntu. For Ubuntu, both FTS and Python should be installed on Wine.
#
#  FTS_INI_PATH & FTS_DLL_PATH should be set to absolute path of liveimport.ini and LiveImportAPI.dll of FTS
#
#  Example below - This may change per machine per FTS version in Windows vs Ubuntu (Wine), set accordingly.
#  For Windows
#    FTS_INI_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 13.2\\'
#    FTS_DLL_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 13.2\\Executables\\Core\\'
#
#  For Ubuntu - FTS path recognized by Wine (not Ubuntu path)
#    FTS_INI_PATH = 'C:\\Program Files\\Frontline Test System II\\Frontline 13.2\\'
#    FTS_DLL_PATH = 'C:\\Program Files\\Frontline Test System II\\Frontline 13.2\\Executables\\Core\\'
#
###############################################################################################################

import os
import platform
import socket
import struct
import subprocess
import sys
import time
if sys.version_info[0] >= 3:
    import configparser
else:
    import ConfigParser as configparser

from calendar import timegm
from ctypes import byref, c_bool, c_longlong, CDLL
from _ctypes import FreeLibrary
from datetime import datetime


# Update below to right path corresponding to your machine, FTS version and OS used.
FTS_INI_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 15.12\\'
FTS_DLL_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 15.12\\Executables\\Core\\'


iniName = 'liveimport.ini'
if (platform.architecture()[0] == '32bit'):
    dllName = 'LiveImportAPI.dll'
else:
    dllName = 'LiveImportAPI_x64.dll'

launchFtsCmd = '\"'+FTS_DLL_PATH + 'fts.exe\"' + ' \"/ComProbe Protocol Analysis System=Generic\"' + ' \"/oemkey=Virtual\"'

# Unix Epoch delta since 01/01/1970
FILETIME_EPOCH_DELTA = 116444736000000000
HUNDREDS_OF_NANOSECONDS = 10000000

HOST = 'localhost'
PORT = 8872
SNOOP_ID = 16
SNOOP_HDR = 24


def get_file_time():
    """
    Obtain current time in file time format for display
    """
    date_time = datetime.now()
    file_time = FILETIME_EPOCH_DELTA + (timegm(date_time.timetuple()) * HUNDREDS_OF_NANOSECONDS)
    file_time = file_time + (date_time.microsecond * 10)
    return file_time


def get_connection_string():
    """
    Read ConnectionString from liveimport.ini
    """
    config = configparser.ConfigParser()
    config.read(FTS_INI_PATH + iniName)
    try:
        conn_str = config.get('General', 'ConnectionString')
    except (configparser.NoSectionError, configparser.NoOptionError):
        return None

    return conn_str


def get_configuration_string():
    """
    Read Configuration string from liveimport.ini
    """
    config_str = ''
    config = configparser.ConfigParser()
    config.read(FTS_INI_PATH + iniName)
    try:
        config_items = config.items('Configuration')
    except (configparser.NoSectionError, configparser.NoOptionError):
        return None

    if config_items is not None:
        for item in config_items:
            key, value = item
            config_str += ("%s=%s\n" % (key, value))
        return config_str
    else:
        return None

def check_live_import_connection(live_import):
    """
    Launch FTS app in Virtual Sniffing Mode
    Check if FTS App is ready to start receiving the data.
    If not, wait until 1 min and exit if FTS didn't start.
    """
    is_connection_running = c_bool()
    count = 0

    status = live_import.IsAppReady(byref(is_connection_running))
    if (is_connection_running.value == True):
        print ("FTS is already launched, Start capture if not already started")
        return True

    print("Launching FTS Virtual Sniffing")
    try:
        ftsProcess = subprocess.Popen((launchFtsCmd), stdout=subprocess.PIPE)
    except:
        print("Error in Launching FTS.. exiting")
        return False

    while (is_connection_running.value == False and count < 12):
        status = live_import.IsAppReady(byref(is_connection_running))
        if (status < 0):
            print("Live Import Internal Error %d" %(status))
            return False
        if (is_connection_running.value == False):
            print("Waiting for 5 sec.. Open FTS Virtual Sniffing")
            time.sleep(5)
            count += 1
    if (is_connection_running.value == True):
        print ("FTS is ready to receive the data, Start capture now")
        return True
    else:
        print("FTS Virtual Sniffing didn't start until 1 min.. exiting")
        return False


def init_live_import(conn_str, config_str):
    """
    Load DLL and Initialize the LiveImport module for FTS.
    """
    success = c_bool()
    try:
        live_import = CDLL(FTS_DLL_PATH + dllName)
    except:
        return None

    if live_import is None:
        print("Error: Path to LiveImportAPI.dll is incorrect.. exiting");
        return None

    print(dllName + " loaded successfully")
    result = live_import.InitializeLiveImport(conn_str.encode('ascii', 'ignore'),
                                              config_str.encode('ascii', 'ignore'),
                                              byref(success))
    if (result < 0):
        print("Live Import Init failed")
        return None
    else:
        print("Live Import Init success")
        return live_import


def release_live_import(live_import):
    """
    Cleanup and exit live import module.
    """
    if live_import is not None:
        live_import.ReleaseLiveImport()
        FreeLibrary(live_import._handle)


def main():

    print("Bluetooth Virtual Sniffing for Fluoride")
    connection_str = get_connection_string()
    if connection_str is None:
        print("Error: path to liveimport.ini is incorrect.. exiting")
        exit(0)

    configuration_str = get_configuration_string()
    if configuration_str is None:
        print("Error: path to liveimport.ini is incorrect.. exiting")
        exit(0)

    live_import = init_live_import(connection_str, configuration_str)
    if live_import is None:
        print("Error: Path to LiveImportAPI.dll is incorrect.. exiting")
        exit(0)

    if (check_live_import_connection(live_import) == False):
        release_live_import(live_import)
        exit(0)

    # Wait until the forward socket is ready
    print("Waiting until adb is ready")
    os.system('adb wait-for-device forward tcp:8872 tcp:8872')

    btsnoop_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    btsnoop_sock.connect((HOST, PORT))
    snoop_id = btsnoop_sock.recv(SNOOP_ID)
    if not snoop_id.startswith(b"btsnoop"):
        print("Error: Snoop ID wasn't received.. exiting")
        release_live_import(live_import)
        exit(0)

    while True:
        try:
            snoop_hdr  = btsnoop_sock.recv(SNOOP_HDR)
            if snoop_hdr is not None:
                try:
                    olen, ilen, flags = struct.unpack(">LLL", snoop_hdr[0:12])
                except struct.error:
                    print("Error: Invalid data", repr(snoop_hdr))
                    continue

                file_time = get_file_time()
                timestamp = c_longlong(file_time)

                snoop_data = b''
                while (len(snoop_data) < olen):
                    data_frag = btsnoop_sock.recv(olen - len(snoop_data))
                    if data_frag is not None:
                        snoop_data += data_frag

                print ("Bytes received %d Olen %d ilen %d flags %d" % (len(snoop_data), olen, ilen, flags))
                packet_type = struct.unpack(">B", snoop_data[0:1])[0]
                if packet_type == 1:
                    drf = 1
                    isend = 0
                elif packet_type == 2:
                    drf = 2
                    if (flags & 0x01):
                        isend = 1
                    else:
                        isend = 0
                elif packet_type == 3:
                    drf = 4
                    if (flags & 0x01):
                        isend = 1
                    else:
                        isend = 0
                elif packet_type == 4:
                    drf = 8
                    isend = 1

                result = live_import.SendFrame(olen-1, olen-1, snoop_data[1:olen], drf, isend, timestamp)
                if (result < 0):
                    print("Send frame failed")
        except KeyboardInterrupt:
            print("Cleanup and exit")
            release_live_import(live_import)
            btsnoop_sock.close()
            exit(0)


if __name__ == '__main__':
    main()