Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f33cc6fe authored by Treehugger Robot's avatar Treehugger Robot Committed by Automerger Merge Worker
Browse files

Merge "Cert: Introduce GD SL4A combination test infra" am: 3e0c1136 am: 2a6ab08e am: f4ca26f9

Original change: https://android-review.googlesource.com/c/platform/packages/modules/Bluetooth/+/1977214

Change-Id: Id7c67e26773f83a4a995636115d652d0d3e23f84
parents f588c97d f4ca26f9
Loading
Loading
Loading
Loading
+10 −9
Original line number Diff line number Diff line
@@ -557,29 +557,30 @@ class GdAndroidDevice(GdDeviceBase):
            logging.error("logcat_process %s_%s stopped with code: %d" % (self.label, self.serial_number, return_code))
        self.logcat_logger.stop()
        self.cleanup_port_forwarding()
        self.pull_logs(self.log_path_base)

    def pull_logs(self, base_dir):
        try:
            self.adb.pull([
                "/data/misc/bluetooth/logs/btsnoop_hci.log",
                str(os.path.join(self.log_path_base, "%s_btsnoop_hci.log" % self.label))
                str(os.path.join(base_dir, "%s_btsnoop_hci.log" % self.label))
            ])
        except AdbError as error:
            # Some tests have no snoop logs, and that's OK
            if ADB_FILE_NOT_EXIST_ERROR not in str(error):
                logging.error(PULL_LOG_FILE_ERROR_MSG_PREFIX + str(error))
        try:
            self.adb.pull([
                "/data/misc/bluedroid/bt_config.conf",
                str(os.path.join(self.log_path_base, "%s_bt_config.conf" % self.label))
            ])
            self.adb.pull(
                ["/data/misc/bluedroid/bt_config.conf",
                 str(os.path.join(base_dir, "%s_bt_config.conf" % self.label))])
        except AdbError as error:
            # Some tests have no config file, and that's OK
            if ADB_FILE_NOT_EXIST_ERROR not in str(error):
                logging.error(PULL_LOG_FILE_ERROR_MSG_PREFIX + str(error))
        try:
            self.adb.pull([
                "/data/misc/bluedroid/bt_config.bak",
                str(os.path.join(self.log_path_base, "%s_bt_config.bak" % self.label))
            ])
            self.adb.pull(
                ["/data/misc/bluedroid/bt_config.bak",
                 str(os.path.join(base_dir, "%s_bt_config.bak" % self.label))])
        except AdbError as error:
            # Some tests have no config.bak file, and that's OK
            if ADB_FILE_NOT_EXIST_ERROR not in str(error):
+35 −0
Original line number Diff line number Diff line
_description: Bluetooth cert testing with SL4A
TestBeds:
  - Name: AndroidDeviceCert
    Controllers:
      GdDevice:
        - grpc_port: '8898'
          grpc_root_server_port: '8896'
          signal_port: '8894'
          label: cert
          serial_number: 'CERT'
          name: Cert Device
          cmd:
            - "adb"
            - "-s"
            - "$(serial_number)"
            - "shell"
            - "ASAN_OPTIONS=detect_container_overflow=0"
            - "/system/bin/bluetooth_stack_with_facade"
            - "--grpc-port=$(grpc_port)"
            - "--root-server-port=$(grpc_root_server_port)"
            - "--btsnoop=/data/misc/bluetooth/logs/btsnoop_hci.log"
            - "--btsnooz=/data/misc/bluetooth/logs/btsnooz_hci.log"
            - "--btconfig=/data/misc/bluedroid/bt_config.conf"
            - "--signal-port=$(signal_port)"
      AndroidDevice:
        - label: dut
          # Preferred client port number on the PC host side for SL4A
          client_port: '8895'
          # Preferred server port number forwarded from Android to the host PC
          # via adb for SL4A connections
          forwarded_port: '8899'
          # Preferred server port used by SL4A on Android device
          server_port: '8899'
          serial: 'DUT'
logpath: "/tmp/logs"
 No newline at end of file
+62 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2021 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from blueberry.tests.gd_sl4a.hci.le_advanced_scanning_test import LeAdvancedScanningTest

from mobly import suite_runner
import argparse

ALL_TESTS = [LeAdvancedScanningTest]


def main():
    """
    Local test runner that allows  to specify list of tests to and customize
    test config file location
    """
    parser = argparse.ArgumentParser(description="Run local GD SL4A tests.")
    parser.add_argument(
        '-c', '--config', type=str, required=True, metavar='<PATH>', help='Path to the test configuration file.')
    parser.add_argument(
        '--tests',
        '--test_case',
        nargs='+',
        type=str,
        metavar='[ClassA[.test_a] ClassB[.test_b] ...]',
        help='A list of test classes and optional tests to execute.')
    parser.add_argument("--all_tests", "-A", type=bool, dest="all_tests", default=False, nargs="?")
    parser.add_argument("--presubmit", type=bool, dest="presubmit", default=False, nargs="?")
    parser.add_argument("--postsubmit", type=bool, dest="postsubmit", default=False, nargs="?")
    args = parser.parse_args()
    test_list = ALL_TESTS
    if args.all_tests:
        test_list = ALL_TESTS
    elif args.presubmit:
        test_list = ALL_TESTS
    elif args.postsubmit:
        test_list = ALL_TESTS
    # Do not pass this layer's cmd line argument to next layer
    argv = ["--config", args.config]
    if args.tests:
        argv.append("--tests")
        for test in args.tests:
            argv.append(test)

    suite_runner.run_suite(test_list, argv=argv)


if __name__ == "__main__":
    main()
+249 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2021 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import queue
import logging

from google.protobuf import empty_pb2 as empty_proto

from bluetooth_packets_python3 import hci_packets
from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_modes, ble_address_types, scan_result, ble_scan_settings_phys
from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects
from blueberry.tests.gd_sl4a.lib.gd_sl4a_base_test import GdSl4aBaseTestClass
from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade
from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade
from blueberry.facade import common_pb2 as common


class LeAdvancedScanningTest(GdSl4aBaseTestClass):

    def setup_class(self):
        super().setup_class(cert_module='HCI_INTERFACES')
        self.default_timeout = 10  # seconds

    def setup_test(self):
        super().setup_test()

    def teardown_test(self):
        super().teardown_test()

    def set_cert_privacy_policy_with_random_address(self, random_address):
        private_policy = le_initiator_address_facade.PrivacyPolicy(
            address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS,
            address_with_type=common.BluetoothAddressWithType(
                address=common.BluetoothAddress(address=bytes(random_address, encoding='utf8')),
                type=common.RANDOM_DEVICE_ADDRESS))
        self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy)

    def set_cert_privacy_policy_with_public_address(self):
        public_address_bytes = self.cert.hci_controller.GetMacAddress(empty_proto.Empty()).address
        private_policy = le_initiator_address_facade.PrivacyPolicy(
            address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS,
            address_with_type=common.BluetoothAddressWithType(
                address=common.BluetoothAddress(address=public_address_bytes), type=common.PUBLIC_DEVICE_ADDRESS))
        self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy)
        # Bluetooth MAC address must be upper case
        return public_address_bytes.decode('utf-8').upper()

    def test_scan_filter_device_name_legacy_pdu(self):
        # Use public address on cert side
        logging.info("Setting public address")
        DEVICE_NAME = 'Im_The_CERT!'
        public_address = self.set_cert_privacy_policy_with_public_address()
        logging.info("Set public address")

        # Setup cert side to advertise
        gap_name = hci_packets.GapData()
        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
        config = le_advertising_facade.AdvertisingConfig(
            advertisement=[gap_data],
            interval_min=512,
            interval_max=768,
            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
            own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS,
            channel_map=7,
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES,
            tx_power=20)
        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
        logging.info("Creating advertiser")
        create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request)
        logging.info("Created advertiser")

        # Setup SL4A DUT side to scan
        logging.info("Start scanning with public address %s" % public_address)
        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
        expected_event_name = scan_result.format(scan_callback)

        # Setup SL4A DUT filter
        self.dut.sl4a.bleSetScanFilterDeviceName(DEVICE_NAME)
        self.dut.sl4a.bleBuildScanFilter(filter_list)

        # Start scanning on SL4A DUT side
        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
        logging.info("Started scanning")
        try:
            # Verify if there is scan result
            event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout)
        except queue.Empty as error:
            logging.error("Could not find initial advertisement.")
            return False
        # Print out scan result
        mac_address = event_info['data']['Result']['deviceInfo']['address']
        logging.info("Filter advertisement with address {}".format(mac_address))

        # Stop scanning
        logging.info("Stop scanning")
        self.dut.sl4a.bleStopBleScan(scan_callback)
        logging.info("Stopped scanning")

        # Stop advertising
        logging.info("Stop advertising")
        remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id)
        self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request)
        logging.info("Stopped advertising")

        return True

    def test_scan_filter_device_random_address_legacy_pdu(self):
        # Use random address on cert side
        logging.info("Setting random address")
        RANDOM_ADDRESS = 'D0:05:04:03:02:01'
        DEVICE_NAME = 'Im_The_CERT!'
        self.set_cert_privacy_policy_with_random_address(RANDOM_ADDRESS)
        logging.info("Set random address")

        # Setup cert side to advertise
        gap_name = hci_packets.GapData()
        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
        config = le_advertising_facade.AdvertisingConfig(
            advertisement=[gap_data],
            interval_min=512,
            interval_max=768,
            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
            channel_map=7,
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
        logging.info("Creating advertiser")
        create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request)
        logging.info("Created advertiser")

        # Setup SL4A DUT side to scan
        addr_type = ble_address_types["random"]
        logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d" % (RANDOM_ADDRESS, addr_type))
        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
        expected_event_name = scan_result.format(scan_callback)

        # Setup SL4A DUT filter
        self.dut.sl4a.bleSetScanFilterDeviceAddressAndType(RANDOM_ADDRESS, int(addr_type))
        self.dut.sl4a.bleBuildScanFilter(filter_list)

        # Start scanning on SL4A DUT side
        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
        logging.info("Started scanning")
        try:
            # Verify if there is scan result
            event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout)
        except queue.Empty as error:
            logging.error("Could not find initial advertisement.")
            return False
        # Print out scan result
        mac_address = event_info['data']['Result']['deviceInfo']['address']
        logging.info("Filter advertisement with address {}".format(mac_address))

        # Stop scanning
        logging.info("Stop scanning")
        self.dut.sl4a.bleStopBleScan(scan_callback)
        logging.info("Stopped scanning")

        # Stop advertising
        logging.info("Stop advertising")
        remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id)
        self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request)
        logging.info("Stopped advertising")

        return True

    def test_scan_filter_device_public_address_extended_pdu(self):
        # Use public address on cert side
        logging.info("Setting public address")
        DEVICE_NAME = 'Im_The_CERT!'
        public_address = self.set_cert_privacy_policy_with_public_address()
        logging.info("Set public address")

        # Setup cert side to advertise
        gap_name = hci_packets.GapData()
        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
        config = le_advertising_facade.AdvertisingConfig(
            advertisement=[gap_data],
            interval_min=512,
            interval_max=768,
            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
            own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS,
            channel_map=7,
            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
            advertising_config=config, secondary_advertising_phy=ble_scan_settings_phys["1m"])
        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
        logging.info("Creating advertiser")
        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)
        logging.info("Created advertiser")

        # Setup SL4A DUT side to scan
        addr_type = ble_address_types["public"]
        logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d" % (public_address, addr_type))
        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
        self.dut.sl4a.bleSetScanSettingsLegacy(False)
        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
        expected_event_name = scan_result.format(scan_callback)

        # Setup SL4A DUT filter
        self.dut.sl4a.bleSetScanFilterDeviceAddressAndType(public_address, int(addr_type))
        self.dut.sl4a.bleBuildScanFilter(filter_list)

        # Start scanning on SL4A DUT side
        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
        logging.info("Started scanning")
        try:
            # Verify if there is scan result
            event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout)
        except queue.Empty as error:
            logging.error("Could not find initial advertisement.")
            return False
        # Print out scan result
        mac_address = event_info['data']['Result']['deviceInfo']['address']
        logging.info("Filter advertisement with address {}".format(mac_address))

        # Stop scanning
        logging.info("Stop scanning")
        self.dut.sl4a.bleStopBleScan(scan_callback)
        logging.info("Stopped scanning")

        # Stop advertising
        logging.info("Stop advertising")
        remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=create_response.advertiser_id)
        self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request)
        logging.info("Stopped advertising")

        return True
+259 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading