Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit b015842d authored by Rahul Sabnis's avatar Rahul Sabnis Committed by Android (Google) Code Review
Browse files

Merge "WIP le l2cap coc sl4a_sl4a tests" into tm-qpr-dev

parents f7974f3c 624b8a23
Loading
Loading
Loading
Loading
+169 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import binascii
import io
import logging
import os
import queue
import time

from blueberry.tests.gd.cert.context import get_current_context
from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.gd_sl4a.lib.bt_constants import ble_address_types
from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test
from blueberry.tests.sl4a_sl4a.lib.security import Security


class LeL2capCoCTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass):

    def __get_cert_public_address_and_irk_from_bt_config(self):
        # Pull IRK from SL4A cert side to pass in from SL4A DUT side when scanning
        bt_config_file_path = os.path.join(get_current_context().get_full_output_path(),
                                           "DUT_%s_bt_config.conf" % self.cert.serial)
        try:
            self.cert.adb.pull(["/data/misc/bluedroid/bt_config.conf", bt_config_file_path])
        except AdbError as error:
            logging.error("Failed to pull SL4A cert BT config")
            return False
        logging.debug("Reading SL4A cert BT config")
        with io.open(bt_config_file_path) as f:
            for line in f.readlines():
                stripped_line = line.strip()
                if (stripped_line.startswith("Address")):
                    address_fields = stripped_line.split(' ')
                    # API currently requires public address to be capitalized
                    address = address_fields[2].upper()
                    logging.debug("Found cert address: %s" % address)
                    continue
                if (stripped_line.startswith("LE_LOCAL_KEY_IRK")):
                    irk_fields = stripped_line.split(' ')
                    irk = irk_fields[2]
                    logging.debug("Found cert IRK: %s" % irk)
                    continue

        return address, irk

    def setup_class(self):
        super().setup_class()

    def setup_test(self):
        assertThat(super().setup_test()).isTrue()

    def teardown_test(self):
        self.dut_scanner_.stop_scanning()
        self.cert_advertiser_.stop_advertising()
        self.dut_security_.remove_all_bonded_devices()
        self.cert_security_.remove_all_bonded_devices()
        super().teardown_test()

    # Scans for the cert device by name. We expect to get back a RPA.
    def __scan_for_cert_by_name(self):
        cert_public_address, irk = self.__get_cert_public_address_and_irk_from_bt_config()
        self.cert_advertiser_.advertise_public_extended_pdu()
        advertising_name = self.cert_advertiser_.get_local_advertising_name()

        # Scan with name and verify we get back a scan result with the RPA
        scan_result_addr = self.dut_scanner_.scan_for_name(advertising_name)
        assertThat(scan_result_addr).isNotNone()
        assertThat(scan_result_addr).isNotEqualTo(cert_public_address)

        return scan_result_addr

    def __scan_for_irk(self):
        cert_public_address, irk = self.__get_cert_public_address_and_irk_from_bt_config()
        rpa_address = self.cert_advertiser_.advertise_public_extended_pdu()
        id_addr = self.dut_scanner_.scan_for_address_with_irk(cert_public_address, ble_address_types["public"], irk)
        self.dut_scanner_.stop_scanning()
        return id_addr

    def __create_le_bond_oob_single_sided(self,
                                          wait_for_oob_data=True,
                                          wait_for_device_bonded=True,
                                          addr=None,
                                          addr_type=ble_address_types["random"]):
        oob_data = self.cert_security_.generate_oob_data(Security.TRANSPORT_LE, wait_for_oob_data)
        if wait_for_oob_data:
            assertThat(oob_data[0]).isEqualTo(0)
            assertThat(oob_data[1]).isNotNone()
        self.dut_security_.create_bond_out_of_band(oob_data[1], addr, addr_type, wait_for_device_bonded)
        return oob_data[1].to_sl4a_address()

    def __create_le_bond_oob_double_sided(self,
                                          wait_for_oob_data=True,
                                          wait_for_device_bonded=True,
                                          addr=None,
                                          addr_type=ble_address_types["random"]):
        # Genearte OOB data on DUT, but we don't use it
        self.dut_security_.generate_oob_data(Security.TRANSPORT_LE, wait_for_oob_data)
        self.__create_le_bond_oob_single_sided(wait_for_oob_data, wait_for_device_bonded, addr, addr_type)

    def __test_le_l2cap_insecure_coc(self):
        logging.info("Testing insecure L2CAP CoC")
        cert_rpa = self.__scan_for_cert_by_name()

        # Listen on an insecure l2cap coc on the cert
        psm = self.cert_l2cap_.listen_using_l2cap_le_coc(False)
        self.dut_l2cap_.create_l2cap_le_coc(cert_rpa, psm, False)

        # Cleanup
        self.dut_scanner_.stop_scanning()
        self.dut_l2cap_.close_l2cap_le_coc_client()
        self.cert_advertiser_.stop_advertising()
        self.cert_l2cap_.close_l2cap_le_coc_server()

    def __test_le_l2cap_secure_coc(self):
        logging.info("Testing secure L2CAP CoC")
        cert_rpa = self.__create_le_bond_oob_single_sided()

        # Listen on an secure l2cap coc on the cert
        psm = self.cert_l2cap_.listen_using_l2cap_le_coc(True)
        self.dut_l2cap_.create_l2cap_le_coc(cert_rpa, psm, True)

        # Cleanup
        self.dut_scanner_.stop_scanning()
        self.dut_l2cap_.close_l2cap_le_coc_client()
        self.cert_advertiser_.stop_advertising()
        self.cert_l2cap_.close_l2cap_le_coc_server()
        self.dut_security_.remove_all_bonded_devices()
        self.cert_security_.remove_all_bonded_devices()

    def __test_le_l2cap_secure_coc_after_irk_scan(self):
        logging.info("Testing secure L2CAP CoC after IRK scan")
        cert_config_addr, irk = self.__get_cert_public_address_and_irk_from_bt_config()
        cert_id_addr = self.__scan_for_irk()

        assertThat(cert_id_addr).isEqualTo(cert_config_addr)
        self.__create_le_bond_oob_single_sided(True, True, cert_id_addr, ble_address_types["public"])
        self.cert_advertiser_.stop_advertising()
        self.__test_le_l2cap_secure_coc()

    def __test_secure_le_l2cap_coc_stress(self):
        for i in range(0, 10):
            self.__test_le_l2cap_secure_coc()

    def __test_insecure_le_l2cap_coc_stress(self):
        for i in range(0, 10):
            self.__test_le_l2cap_insecure_coc()

    def __test_le_l2cap_coc_stress(self):
        #for i in range (0, 10):
        self.__test_le_l2cap_insecure_coc()
        self.__test_le_l2cap_secure_coc()

    def __test_secure_le_l2cap_coc_after_irk_scan_stress(self):
        for i in range(0, 10):
            self.__test_le_l2cap_secure_coc_after_irk_scan()
+36 −4
Original line number Diff line number Diff line
@@ -15,24 +15,56 @@
#   limitations under the License.

import logging
import queue

from blueberry.tests.gd.cert.truth import assertThat


class L2cap:

    __l2cap_connection_timeout = 30  #seconds
    __l2cap_connection_timeout = 10  #seconds
    __device = None
    __active_client_coc = False
    __active_server_coc = False

    def __init__(self, device):
        self.__device = device

    def __wait_for_event(self, expected_event_name):
        try:
            event_info = self.__device.ed.pop_event(expected_event_name, self.__l2cap_connection_timeout)
            logging.info(event_info)
        except queue.Empty as error:
            logging.error("Failed to find event: %s", expected_event_name)
            return False
        return True

    def create_l2cap_le_coc(self, address, psm, secure):
        logging.info("creating l2cap channel with secure=%r and psm %s", secure, psm)
        self.__device.sl4a.bluetoothSocketConnBeginConnectThreadPsm(address, True, psm, secure)
        assertThat(self.__wait_for_event("BluetoothSocketConnectSuccess")).isTrue()
        self.__active_client_coc = True

    # Starts listening on the l2cap server socket, returns the psm
    def listen_using_l2cap_coc(self, secure):
        logging.info("Listening for l2cap channel with secure=%r and psm %s", secure, psm)
        self.__device.sl4a.bluetoothSocketConnBeginAcceptThreadPsm(__l2cap_connection_timeout, True, secure)
    def listen_using_l2cap_le_coc(self, secure):
        logging.info("Listening for l2cap channel with secure=%r", secure)
        self.__device.sl4a.bluetoothSocketConnBeginAcceptThreadPsm(self.__l2cap_connection_timeout, True, secure)
        self.__active_server_coc = True
        return self.__device.sl4a.bluetoothSocketConnGetPsm()

    def close_l2cap_le_coc_client(self):
        if self.__active_client_coc:
            logging.info("Closing LE L2CAP CoC Client")
            self.__device.sl4a.bluetoothSocketConnKillConnThread()
            self.__active_client_coc = False

    def close_l2cap_le_coc_server(self):
        if self.__active_server_coc:
            logging.info("Closing LE L2CAP CoC Server")
            self.__device.sl4a.bluetoothSocketConnEndAcceptThread()
            self.__active_server_coc = False

    def close(self):
        self.close_l2cap_le_coc_client()
        self.close_l2cap_le_coc_server()
        self.__device == None
+1 −1
Original line number Diff line number Diff line
@@ -49,4 +49,4 @@ class OobData:
    def to_sl4a_address_type(self):
        if len(self.address) != self.ADDRESS_WITH_TYPE_LENGTH:
            return -1
        return self.address.upper()[-2]
        return self.address.upper()[-1]
+22 −7
Original line number Diff line number Diff line
@@ -37,7 +37,7 @@ class Security:
    TRANSPORT_LE = "2"

    __default_timeout = 10  # seconds
    __default_bonding_timeout = 30  # seconds
    __default_bonding_timeout = 60  # seconds
    __device = None

    def __init__(self, device):
@@ -70,6 +70,7 @@ class Security:
                logging.info("Generating local oob data failed with error code %d", errorcode)
                return errorcode, None

        logging.info("OOB ADDR with Type: %s", generate_success_event["data"]["address_with_type"])
        return 0, OobData(generate_success_event["data"]["address_with_type"],
                          generate_success_event["data"]["confirmation"], generate_success_event["data"]["randomizer"])

@@ -84,13 +85,27 @@ class Security:
        logging.info("Bonded: %s", bond_state["data"]["bonded_state"])
        assertThat(bond_state["data"]["bonded_state"]).isEqualTo(True)

    def create_bond_out_of_band(self, oob_data, wait_for_device_bonded=True):
    def create_bond_out_of_band(self,
                                oob_data,
                                bt_device_object_address=None,
                                bt_device_object_address_type=-1,
                                wait_for_device_bonded=True):
        assertThat(oob_data).isNotNone()
        address = oob_data.to_sl4a_address()
        address_type = oob_data.to_sl4a_address_type()
        logging.info("Bonding OOB with %s and address type=%s", address, address_type)
        self.__device.sl4a.bluetoothCreateLeBondOutOfBand(address, oob_data.confirmation, oob_data.randomizer,
                                                          address_type)
        oob_data_address = oob_data.to_sl4a_address()
        oob_data_address_type = oob_data.to_sl4a_address_type()

        # If a BT Device object address isn't specified, default to the oob data
        # address and type
        if bt_device_object_address is None:
            bt_device_object_address = oob_data_address
            bt_device_object_address_type = oob_data_address_type

        logging.info("Bonding OOB with device addr=%s, device addr type=%s, oob addr=%s, oob addr type=%s",
                     bt_device_object_address, bt_device_object_address_type, oob_data_address, oob_data_address_type)
        bond_start = self.__device.sl4a.bluetoothCreateLeBondOutOfBand(
            oob_data_address, oob_data_address_type, oob_data.confirmation, oob_data.randomizer,
            bt_device_object_address, bt_device_object_address_type)
        assertThat(bond_start).isTrue()

        if wait_for_device_bonded:
            self.ensure_device_bonded()
+8 −0
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@ from blueberry.tests.gd_sl4a.lib.ble_lib import disable_bluetooth
from blueberry.tests.gd_sl4a.lib.ble_lib import enable_bluetooth
from blueberry.tests.sl4a_sl4a.lib.le_advertiser import LeAdvertiser
from blueberry.tests.sl4a_sl4a.lib.le_scanner import LeScanner
from blueberry.tests.sl4a_sl4a.lib.l2cap import L2cap
from blueberry.tests.sl4a_sl4a.lib.security import Security
from blueberry.utils.mobly_sl4a_utils import setup_sl4a
from blueberry.utils.mobly_sl4a_utils import teardown_sl4a
@@ -43,11 +44,13 @@ class Sl4aSl4aBaseTestClass(BaseTestClass):
    dut_advertiser_ = None
    dut_scanner_ = None
    dut_security_ = None
    dut_l2cap_ = None

    # CERT
    cert_advertiser_ = None
    cert_scanner_ = None
    cert_security_ = None
    cert_l2cap_ = None

    SUBPROCESS_WAIT_TIMEOUT_SECONDS = 10

@@ -111,9 +114,11 @@ class Sl4aSl4aBaseTestClass(BaseTestClass):
        self.dut_advertiser_ = LeAdvertiser(self.dut)
        self.dut_scanner_ = LeScanner(self.dut)
        self.dut_security_ = Security(self.dut)
        self.dut_l2cap_ = L2cap(self.dut)
        self.cert_advertiser_ = LeAdvertiser(self.cert)
        self.cert_scanner_ = LeScanner(self.cert)
        self.cert_security_ = Security(self.cert)
        self.cert_l2cap_ = L2cap(self.cert)
        return True

    def teardown_test(self):
@@ -121,13 +126,16 @@ class Sl4aSl4aBaseTestClass(BaseTestClass):
        safeClose(self.dut_advertiser_)
        safeClose(self.dut_scanner_)
        safeClose(self.dut_security_)
        safeClose(self.dut_l2cap_)
        safeClose(self.cert_advertiser_)
        safeClose(self.cert_scanner_)
        safeClose(self.cert_security_)
        safeClose(self.cert_l2cap_)
        self.dut_advertiser_ = None
        self.dut_scanner_ = None
        self.dut_security_ = None
        self.cert_advertiser_ = None
        self.cert_l2cap_ = None
        self.cert_scanner_ = None
        self.cert_security_ = None

Loading