Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 145bfb52 authored by Rahul Sabnis's avatar Rahul Sabnis Committed by Android (Google) Code Review
Browse files

Merge changes Id782d06a,I974ce9e7,I57cdfe7f,I88f366fb,I131ab155 into tm-d1-dev

* changes:
  Integers are not a reliable method to determine if value set
  Blueberry2: Add SL4A + SL4A LeAdvertisingTest
  Blueberry2: SL4A + SL4AA OobPairingTest
  Blueberry2: Add SL4A Abstractions
  Blueberry2: Fix testing for development
parents 1e25bd3c fdae953a
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -77,7 +77,6 @@ def make_ports_available(ports: Container[int], timeout_seconds=10):
        logging.warning("Freeing port %d used by %s" % (conn.laddr.port, str(conn)))
        if not conn.pid:
            logging.error("Failed to kill process occupying port %d due to lack of pid" % conn.laddr.port)
            success = False
            continue
        logging.warning("Killing pid %d that is using port port %d" % (conn.pid, conn.laddr.port))
        if conn.pid in killed_pids:
+57 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import binascii
import io
import logging
import os
import queue

from blueberry.tests.gd.cert.context import get_current_context

from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test
from blueberry.tests.gd_sl4a.lib.bt_constants import ble_address_types


class LeAdvertisingTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass):

    def setup_class(self):
        super().setup_class()

    def setup_test(self):
        super().setup_test()

    def teardown_test(self):
        super().teardown_test()

    def test_advertise_name(self):
        rpa_address = self.cert_advertiser_.advertise_rpa_public_extended_pdu()
        self.dut_scanner_.scan_for_name(self.cert_advertiser_.get_local_advertising_name())
        self.dut_scanner_.stop_scanning()
        self.cert_advertiser_.stop_advertising()

    def test_advertise_name_stress(self):
        for i in range(0, 10):
            self.test_advertise_name()

    def test_advertise_name_twice_no_stop(self):
        rpa_address = self.cert_advertiser_.advertise_rpa_public_extended_pdu()
        self.dut_scanner_.scan_for_name(self.cert_advertiser_.get_local_advertising_name())
        self.dut_scanner_.stop_scanning()
        rpa_address = self.cert_advertiser_.advertise_rpa_public_extended_pdu()
        self.dut_scanner_.scan_for_name(self.cert_advertiser_.get_local_advertising_name())
        self.dut_scanner_.stop_scanning()
        self.cert_advertiser_.stop_advertising()
+81 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import binascii
import logging
import queue

from blueberry.facade import common_pb2 as common
from blueberry.tests.gd.cert.closable import Closable
from blueberry.tests.gd.cert.closable import safeClose
from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_advertise_objects
from blueberry.tests.gd_sl4a.lib.bt_constants import adv_succ
from blueberry.tests.gd_sl4a.lib.bt_constants import ble_advertise_settings_modes
from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test


class LeAdvertiser(Closable):

    is_advertising = False
    device = None
    default_timeout = 10  # seconds
    advertise_callback = None
    advertise_data = None
    advertise_settings = None

    def __init__(self, device):
        self.device = device

    def __wait_for_event(self, expected_event_name):
        try:
            event_info = self.device.ed.pop_event(expected_event_name, self.default_timeout)
            logging.info(event_info)
        except queue.Empty as error:
            logging.error("Failed to find event: %s", expected_event_name)
            return False
        return True

    def advertise_rpa_public_extended_pdu(self, name="SL4A Device"):
        if self.is_advertising:
            logging.info("Already advertising!")
            return
        self.is_advertising = True
        self.device.sl4a.bleSetScanSettingsLegacy(False)
        self.device.sl4a.bleSetAdvertiseSettingsIsConnectable(True)
        self.device.sl4a.bleSetAdvertiseDataIncludeDeviceName(True)
        self.device.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency'])
        self.device.sl4a.bleSetAdvertiseSettingsOwnAddressType(common.RANDOM_DEVICE_ADDRESS)
        self.advertise_callback, self.advertise_data, self.advertise_settings = generate_ble_advertise_objects(
            self.device.sl4a)
        self.device.sl4a.bleStartBleAdvertising(self.advertise_callback, self.advertise_data, self.advertise_settings)

        # Wait for SL4A cert to start advertising
        assertThat(self.__wait_for_event(adv_succ.format(self.advertise_callback))).isTrue()
        logging.info("Advertising started")

    def get_local_advertising_name(self):
        return self.device.sl4a.bluetoothGetLocalName()

    def stop_advertising(self):
        if self.is_advertising:
            logging.info("Stopping advertisement")
            self.device.sl4a.bleStopBleAdvertising(self.advertise_callback)
            self.is_advertising = False

    def close(self):
        self.stop_advertising()
        self.device = None
+168 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import logging
import queue

from blueberry.facade import common_pb2 as common
from blueberry.tests.gd.cert.closable import Closable
from blueberry.tests.gd.cert.closable import safeClose
from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects
from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_modes
from blueberry.tests.gd_sl4a.lib.bt_constants import scan_result
from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test


class LeScanner(Closable):

    is_scanning = False
    device = None
    filter_list = None
    scan_settings = None
    scan_callback = None

    def __init__(self, device):
        self.device = device

    def __wait_for_scan_result_event(self, expected_event_name, timeout=60):
        try:
            event_info = self.device.ed.pop_event(expected_event_name, timeout)
        except queue.Empty as error:
            logging.error("Could not find scan result event: %s", expected_event_name)
            return None
        return event_info['data']['Result']['deviceInfo']['address']

    def scan_for_address_expect_none(self, address, addr_type):
        if self.is_scanning:
            print("Already scanning!")
            return
        self.is_scanning = True
        logging.info("Start scanning for identity address {} or type {}".format(address, addr_type))
        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
        self.device.sl4a.bleSetScanSettingsLegacy(False)
        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
        expected_event_name = scan_result.format(self.scan_callback)

        # Start scanning on SL4A DUT
        self.device.sl4a.bleSetScanFilterDeviceAddressAndType(address, addr_type)
        self.device.sl4a.bleBuildScanFilter(self.filter_list)
        self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback)

        # Verify that scan result is received on SL4A DUT
        advertising_address = self.__wait_for_scan_result_event(expected_event_name, 1)
        assertThat(advertising_address).isNone()
        logging.info("Filter advertisement with address {}".format(advertising_address))

    def scan_for_address(self, address, addr_type):
        if self.is_scanning:
            print("Already scanning!")
            return
        self.is_scanning = True
        logging.info("Start scanning for identity address {} or type {}".format(address, addr_type))
        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
        self.device.sl4a.bleSetScanSettingsLegacy(False)
        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
        expected_event_name = scan_result.format(self.scan_callback)

        # Start scanning on SL4A DUT
        self.device.sl4a.bleSetScanFilterDeviceAddressAndType(address, addr_type)
        self.device.sl4a.bleBuildScanFilter(self.filter_list)
        self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback)

        # Verify that scan result is received on SL4A DUT
        advertising_address = self.__wait_for_scan_result_event(expected_event_name)
        assertThat(advertising_address).isNotNone()
        logging.info("Filter advertisement with address {}".format(advertising_address))

    def scan_for_address_with_irk(self, address, addr_type, irk):
        if self.is_scanning:
            print("Already scanning!")
            return
        self.is_scanning = True
        logging.info("Start scanning for identity address {} or type {} using irk {}".format(address, addr_type, irk))
        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
        self.device.sl4a.bleSetScanSettingsLegacy(False)
        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
        expected_event_name = scan_result.format(self.scan_callback)

        # Start scanning on SL4A DUT
        self.device.sl4a.bleSetScanFilterDeviceAddressTypeAndIrkHexString(address, addr_type, irk)
        self.device.sl4a.bleBuildScanFilter(self.filter_list)
        self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback)

        # Verify that scan result is received on SL4A DUT
        advertising_address = self.__wait_for_scan_result_event(expected_event_name)
        assertThat(advertising_address).isNotNone()
        logging.info("Filter advertisement with address {}".format(advertising_address))

    def scan_for_address_with_irk_pending_intent(self, address, addr_type, irk):
        if self.is_scanning:
            print("Already scanning!")
            return
        self.is_scanning = True
        logging.info("Start scanning for identity address {} or type {} using irk {}".format(address, addr_type, irk))
        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
        self.device.sl4a.bleSetScanSettingsLegacy(False)
        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
        # Hard code here since callback index iterates and will cause this to fail if ran
        # Second as the impl in SL4A sends this since it's a single callback for broadcast.
        expected_event_name = "BleScan1onScanResults"

        # Start scanning on SL4A DUT
        self.device.sl4a.bleSetScanFilterDeviceAddressTypeAndIrkHexString(address, addr_type, irk)
        self.device.sl4a.bleBuildScanFilter(self.filter_list)
        self.device.sl4a.bleStartBleScanPendingIntent(self.filter_list, self.scan_settings)

        # Verify that scan result is received on SL4A DUT
        advertising_address = self.__wait_for_scan_result_event(expected_event_name)
        assertThat(advertising_address).isNotNone()
        logging.info("Filter advertisement with address {}".format(advertising_address))

    def scan_for_name(self, name):
        if self.is_scanning:
            print("Already scanning!")
            return
        self.is_scanning = True
        logging.info("Start scanning for name {}".format(name))
        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
        self.device.sl4a.bleSetScanSettingsLegacy(False)
        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
        expected_event_name = scan_result.format(1)

        # Start scanning on SL4A DUT
        self.device.sl4a.bleSetScanFilterDeviceName(name)
        self.device.sl4a.bleBuildScanFilter(self.filter_list)
        self.device.sl4a.bleStartBleScanPendingIntent(self.filter_list, self.scan_settings)

        # Verify that scan result is received on SL4A DUT
        advertising_address = self.__wait_for_scan_result_event(expected_event_name)
        assertThat(advertising_address).isNotNone()
        logging.info("Filter advertisement with address {}".format(advertising_address))
        return advertising_address

    def stop_scanning(self):
        """
        Warning: no java callback registered for this
        """
        if self.is_scanning:
            logging.info("Stopping scan")
            self.device.sl4a.bleStopBleScan(self.scan_callback)
            self.is_scanning = False

    def close(self):
        self.stop_scanning()
        self.device = None
+45 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.


class OobData:
    """
    This represents the data generated from the device
    """

    address = None
    confirmation = None
    randomizer = None

    def __init__(self, address, confirmation, randomizer):
        self.address = address
        self.confirmation = confirmation
        self.randomizer = randomizer

    def to_sl4a_address(self):
        oob_address = self.address.upper()
        address_str_octets = []
        i = 1
        buf = ""
        for c in oob_address:
            buf += c
            if i % 2 == 0:
                address_str_octets.append(buf)
                buf = ""
            i += 1
        address_str_octets = address_str_octets[:6]
        address_str_octets.reverse()
        return ":".join(address_str_octets)
Loading