Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 4ef585be authored by Jizheng Chu's avatar Jizheng Chu Committed by Automerger Merge Worker
Browse files

Merge "Project import generated by Copybara." am: 94073b75 am: 24197427

Original change: https://android-review.googlesource.com/c/platform/packages/modules/Bluetooth/+/1922800

Change-Id: Id45e7f25ed2f855efcc46e1a45465e3c5d9083ff
parents 81fb3d8e 24197427
Loading
Loading
Loading
Loading
+131 −121
Original line number Diff line number Diff line
@@ -22,23 +22,27 @@ The config for this derived_bt_target_device in mobileharness is:
import logging
import os
import time
from typing import Any, Dict, Optional

from mobly import asserts
from mobly.controllers.android_device import AndroidDevice
from mobly.signals import ControllerError
from mobly import signals
from mobly.controllers import android_device

# Internal import
from blueberry.utils import android_bluetooth_decorator
from blueberry.utils import bt_constants
from blueberry.utils.android_bluetooth_decorator import AndroidBluetoothDecorator
import blueberry.utils.bt_test_utils as btutils
from blueberry.utils import bt_test_utils as btutils

_CONNECTION_STATE = bt_constants.BluetoothConnectionStatus

ADB_FILE = "rec.pcm"
ADB_PATH = "/sdcard/Music/"
WAVE_FILE_TEMPLATE = "recorded_audio_%s.wav"
ADB_FILE = 'rec.pcm'
ADB_PATH = '/sdcard/Music/'
WAVE_FILE_TEMPLATE = 'recorded_audio_%s.wav'
DEFAULT_WAIT_TIME = 3.0

# A MediaBrowserService implemented in the SL4A app to intercept Media keys and
# commands.
BLUETOOTH_SL4A_AUDIO_SRC_MBS = "BluetoothSL4AAudioSrcMBS"
BLUETOOTH_SL4A_AUDIO_SRC_MBS = 'BluetoothSL4AAudioSrcMBS'

A2DP_HFP_PROFILES = [
    bt_constants.BluetoothProfile.A2DP_SINK,
@@ -53,19 +57,21 @@ class AndroidBtTargetDevice(object):
  hfp and a2dp sink device.
  """

  def __init__(self, config):
  def __init__(self, config: Dict[str, Any]) -> None:
    """Initializes an android hfp device."""
    logging.info("Initializes the android hfp device")
    logging.info('Initializes the android hfp device')
    self.config = config
    self.pri_ad = None
    self.sec_ad = None
    self.serial = config.get("device_id", None)
    self.audio_params = config.get("audio_params", None)
    self.serial = config.get('device_id', None)
    self.audio_params = config.get('audio_params', None)

    if self.serial:
      # self._ad for accessing the device at the end of the test
      self._ad = AndroidDevice(self.serial)
      self._ad = android_device.AndroidDevice(self.serial)
      self.aud = adb_ui_device.AdbUiDevice(self._ad)
      self.pri_ad = AndroidBluetoothDecorator(self._ad)
      self.pri_ad = android_bluetooth_decorator.AndroidBluetoothDecorator(
          self._ad)
      self.pri_ad.init_setup()
      self.pri_ad.sl4a_setup()
      self.sl4a = self._ad.services.sl4a
@@ -75,58 +81,58 @@ class AndroidBtTargetDevice(object):
        self._initialize_audio_params()
    self.avrcp_ready = False

  def __getattr__(self, name):
  def __getattr__(self, name: str) -> Any:
    return getattr(self.pri_ad, name)

  def _disable_profiles(self):
  def _disable_profiles(self) -> None:
    if self.sec_ad is None:
      raise MissingBtClientDeviceError("Please provide sec_ad forsetting"
                                       "profiles")
      raise MissingBtClientDeviceError('Please provide sec_ad forsetting'
                                       'profiles')
    self.set_profiles_policy_off(self.sec_ad, A2DP_HFP_PROFILES)

  def _initialize_audio_params(self):
    self.audio_capture_path = os.path.join(self._ad.log_path, "audio_capture")
  def _initialize_audio_params(self) -> None:
    self.audio_capture_path = os.path.join(self._ad.log_path, 'audio_capture')
    os.makedirs(self.audio_capture_path)
    self.adb_path = os.path.join(ADB_PATH, ADB_FILE)
    self.wave_file_template = os.path.join(self.audio_capture_path,
                                           WAVE_FILE_TEMPLATE)
    self.wave_file_number = 0

  def _verify_pri_ad(self):
  def _verify_pri_ad(self) -> None:
    if not self.pri_ad:
      raise ControllerError("No be target device")
      raise signals.ControllerError('No be target device')

  def clean_up(self):
  def clean_up(self) -> None:
    """Resets Bluetooth and stops all services when the device is destroyed."""
    self.deactivate_ble_pairing_mode()
    self.factory_reset_bluetooth()
    self._ad.services.stop_all()

  def a2dp_sink_connect(self):
  def a2dp_sink_connect(self) -> bool:
    """Establishes the hft connection between self.pri_ad and self.sec_ad."""
    self._verify_pri_ad()
    connected = self.pri_ad.a2dp_sink_connect(self.sec_ad)
    asserts.assert_true(
        connected, "The a2dp sink connection between {} and {} failed".format(
        connected, 'The a2dp sink connection between {} and {} failed'.format(
            self.serial, self.sec_ad.serial))
    self.log.info("The a2dp sink connection between %s and %s succeeded",
    self.log.info('The a2dp sink connection between %s and %s succeeded',
                  self.serial, self.sec_ad.serial)
    return True

  def activate_pairing_mode(self):
  def activate_pairing_mode(self) -> None:
    """Makes the android hfp device discoverable over Bluetooth."""
    self.log.info("Activating the pairing mode of the android target device")
    self.log.info('Activating the pairing mode of the android target device')
    self.pri_ad.activate_pairing_mode()

  def activate_ble_pairing_mode(self):
  def activate_ble_pairing_mode(self) -> None:
    """Activates BLE pairing mode on an AndroidBtTargetDevice."""
    self.pri_ad.activate_ble_pairing_mode()

  def deactivate_ble_pairing_mode(self):
  def deactivate_ble_pairing_mode(self) -> None:
    """Deactivates BLE pairing mode on an AndroidBtTargetDevice."""
    self.pri_ad.deactivate_ble_pairing_mode()

  def add_pri_ad_device(self, pri_ad):
  def add_pri_ad_device(self, pri_ad: android_device.AndroidDevice) -> None:
    """Adds primary android device as bt target device.

    The primary android device should have been initialized with
@@ -142,12 +148,12 @@ class AndroidBtTargetDevice(object):
    self.log = self.pri_ad.log
    self.serial = self.pri_ad.serial
    self.log.info(
        "Adds primary android device with id %s for the bluetooth"
        "connection", pri_ad.serial)
        'Adds primary android device with id %s for the bluetooth'
        'connection', pri_ad.serial)
    if self.audio_params:
      self._initialize_audio_params()

  def add_sec_ad_device(self, sec_ad):
  def add_sec_ad_device(self, sec_ad: android_device.AndroidDevice) -> None:
    """Adds second android device for bluetooth connection.

    The second android device should have sl4a service acitvated.
@@ -156,65 +162,65 @@ class AndroidBtTargetDevice(object):
      sec_ad: the second android device for bluetooth connection.
    """
    self.log.info(
        "Adds second android device with id %s for the bluetooth"
        "connection", sec_ad.serial)
        'Adds second android device with id %s for the bluetooth'
        'connection', sec_ad.serial)
    self.sec_ad = sec_ad
    self.sec_ad_mac_address = self.sec_ad.sl4a.bluetoothGetLocalAddress()

  def answer_phone_call(self):
  def answer_phone_call(self) -> bool:
    """Answers an incoming phone call."""
    if not self.is_hfp_connected():
      self.hfp_connect()
    # Make sure the device is in ringing state.
    if not self.wait_for_call_state(
        bt_constants.CALL_STATE_RINGING, bt_constants.CALL_STATE_TIMEOUT_SEC):
      raise ControllerError(
          "Timed out after %ds waiting for the device %s to be ringing state "
          "before anwsering the incoming phone call." %
      raise signals.ControllerError(
          'Timed out after %ds waiting for the device %s to be ringing state '
          'before anwsering the incoming phone call.' %
          (bt_constants.CALL_STATE_TIMEOUT_SEC, self.serial))
    self.log.info("Answers the incoming phone call from hf phone %s for %s",
    self.log.info('Answers the incoming phone call from hf phone %s for %s',
                  self.mac_address, self.sec_ad_mac_address)
    return self.sl4a.bluetoothHfpClientAcceptCall(self.sec_ad_mac_address)

  def call_volume_down(self):
  def call_volume_down(self) -> None:
    """Lowers the volume."""
    current_volume = self.mbs.getVoiceCallVolume()
    if current_volume > 0:
      change_volume = current_volume - 1
      self.log.debug("Set voice call volume from %d to %d." %
      self.log.debug('Set voice call volume from %d to %d.' %
                     (current_volume, change_volume))
      self.mbs.setVoiceCallVolume(change_volume)

  def call_volume_up(self):
  def call_volume_up(self) -> None:
    """Raises the volume."""
    current_volume = self.mbs.getVoiceCallVolume()
    if current_volume < self.mbs.getVoiceCallMaxVolume():
      change_volume = current_volume + 1
      self.log.debug("Set voice call volume from %d to %d." %
      self.log.debug('Set voice call volume from %d to %d.' %
                     (current_volume, change_volume))
      self.mbs.setVoiceCallVolume(change_volume)

  def disconnect_all(self):
  def disconnect_all(self) -> None:
    self._disable_profiles()

  def factory_reset_bluetooth(self):
  def factory_reset_bluetooth(self) -> None:
    """Factory resets Bluetooth on the android hfp device."""
    self.log.info("Factory resets Bluetooth on the android target device")
    self.log.info('Factory resets Bluetooth on the android target device')
    self.pri_ad.factory_reset_bluetooth()

  def get_bluetooth_mac_address(self):
  def get_bluetooth_mac_address(self) -> str:
    """Gets Bluetooth mac address of this android_bt_device."""
    self.log.info("Getting Bluetooth mac address for AndroidBtTargetDevice.")
    self.log.info('Getting Bluetooth mac address for AndroidBtTargetDevice.')
    mac_address = self.sl4a.bluetoothGetLocalAddress()
    self.log.info("Bluetooth mac address of AndroidBtTargetDevice: %s",
    self.log.info('Bluetooth mac address of AndroidBtTargetDevice: %s',
                  mac_address)
    return mac_address

  def get_audio_params(self):
  def get_audio_params(self) -> Optional[Dict[str, str]]:
    """Gets audio params from the android_bt_target_device."""
    return self.audio_params

  def get_new_wave_file_path(self):
  def get_new_wave_file_path(self) -> str:
    """Gets a new wave file path for the audio capture."""
    wave_file_path = self.wave_file_template % self.wave_file_number
    while os.path.exists(wave_file_path):
@@ -226,27 +232,27 @@ class AndroidBtTargetDevice(object):
    """Gets unread messages from the connected device (MSE)."""
    self.sl4a.mapGetUnreadMessages(self.sec_ad_mac_address)

  def hangup_phone_call(self):
  def hangup_phone_call(self) -> bool:
    """Hangs up an ongoing phone call."""
    if not self.is_hfp_connected():
      self.hfp_connect()
    self.log.info("Hangs up the phone call from hf phone %s for %s",
    self.log.info('Hangs up the phone call from hf phone %s for %s',
                  self.mac_address, self.sec_ad_mac_address)
    return self.sl4a.bluetoothHfpClientTerminateAllCalls(
        self.sec_ad_mac_address)

  def hfp_connect(self):
  def hfp_connect(self) -> bool:
    """Establishes the hft connection between self.pri_ad and self.sec_ad."""
    self._verify_pri_ad()
    connected = self.pri_ad.hfp_connect(self.sec_ad)
    asserts.assert_true(
        connected, "The hfp connection between {} and {} failed".format(
        connected, 'The hfp connection between {} and {} failed'.format(
            self.serial, self.sec_ad.serial))
    self.log.info("The hfp connection between %s and %s succeed", self.serial,
    self.log.info('The hfp connection between %s and %s succeed', self.serial,
                  self.sec_ad.serial)
    return connected

  def init_ambs_for_avrcp(self):
  def init_ambs_for_avrcp(self) -> bool:
    """Initializes media browser service for avrcp.

    This is required to be done before running any of the passthrough
@@ -271,7 +277,7 @@ class AndroidBtTargetDevice(object):
    if not self.is_a2dp_sink_connected():
      self.a2dp_sink_connect()

    self.sec_ad.log.info("Starting AvrcpMediaBrowserService")
    self.sec_ad.log.info('Starting AvrcpMediaBrowserService')
    self.sec_ad.sl4a.bluetoothMediaPhoneSL4AMBSStart()

    time.sleep(DEFAULT_WAIT_TIME)
@@ -279,9 +285,9 @@ class AndroidBtTargetDevice(object):
    # Check if the media session "BluetoothSL4AAudioSrcMBS" is active on sec_ad.
    active_sessions = self.sec_ad.sl4a.bluetoothMediaGetActiveMediaSessions()
    if BLUETOOTH_SL4A_AUDIO_SRC_MBS not in active_sessions:
      raise ControllerError("Failed to start AvrcpMediaBrowserService.")
      raise signals.ControllerError('Failed to start AvrcpMediaBrowserService.')

    self.log.info("Connecting to A2dp media browser service")
    self.log.info('Connecting to A2dp media browser service')
    self.sl4a.bluetoothMediaConnectToCarMBS()

    # TODO(user) Wait for an event back instead of sleep
@@ -289,52 +295,52 @@ class AndroidBtTargetDevice(object):
    self.avrcp_ready = True
    return self.avrcp_ready

  def is_avrcp_ready(self):
  def is_avrcp_ready(self) -> bool:
    """Checks if the pri_ad and sec_ad are ready for avrcp."""
    self._verify_pri_ad()
    if self.avrcp_ready:
      return True
    active_sessions = self.sl4a.bluetoothMediaGetActiveMediaSessions()
    if not active_sessions:
      self.log.info("The device is not avrcp ready")
      self.log.info('The device is not avrcp ready')
      self.avrcp_ready = False
    else:
      self.log.info("The device is avrcp ready")
      self.log.info('The device is avrcp ready')
      self.avrcp_ready = True
    return self.avrcp_ready

  def is_hfp_connected(self):
  def is_hfp_connected(self) -> _CONNECTION_STATE:
    """Checks if the pri_ad and sec_ad are hfp connected."""
    self._verify_pri_ad()
    if self.sec_ad is None:
      raise MissingBtClientDeviceError("The sec_ad was not added")
      raise MissingBtClientDeviceError('The sec_ad was not added')
    return self.sl4a.bluetoothHfpClientGetConnectionStatus(
        self.sec_ad_mac_address)

  def is_a2dp_sink_connected(self):
  def is_a2dp_sink_connected(self) -> _CONNECTION_STATE:
    """Checks if the pri_ad and sec_ad are hfp connected."""
    self._verify_pri_ad()
    if self.sec_ad is None:
      raise MissingBtClientDeviceError("The sec_ad was not added")
      raise MissingBtClientDeviceError('The sec_ad was not added')
    return self.sl4a.bluetoothA2dpSinkGetConnectionStatus(
        self.sec_ad_mac_address)

  def last_number_dial(self):
  def last_number_dial(self) -> None:
    """Redials last outgoing phone number."""
    if not self.is_hfp_connected():
      self.hfp_connect()
    self.log.info("Redials last number from hf phone %s for %s",
    self.log.info('Redials last number from hf phone %s for %s',
                  self.mac_address, self.sec_ad_mac_address)
    self.sl4a.bluetoothHfpClientDial(self.sec_ad_mac_address, None)

  def map_connect(self):
  def map_connect(self) -> None:
    """Establishes the map connection between self.pri_ad and self.sec_ad."""
    self._verify_pri_ad()
    connected = self.pri_ad.map_connect(self.sec_ad)
    asserts.assert_true(
        connected, "The map connection between {} and {} failed".format(
        connected, 'The map connection between {} and {} failed'.format(
            self.serial, self.sec_ad.serial))
    self.log.info("The map connection between %s and %s succeed", self.serial,
    self.log.info('The map connection between %s and %s succeed', self.serial,
                  self.sec_ad.serial)

  def map_disconnect(self) -> None:
@@ -349,87 +355,91 @@ class AndroidBtTargetDevice(object):
          'Failed to terminate the MAP connection with the device "%s".' %
          self.sec_ad_mac_address)

  def pbap_connect(self):
  def pbap_connect(self) -> None:
    """Establishes the pbap connection between self.pri_ad and self.sec_ad."""
    connected = self.pri_ad.pbap_connect(self.sec_ad)
    asserts.assert_true(
        connected, "The pbap connection between {} and {} failed".format(
        connected, 'The pbap connection between {} and {} failed'.format(
            self.serial, self.sec_ad.serial))
    self.log.info("The pbap connection between %s and %s succeed", self.serial,
    self.log.info('The pbap connection between %s and %s succeed', self.serial,
                  self.sec_ad.serial)

  def pause(self):
  def pause(self) -> None:
    """Sends Avrcp pause command."""
    self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PAUSE, self.sec_ad)

  def play(self):
  def play(self) -> None:
    """Sends Avrcp play command."""
    self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PLAY, self.sec_ad)

  def power_on(self):
  def power_on(self) -> bool:
    """Turns the Bluetooth on the android bt garget device."""
    self.log.info("Turns on the bluetooth")
    self.log.info('Turns on the bluetooth')
    return self.sl4a.bluetoothToggleState(True)

  def power_off(self):
  def power_off(self) -> bool:
    """Turns the Bluetooth off the android bt garget device."""
    self.log.info("Turns off the bluetooth")
    self.log.info('Turns off the bluetooth')
    return self.sl4a.bluetoothToggleState(False)

  def route_call_audio(self, connect=False):
  def route_call_audio(self, connect: bool = False) -> None:
    """Routes call audio during a call."""
    if not self.is_hfp_connected():
      self.hfp_connect()
    self.log.info(
        "Routes call audio during a call from hf phone %s for %s "
        "audio connection %s after routing", self.mac_address,
        'Routes call audio during a call from hf phone %s for %s '
        'audio connection %s after routing', self.mac_address,
        self.sec_ad_mac_address, connect)
    if connect:
      self.sl4a.bluetoothHfpClientConnectAudio(self.sec_ad_mac_address)
    else:
      self.sl4a.bluetoothHfpClientDisconnectAudio(self.sec_ad_mac_address)

  def reject_phone_call(self):
  def reject_phone_call(self) -> bool:
    """Rejects an incoming phone call."""
    if not self.is_hfp_connected():
      self.hfp_connect()
    # Make sure the device is in ringing state.
    if not self.wait_for_call_state(
        bt_constants.CALL_STATE_RINGING, bt_constants.CALL_STATE_TIMEOUT_SEC):
      raise ControllerError(
          "Timed out after %ds waiting for the device %s to be ringing state "
          "before rejecting the incoming phone call." %
      raise signals.ControllerError(
          'Timed out after %ds waiting for the device %s to be ringing state '
          'before rejecting the incoming phone call.' %
          (bt_constants.CALL_STATE_TIMEOUT_SEC, self.serial))
    self.log.info("Rejects the incoming phone call from hf phone %s for %s",
    self.log.info('Rejects the incoming phone call from hf phone %s for %s',
                  self.mac_address, self.sec_ad_mac_address)
    return self.sl4a.bluetoothHfpClientRejectCall(self.sec_ad_mac_address)

  def set_audio_params(self, audio_params):
  def set_audio_params(self, audio_params: Optional[Dict[str, str]]) -> None:
    """Sets audio params to the android_bt_target_device."""
    self.audio_params = audio_params

  def track_previous(self):
  def track_previous(self) -> None:
    """Sends Avrcp skip prev command."""
    self.send_media_passthrough_cmd(
        bt_constants.CMD_MEDIA_SKIP_PREV, self.sec_ad)

  def track_next(self):
  def track_next(self) -> None:
    """Sends Avrcp skip next command."""
    self.send_media_passthrough_cmd(
        bt_constants.CMD_MEDIA_SKIP_NEXT, self.sec_ad)

  def start_audio_capture(self):
    """Starts the audio capture over adb."""
    if self.audio_params is None:
      raise MissingAudioParamsError("Missing audio params for captureing audio")
  def start_audio_capture(self, duration_sec: int = 20) -> None:
    """Starts the audio capture over adb.

    Args:
      duration_sec: int, Number of seconds to record audio, 20 secs as default.
    """
    if 'duration' in self.audio_params.keys():
      duration_sec = self.audio_params['duration']
    if not self.is_a2dp_sink_connected():
      self.a2dp_sink_connect()
    cmd = "ap2f --usage 1 --start --duration {} --target {}".format(
        self.audio_params["duration"], self.adb_path)
    self.log.info("Starts capturing audio with adb shell command %s", cmd)
    cmd = 'ap2f --usage 1 --start --duration {} --target {}'.format(
        duration_sec, self.adb_path)
    self.log.info('Starts capturing audio with adb shell command %s', cmd)
    self.adb.shell(cmd)

  def stop_audio_capture(self):
  def stop_audio_capture(self) -> str:
    """Stops the audio capture and stores it in wave file.

    Returns:
@@ -439,68 +449,68 @@ class AndroidBtTargetDevice(object):
      MissingAudioParamsError: when self.audio_params is None
    """
    if self.audio_params is None:
      raise MissingAudioParamsError("Missing audio params for captureing audio")
      raise MissingAudioParamsError('Missing audio params for capturing audio')
    if not self.is_a2dp_sink_connected():
      self.a2dp_sink_connect()
    adb_pull_args = [self.adb_path, self.audio_capture_path]
    self.log.info("start adb -s %s pull %s", self.serial, adb_pull_args)
    self.log.info('start adb -s %s pull %s', self.serial, adb_pull_args)
    self._ad.adb.pull(adb_pull_args)
    pcm_file_path = os.path.join(self.audio_capture_path, ADB_FILE)
    self.log.info("delete the recored file %s", self.adb_path)
    self._ad.adb.shell("rm {}".format(self.adb_path))
    self.log.info('delete the recored file %s', self.adb_path)
    self._ad.adb.shell('rm {}'.format(self.adb_path))
    wave_file_path = self.get_new_wave_file_path()
    self.log.info("convert pcm file %s to wav file %s", pcm_file_path,
    self.log.info('convert pcm file %s to wav file %s', pcm_file_path,
                  wave_file_path)
    btutils.convert_pcm_to_wav(pcm_file_path, wave_file_path, self.audio_params)
    return wave_file_path

  def stop_all_services(self):
  def stop_all_services(self) -> None:
    """Stops all services for the pri_ad device."""
    self.log.info("Stops all services on the android bt target device")
    self.log.info('Stops all services on the android bt target device')
    self._ad.services.stop_all()

  def stop_ambs_for_avrcp(self):
  def stop_ambs_for_avrcp(self) -> None:
    """Stops media browser service for avrcp."""
    if self.is_avrcp_ready():
      self.log.info("Stops avrcp connection")
      self.log.info('Stops avrcp connection')
      self.sec_ad.sl4a.bluetoothMediaPhoneSL4AMBSStop()
      self.avrcp_ready = False

  def stop_voice_dial(self):
  def stop_voice_dial(self) -> None:
    """Stops voice dial."""
    if not self.is_hfp_connected():
      self.hfp_connect()
    self.log.info("Stops voice dial from hf phone %s for %s", self.mac_address,
    self.log.info('Stops voice dial from hf phone %s for %s', self.mac_address,
                  self.sec_ad_mac_address)
    if self.is_hfp_connected():
      self.sl4a.bluetoothHfpClientStopVoiceRecognition(
          self.sec_ad_mac_address)

  def take_bug_report(self,
                      test_name=None,
                      begin_time=None,
                      timeout=300,
                      destination=None):
                      test_name: Optional[str] = None,
                      begin_time: Optional[int] = None,
                      timeout: float = 300,
                      destination: Optional[str] = None) -> None:
    """Wrapper method to capture bugreport on the android bt target device."""
    self._ad.take_bug_report(test_name, begin_time, timeout, destination)

  def voice_dial(self):
  def voice_dial(self) -> None:
    """Triggers voice dial."""
    if not self.is_hfp_connected():
      self.hfp_connect()
    self.log.info("Triggers voice dial from hf phone %s for %s",
    self.log.info('Triggers voice dial from hf phone %s for %s',
                  self.mac_address, self.sec_ad_mac_address)
    if self.is_hfp_connected():
      self.sl4a.bluetoothHfpClientStartVoiceRecognition(
          self.sec_ad_mac_address)

  def log_type(self):
  def log_type(self) -> str:
    """Gets the log type of Android bt target device.

    Returns:
      A string, the log type of Android bt target device.
    """
    return bt_constants.LogType.BLUETOOTH_DEVICE_SIMULATOR
    return bt_constants.LogType.BLUETOOTH_DEVICE_SIMULATOR.value


class BluetoothProfileConnectionError(Exception):
+87 −86

File changed.

Preview size limit exceeded, changes collapsed.

+23 −17

File changed.

Preview size limit exceeded, changes collapsed.

+14 −6
Original line number Diff line number Diff line
@@ -15,7 +15,9 @@ Example MH testbed config for Hostside:
  dimensions:
    device: GrpcBtSyncStub
"""

import subprocess
from typing import Any, Dict, Optional, Tuple

from absl import flags
from absl import logging
@@ -25,6 +27,7 @@ import grpc
from blueberry.grpc.proto import blueberry_device_controller_pb2
from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc


FLAGS = flags.FLAGS
flags.DEFINE_string('server', 'dns:///[::1]:10000', 'server address')

@@ -32,19 +35,22 @@ flags.DEFINE_string('server', 'dns:///[::1]:10000', 'server address')
class GrpcBtSyncMock(object):
  """Generic GRPC device controller."""

  def __init__(self, config):
  def __init__(self, config: Dict[str, str]) -> None:
    """Initialize GRPC object."""
    super(GrpcBtSyncMock, self).__init__()
    self.config = config
    self.mac_address = config['mac_address']

  def __del__(self):
  def __del__(self) -> None:
    # pytype: disable=attribute-error
    self.server_proc.terminate()
    del self.channel_creds
    del self.channel
    del self.stub

  def setup(self):
  def setup(self) -> None:
    """Setup the gRPC server that the sync mock will respond to."""
    # pytype: disable=attribute-error
    server_path = self.get_user_params()['mh_files']['grpc_server'][0]
    logging.info('Start gRPC server: %s', server_path)
    self.server_proc = subprocess.Popen([server_path],
@@ -60,17 +66,19 @@ class GrpcBtSyncMock(object):
    self.stub = blueberry_device_controller_pb2_grpc.BlueberryDeviceControllerStub(
        self.channel)

  def init_setup(self):
  def init_setup(self) -> None:
    logging.info('init setup TO BE IMPLEMENTED')

  def set_target(self, bt_device):
  def set_target(self, bt_device: Any) -> None:
    self._target_device = bt_device

  def pair_and_connect_bluetooth(self, target_mac_address):
  def pair_and_connect_bluetooth(
      self, target_mac_address: str) -> Optional[Tuple[float, float]]:
    """Pair and connect to a peripheral Bluetooth device."""
    request = blueberry_device_controller_pb2.TargetMacAddress(
        mac_address=target_mac_address)
    try:
      # pytype: disable=attribute-error
      response = self.stub.PairAndConnectBluetooth(request)
      logging.info('pair and connect bluetooth response: %s', response)
      if response.error:
+14 −6

File changed.

Preview size limit exceeded, changes collapsed.

Loading