Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 17eef576 authored by Martin Brabham's avatar Martin Brabham
Browse files

Topshim Testing: Classic Security Test Framework

- Add rust security service
- Remove `remove_bond` api from adapter client.
- Add python security client

Bug: 234756905
Test: system/gd/cert/run --clean --topshim ClassicSecurityTest
Tag: #floss
Change-Id: Ia2c04b5c02e8691bd50b20785ed28cb410fda42f
parent 8d7ec88a
Loading
Loading
Loading
Loading
+4 −1
Original line number Diff line number Diff line
@@ -15,12 +15,15 @@ service AdapterService {
  rpc LeRand(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SetEventFilterConnectionSetupAllDevices(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc AllowWakeByHid(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc RemoveBond(RemoveBondRequest) returns (google.protobuf.Empty) {}
  rpc RestoreFilterAcceptList(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SetDefaultEventMask(google.protobuf.Empty) returns (google.protobuf.Empty) {}
  rpc SetEventFilterInquiryResultAllDevices(google.protobuf.Empty) returns (google.protobuf.Empty) {}
}

service SecurityService {
    rpc RemoveBond(RemoveBondRequest) returns (google.protobuf.Empty) {}
}

service GattService {
  // Advertiser
  rpc RegisterAdvertiser(google.protobuf.Empty) returns (google.protobuf.Empty) {}
+0 −3
Original line number Diff line number Diff line
@@ -118,9 +118,6 @@ class AdapterClient(AsyncClosable):
    async def allow_wake_by_hid(self):
        await self.__adapter_stub.AllowWakeByHid(empty_proto.Empty())

    async def remove_bond(self, address):
        await self.__adapter_stub.RemoveBond(facade_pb2.RemoveBondRequest(address=address))


class A2dpAutomationHelper():
    """Invoke gRPC on topshim for A2DP testing"""
+71 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2022 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import grpc

from blueberry.facade.topshim import facade_pb2
from blueberry.facade.topshim import facade_pb2_grpc
from blueberry.tests.topshim.lib.async_closable import AsyncClosable
from blueberry.tests.topshim.lib.async_closable import asyncSafeClose

from google.protobuf import empty_pb2 as empty_proto


class SecurityClient(AsyncClosable):
    """
    Wrapper gRPC interface to the GATT Service
    """
    # Timeout for async wait
    DEFAULT_TIMEOUT = 2
    __task_list = []
    __channel = None
    __security_stub = None
    __adapter_event_stream = None
    __adapter_client = None

    def __init__(self, adapter_client, port=8999):
        self.__channel = grpc.aio.insecure_channel("localhost:%d" % port)
        self.__security_stub = facade_pb2_grpc.SecurityServiceStub(self.__channel)
        self.__adapter_client = adapter_client
        #self.__gatt_event_stream = self.__security_stub.FetchEvents(facade_pb2.FetchEventsRequest())

    async def close(self):
        """
        Terminate the current tasks
        """
        for task in self.__task_list:
            task.cancel()
            task = None
        self.__task_list.clear()
        await self.__channel.close()

    async def remove_bond(self, address):
        """
        Removes a bonding entry for a given address
        """
        await self.__security_stub.RemoveBond(facade_pb2.RemoveBondRequest(address=address))
        return await self.__adapter_client.le_rand()

    async def bond_using_numeric_comparison(self, address):
        """
        Bond to a given address using numeric comparison method
        """
        # Set IO Capabilities
        # Enable Page scan
        # Become discoverable
        # Discover device
        # Initiate bond
        return await self.__adapter_client.le_rand()
+7 −17
Original line number Diff line number Diff line
@@ -34,6 +34,7 @@ from blueberry.tests.gd.cert.truth import assertThat
from blueberry.tests.topshim.lib.adapter_client import AdapterClient
from blueberry.tests.topshim.lib.async_closable import asyncSafeClose
from blueberry.tests.topshim.lib.gatt_client import GattClient
from blueberry.tests.topshim.lib.security_client import SecurityClient
from blueberry.tests.topshim.lib.topshim_device import TopshimDevice

from mobly import asserts
@@ -162,26 +163,18 @@ class TopshimBaseTest(base_test.BaseTestClass):
    __dut = None
    __cert = None

    async def __safe_terminate(self, object):
        if object != None:
            return await object.terminate()
        else:
            self.log.warn("Object was already null!")

    async def _setup_adapter(self):
    async def __setup_adapter(self):
        dut_adapter = AdapterClient(port=self.dut_port)
        # TODO(optedoblivion): Remove hack
        self.dut_adapter = dut_adapter
        cert_adapter = AdapterClient(port=self.cert_port)
        started = await dut_adapter._verify_adapter_started()
        assertThat(started).isTrue()
        started = started and await cert_adapter._verify_adapter_started()
        assertThat(started).isTrue()
        self.__dut = TopshimDevice(dut_adapter, GattClient(port=self.dut_port))
        self.__cert = TopshimDevice(cert_adapter, GattClient(port=self.cert_port))
        self.__dut = TopshimDevice(dut_adapter, self.dut_port)
        self.__cert = TopshimDevice(cert_adapter, self.cert_port)
        return started

    async def _teardown_adapter(self):
    async def __teardown_adapter(self):
        await asyncSafeClose(self.__dut)
        await asyncSafeClose(self.__cert)

@@ -197,9 +190,6 @@ class TopshimBaseTest(base_test.BaseTestClass):
        """
        return self.__cert

    def __post(self, async_fn):
        asyncio.get_event_loop().run_until_complete(async_fn)

    def setup_class(self):
        """
        Configure rootcanal and setup test parameters
@@ -233,7 +223,7 @@ class TopshimBaseTest(base_test.BaseTestClass):
        self.cert_port = controllers[0].grpc_port
        self.dut_port = controllers[1].grpc_port
        asyncio.set_event_loop(asyncio.new_event_loop())
        self.__post(self._setup_adapter())
        asyncio.get_event_loop().run_until_complete(self.__setup_adapter())

    def teardown_class(self):
        _teardown_class_core(
@@ -241,4 +231,4 @@ class TopshimBaseTest(base_test.BaseTestClass):
            rootcanal_process=self.rootcanal_process,
            rootcanal_logger=self.rootcanal_logger,
            subprocess_wait_timeout_seconds=1)
        self.__post(self._teardown_adapter())
        asyncio.get_event_loop().run_until_complete(self.__teardown_adapter())
+15 −4
Original line number Diff line number Diff line
@@ -16,11 +16,13 @@
import asyncio
import logging

from blueberry.tests.topshim.lib.async_closable import AsyncClosable
from blueberry.tests.topshim.lib.async_closable import asyncSafeClose
from blueberry.tests.gd.cert.gd_device import GdHostOnlyDevice
from blueberry.tests.gd.cert.gd_device import MOBLY_CONTROLLER_CONFIG_NAME
from blueberry.tests.gd.cert.os_utils import get_gd_root
from blueberry.tests.topshim.lib.async_closable import AsyncClosable
from blueberry.tests.topshim.lib.async_closable import asyncSafeClose
from blueberry.tests.topshim.lib.gatt_client import GattClient
from blueberry.tests.topshim.lib.security_client import SecurityClient


def create(configs):
@@ -69,6 +71,7 @@ def get_instances_with_configs(configs):
class TopshimDevice(AsyncClosable):
    __adapter = None
    __gatt = None
    __security = None

    async def __le_rand_wrapper(self, async_fn):
        await async_fn
@@ -79,9 +82,10 @@ class TopshimDevice(AsyncClosable):
    def __post(self, async_fn):
        asyncio.get_event_loop().run_until_complete(self.__le_rand_wrapper(async_fn))

    def __init__(self, adapter, gatt):
    def __init__(self, adapter, grpc_port):
        self.__adapter = adapter
        self.__gatt = gatt
        self.__gatt = GattClient(port=grpc_port)
        self.__security = SecurityClient(adapter, port=grpc_port)

    async def close(self):
        """
@@ -89,6 +93,7 @@ class TopshimDevice(AsyncClosable):
        """
        await asyncSafeClose(self.__adapter)
        await asyncSafeClose(self.__gatt)
        await asyncSafeClose(self.__security)

    def enable_page_scan(self):
        self.__post(self.__adapter.enable_page_scan())
@@ -141,3 +146,9 @@ class TopshimDevice(AsyncClosable):

    def le_rand(self):
        self.__post(self.__adapter.le_rand())

    def remove_bonded_device(self, address):
        """
        Removes a bonding entry for a given address.
        """
        self.__post(self.__security.remove_bond(address))
Loading