Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a38e66fb authored by Martin Brabham's avatar Martin Brabham
Browse files

Generate OOB data Topshim API

This implements the topshim portion and testing API for calling in to
generate local out of band data.

Bug: 234756905
Test: system/gd/cert/run --clean --topshim LeSecurityTest.test_generate_local_oob_data
Tag: #floss
Change-Id: Ia056238fc77fb12e60dca17fba41dc6a184b6181
parent 75de1f74
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@ service AdapterService {

service SecurityService {
    rpc RemoveBond(RemoveBondRequest) returns (google.protobuf.Empty) {}
    rpc GenerateLocalOobData(GenerateOobDataRequest) returns (google.protobuf.Empty) {}
}

service GattService {
@@ -112,6 +113,7 @@ enum EventType {
  ADAPTER_STATE = 0;
  SSP_REQUEST = 1;
  LE_RAND = 2;
  GENERATE_LOCAL_OOB_DATA = 3;
}
message FetchEventsRequest {}

@@ -195,3 +197,7 @@ message Connection {
// For our HFP APIs this would store the bluetooth address but staying consistent with Pandora naming.
  bytes cookie = 1;
}

message GenerateOobDataRequest {
  int32 transport = 1;
}
+4 −1
Original line number Diff line number Diff line
@@ -64,7 +64,10 @@ class AdapterClient(AsyncClosable):
        """Start fetching events"""
        future = asyncio.get_running_loop().create_future()
        self.__task_list.append(asyncio.get_running_loop().create_task(self.__get_next_event(event, future)))
        try:
            await asyncio.wait_for(future, AdapterClient.DEFAULT_TIMEOUT)
        except:
            print("Failed to get event", event)
        return future

    async def _verify_adapter_started(self):
+50 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.


class OobData:
    """
    Represents an Out of Band data set in a readible object
    """

    def __init__(self, is_valid, transport, byte_string_address, byte_string_c, byte_string_r):
        """
        @param is_valid indicates whether the data was able to be parsed
        @param transport LE or Classic
        @param 7 octet byte string.  Little Endian 6 byte address + 1 byte transport
        @param byte_string_c 16 octet confirmation
        @param byte_string_r 16 octet randomizer
        """
        self.__is_valid = True if is_valid == "1" else False
        self.__transport = int(transport)
        self.__byte_string_address = byte_string_address
        self.__byte_string_c = byte_string_c
        self.__byte_string_r = byte_string_r

    def is_valid(self):
        return self.__is_valid

    def transport(self):
        return self.__transport

    def address(self):
        return self.__byte_string_address

    def confirmation(self):
        return self.__byte_string_c

    def randomizer(self):
        return self.__byte_string_r
+10 −20
Original line number Diff line number Diff line
@@ -29,18 +29,15 @@ class SecurityClient(AsyncClosable):
    Wrapper gRPC interface to the GATT Service
    """
    # Timeout for async wait
    DEFAULT_TIMEOUT = 2
    __task_list = []
    __channel = None
    __security_stub = None
    __adapter_event_stream = None
    __adapter_client = None
    __security = None
    __adapter = None

    def __init__(self, adapter_client, port=8999):
    def __init__(self, adapter, port=8999):
        self.__channel = grpc.aio.insecure_channel("localhost:%d" % port)
        self.__security_stub = facade_pb2_grpc.SecurityServiceStub(self.__channel)
        self.__adapter_client = adapter_client
        #self.__gatt_event_stream = self.__security_stub.FetchEvents(facade_pb2.FetchEventsRequest())
        self.__security = facade_pb2_grpc.SecurityServiceStub(self.__channel)
        self.__adapter = adapter

    async def close(self):
        """
@@ -56,16 +53,9 @@ class SecurityClient(AsyncClosable):
        """
        Removes a bonding entry for a given address
        """
        await self.__security_stub.RemoveBond(facade_pb2.RemoveBondRequest(address=address))
        return await self.__adapter_client.le_rand()
        await self.__security.RemoveBond(facade_pb2.RemoveBondRequest(address=address))

    async def bond_using_numeric_comparison(self, address):
        """
        Bond to a given address using numeric comparison method
        """
        # Set IO Capabilities
        # Enable Page scan
        # Become discoverable
        # Discover device
        # Initiate bond
        return await self.__adapter_client.le_rand()
    async def generate_local_oob_data(self, transport):
        await self.__security.GenerateLocalOobData(facade_pb2.GenerateOobDataRequest(transport=transport))
        future = await self.__adapter._listen_for_event(facade_pb2.EventType.GENERATE_LOCAL_OOB_DATA)
        return future
+25 −27
Original line number Diff line number Diff line
@@ -70,8 +70,8 @@ def _setup_class_core(verbose_mode, log_path_base, controller_configs):
    rootcanal_hci_port = int(rootcanal_config.get("hci_port", "6402"))
    rootcanal_link_layer_port = int(rootcanal_config.get("link_layer_port", "6403"))

    info['make_rootcanal_ports_available'] = make_ports_available((rootcanal_test_port, rootcanal_hci_port,
                                                                   rootcanal_link_layer_port))
    info['make_rootcanal_ports_available'] = make_ports_available(
        (rootcanal_test_port, rootcanal_hci_port, rootcanal_link_layer_port))
    if not make_ports_available((rootcanal_test_port, rootcanal_hci_port, rootcanal_link_layer_port)):
        return info

@@ -79,8 +79,7 @@ def _setup_class_core(verbose_mode, log_path_base, controller_configs):
    rootcanal_cmd = [rootcanal, str(rootcanal_test_port), str(rootcanal_hci_port), str(rootcanal_link_layer_port)]
    info['rootcanal_cmd'] = rootcanal_cmd

    rootcanal_process = subprocess.Popen(
        rootcanal_cmd,
    rootcanal_process = subprocess.Popen(rootcanal_cmd,
                                         cwd=get_gd_root(),
                                         env=os.environ.copy(),
                                         stdout=subprocess.PIPE,
@@ -98,8 +97,7 @@ def _setup_class_core(verbose_mode, log_path_base, controller_configs):
        info['is_subprocess_alive'] = False
        return info

    info['rootcanal_logger'] = AsyncSubprocessLogger(
        rootcanal_process, [rootcanal_logpath],
    info['rootcanal_logger'] = AsyncSubprocessLogger(rootcanal_process, [rootcanal_logpath],
                                                     log_to_stdout=verbose_mode,
                                                     tag="rootcanal",
                                                     color=TerminalColor.MAGENTA)
@@ -170,8 +168,10 @@ class TopshimBaseTest(base_test.BaseTestClass):
        assertThat(started).isTrue()
        started = started and await cert_adapter._verify_adapter_started()
        assertThat(started).isTrue()
        self.__dut = TopshimDevice(dut_adapter, self.dut_port)
        self.__cert = TopshimDevice(cert_adapter, self.cert_port)
        self.__dut = TopshimDevice(dut_adapter, GattClient(port=self.dut_port),
                                   SecurityClient(dut_adapter, port=self.dut_port))
        self.__cert = TopshimDevice(cert_adapter, GattClient(port=self.cert_port),
                                    SecurityClient(cert_adapter, port=self.cert_port))
        return started

    async def __teardown_adapter(self):
@@ -200,8 +200,7 @@ class TopshimBaseTest(base_test.BaseTestClass):
        for config in self.controller_configs[CONTROLLER_CONFIG_NAME]:
            config['verbose_mode'] = self.verbose_mode

        self.info = _setup_class_core(
            verbose_mode=self.verbose_mode,
        self.info = _setup_class_core(verbose_mode=self.verbose_mode,
                                      log_path_base=self.log_path_base,
                                      controller_configs=self.controller_configs)
        self.rootcanal_running = self.info['rootcanal_running']
@@ -213,8 +212,8 @@ class TopshimBaseTest(base_test.BaseTestClass):
        asserts.assert_true(self.info['make_rootcanal_ports_available'], "Failed to make root canal ports available")

        self.log.debug("Running %s" % " ".join(self.info['rootcanal_cmd']))
        asserts.assert_true(
            self.info['is_rootcanal_process_started'], msg="Cannot start root-canal at " + str(self.info['rootcanal']))
        asserts.assert_true(self.info['is_rootcanal_process_started'],
                            msg="Cannot start root-canal at " + str(self.info['rootcanal']))
        asserts.assert_true(self.info['is_subprocess_alive'], msg="root-canal stopped immediately after running")

        self.controller_configs = self.info['controller_configs']
@@ -226,8 +225,7 @@ class TopshimBaseTest(base_test.BaseTestClass):
        asyncio.get_event_loop().run_until_complete(self.__setup_adapter())

    def teardown_class(self):
        _teardown_class_core(
            rootcanal_running=self.rootcanal_running,
        _teardown_class_core(rootcanal_running=self.rootcanal_running,
                             rootcanal_process=self.rootcanal_process,
                             rootcanal_logger=self.rootcanal_logger,
                             subprocess_wait_timeout_seconds=1)
Loading