Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit eee72e37 authored by Rahul Sabnis's avatar Rahul Sabnis Committed by Android Build Coastguard Worker
Browse files

Add a timeout for generating local oob data

Tag: #feature
Bug: 245770306
Test: system/gd/cert/run --clean --sl4a_sl4a OobPairingTest
Ignore-AOSP-First: Merging into QPR branch first
Change-Id: If935583dab725ea41585e04e67aeb6e283e23683
(cherry picked from commit 3ae98d68)
Merged-In: If935583dab725ea41585e04e67aeb6e283e23683
parent e0dbe91d
Loading
Loading
Loading
Loading
+18 −1
Original line number Diff line number Diff line
@@ -169,6 +169,7 @@ public class AdapterService extends Service {
    private static final int MIN_OFFLOADED_FILTERS = 10;
    private static final int MIN_OFFLOADED_SCAN_STORAGE_BYTES = 1024;
    private static final Duration PENDING_SOCKET_HANDOFF_TIMEOUT = Duration.ofMinutes(1);
    private static final Duration GENERATE_LOCAL_OOB_DATA_TIMEOUT = Duration.ofSeconds(2);

    private final Object mEnergyInfoLock = new Object();
    private int mStackReportedState;
@@ -3999,15 +4000,31 @@ public class AdapterService extends Service {
        if (mOobDataCallbackQueue.peek() != null) {
            try {
                callback.onError(BluetoothStatusCodes.ERROR_ANOTHER_ACTIVE_OOB_REQUEST);
                return;
            } catch (RemoteException e) {
                Log.e(TAG, "Failed to make callback", e);
            }
            return;
        }
        mOobDataCallbackQueue.offer(callback);
        mHandler.postDelayed(() -> removeFromOobDataCallbackQueue(callback),
                GENERATE_LOCAL_OOB_DATA_TIMEOUT.toMillis());
        generateLocalOobDataNative(transport);
    }

    private synchronized void removeFromOobDataCallbackQueue(IBluetoothOobDataCallback callback) {
        if (callback == null) {
            return;
        }

        if (mOobDataCallbackQueue.peek() == callback) {
            try {
                mOobDataCallbackQueue.poll().onError(BluetoothStatusCodes.ERROR_UNKNOWN);
            } catch (RemoteException e) {
                Log.e(TAG, "Failed to make OobDataCallback to remove callback from queue", e);
            }
        }
    }

    /* package */ synchronized void notifyOobDataCallback(int transport, OobData oobData) {
        if (mOobDataCallbackQueue.peek() == null) {
            Log.e(TAG, "Failed to make callback, no callback exists");
+38 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import logging

from blueberry.tests.gd.cert.truth import assertThat


class L2cap:

    __l2cap_connection_timeout = 30  #seconds
    __device = None

    def __init__(self, device):
        self.__device = device

    def create_l2cap_le_coc(self, address, psm, secure):
        logging.info("creating l2cap channel with secure=%r and psm %s", secure, psm)
        self.__device.sl4a.bluetoothSocketConnBeginConnectThreadPsm(address, True, psm, secure)

    # Starts listening on the l2cap server socket, returns the psm
    def listen_using_l2cap_coc(self, secure):
        logging.info("Listening for l2cap channel with secure=%r and psm %s", secure, psm)
        self.__device.sl4a.bluetoothSocketConnBeginAcceptThreadPsm(__l2cap_connection_timeout, True, secure)
        return self.__device.sl4a.bluetoothSocketConnGetPsm()
+7 −0
Original line number Diff line number Diff line
@@ -24,6 +24,8 @@ class OobData:
    confirmation = None
    randomizer = None

    ADDRESS_WITH_TYPE_LENGTH = 14

    def __init__(self, address, confirmation, randomizer):
        self.address = address
        self.confirmation = confirmation
@@ -43,3 +45,8 @@ class OobData:
        address_str_octets = address_str_octets[:6]
        address_str_octets.reverse()
        return ":".join(address_str_octets)

    def to_sl4a_address_type(self):
        if len(self.address) != self.ADDRESS_WITH_TYPE_LENGTH:
            return -1
        return self.address.upper()[-2]
+48 −24
Original line number Diff line number Diff line
@@ -25,8 +25,8 @@ from blueberry.tests.sl4a_sl4a.lib.oob_data import OobData
class Security:

    # Events sent from SL4A
    SL4A_EVENT_GENERATED = "GeneratedOobData"
    SL4A_EVENT_ERROR = "ErrorOobData"
    SL4A_EVENT_GENERATE_OOB_DATA_SUCCESS = "GeneratedOobData"
    SL4A_EVENT_GENERATE_OOB_DATA_ERROR = "ErrorOobData"
    SL4A_EVENT_BONDED = "Bonded"
    SL4A_EVENT_UNBONDED = "Unbonded"

@@ -44,15 +44,34 @@ class Security:
        self.__device = device
        self.__device.sl4a.bluetoothStartPairingHelper(True)

    def generate_oob_data(self, transport):
    # Returns a tuple formatted as <statuscode, OobData>. The OobData is
    # populated if the statuscode is 0 (SUCCESS), else it will be None
    def generate_oob_data(self, transport, wait_for_oob_data_callback=True):
        logging.info("Generating local OOB data")
        self.__device.sl4a.bluetoothGenerateLocalOobData(transport)

        if wait_for_oob_data_callback is False:
            return 0, None
        else:
            # Check for oob data generation success
            try:
            event_info = self.__device.ed.pop_event(self.SL4A_EVENT_GENERATED, self.__default_timeout)
                generate_success_event = self.__device.ed.pop_event(self.SL4A_EVENT_GENERATE_OOB_DATA_SUCCESS,
                                                                    self.__default_timeout)
            except queue.Empty as error:
                logging.error("Failed to generate OOB data!")
            return None
        return OobData(event_info["data"]["address_with_type"], event_info["data"]["confirmation"],
                       event_info["data"]["randomizer"])
                # Check if generating oob data failed without blocking
                try:
                    generate_failure_event = self.__device.ed.pop_event(self.SL4A_EVENT_GENERATE_OOB_DATA_FAILURE, 0)
                except queue.Empty as error:
                    logging.error("Failed to generate OOB Data without error code")
                    assertThat(True).isFalse()

                errorcode = generate_failure_event["data"]["Error"]
                logging.info("Generating local oob data failed with error code %d", errorcode)
                return errorcode, None

        return 0, OobData(generate_success_event["data"]["address_with_type"],
                          generate_success_event["data"]["confirmation"], generate_success_event["data"]["randomizer"])

    def ensure_device_bonded(self):
        bond_state = None
@@ -65,14 +84,18 @@ class Security:
        logging.info("Bonded: %s", bond_state["data"]["bonded_state"])
        assertThat(bond_state["data"]["bonded_state"]).isEqualTo(True)

    def create_bond_out_of_band(self, oob_data):
    def create_bond_out_of_band(self, oob_data, wait_for_device_bonded=True):
        assertThat(oob_data).isNotNone()
        address = oob_data.to_sl4a_address()
        self.__device.sl4a.bluetoothCreateBondOutOfBand(address, self.TRANSPORT_LE, oob_data.confirmation,
                                                        oob_data.randomizer)
        address_type = oob_data.to_sl4a_address_type()
        logging.info("Bonding OOB with %s and address type=%s", address, address_type)
        self.__device.sl4a.bluetoothCreateLeBondOutOfBand(address, oob_data.confirmation, oob_data.randomizer,
                                                          address_type)

        if wait_for_device_bonded:
            self.ensure_device_bonded()

    def create_bond_numeric_comparison(self, address, transport=TRANSPORT_LE):
    def create_bond_numeric_comparison(self, address, transport=TRANSPORT_LE, wait_for_device_bonded=True):
        assertThat(address).isNotNone()
        if transport == self.TRANSPORT_LE:
            self.__device.sl4a.bluetoothLeBond(address)
@@ -87,15 +110,16 @@ class Security:
            self.remove_bond(device["address"])

    def remove_bond(self, address):
        self.__device.sl4a.bluetoothUnbond(address)
        if self.__device.sl4a.bluetoothUnbond(address):
            bond_state = None
            try:
                bond_state = self.__device.ed.pop_event(self.SL4A_EVENT_UNBONDED, self.__default_timeout)
            except queue.Empty as error:
                logging.error("Failed to get bond event!")

            assertThat(bond_state).isNotNone()
            assertThat(bond_state["data"]["bonded_state"]).isEqualTo(False)
        else:
            logging.info("remove_bond: Bluetooth Device with address: %s does not exist", address)

    def close(self):
        self.remove_all_bonded_devices()
+30 −16
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ import io
import logging
import os
import queue
import time

from blueberry.tests.gd.cert.context import get_current_context
from blueberry.tests.gd.cert.truth import assertThat
@@ -72,18 +73,29 @@ class OobPairingTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass):
        self.dut_scanner_.stop_scanning()
        self.cert_advertiser_.stop_advertising()

    def __create_le_bond_oob_single_sided(self, wait_for_oob_data=True, wait_for_device_bonded=True):
        oob_data = self.cert_security_.generate_oob_data(Security.TRANSPORT_LE, wait_for_oob_data)
        if wait_for_oob_data:
            assertThat(oob_data[0]).isEqualTo(0)
            assertThat(oob_data[1]).isNotNone()
        self.dut_security_.create_bond_out_of_band(oob_data[1], wait_for_device_bonded)

    def __create_le_bond_oob_double_sided(self, wait_for_oob_data=True, wait_for_device_bonded=True):
        # Genearte OOB data on DUT, but we don't use it
        self.dut_security_.generate_oob_data(Security.TRANSPORT_LE, wait_for_oob_data)
        self.__create_le_bond_oob_single_sided(wait_for_oob_data, wait_for_device_bonded)

    def test_classic_generate_local_oob_data(self):
        oob_data = self.dut_security_.generate_oob_data(Security.TRANSPORT_BREDR)
        assertThat(oob_data).isNotNone()
        assertThat(oob_data[0]).isEqualTo(0)
        assertThat(oob_data[1]).isNotNone()
        oob_data = self.dut_security_.generate_oob_data(Security.TRANSPORT_BREDR)
        assertThat(oob_data).isNotNone()
        assertThat(oob_data[0]).isEqualTo(0)
        assertThat(oob_data[1]).isNotNone()

    def test_classic_generate_local_oob_data_stress(self):
        for i in range(1, 20):
            oob_data = self.dut_security_.generate_oob_data(Security.TRANSPORT_BREDR)
            assertThat(oob_data).isNotNone()
            oob_data = self.dut_security_.generate_oob_data(Security.TRANSPORT_BREDR)
            assertThat(oob_data).isNotNone()
            self.test_classic_generate_local_oob_data()

    def test_le_generate_local_oob_data(self):
        oob_data = self.dut_security_.generate_oob_data(Security.TRANSPORT_LE)
@@ -93,23 +105,25 @@ class OobPairingTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass):

    def test_le_generate_local_oob_data_stress(self):
        for i in range(1, 20):
            oob_data = self.dut_security_.generate_oob_data(Security.TRANSPORT_LE)
            assertThat(oob_data).isNotNone()
            oob_data = self.cert_security_.generate_oob_data(Security.TRANSPORT_LE)
            assertThat(oob_data).isNotNone()
            self.test_le_generate_local_oob_data()

    def test_le_bond_oob(self):
        oob_data = self.cert_security_.generate_oob_data(Security.TRANSPORT_LE)
        assertThat(oob_data).isNotNone()
        self.dut_security_.create_bond_out_of_band(oob_data)
    def test_le_bond(self):
        self.__create_le_bond_oob_single_sided()

    def test_le_bond_oob_stress(self):
        for i in range(0, 10):
            logging.info("Stress #%d" % i)
            self.test_le_bond_oob()
            self.__create_le_bond_oob_single_sided()
            self.dut_security_.remove_all_bonded_devices()
            self.cert_security_.remove_all_bonded_devices()

    def test_le_generate_local_oob_data_after_le_bond_oob(self):
        self.test_le_bond_oob()
        self.__create_le_bond_oob_single_sided()
        self.test_le_generate_local_oob_data()

    def test_le_generate_oob_data_while_bonding(self):
        self.__create_le_bond_oob_double_sided(True, False)
        self.dut_security_.generate_oob_data(Security.TRANSPORT_LE, False)
        for i in range(0, 10):
            oob_data = self.dut_security_.generate_oob_data(Security.TRANSPORT_LE, True)
            logging.info("OOB Data came back with code: %d", oob_data[0])