Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit cd57ec40 authored by Henri Chataing's avatar Henri Chataing Committed by Automerger Merge Worker
Browse files

Merge "gdcert: Remove unused helper PySecurity" into main am: 060a2523 am: 056631a6

parents 1a76f050 056631a6
Loading
Loading
Loading
Loading
+0 −14
Original line number Diff line number Diff line
@@ -17,7 +17,6 @@
from blueberry.tests.gd.cert.capture import Capture
from blueberry.tests.gd.cert.matchers import HciMatchers
from blueberry.tests.gd.cert.matchers import SecurityMatchers
from blueberry.facade.security.facade_pb2 import UiMsgType
import hci_packets as hci


@@ -93,16 +92,3 @@ class HciCaptures(object):
    def SimplePairingCompleteCapture():
        return Capture(HciMatchers.EventWithCode(hci.EventCode.SIMPLE_PAIRING_COMPLETE),
                       lambda packet: hci.Event.parse_all(packet.payload))


class SecurityCaptures(object):

    @staticmethod
    def DisplayPasskey():
        return Capture(SecurityMatchers.UiMsg(UiMsgType.DISPLAY_PASSKEY), SecurityCaptures._extract_passkey)

    @staticmethod
    def _extract_passkey(event):
        if event is None:
            return None
        return event.numeric_value
+0 −10
Original line number Diff line number Diff line
@@ -16,7 +16,6 @@

import logging

from blueberry.tests.gd.cert.captures import SecurityCaptures
from blueberry.tests.gd.cert.closable import Closable
from blueberry.tests.gd.cert.closable import safeClose
from blueberry.tests.gd.cert.event_stream import EventStream
@@ -56,15 +55,6 @@ class PyLeSecurity(Closable):
    def get_advertising_callback_event_stream(self):
        return self._advertising_callback_event_stream

    def wait_for_ui_event_passkey(self, timeout=timedelta(seconds=3)):
        display_passkey_capture = SecurityCaptures.DisplayPasskey()
        assertThat(self._ui_event_stream).emits(display_passkey_capture, timeout=timeout)
        return display_passkey_capture.get()

    def wait_device_disconnect(self, address):
        assertThat(self._helper_event_stream).emits(
            SecurityMatchers.HelperMsg(HelperMsgType.DEVICE_DISCONNECTED, address))

    def SetLeAuthRequirements(self, *args, **kwargs):
        return self._device.security.SetLeAuthRequirements(LeAuthRequirementsMessage(*args, **kwargs))

+0 −276
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2020 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import logging

from blueberry.tests.gd.cert.closable import Closable
from blueberry.tests.gd.cert.closable import safeClose
from blueberry.tests.gd.cert.event_stream import EventStream
from blueberry.tests.gd.cert.truth import assertThat
from blueberry.facade import common_pb2 as common
from google.protobuf import empty_pb2 as empty_proto

from blueberry.facade.security.facade_pb2 import AuthenticationRequirements
from blueberry.facade.security.facade_pb2 import AuthenticationRequirementsMessage
from blueberry.facade.security.facade_pb2 import SecurityPolicyMessage
from blueberry.facade.security.facade_pb2 import IoCapabilities
from blueberry.facade.security.facade_pb2 import IoCapabilityMessage
from blueberry.facade.security.facade_pb2 import OobDataBondMessage
from blueberry.facade.security.facade_pb2 import OobDataMessage
from blueberry.facade.security.facade_pb2 import UiMsgType
from blueberry.facade.security.facade_pb2 import UiCallbackMsg
from blueberry.facade.security.facade_pb2 import UiCallbackType


class PySecurity(Closable):
    """
        Abstraction for security tasks and GRPC calls
    """

    _io_capabilities_name_lookup = {
        IoCapabilities.DISPLAY_ONLY: "DISPLAY_ONLY",
        IoCapabilities.DISPLAY_YES_NO_IO_CAP: "DISPLAY_YES_NO_IO_CAP",
        IoCapabilities.KEYBOARD_ONLY: "KEYBOARD_ONLY",
        IoCapabilities.NO_INPUT_NO_OUTPUT: "NO_INPUT_NO_OUTPUT",
    }

    _auth_reqs_name_lookup = {
        AuthenticationRequirements.NO_BONDING: "NO_BONDING",
        AuthenticationRequirements.NO_BONDING_MITM_PROTECTION: "NO_BONDING_MITM_PROTECTION",
        AuthenticationRequirements.DEDICATED_BONDING: "DEDICATED_BONDING",
        AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION: "DEDICATED_BONDING_MITM_PROTECTION",
        AuthenticationRequirements.GENERAL_BONDING: "GENERAL_BONDING",
        AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION: "GENERAL_BONDING_MITM_PROTECTION",
    }

    _ui_event_stream = None
    _bond_event_stream = None
    _oob_data_event_stream = None

    def __init__(self, device):
        logging.info("DUT: Init")
        self._device = device
        self._device.wait_channel_ready()
        self._ui_event_stream = EventStream(self._device.security.FetchUiEvents(empty_proto.Empty()))
        self._bond_event_stream = EventStream(self._device.security.FetchBondEvents(empty_proto.Empty()))
        self._enforce_security_policy_stream = EventStream(
            self._device.security.FetchEnforceSecurityPolicyEvents(empty_proto.Empty()))
        self._disconnect_event_stream = EventStream(self._device.security.FetchDisconnectEvents(empty_proto.Empty()))
        self._oob_data_event_stream = EventStream(
            self._device.security.FetchGetOutOfBandDataEvents(empty_proto.Empty()))

    def create_bond(self, address, type):
        """
            Triggers stack under test to create bond
        """
        logging.info("DUT: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address)))
        self._device.security.CreateBond(
            common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type))

    def create_bond_out_of_band(self, address, type, p192_oob_data, p256_oob_data):
        """
            Triggers stack under test to create bond using Out of Band method
        """

        logging.info("DUT: Creating OOB bond to '%s' from '%s'" % (str(address), str(self._device.address)))

        self._device.security.CreateBondOutOfBand(
            OobDataBondMessage(
                address=common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type),
                p192_data=OobDataMessage(
                    address=common.BluetoothAddressWithType(
                        address=common.BluetoothAddress(address=address), type=type),
                    confirmation_value=bytes(bytearray(p192_oob_data[0])),
                    random_value=bytes(bytearray(p192_oob_data[1]))),
                p256_data=OobDataMessage(
                    address=common.BluetoothAddressWithType(
                        address=common.BluetoothAddress(address=address), type=type),
                    confirmation_value=bytes(bytearray(p256_oob_data[0])),
                    random_value=bytes(bytearray(p256_oob_data[1])))))

    def remove_bond(self, address, type):
        """
            Removes bond from stack under test
        """
        self._device.security.RemoveBond(
            common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type))

    def set_io_capabilities(self, io_capabilities):
        """
            Set the IO Capabilities used for the DUT
        """
        logging.info("DUT: setting IO Capabilities data to '%s'" % self._io_capabilities_name_lookup.get(
            io_capabilities, "ERROR"))
        self._device.security.SetIoCapability(IoCapabilityMessage(capability=io_capabilities))

    def set_authentication_requirements(self, auth_reqs):
        """
            Establish authentication requirements for the stack
        """
        logging.info("DUT: setting Authentication Requirements data to '%s'" % self._auth_reqs_name_lookup.get(
            auth_reqs, "ERROR"))
        self._device.security.SetAuthenticationRequirements(AuthenticationRequirementsMessage(requirement=auth_reqs))

    def __send_ui_callback(self, address, callback_type, b, uid, pin):
        """
            Send a callback from the UI as if the user pressed a button on the dialog
        """
        logging.info("DUT: Sending user input response uid: %d; response: %s" % (uid, b))
        self._device.security.SendUiCallback(
            UiCallbackMsg(
                message_type=callback_type,
                boolean=b,
                unique_id=uid,
                pin=bytes(pin),
                address=common.BluetoothAddressWithType(
                    address=common.BluetoothAddress(address=address),
                    type=common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)))

    def enable_secure_simple_pairing(self):
        """
            This is called when you want to enable SSP for testing
            Since the stack under test already enables it by default
            we do not need to do anything here for the time being
        """
        pass

    def enable_secure_connections(self):
        pass

    def accept_pairing(self, cert_address, reply_boolean):
        """
            Here we pass, but in cert we perform pairing flow tasks.
            This was added here in order to be more dynamic, but the stack
            under test will handle the pairing flow.
        """
        pass

    def accept_oob_pairing(self, cert_address, reply_boolean):
        """
            Here we pass, but in cert we perform pairing flow tasks.
            This was added here in order to be more dynamic, but the stack
            under test will handle the pairing flow.
        """
        pass

    def wait_for_passkey(self, cert_address):
        """
            Respond to the UI event
        """
        passkey = -1

        def get_passkey(event):
            if event.message_type == UiMsgType.DISPLAY_PASSKEY:
                nonlocal passkey
                passkey = event.numeric_value
                return True
            return False

        logging.info("DUT: Waiting for expected UI event")
        assertThat(self._ui_event_stream).emits(get_passkey)
        return passkey

    def input_pin(self, cert_address, pin):
        """
            Respond to the UI event
        """
        logging.info("DUT: Inputting pin code: %s" % str(pin))
        self.on_user_input(
            cert_address=cert_address, reply_boolean=True, expected_ui_event=UiMsgType.DISPLAY_PIN_ENTRY, pin=pin)

    def on_user_input(self, cert_address, reply_boolean, expected_ui_event, pin=[]):
        """
            Respond to the UI event
        """
        if expected_ui_event is None:
            return

        ui_id = -1

        def get_unique_id(event):
            if event.message_type == expected_ui_event:
                nonlocal ui_id
                ui_id = event.unique_id
                return True
            return False

        logging.info("DUT: Waiting for expected UI event")
        assertThat(self._ui_event_stream).emits(get_unique_id)
        callback_type = UiCallbackType.YES_NO if len(pin) == 0 else UiCallbackType.PIN
        self.__send_ui_callback(cert_address, callback_type, reply_boolean, ui_id, pin)

    def get_address(self):
        return self._device.address

    def wait_for_bond_event(self, expected_bond_event):
        """
            A bond event will be triggered once the bond process
            is complete.  For the DUT we need to wait for it,
            for Cert it isn't needed.
        """
        logging.info("DUT: Waiting for Bond Event: %s " % expected_bond_event)
        assertThat(self._bond_event_stream).emits(
            lambda event: event.message_type == expected_bond_event or logging.info("DUT: Actual Bond Event: %s" % event.message_type)
        )

    def wait_for_enforce_security_event(self, expected_enforce_security_event):
        """
            We expect a 'True' or 'False' from the enforce security call

            This interface will allow the caller to wait for a callback
            result from enforcing security policy over the facade.
        """
        logging.info("DUT: Waiting for enforce security event")
        assertThat(self._enforce_security_policy_stream).emits(
            lambda event: event.result == expected_enforce_security_event or logging.info(event.result))

    def wait_for_disconnect_event(self):
        """
            The Address is expected to be returned
        """
        logging.info("DUT: Waiting for Disconnect Event")
        assertThat(self._disconnect_event_stream).emits(lambda event: logging.info("event: %s" % event.address) or True)

    def enforce_security_policy(self, address, type, policy):
        """
            Call to enforce classic security policy
        """
        self._device.security.EnforceSecurityPolicy(
            SecurityPolicyMessage(
                address=common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type),
                policy=policy))

    def get_oob_data_from_controller(self, oob_data_present):
        self._device.security.GetOutOfBandData(empty_proto.Empty())
        oob_data = []

        def get_oob_data(event):
            nonlocal oob_data
            oob_data = [
                list(event.p192_data.confirmation_value),
                list(event.p192_data.random_value), [0 for i in range(0, 16)], [0 for i in range(0, 16)]
            ]
            return True

        assertThat(self._oob_data_event_stream).emits(get_oob_data)
        return oob_data

    def close(self):
        safeClose(self._ui_event_stream)
        safeClose(self._bond_event_stream)
        safeClose(self._enforce_security_policy_stream)
        safeClose(self._disconnect_event_stream)
        safeClose(self._oob_data_event_stream)