Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 67d4b509 authored by Rahul Sabnis's avatar Rahul Sabnis Committed by Automerger Merge Worker
Browse files

Merge changes from topic "generate_oob_timeout-tm-qpr-dev" into tm-qpr-dev am: 8d1fcb03

parents 56b23c2a 8d1fcb03
Loading
Loading
Loading
Loading
+18 −1
Original line number Diff line number Diff line
@@ -169,6 +169,7 @@ public class AdapterService extends Service {
    private static final int MIN_OFFLOADED_FILTERS = 10;
    private static final int MIN_OFFLOADED_SCAN_STORAGE_BYTES = 1024;
    private static final Duration PENDING_SOCKET_HANDOFF_TIMEOUT = Duration.ofMinutes(1);
    private static final Duration GENERATE_LOCAL_OOB_DATA_TIMEOUT = Duration.ofSeconds(2);

    private final Object mEnergyInfoLock = new Object();
    private int mStackReportedState;
@@ -4172,15 +4173,31 @@ public class AdapterService extends Service {
        if (mOobDataCallbackQueue.peek() != null) {
            try {
                callback.onError(BluetoothStatusCodes.ERROR_ANOTHER_ACTIVE_OOB_REQUEST);
                return;
            } catch (RemoteException e) {
                Log.e(TAG, "Failed to make callback", e);
            }
            return;
        }
        mOobDataCallbackQueue.offer(callback);
        mHandler.postDelayed(() -> removeFromOobDataCallbackQueue(callback),
                GENERATE_LOCAL_OOB_DATA_TIMEOUT.toMillis());
        generateLocalOobDataNative(transport);
    }

    private synchronized void removeFromOobDataCallbackQueue(IBluetoothOobDataCallback callback) {
        if (callback == null) {
            return;
        }

        if (mOobDataCallbackQueue.peek() == callback) {
            try {
                mOobDataCallbackQueue.poll().onError(BluetoothStatusCodes.ERROR_UNKNOWN);
            } catch (RemoteException e) {
                Log.e(TAG, "Failed to make OobDataCallback to remove callback from queue", e);
            }
        }
    }

    /* package */ synchronized void notifyOobDataCallback(int transport, OobData oobData) {
        if (mOobDataCallbackQueue.peek() == null) {
            Log.e(TAG, "Failed to make callback, no callback exists");
+38 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import logging

from blueberry.tests.gd.cert.truth import assertThat


class L2cap:

    __l2cap_connection_timeout = 30  #seconds
    __device = None

    def __init__(self, device):
        self.__device = device

    def create_l2cap_le_coc(self, address, psm, secure):
        logging.info("creating l2cap channel with secure=%r and psm %s", secure, psm)
        self.__device.sl4a.bluetoothSocketConnBeginConnectThreadPsm(address, True, psm, secure)

    # Starts listening on the l2cap server socket, returns the psm
    def listen_using_l2cap_coc(self, secure):
        logging.info("Listening for l2cap channel with secure=%r and psm %s", secure, psm)
        self.__device.sl4a.bluetoothSocketConnBeginAcceptThreadPsm(__l2cap_connection_timeout, True, secure)
        return self.__device.sl4a.bluetoothSocketConnGetPsm()
+8 −4
Original line number Diff line number Diff line
@@ -49,7 +49,7 @@ class LeScanner(Closable):
    def scan_for_address_expect_none(self, address, addr_type):
        if self.is_scanning:
            print("Already scanning!")
            return
            return None
        self.is_scanning = True
        logging.info("Start scanning for identity address {} or type {}".format(address, addr_type))
        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
@@ -66,11 +66,12 @@ class LeScanner(Closable):
        advertising_address = self.__wait_for_scan_result_event(expected_event_name, 1)
        assertThat(advertising_address).isNone()
        logging.info("Filter advertisement with address {}".format(advertising_address))
        return advertising_address

    def scan_for_address(self, address, addr_type):
        if self.is_scanning:
            print("Already scanning!")
            return
            return None
        self.is_scanning = True
        logging.info("Start scanning for identity address {} or type {}".format(address, addr_type))
        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
@@ -87,11 +88,12 @@ class LeScanner(Closable):
        advertising_address = self.__wait_for_scan_result_event(expected_event_name)
        assertThat(advertising_address).isNotNone()
        logging.info("Filter advertisement with address {}".format(advertising_address))
        return advertising_address

    def scan_for_address_with_irk(self, address, addr_type, irk):
        if self.is_scanning:
            print("Already scanning!")
            return
            return None
        self.is_scanning = True
        logging.info("Start scanning for identity address {} or type {} using irk {}".format(address, addr_type, irk))
        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
@@ -108,11 +110,12 @@ class LeScanner(Closable):
        advertising_address = self.__wait_for_scan_result_event(expected_event_name)
        assertThat(advertising_address).isNotNone()
        logging.info("Filter advertisement with address {}".format(advertising_address))
        return advertising_address

    def scan_for_address_with_irk_pending_intent(self, address, addr_type, irk):
        if self.is_scanning:
            print("Already scanning!")
            return
            return None
        self.is_scanning = True
        logging.info("Start scanning for identity address {} or type {} using irk {}".format(address, addr_type, irk))
        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
@@ -131,6 +134,7 @@ class LeScanner(Closable):
        advertising_address = self.__wait_for_scan_result_event(expected_event_name)
        assertThat(advertising_address).isNotNone()
        logging.info("Filter advertisement with address {}".format(advertising_address))
        return advertising_address

    def scan_for_name(self, name):
        if self.is_scanning:
+7 −0
Original line number Diff line number Diff line
@@ -24,6 +24,8 @@ class OobData:
    confirmation = None
    randomizer = None

    ADDRESS_WITH_TYPE_LENGTH = 14

    def __init__(self, address, confirmation, randomizer):
        self.address = address
        self.confirmation = confirmation
@@ -43,3 +45,8 @@ class OobData:
        address_str_octets = address_str_octets[:6]
        address_str_octets.reverse()
        return ":".join(address_str_octets)

    def to_sl4a_address_type(self):
        if len(self.address) != self.ADDRESS_WITH_TYPE_LENGTH:
            return -1
        return self.address.upper()[-2]
+48 −24
Original line number Diff line number Diff line
@@ -25,8 +25,8 @@ from blueberry.tests.sl4a_sl4a.lib.oob_data import OobData
class Security:

    # Events sent from SL4A
    SL4A_EVENT_GENERATED = "GeneratedOobData"
    SL4A_EVENT_ERROR = "ErrorOobData"
    SL4A_EVENT_GENERATE_OOB_DATA_SUCCESS = "GeneratedOobData"
    SL4A_EVENT_GENERATE_OOB_DATA_ERROR = "ErrorOobData"
    SL4A_EVENT_BONDED = "Bonded"
    SL4A_EVENT_UNBONDED = "Unbonded"

@@ -44,15 +44,34 @@ class Security:
        self.__device = device
        self.__device.sl4a.bluetoothStartPairingHelper(True)

    def generate_oob_data(self, transport):
    # Returns a tuple formatted as <statuscode, OobData>. The OobData is
    # populated if the statuscode is 0 (SUCCESS), else it will be None
    def generate_oob_data(self, transport, wait_for_oob_data_callback=True):
        logging.info("Generating local OOB data")
        self.__device.sl4a.bluetoothGenerateLocalOobData(transport)

        if wait_for_oob_data_callback is False:
            return 0, None
        else:
            # Check for oob data generation success
            try:
            event_info = self.__device.ed.pop_event(self.SL4A_EVENT_GENERATED, self.__default_timeout)
                generate_success_event = self.__device.ed.pop_event(self.SL4A_EVENT_GENERATE_OOB_DATA_SUCCESS,
                                                                    self.__default_timeout)
            except queue.Empty as error:
                logging.error("Failed to generate OOB data!")
            return None
        return OobData(event_info["data"]["address_with_type"], event_info["data"]["confirmation"],
                       event_info["data"]["randomizer"])
                # Check if generating oob data failed without blocking
                try:
                    generate_failure_event = self.__device.ed.pop_event(self.SL4A_EVENT_GENERATE_OOB_DATA_FAILURE, 0)
                except queue.Empty as error:
                    logging.error("Failed to generate OOB Data without error code")
                    assertThat(True).isFalse()

                errorcode = generate_failure_event["data"]["Error"]
                logging.info("Generating local oob data failed with error code %d", errorcode)
                return errorcode, None

        return 0, OobData(generate_success_event["data"]["address_with_type"],
                          generate_success_event["data"]["confirmation"], generate_success_event["data"]["randomizer"])

    def ensure_device_bonded(self):
        bond_state = None
@@ -65,14 +84,18 @@ class Security:
        logging.info("Bonded: %s", bond_state["data"]["bonded_state"])
        assertThat(bond_state["data"]["bonded_state"]).isEqualTo(True)

    def create_bond_out_of_band(self, oob_data):
    def create_bond_out_of_band(self, oob_data, wait_for_device_bonded=True):
        assertThat(oob_data).isNotNone()
        address = oob_data.to_sl4a_address()
        self.__device.sl4a.bluetoothCreateBondOutOfBand(address, self.TRANSPORT_LE, oob_data.confirmation,
                                                        oob_data.randomizer)
        address_type = oob_data.to_sl4a_address_type()
        logging.info("Bonding OOB with %s and address type=%s", address, address_type)
        self.__device.sl4a.bluetoothCreateLeBondOutOfBand(address, oob_data.confirmation, oob_data.randomizer,
                                                          address_type)

        if wait_for_device_bonded:
            self.ensure_device_bonded()

    def create_bond_numeric_comparison(self, address, transport=TRANSPORT_LE):
    def create_bond_numeric_comparison(self, address, transport=TRANSPORT_LE, wait_for_device_bonded=True):
        assertThat(address).isNotNone()
        if transport == self.TRANSPORT_LE:
            self.__device.sl4a.bluetoothLeBond(address)
@@ -87,15 +110,16 @@ class Security:
            self.remove_bond(device["address"])

    def remove_bond(self, address):
        self.__device.sl4a.bluetoothUnbond(address)
        if self.__device.sl4a.bluetoothUnbond(address):
            bond_state = None
            try:
                bond_state = self.__device.ed.pop_event(self.SL4A_EVENT_UNBONDED, self.__default_timeout)
            except queue.Empty as error:
                logging.error("Failed to get bond event!")

            assertThat(bond_state).isNotNone()
            assertThat(bond_state["data"]["bonded_state"]).isEqualTo(False)
        else:
            logging.info("remove_bond: Bluetooth Device with address: %s does not exist", address)

    def close(self):
        self.remove_all_bonded_devices()
Loading