Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 4092299a authored by Christine Hallstrom's avatar Christine Hallstrom Committed by Automerger Merge Worker
Browse files

Merge "Fix connect GATT with IRK and public address" am: 76619597 am: e4dcb258

Original change: https://android-review.googlesource.com/c/platform/packages/modules/Bluetooth/+/2066596



Change-Id: I573cbcd33c428c63b6f1d9040a2467d19328d651
Ignore-AOSP-First: this is an automerge
Signed-off-by: default avatarAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
parents 7d5b999f e4dcb258
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -60,6 +60,7 @@ adv_succ = "BleAdvertise{}onSuccess"
bluetooth_off = "BluetoothStateChangedOff"
bluetooth_on = "BluetoothStateChangedOn"
mtu_changed = "GattConnect{}onMtuChanged"
gatt_connection_state_change = "GattConnect{}onConnectionStateChange"
advertising_set_started = "AdvertisingSet{}onAdvertisingSetStarted"
advertising_set_stopped = "AdvertisingSet{}onAdvertisingSetStopped"
advertising_set_on_own_address_read = "AdvertisingSet{}onOwnAddressRead"
+137 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2021 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import io
import os
import queue
import logging

from blueberry.tests.gd.cert.context import get_current_context
from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.gd_sl4a.lib.bt_constants import adv_succ, ble_advertise_settings_modes, ble_scan_settings_modes, ble_address_types, ble_scan_settings_phys, gatt_connection_state_change, gatt_transport, scan_result
from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects, generate_ble_advertise_objects
from blueberry.tests.sl4a_sl4a.lib.sl4a_sl4a_base_test import Sl4aSl4aBaseTestClass
from blueberry.facade import common_pb2 as common

from mobly.controllers.android_device_lib.adb import AdbError


class GattConnectTest(Sl4aSl4aBaseTestClass):

    def setup_class(self):
        super().setup_class()
        self.default_timeout = 10  # seconds

    def setup_test(self):
        super().setup_test()

    def teardown_test(self):
        super().teardown_test()

    def _wait_for_event(self, expected_event_name, device):
        try:
            event_info = device.ed.pop_event(expected_event_name, self.default_timeout)
            logging.info(event_info)
        except queue.Empty as error:
            logging.error("Failed to find event: %s", expected_event_name)
            return False
        return True

    def _wait_for_scan_result_event(self, expected_event_name, device):
        try:
            event_info = device.ed.pop_event(expected_event_name, self.default_timeout)
        except queue.Empty as error:
            logging.error("Could not find scan result event: %s", expected_event_name)
            return None
        return event_info['data']['Result']['deviceInfo']['address']

    def _get_cert_public_address_and_irk_from_bt_config(self):
        # Pull IRK from SL4A cert side to pass in from SL4A DUT side when scanning
        bt_config_file_path = os.path.join(get_current_context().get_full_output_path(),
                                           "DUT_%s_bt_config.conf" % self.cert.serial)
        try:
            self.cert.adb.pull(["/data/misc/bluedroid/bt_config.conf", bt_config_file_path])
        except AdbError as error:
            logging.error("Failed to pull SL4A cert BT config")
            return False
        logging.debug("Reading SL4A cert BT config")
        with io.open(bt_config_file_path) as f:
            for line in f.readlines():
                stripped_line = line.strip()
                if (stripped_line.startswith("Address")):
                    address_fields = stripped_line.split(' ')
                    # API currently requires public address to be capitalized
                    address = address_fields[2].upper()
                    logging.debug("Found cert address: %s" % address)
                    continue
                if (stripped_line.startswith("LE_LOCAL_KEY_IRK")):
                    irk_fields = stripped_line.split(' ')
                    irk = irk_fields[2]
                    logging.debug("Found cert IRK: %s" % irk)
                    continue

        return address, irk

    def test_scan_connect_unbonded_device_public_address_with_irk(self):
        # Set up SL4A cert side to advertise
        logging.info("Starting advertising")
        self.cert.sl4a.bleSetAdvertiseSettingsIsConnectable(True)
        self.cert.sl4a.bleSetAdvertiseDataIncludeDeviceName(True)
        self.cert.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency'])
        self.cert.sl4a.bleSetAdvertiseSettingsOwnAddressType(common.RANDOM_DEVICE_ADDRESS)
        advertise_callback, advertise_data, advertise_settings = generate_ble_advertise_objects(self.cert.sl4a)
        self.cert.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings)

        # Wait for SL4A cert to start advertising
        assertThat(self._wait_for_event(adv_succ.format(advertise_callback), self.cert)).isTrue()
        logging.info("Advertising started")

        # Pull IRK from SL4A cert side to pass in from SL4A DUT side when scanning
        cert_public_address, irk = self._get_cert_public_address_and_irk_from_bt_config()

        # Set up SL4A DUT side to scan
        addr_type = ble_address_types["public"]
        logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" % (cert_public_address,
                                                                                               addr_type, irk))
        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
        self.dut.sl4a.bleSetScanSettingsLegacy(False)
        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
        expected_event_name = scan_result.format(scan_callback)

        # Start scanning on SL4A DUT
        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrkHexString(cert_public_address, int(addr_type), irk)
        self.dut.sl4a.bleBuildScanFilter(filter_list)
        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
        logging.info("Started scanning")

        # Verify that scan result is received on SL4A DUT
        mac_address = self._wait_for_scan_result_event(expected_event_name, self.dut)
        assertThat(mac_address).isNotNone()
        logging.info("Filter advertisement with address {}".format(mac_address))

        # Stop scanning and try to connect GATT
        self.dut.sl4a.bleStopBleScan(scan_callback)
        gatt_callback = self.dut.sl4a.gattCreateGattCallback()
        bluetooth_gatt = self.dut.sl4a.gattClientConnectGatt(gatt_callback, mac_address, False, gatt_transport['le'],
                                                             False, None)
        assertThat(bluetooth_gatt).isNotNone()

        # Verify that GATT connect event occurs on SL4A DUT
        expected_event_name = gatt_connection_state_change.format(gatt_callback)
        assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue()

        # Test over
        self.cert.sl4a.bleStopBleAdvertising(advertise_callback)
+3 −1
Original line number Diff line number Diff line
@@ -14,10 +14,12 @@
#   See the License for the specific language governing permissions and
#   limitations under the License.

from blueberry.tests.sl4a_sl4a.gatt.gatt_connect_test import GattConnectTest

from mobly import suite_runner
import argparse

ALL_TESTS = []
ALL_TESTS = [GattConnectTest]


def main():
+11 −2
Original line number Diff line number Diff line
@@ -293,7 +293,6 @@ struct le_impl : public bluetooth::hci::LeAddressManagerCallback {
      on_le_connection_canceled_on_pause();
      return;
    }
    // TODO: find out which address and type was used to initiate the connection
    AddressWithType remote_address(address, peer_address_type);
    AddressWithType local_address = le_address_manager_->GetCurrentAddress();
    on_common_le_connection_complete(remote_address);
@@ -364,9 +363,15 @@ struct le_impl : public bluetooth::hci::LeAddressManagerCallback {
      return;
    }
    AddressWithType remote_address(address, peer_address_type);
    if (!peer_resolvable_address.IsEmpty()) {
    // The address added to the connect list is the one that callbacks
    // should be sent for, given that is the address passed in
    // call to BluetoothDevice#connectGatt and tied to app layer callbacks.
    if (!peer_resolvable_address.IsEmpty() &&
        is_device_in_connect_list(AddressWithType(peer_resolvable_address, AddressType::RANDOM_DEVICE_ADDRESS))) {
      LOG_INFO("Using resolvable address");
      remote_address = AddressWithType(peer_resolvable_address, AddressType::RANDOM_DEVICE_ADDRESS);
    }

    on_common_le_connection_complete(remote_address);
    if (status == ErrorCode::UNKNOWN_CONNECTION) {
      if (remote_address.GetAddress() != Address::kEmpty) {
@@ -557,6 +562,10 @@ struct le_impl : public bluetooth::hci::LeAddressManagerCallback {
        address_with_type.ToFilterAcceptListAddressType(), address_with_type.GetAddress());
  }

  bool is_device_in_connect_list(AddressWithType address_with_type) {
    return (connect_list.find(address_with_type) != connect_list.end());
  }

  void remove_device_from_connect_list(AddressWithType address_with_type) {
    if (connect_list.find(address_with_type) == connect_list.end()) {
      LOG_WARN("Device not in acceptlist and cannot be removed:%s", PRIVATE_ADDRESS_WITH_TYPE(address_with_type));
+1 −0
Original line number Diff line number Diff line
@@ -1516,6 +1516,7 @@ void shim::legacy::Acl::OnLeConnectSuccess(

  // Once an le connection has successfully been established
  // the device address is removed from the controller accept list.

  if (IsRpa(address_with_type)) {
    LOG_DEBUG("Connection address is rpa:%s identity_addr:%s",
              PRIVATE_ADDRESS(address_with_type),
+7 −7

File changed.

Contains only whitespace changes.

Loading