Loading system/blueberry/facade/topshim/facade.proto +6 −0 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ service AdapterService { service SecurityService { rpc RemoveBond(RemoveBondRequest) returns (google.protobuf.Empty) {} rpc GenerateLocalOobData(GenerateOobDataRequest) returns (google.protobuf.Empty) {} } service GattService { Loading Loading @@ -112,6 +113,7 @@ enum EventType { ADAPTER_STATE = 0; SSP_REQUEST = 1; LE_RAND = 2; GENERATE_LOCAL_OOB_DATA = 3; } message FetchEventsRequest {} Loading Loading @@ -195,3 +197,7 @@ message Connection { // For our HFP APIs this would store the bluetooth address but staying consistent with Pandora naming. bytes cookie = 1; } message GenerateOobDataRequest { int32 transport = 1; } system/blueberry/tests/topshim/lib/adapter_client.py +4 −1 Original line number Diff line number Diff line Loading @@ -64,7 +64,10 @@ class AdapterClient(AsyncClosable): """Start fetching events""" future = asyncio.get_running_loop().create_future() self.__task_list.append(asyncio.get_running_loop().create_task(self.__get_next_event(event, future))) try: await asyncio.wait_for(future, AdapterClient.DEFAULT_TIMEOUT) except: print("Failed to get event", event) return future async def _verify_adapter_started(self): Loading system/blueberry/tests/topshim/lib/oob_data.py 0 → 100644 +50 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2022 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. class OobData: """ Represents an Out of Band data set in a readible object """ def __init__(self, is_valid, transport, byte_string_address, byte_string_c, byte_string_r): """ @param is_valid indicates whether the data was able to be parsed @param transport LE or Classic @param 7 octet byte string. Little Endian 6 byte address + 1 byte transport @param byte_string_c 16 octet confirmation @param byte_string_r 16 octet randomizer """ self.__is_valid = True if is_valid == "1" else False self.__transport = int(transport) self.__byte_string_address = byte_string_address self.__byte_string_c = byte_string_c self.__byte_string_r = byte_string_r def is_valid(self): return self.__is_valid def transport(self): return self.__transport def address(self): return self.__byte_string_address def confirmation(self): return self.__byte_string_c def randomizer(self): return self.__byte_string_r system/blueberry/tests/topshim/lib/security_client.py +10 −20 Original line number Diff line number Diff line Loading @@ -29,18 +29,15 @@ class SecurityClient(AsyncClosable): Wrapper gRPC interface to the GATT Service """ # Timeout for async wait DEFAULT_TIMEOUT = 2 __task_list = [] __channel = None __security_stub = None __adapter_event_stream = None __adapter_client = None __security = None __adapter = None def __init__(self, adapter_client, port=8999): def __init__(self, adapter, port=8999): self.__channel = grpc.aio.insecure_channel("localhost:%d" % port) self.__security_stub = facade_pb2_grpc.SecurityServiceStub(self.__channel) self.__adapter_client = adapter_client #self.__gatt_event_stream = self.__security_stub.FetchEvents(facade_pb2.FetchEventsRequest()) self.__security = facade_pb2_grpc.SecurityServiceStub(self.__channel) self.__adapter = adapter async def close(self): """ Loading @@ -56,16 +53,9 @@ class SecurityClient(AsyncClosable): """ Removes a bonding entry for a given address """ await self.__security_stub.RemoveBond(facade_pb2.RemoveBondRequest(address=address)) return await self.__adapter_client.le_rand() await self.__security.RemoveBond(facade_pb2.RemoveBondRequest(address=address)) async def bond_using_numeric_comparison(self, address): """ Bond to a given address using numeric comparison method """ # Set IO Capabilities # Enable Page scan # Become discoverable # Discover device # Initiate bond return await self.__adapter_client.le_rand() async def generate_local_oob_data(self, transport): await self.__security.GenerateLocalOobData(facade_pb2.GenerateOobDataRequest(transport=transport)) future = await self.__adapter._listen_for_event(facade_pb2.EventType.GENERATE_LOCAL_OOB_DATA) return future system/blueberry/tests/topshim/lib/topshim_base_test.py +25 −27 Original line number Diff line number Diff line Loading @@ -70,8 +70,8 @@ def _setup_class_core(verbose_mode, log_path_base, controller_configs): rootcanal_hci_port = int(rootcanal_config.get("hci_port", "6402")) rootcanal_link_layer_port = int(rootcanal_config.get("link_layer_port", "6403")) info['make_rootcanal_ports_available'] = make_ports_available((rootcanal_test_port, rootcanal_hci_port, rootcanal_link_layer_port)) info['make_rootcanal_ports_available'] = make_ports_available( (rootcanal_test_port, rootcanal_hci_port, rootcanal_link_layer_port)) if not make_ports_available((rootcanal_test_port, rootcanal_hci_port, rootcanal_link_layer_port)): return info Loading @@ -79,8 +79,7 @@ def _setup_class_core(verbose_mode, log_path_base, controller_configs): rootcanal_cmd = [rootcanal, str(rootcanal_test_port), str(rootcanal_hci_port), str(rootcanal_link_layer_port)] info['rootcanal_cmd'] = rootcanal_cmd rootcanal_process = subprocess.Popen( rootcanal_cmd, rootcanal_process = subprocess.Popen(rootcanal_cmd, cwd=get_gd_root(), env=os.environ.copy(), stdout=subprocess.PIPE, Loading @@ -98,8 +97,7 @@ def _setup_class_core(verbose_mode, log_path_base, controller_configs): info['is_subprocess_alive'] = False return info info['rootcanal_logger'] = AsyncSubprocessLogger( rootcanal_process, [rootcanal_logpath], info['rootcanal_logger'] = AsyncSubprocessLogger(rootcanal_process, [rootcanal_logpath], log_to_stdout=verbose_mode, tag="rootcanal", color=TerminalColor.MAGENTA) Loading Loading @@ -170,8 +168,10 @@ class TopshimBaseTest(base_test.BaseTestClass): assertThat(started).isTrue() started = started and await cert_adapter._verify_adapter_started() assertThat(started).isTrue() self.__dut = TopshimDevice(dut_adapter, self.dut_port) self.__cert = TopshimDevice(cert_adapter, self.cert_port) self.__dut = TopshimDevice(dut_adapter, GattClient(port=self.dut_port), SecurityClient(dut_adapter, port=self.dut_port)) self.__cert = TopshimDevice(cert_adapter, GattClient(port=self.cert_port), SecurityClient(cert_adapter, port=self.cert_port)) return started async def __teardown_adapter(self): Loading Loading @@ -200,8 +200,7 @@ class TopshimBaseTest(base_test.BaseTestClass): for config in self.controller_configs[CONTROLLER_CONFIG_NAME]: config['verbose_mode'] = self.verbose_mode self.info = _setup_class_core( verbose_mode=self.verbose_mode, self.info = _setup_class_core(verbose_mode=self.verbose_mode, log_path_base=self.log_path_base, controller_configs=self.controller_configs) self.rootcanal_running = self.info['rootcanal_running'] Loading @@ -213,8 +212,8 @@ class TopshimBaseTest(base_test.BaseTestClass): asserts.assert_true(self.info['make_rootcanal_ports_available'], "Failed to make root canal ports available") self.log.debug("Running %s" % " ".join(self.info['rootcanal_cmd'])) asserts.assert_true( self.info['is_rootcanal_process_started'], msg="Cannot start root-canal at " + str(self.info['rootcanal'])) asserts.assert_true(self.info['is_rootcanal_process_started'], msg="Cannot start root-canal at " + str(self.info['rootcanal'])) asserts.assert_true(self.info['is_subprocess_alive'], msg="root-canal stopped immediately after running") self.controller_configs = self.info['controller_configs'] Loading @@ -226,8 +225,7 @@ class TopshimBaseTest(base_test.BaseTestClass): asyncio.get_event_loop().run_until_complete(self.__setup_adapter()) def teardown_class(self): _teardown_class_core( rootcanal_running=self.rootcanal_running, _teardown_class_core(rootcanal_running=self.rootcanal_running, rootcanal_process=self.rootcanal_process, rootcanal_logger=self.rootcanal_logger, subprocess_wait_timeout_seconds=1) Loading Loading
system/blueberry/facade/topshim/facade.proto +6 −0 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ service AdapterService { service SecurityService { rpc RemoveBond(RemoveBondRequest) returns (google.protobuf.Empty) {} rpc GenerateLocalOobData(GenerateOobDataRequest) returns (google.protobuf.Empty) {} } service GattService { Loading Loading @@ -112,6 +113,7 @@ enum EventType { ADAPTER_STATE = 0; SSP_REQUEST = 1; LE_RAND = 2; GENERATE_LOCAL_OOB_DATA = 3; } message FetchEventsRequest {} Loading Loading @@ -195,3 +197,7 @@ message Connection { // For our HFP APIs this would store the bluetooth address but staying consistent with Pandora naming. bytes cookie = 1; } message GenerateOobDataRequest { int32 transport = 1; }
system/blueberry/tests/topshim/lib/adapter_client.py +4 −1 Original line number Diff line number Diff line Loading @@ -64,7 +64,10 @@ class AdapterClient(AsyncClosable): """Start fetching events""" future = asyncio.get_running_loop().create_future() self.__task_list.append(asyncio.get_running_loop().create_task(self.__get_next_event(event, future))) try: await asyncio.wait_for(future, AdapterClient.DEFAULT_TIMEOUT) except: print("Failed to get event", event) return future async def _verify_adapter_started(self): Loading
system/blueberry/tests/topshim/lib/oob_data.py 0 → 100644 +50 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2022 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. class OobData: """ Represents an Out of Band data set in a readible object """ def __init__(self, is_valid, transport, byte_string_address, byte_string_c, byte_string_r): """ @param is_valid indicates whether the data was able to be parsed @param transport LE or Classic @param 7 octet byte string. Little Endian 6 byte address + 1 byte transport @param byte_string_c 16 octet confirmation @param byte_string_r 16 octet randomizer """ self.__is_valid = True if is_valid == "1" else False self.__transport = int(transport) self.__byte_string_address = byte_string_address self.__byte_string_c = byte_string_c self.__byte_string_r = byte_string_r def is_valid(self): return self.__is_valid def transport(self): return self.__transport def address(self): return self.__byte_string_address def confirmation(self): return self.__byte_string_c def randomizer(self): return self.__byte_string_r
system/blueberry/tests/topshim/lib/security_client.py +10 −20 Original line number Diff line number Diff line Loading @@ -29,18 +29,15 @@ class SecurityClient(AsyncClosable): Wrapper gRPC interface to the GATT Service """ # Timeout for async wait DEFAULT_TIMEOUT = 2 __task_list = [] __channel = None __security_stub = None __adapter_event_stream = None __adapter_client = None __security = None __adapter = None def __init__(self, adapter_client, port=8999): def __init__(self, adapter, port=8999): self.__channel = grpc.aio.insecure_channel("localhost:%d" % port) self.__security_stub = facade_pb2_grpc.SecurityServiceStub(self.__channel) self.__adapter_client = adapter_client #self.__gatt_event_stream = self.__security_stub.FetchEvents(facade_pb2.FetchEventsRequest()) self.__security = facade_pb2_grpc.SecurityServiceStub(self.__channel) self.__adapter = adapter async def close(self): """ Loading @@ -56,16 +53,9 @@ class SecurityClient(AsyncClosable): """ Removes a bonding entry for a given address """ await self.__security_stub.RemoveBond(facade_pb2.RemoveBondRequest(address=address)) return await self.__adapter_client.le_rand() await self.__security.RemoveBond(facade_pb2.RemoveBondRequest(address=address)) async def bond_using_numeric_comparison(self, address): """ Bond to a given address using numeric comparison method """ # Set IO Capabilities # Enable Page scan # Become discoverable # Discover device # Initiate bond return await self.__adapter_client.le_rand() async def generate_local_oob_data(self, transport): await self.__security.GenerateLocalOobData(facade_pb2.GenerateOobDataRequest(transport=transport)) future = await self.__adapter._listen_for_event(facade_pb2.EventType.GENERATE_LOCAL_OOB_DATA) return future
system/blueberry/tests/topshim/lib/topshim_base_test.py +25 −27 Original line number Diff line number Diff line Loading @@ -70,8 +70,8 @@ def _setup_class_core(verbose_mode, log_path_base, controller_configs): rootcanal_hci_port = int(rootcanal_config.get("hci_port", "6402")) rootcanal_link_layer_port = int(rootcanal_config.get("link_layer_port", "6403")) info['make_rootcanal_ports_available'] = make_ports_available((rootcanal_test_port, rootcanal_hci_port, rootcanal_link_layer_port)) info['make_rootcanal_ports_available'] = make_ports_available( (rootcanal_test_port, rootcanal_hci_port, rootcanal_link_layer_port)) if not make_ports_available((rootcanal_test_port, rootcanal_hci_port, rootcanal_link_layer_port)): return info Loading @@ -79,8 +79,7 @@ def _setup_class_core(verbose_mode, log_path_base, controller_configs): rootcanal_cmd = [rootcanal, str(rootcanal_test_port), str(rootcanal_hci_port), str(rootcanal_link_layer_port)] info['rootcanal_cmd'] = rootcanal_cmd rootcanal_process = subprocess.Popen( rootcanal_cmd, rootcanal_process = subprocess.Popen(rootcanal_cmd, cwd=get_gd_root(), env=os.environ.copy(), stdout=subprocess.PIPE, Loading @@ -98,8 +97,7 @@ def _setup_class_core(verbose_mode, log_path_base, controller_configs): info['is_subprocess_alive'] = False return info info['rootcanal_logger'] = AsyncSubprocessLogger( rootcanal_process, [rootcanal_logpath], info['rootcanal_logger'] = AsyncSubprocessLogger(rootcanal_process, [rootcanal_logpath], log_to_stdout=verbose_mode, tag="rootcanal", color=TerminalColor.MAGENTA) Loading Loading @@ -170,8 +168,10 @@ class TopshimBaseTest(base_test.BaseTestClass): assertThat(started).isTrue() started = started and await cert_adapter._verify_adapter_started() assertThat(started).isTrue() self.__dut = TopshimDevice(dut_adapter, self.dut_port) self.__cert = TopshimDevice(cert_adapter, self.cert_port) self.__dut = TopshimDevice(dut_adapter, GattClient(port=self.dut_port), SecurityClient(dut_adapter, port=self.dut_port)) self.__cert = TopshimDevice(cert_adapter, GattClient(port=self.cert_port), SecurityClient(cert_adapter, port=self.cert_port)) return started async def __teardown_adapter(self): Loading Loading @@ -200,8 +200,7 @@ class TopshimBaseTest(base_test.BaseTestClass): for config in self.controller_configs[CONTROLLER_CONFIG_NAME]: config['verbose_mode'] = self.verbose_mode self.info = _setup_class_core( verbose_mode=self.verbose_mode, self.info = _setup_class_core(verbose_mode=self.verbose_mode, log_path_base=self.log_path_base, controller_configs=self.controller_configs) self.rootcanal_running = self.info['rootcanal_running'] Loading @@ -213,8 +212,8 @@ class TopshimBaseTest(base_test.BaseTestClass): asserts.assert_true(self.info['make_rootcanal_ports_available'], "Failed to make root canal ports available") self.log.debug("Running %s" % " ".join(self.info['rootcanal_cmd'])) asserts.assert_true( self.info['is_rootcanal_process_started'], msg="Cannot start root-canal at " + str(self.info['rootcanal'])) asserts.assert_true(self.info['is_rootcanal_process_started'], msg="Cannot start root-canal at " + str(self.info['rootcanal'])) asserts.assert_true(self.info['is_subprocess_alive'], msg="root-canal stopped immediately after running") self.controller_configs = self.info['controller_configs'] Loading @@ -226,8 +225,7 @@ class TopshimBaseTest(base_test.BaseTestClass): asyncio.get_event_loop().run_until_complete(self.__setup_adapter()) def teardown_class(self): _teardown_class_core( rootcanal_running=self.rootcanal_running, _teardown_class_core(rootcanal_running=self.rootcanal_running, rootcanal_process=self.rootcanal_process, rootcanal_logger=self.rootcanal_logger, subprocess_wait_timeout_seconds=1) Loading