Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7cae0f83 authored by Martin Brabham's avatar Martin Brabham
Browse files

Blueberry2: Add SL4A Abstractions

 - Advertiser
 - Scanner
 - Security

Bug: 236301249
Test: mma -j $(nproc)
Test: system/gd/cert/run --clean --sl4a_sl4a
Tag: #stability
Ignore-AOSP-First: Need this in tm-dev stat and can't put it in AOSP yet
Change-Id: I88f366fb43b99d89e3e194d4b0fcf3bea0c5f1a6
parent b4aafdd1
Loading
Loading
Loading
Loading
+81 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import binascii
import logging
import queue

from blueberry.facade import common_pb2 as common
from blueberry.tests.gd.cert.closable import Closable
from blueberry.tests.gd.cert.closable import safeClose
from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_advertise_objects
from blueberry.tests.gd_sl4a.lib.bt_constants import adv_succ
from blueberry.tests.gd_sl4a.lib.bt_constants import ble_advertise_settings_modes
from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test


class LeAdvertiser(Closable):

    is_advertising = False
    device = None
    default_timeout = 10  # seconds
    advertise_callback = None
    advertise_data = None
    advertise_settings = None

    def __init__(self, device):
        self.device = device

    def __wait_for_event(self, expected_event_name):
        try:
            event_info = self.device.ed.pop_event(expected_event_name, self.default_timeout)
            logging.info(event_info)
        except queue.Empty as error:
            logging.error("Failed to find event: %s", expected_event_name)
            return False
        return True

    def advertise_rpa_public_extended_pdu(self, name="SL4A Device"):
        if self.is_advertising:
            logging.info("Already advertising!")
            return
        self.is_advertising = True
        self.device.sl4a.bleSetScanSettingsLegacy(False)
        self.device.sl4a.bleSetAdvertiseSettingsIsConnectable(True)
        self.device.sl4a.bleSetAdvertiseDataIncludeDeviceName(True)
        self.device.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency'])
        self.device.sl4a.bleSetAdvertiseSettingsOwnAddressType(common.RANDOM_DEVICE_ADDRESS)
        self.advertise_callback, self.advertise_data, self.advertise_settings = generate_ble_advertise_objects(
            self.device.sl4a)
        self.device.sl4a.bleStartBleAdvertising(self.advertise_callback, self.advertise_data, self.advertise_settings)

        # Wait for SL4A cert to start advertising
        assertThat(self.__wait_for_event(adv_succ.format(self.advertise_callback))).isTrue()
        logging.info("Advertising started")

    def get_local_advertising_name(self):
        return self.device.sl4a.bluetoothGetLocalName()

    def stop_advertising(self):
        if self.is_advertising:
            logging.info("Stopping advertisement")
            self.device.sl4a.bleStopBleAdvertising(self.advertise_callback)
            self.is_advertising = False

    def close(self):
        self.stop_advertising()
        self.device = None
+168 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import logging
import queue

from blueberry.facade import common_pb2 as common
from blueberry.tests.gd.cert.closable import Closable
from blueberry.tests.gd.cert.closable import safeClose
from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects
from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_modes
from blueberry.tests.gd_sl4a.lib.bt_constants import scan_result
from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test


class LeScanner(Closable):

    is_scanning = False
    device = None
    filter_list = None
    scan_settings = None
    scan_callback = None

    def __init__(self, device):
        self.device = device

    def __wait_for_scan_result_event(self, expected_event_name, timeout=60):
        try:
            event_info = self.device.ed.pop_event(expected_event_name, timeout)
        except queue.Empty as error:
            logging.error("Could not find scan result event: %s", expected_event_name)
            return None
        return event_info['data']['Result']['deviceInfo']['address']

    def scan_for_address_expect_none(self, address, addr_type):
        if self.is_scanning:
            print("Already scanning!")
            return
        self.is_scanning = True
        logging.info("Start scanning for identity address {} or type {}".format(address, addr_type))
        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
        self.device.sl4a.bleSetScanSettingsLegacy(False)
        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
        expected_event_name = scan_result.format(self.scan_callback)

        # Start scanning on SL4A DUT
        self.device.sl4a.bleSetScanFilterDeviceAddressAndType(address, addr_type)
        self.device.sl4a.bleBuildScanFilter(self.filter_list)
        self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback)

        # Verify that scan result is received on SL4A DUT
        advertising_address = self.__wait_for_scan_result_event(expected_event_name, 1)
        assertThat(advertising_address).isNone()
        logging.info("Filter advertisement with address {}".format(advertising_address))

    def scan_for_address(self, address, addr_type):
        if self.is_scanning:
            print("Already scanning!")
            return
        self.is_scanning = True
        logging.info("Start scanning for identity address {} or type {}".format(address, addr_type))
        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
        self.device.sl4a.bleSetScanSettingsLegacy(False)
        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
        expected_event_name = scan_result.format(self.scan_callback)

        # Start scanning on SL4A DUT
        self.device.sl4a.bleSetScanFilterDeviceAddressAndType(address, addr_type)
        self.device.sl4a.bleBuildScanFilter(self.filter_list)
        self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback)

        # Verify that scan result is received on SL4A DUT
        advertising_address = self.__wait_for_scan_result_event(expected_event_name)
        assertThat(advertising_address).isNotNone()
        logging.info("Filter advertisement with address {}".format(advertising_address))

    def scan_for_address_with_irk(self, address, addr_type, irk):
        if self.is_scanning:
            print("Already scanning!")
            return
        self.is_scanning = True
        logging.info("Start scanning for identity address {} or type {} using irk {}".format(address, addr_type, irk))
        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
        self.device.sl4a.bleSetScanSettingsLegacy(False)
        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
        expected_event_name = scan_result.format(self.scan_callback)

        # Start scanning on SL4A DUT
        self.device.sl4a.bleSetScanFilterDeviceAddressTypeAndIrkHexString(address, addr_type, irk)
        self.device.sl4a.bleBuildScanFilter(self.filter_list)
        self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback)

        # Verify that scan result is received on SL4A DUT
        advertising_address = self.__wait_for_scan_result_event(expected_event_name)
        assertThat(advertising_address).isNotNone()
        logging.info("Filter advertisement with address {}".format(advertising_address))

    def scan_for_address_with_irk_pending_intent(self, address, addr_type, irk):
        if self.is_scanning:
            print("Already scanning!")
            return
        self.is_scanning = True
        logging.info("Start scanning for identity address {} or type {} using irk {}".format(address, addr_type, irk))
        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
        self.device.sl4a.bleSetScanSettingsLegacy(False)
        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
        # Hard code here since callback index iterates and will cause this to fail if ran
        # Second as the impl in SL4A sends this since it's a single callback for broadcast.
        expected_event_name = "BleScan1onScanResults"

        # Start scanning on SL4A DUT
        self.device.sl4a.bleSetScanFilterDeviceAddressTypeAndIrkHexString(address, addr_type, irk)
        self.device.sl4a.bleBuildScanFilter(self.filter_list)
        self.device.sl4a.bleStartBleScanPendingIntent(self.filter_list, self.scan_settings)

        # Verify that scan result is received on SL4A DUT
        advertising_address = self.__wait_for_scan_result_event(expected_event_name)
        assertThat(advertising_address).isNotNone()
        logging.info("Filter advertisement with address {}".format(advertising_address))

    def scan_for_name(self, name):
        if self.is_scanning:
            print("Already scanning!")
            return
        self.is_scanning = True
        logging.info("Start scanning for name {}".format(name))
        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
        self.device.sl4a.bleSetScanSettingsLegacy(False)
        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
        expected_event_name = scan_result.format(1)

        # Start scanning on SL4A DUT
        self.device.sl4a.bleSetScanFilterDeviceName(name)
        self.device.sl4a.bleBuildScanFilter(self.filter_list)
        self.device.sl4a.bleStartBleScanPendingIntent(self.filter_list, self.scan_settings)

        # Verify that scan result is received on SL4A DUT
        advertising_address = self.__wait_for_scan_result_event(expected_event_name)
        assertThat(advertising_address).isNotNone()
        logging.info("Filter advertisement with address {}".format(advertising_address))
        return advertising_address

    def stop_scanning(self):
        """
        Warning: no java callback registered for this
        """
        if self.is_scanning:
            logging.info("Stopping scan")
            self.device.sl4a.bleStopBleScan(self.scan_callback)
            self.is_scanning = False

    def close(self):
        self.stop_scanning()
        self.device = None
+45 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.


class OobData:
    """
    This represents the data generated from the device
    """

    address = None
    confirmation = None
    randomizer = None

    def __init__(self, address, confirmation, randomizer):
        self.address = address
        self.confirmation = confirmation
        self.randomizer = randomizer

    def to_sl4a_address(self):
        oob_address = self.address.upper()
        address_str_octets = []
        i = 1
        buf = ""
        for c in oob_address:
            buf += c
            if i % 2 == 0:
                address_str_octets.append(buf)
                buf = ""
            i += 1
        address_str_octets = address_str_octets[:6]
        address_str_octets.reverse()
        return ":".join(address_str_octets)
+102 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import queue
import logging

from blueberry.tests.gd.cert.closable import Closable
from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.sl4a_sl4a.lib.oob_data import OobData


class Security:

    # Events sent from SL4A
    SL4A_EVENT_GENERATED = "GeneratedOobData"
    SL4A_EVENT_ERROR = "ErrorOobData"
    SL4A_EVENT_BONDED = "Bonded"
    SL4A_EVENT_UNBONDED = "Unbonded"

    # Matches tBT_TRANSPORT
    # Used Strings because ints were causing gRPC problems
    TRANSPORT_AUTO = "0"
    TRANSPORT_BREDR = "1"
    TRANSPORT_LE = "2"

    __default_timeout = 10  # seconds
    __default_bonding_timeout = 30  # seconds
    __device = None

    def __init__(self, device):
        self.__device = device
        self.__device.sl4a.bluetoothStartPairingHelper(True)

    def generate_oob_data(self, transport):
        self.__device.sl4a.bluetoothGenerateLocalOobData(transport)
        try:
            event_info = self.__device.ed.pop_event(self.SL4A_EVENT_GENERATED, self.__default_timeout)
        except queue.Empty as error:
            logging.error("Failed to generate OOB data!")
            return None
        return OobData(event_info["data"]["address_with_type"], event_info["data"]["confirmation"],
                       event_info["data"]["randomizer"])

    def ensure_device_bonded(self):
        bond_state = None
        try:
            bond_state = self.__device.ed.pop_event(self.SL4A_EVENT_BONDED, self.__default_bonding_timeout)
        except queue.Empty as error:
            logging.error("Failed to get bond event!")

        assertThat(bond_state).isNotNone()
        logging.info("Bonded: %s", bond_state["data"]["bonded_state"])
        assertThat(bond_state["data"]["bonded_state"]).isEqualTo(True)

    def create_bond_out_of_band(self, oob_data):
        assertThat(oob_data).isNotNone()
        address = oob_data.to_sl4a_address()
        self.__device.sl4a.bluetoothCreateBondOutOfBand(address, self.TRANSPORT_LE, oob_data.confirmation,
                                                        oob_data.randomizer)
        self.ensure_device_bonded()

    def create_bond_numeric_comparison(self, address, transport=TRANSPORT_LE):
        assertThat(address).isNotNone()
        if transport == self.TRANSPORT_LE:
            self.__device.sl4a.bluetoothLeBond(address)
        else:
            self.__device.sl4a.bluetoothBond(address)
        self.ensure_device_bonded()

    def remove_all_bonded_devices(self):
        bonded_devices = self.__device.sl4a.bluetoothGetBondedDevices()
        for device in bonded_devices:
            self.remove_bond(device["address"])

    def remove_bond(self, address):
        self.__device.sl4a.bluetoothUnbond(address)
        bond_state = None
        try:
            bond_state = self.__device.ed.pop_event(self.SL4A_EVENT_UNBONDED, self.__default_timeout)
        except queue.Empty as error:
            logging.error("Failed to get bond event!")

        assertThat(bond_state).isNotNone()
        assertThat(bond_state["data"]["bonded_state"]).isEqualTo(False)

    def close(self):
        self.remove_all_bonded_devices()
        self.__device.sl4a.bluetoothStartPairingHelper(False)
        self.__device = None
+34 −0
Original line number Diff line number Diff line
@@ -19,10 +19,14 @@ import os
import traceback
from functools import wraps

from blueberry.tests.gd.cert.closable import safeClose
from blueberry.tests.gd.cert.context import get_current_context
from blueberry.tests.gd_sl4a.lib.ble_lib import BleLib
from blueberry.tests.gd_sl4a.lib.ble_lib import disable_bluetooth
from blueberry.tests.gd_sl4a.lib.ble_lib import enable_bluetooth
from blueberry.tests.sl4a_sl4a.lib.le_advertiser import LeAdvertiser
from blueberry.tests.sl4a_sl4a.lib.le_scanner import LeScanner
from blueberry.tests.sl4a_sl4a.lib.security import Security
from blueberry.utils.mobly_sl4a_utils import setup_sl4a
from blueberry.utils.mobly_sl4a_utils import teardown_sl4a
from grpc import RpcError
@@ -35,6 +39,16 @@ from mobly.controllers.android_device_lib.adb import AdbError

class Sl4aSl4aBaseTestClass(BaseTestClass):

    # DUT
    dut_advertiser_ = None
    dut_scanner_ = None
    dut_security_ = None

    # CERT
    cert_advertiser_ = None
    cert_scanner_ = None
    cert_security_ = None

    SUBPROCESS_WAIT_TIMEOUT_SECONDS = 10

    def setup_class(self):
@@ -94,9 +108,29 @@ class Sl4aSl4aBaseTestClass(BaseTestClass):
    def setup_test(self):
        self.setup_device_for_test(self.dut)
        self.setup_device_for_test(self.cert)
        self.dut_advertiser_ = LeAdvertiser(self.dut)
        self.dut_scanner_ = LeScanner(self.dut)
        self.dut_security_ = Security(self.dut)
        self.cert_advertiser_ = LeAdvertiser(self.cert)
        self.cert_scanner_ = LeScanner(self.cert)
        self.cert_security_ = Security(self.cert)
        return True

    def teardown_test(self):
        # Go ahead and remove everything before turning off the stack
        safeClose(self.dut_advertiser_)
        safeClose(self.dut_scanner_)
        safeClose(self.dut_security_)
        safeClose(self.cert_advertiser_)
        safeClose(self.cert_scanner_)
        safeClose(self.cert_security_)
        self.dut_advertiser_ = None
        self.dut_scanner_ = None
        self.dut_security_ = None
        self.cert_advertiser_ = None
        self.cert_scanner_ = None
        self.cert_security_ = None

        # Make sure BLE is disabled and Bluetooth is disabled after test
        self.dut.sl4a.bluetoothDisableBLE()
        disable_bluetooth(self.dut.sl4a, self.dut.ed)