Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e1860a6b authored by Myles Watson's avatar Myles Watson Committed by Gerrit Code Review
Browse files

Merge changes I53b512b1,I1be0028c,Iabd6b355,I7d3ccb15

* changes:
  CertSecurity: Add OOB related interfaces
  PySecurity: Add enable_secure_connections interface
  HciCaptures: Add OOB Related captures
  HciMatchers: Add OOB Related events.
parents 3a7b9c01 ab27a239
Loading
Loading
Loading
Loading
+20 −0
Original line number Diff line number Diff line
@@ -63,6 +63,20 @@ class HalCaptures(object):

class HciCaptures(object):

    @staticmethod
    def ReadLocalOobDataCompleteCapture():
        return Capture(
            HciMatchers.CommandComplete(hci_packets.OpCode.READ_LOCAL_OOB_DATA),
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.event, hci_packets.OpCode.READ_LOCAL_OOB_DATA)
        )

    @staticmethod
    def ReadLocalOobExtendedDataCompleteCapture():
        return Capture(
            HciMatchers.CommandComplete(hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA),
            lambda packet: HciMatchers.ExtractMatchingCommandComplete(packet.event, hci_packets.OpCode.READ_LOCAL_OOB_EXTENDED_DATA)
        )

    @staticmethod
    def ReadBdAddrCompleteCapture():
        return Capture(
@@ -95,6 +109,12 @@ class HciCaptures(object):
        return Capture(HciMatchers.LeConnectionComplete(),
                       lambda packet: HciMatchers.ExtractLeConnectionComplete(packet.event))

    @staticmethod
    def SimplePairingCompleteCapture():
        return Capture(HciMatchers.EventWithCode(hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE),
            lambda packet: hci_packets.SimplePairingCompleteView(
                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE)))


class L2capCaptures(object):

+10 −0
Original line number Diff line number Diff line
@@ -15,6 +15,8 @@
#   limitations under the License.

import bluetooth_packets_python3 as bt_packets
import logging

from bluetooth_packets_python3 import hci_packets
from bluetooth_packets_python3.hci_packets import EventCode
from bluetooth_packets_python3 import l2cap_packets
@@ -109,6 +111,10 @@ class HciMatchers(object):
        return hci_packets.LeEnhancedConnectionCompleteView(
            HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE))

    @staticmethod
    def LogEventCode():
        return lambda event: logging.info("Received event: %x" % hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(event.event))).GetEventCode())

    @staticmethod
    def LinkKeyRequest():
        return lambda event: HciMatchers.EventWithCode(EventCode.LINK_KEY_REQUEST)
@@ -153,6 +159,10 @@ class HciMatchers(object):
    def DisconnectionComplete():
        return lambda event: HciMatchers.EventWithCode(EventCode.DISCONNECTION_COMPLETE)

    @staticmethod
    def RemoteOobDataRequest():
        return lambda event: HciMatchers.EventWithCode(EventCode.REMOTE_OOB_DATA_REQUEST)


class NeighborMatchers(object):

+6 −1
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@ from hci.facade import facade_pb2 as hci_facade

from security.facade_pb2 import AuthenticationRequirements
from security.facade_pb2 import AuthenticationRequirementsMessage
from security.facade_pb2 import BondMsgType
from security.facade_pb2 import SecurityPolicyMessage
from security.facade_pb2 import IoCapabilities
from security.facade_pb2 import IoCapabilityMessage
@@ -129,6 +130,9 @@ class PySecurity(Closable):
        """
        pass

    def enable_secure_connections(self):
        pass

    def accept_pairing(self, cert_address, reply_boolean):
        """
            Here we pass, but in cert we perform pairing flow tasks.
@@ -169,7 +173,8 @@ class PySecurity(Closable):
        """
        logging.debug("DUT: Waiting for Bond Event")
        assertThat(self._bond_event_stream).emits(
            lambda event: event.message_type == expected_bond_event or logging.info("DUT: %s" % event.message_type))
            lambda event: event.message_type == expected_bond_event or logging.info("DUT: Actual Bond Event: %s" % event.message_type)
        )

    def wait_for_enforce_security_event(self, expected_enforce_security_event):
        """
+86 −6
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
import logging

from bluetooth_packets_python3 import hci_packets
from cert.captures import HciCaptures
from cert.closable import safeClose
from cert.event_stream import EventStream
from cert.matchers import HciMatchers
@@ -68,8 +69,9 @@ class CertSecurity(PySecurity):

    _hci_event_stream = None
    _io_caps = hci_packets.IoCapability.DISPLAY_ONLY
    _oob_data = hci_packets.OobDataPresent.NOT_PRESENT
    _remote_oob_data = None
    _auth_reqs = hci_packets.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION
    _secure_connections_enabled = False

    _hci = None

@@ -129,12 +131,76 @@ class CertSecurity(PySecurity):
            auth_reqs, "ERROR"))
        self._auth_reqs = self._auth_req_lookup.get(auth_reqs, hci_packets.AuthenticationRequirements.GENERAL_BONDING)

    def set_oob_data(self, data):
    def set_remote_oob_data(self, remote_data):
        """
            Set the Out-of-band data for SSP pairing
        """
        logging.info("Cert: setting OOB data present to '%s'" % data)
        self._oob_data = self._oob_present_lookup.get(data, hci_packets.OobDataPresent.NOT_PRESENT)
        logging.info("Cert: setting OOB data present to '%s'" % remote_data)
        self._remote_oob_data = remote_data

    def get_oob_data_from_controller(self, pb_oob_data_type):
        """
            Get the Out-of-band data for SSP pairing

            :param pb_oob_data_type: Type of data needed
            :return: a tuple of bytes (192c,192r,256c,256r) with increasing security; bytes may be all 0s depending on pb_oob_data_type value

        """

        oob_data_type = self._oob_present_lookup[pb_oob_data_type]

        if (oob_data_type == hci_packets.OobDataPresent.NOT_PRESENT):
            logging.warn("No data present, no need to call get_oob_data")
            return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], [0 for i in range(0, 16)],
                    [0 for i in range(0, 16)])

        logging.info("Cert: Requesting OOB data")
        if oob_data_type == hci_packets.OobDataPresent.P_192_PRESENT:
            # If host and controller supports secure connections we always used ReadLocalOobExtendedDataRequest
            if self._secure_connections_enabled:
                logging.info("Cert: Requesting P192 Data; secure connections")
                complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture()
                self._enqueue_hci_command(hci_packets.ReadLocalOobExtendedDataBuilder(), True)
                logging.info("Cert: Waiting for OOB response from controller")
                assertThat(self._hci_event_stream).emits(complete_capture)
                command_complete = complete_capture.get()
                complete = hci_packets.ReadLocalOobExtendedDataCompleteView(command_complete)
                return (list(complete.GetC192()), list(complete.GetR192()), [0 for i in range(0, 16)],
                        [0 for i in range(0, 16)])
            # else we use ReadLocalDataRequest
            else:
                logging.info("Cert: Requesting P192 Data; no secure connections")
                complete_capture = HciCaptures.ReadLocalOobDataCompleteCapture()
                self._enqueue_hci_command(hci_packets.ReadLocalOobDataBuilder(), True)
                logging.info("Cert: Waiting for OOB response from controller")
                assertThat(self._hci_event_stream).emits(complete_capture)
                command_complete = complete_capture.get()
                complete = hci_packets.ReadLocalOobDataCompleteView(command_complete)
                return (list(complete.GetC()), list(complete.GetR()), [0 for i in range(0, 16)],
                        [0 for i in range(0, 16)])

        # Must be secure connection compatible to use these
        elif oob_data_type == hci_packets.OobDataPresent.P_256_PRESENT:
            logging.info("Cert: Requesting P256 Extended Data; secure connections")
            complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture()
            self._enqueue_hci_command(hci_packets.ReadLocalOobExtendedDataBuilder(), True)
            logging.info("Cert: Waiting for OOB response from controller")
            assertThat(self._hci_event_stream).emits(complete_capture)
            command_complete = complete_capture.get()
            complete = hci_packets.ReadLocalOobExtendedDataCompleteView(command_complete)
            return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], list(complete.GetC256()),
                    list(complete.GetR256()))

        else:  # Both
            logging.info("Cert: Requesting P192 AND P256 Extended Data; secure connections")
            complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture()
            self._enqueue_hci_command(hci_packets.ReadLocalOobExtendedDataBuilder(), True)
            logging.info("Cert: Waiting for OOB response from controller")
            assertThat(self._hci_event_stream).emits(complete_capture)
            command_complete = complete_capture.get()
            complete = hci_packets.ReadLocalOobExtendedDataCompleteView(command_complete)
            return (list(complete.GetC192()), list(complete.GetR192()), list(complete.GetC256()),
                    list(complete.GetR256()))

    def send_ui_callback(self, address, callback_type, b, uid):
        """
@@ -150,7 +216,20 @@ class CertSecurity(PySecurity):
        logging.info("Cert: Sending WRITE_SIMPLE_PAIRING_MODE [True]")
        self._enqueue_hci_command(hci_packets.WriteSimplePairingModeBuilder(hci_packets.Enable.ENABLED), True)
        logging.info("Cert: Waiting for controller response")
        assertThat(self._hci_event_stream).emits(lambda msg: b'\x0e\x04\x01\x56\x0c' in msg.event)
        assertThat(self._hci_event_stream).emits(
            HciMatchers.CommandComplete(hci_packets.OpCode.WRITE_SIMPLE_PAIRING_MODE))

    def enable_secure_connections(self):
        """
            This is called when you want to enable secure connections support
        """
        logging.info("Cert: Sending WRITE_SECURE_CONNECTIONS_HOST_SUPPORT [True]")
        self._enqueue_hci_command(
            hci_packets.WriteSecureConnectionsHostSupportBuilder(hci_packets.Enable.ENABLED), True)
        logging.info("Cert: Waiting for controller response")
        assertThat(self._hci_event_stream).emits(
            HciMatchers.CommandComplete(hci_packets.OpCode.WRITE_SECURE_CONNECTIONS_HOST_SUPPORT))
        self._secure_connections_enabled = True

    def accept_pairing(self, dut_address, reply_boolean):
        """
@@ -165,7 +244,8 @@ class CertSecurity(PySecurity):
        logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY")
        self._enqueue_hci_command(
            hci_packets.IoCapabilityRequestReplyBuilder(
                dut_address.decode('utf8'), self._io_caps, self._oob_data, self._auth_reqs), True)
                dut_address.decode('utf8'), self._io_caps, hci_packets.OobDataPresent.NOT_PRESENT, self._auth_reqs),
            True)
        logging.info("Cert: Waiting for USER_CONFIRMATION_REQUEST")
        assertThat(self._hci_event_stream).emits(HciMatchers.UserConfirmationRequest())
        logging.info("Cert: Sending Simulated User Response '%s'" % reply_boolean)
+88 −6
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ from bluetooth_packets_python3 import hci_packets
from cert.event_stream import EventStream
from cert.gd_base_test import GdBaseTestClass
from cert.py_security import PySecurity
from cert.truth import assertThat
from facade import common_pb2 as common
from google.protobuf import empty_pb2 as empty_proto
from hci.facade import controller_facade_pb2 as controller_facade
@@ -147,10 +148,8 @@ class SecurityTest(GdBaseTestClass):
        # Arrange
        self.dut_security.set_io_capabilities(IoCapabilities.NO_INPUT_NO_OUTPUT)
        self.dut_security.set_authentication_requirements(AuthenticationRequirements.DEDICATED_BONDING)
        self.dut_security.set_oob_data(OobDataPresent.NOT_PRESENT)
        self.cert_security.set_io_capabilities(IoCapabilities.NO_INPUT_NO_OUTPUT)
        self.cert_security.set_authentication_requirements(AuthenticationRequirements.DEDICATED_BONDING)
        self.cert_security.set_oob_data(OobDataPresent.NOT_PRESENT)

        # Act and Assert
        self._run_ssp_numeric_comparison(
@@ -174,10 +173,8 @@ class SecurityTest(GdBaseTestClass):
        # Arrange
        self.dut_security.set_io_capabilities(IoCapabilities.NO_INPUT_NO_OUTPUT)
        self.dut_security.set_authentication_requirements(AuthenticationRequirements.DEDICATED_BONDING)
        self.dut_security.set_oob_data(OobDataPresent.NOT_PRESENT)
        self.cert_security.set_io_capabilities(IoCapabilities.NO_INPUT_NO_OUTPUT)
        self.cert_security.set_authentication_requirements(AuthenticationRequirements.DEDICATED_BONDING)
        self.cert_security.set_oob_data(OobDataPresent.NOT_PRESENT)

        # Act and Assert
        self._run_ssp_numeric_comparison(
@@ -191,6 +188,9 @@ class SecurityTest(GdBaseTestClass):
            expected_resp_bond_event=None)

        self.dut_security.remove_bond(self.cert.address, common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
        self.cert_security.remove_bond(self.cert.address, common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
        self.dut_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)
        self.cert_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)

        self.dut_security.wait_for_disconnect_event()
        self.cert_security.wait_for_disconnect_event()
@@ -206,6 +206,14 @@ class SecurityTest(GdBaseTestClass):
            expected_init_bond_event=BondMsgType.DEVICE_BONDED,
            expected_resp_bond_event=None)

        self.dut_security.remove_bond(self.cert.address, common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
        self.cert_security.remove_bond(self.cert.address, common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
        self.dut_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)
        self.cert_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)

        self.dut_security.wait_for_disconnect_event()
        self.cert_security.wait_for_disconnect_event()

    def test_successful_dut_initiated_ssp_numeric_comparison(self):
        test_count = len(self.io_capabilities) * len(self.auth_reqs) * len(self.oob_present) * len(
            self.io_capabilities) * len(self.auth_reqs) * len(self.oob_present)
@@ -232,10 +240,8 @@ class SecurityTest(GdBaseTestClass):
                                logging.info("")
                                self.dut_security.set_io_capabilities(dut_io_capability)
                                self.dut_security.set_authentication_requirements(dut_auth_reqs)
                                self.dut_security.set_oob_data(dut_oob_present)
                                self.cert_security.set_io_capabilities(cert_io_capability)
                                self.cert_security.set_authentication_requirements(cert_auth_reqs)
                                self.cert_security.set_oob_data(cert_oob_present)
                                init_ui_response = True
                                resp_ui_response = True
                                expected_init_ui_event = None  # None is auto accept
@@ -302,3 +308,79 @@ class SecurityTest(GdBaseTestClass):

                                self.dut_security.wait_for_disconnect_event()
                                self.cert_security.wait_for_disconnect_event()

    def test_enable_secure_simple_pairing(self):
        self.dut_security.enable_secure_simple_pairing()
        self.cert_security.enable_secure_simple_pairing()

    def test_enable_secure_connections(self):
        self.dut_security.enable_secure_simple_pairing()
        self.cert_security.enable_secure_simple_pairing()
        self.dut_security.enable_secure_connections()
        self.cert_security.enable_secure_connections()

    def test_get_oob_data_from_controller_not_present(self):
        oob_data = self.cert_security.get_oob_data_from_controller(OobDataPresent.NOT_PRESENT)
        assertThat(len(oob_data)).isEqualTo(4)
        has192C = not all([i == 0 for i in oob_data[0]])
        has192R = not all([i == 0 for i in oob_data[1]])
        has256C = not all([i == 0 for i in oob_data[2]])
        has256R = not all([i == 0 for i in oob_data[3]])
        assertThat(has192C).isFalse()
        assertThat(has192R).isFalse()
        assertThat(has256C).isFalse()
        assertThat(has256R).isFalse()

    def test_get_oob_data_from_controller_p192_present_no_secure_connections(self):
        oob_data = self.cert_security.get_oob_data_from_controller(OobDataPresent.P192_PRESENT)
        assertThat(len(oob_data)).isEqualTo(4)
        has192C = not all([i == 0 for i in oob_data[0]])
        has192R = not all([i == 0 for i in oob_data[1]])
        has256C = not all([i == 0 for i in oob_data[2]])
        has256R = not all([i == 0 for i in oob_data[3]])
        assertThat(has192C).isTrue()
        assertThat(has192R).isTrue()
        assertThat(has256C).isFalse()
        assertThat(has256R).isFalse()

    def test_get_oob_data_from_controller_p192_present(self):
        self.cert_security.enable_secure_simple_pairing()
        self.cert_security.enable_secure_connections()
        oob_data = self.cert_security.get_oob_data_from_controller(OobDataPresent.P192_PRESENT)
        assertThat(len(oob_data)).isEqualTo(4)
        has192C = not all([i == 0 for i in oob_data[0]])
        has192R = not all([i == 0 for i in oob_data[1]])
        has256C = not all([i == 0 for i in oob_data[2]])
        has256R = not all([i == 0 for i in oob_data[3]])
        assertThat(has192C).isTrue()
        assertThat(has192R).isTrue()
        assertThat(has256C).isFalse()
        assertThat(has256R).isFalse()

    def test_get_oob_data_from_controller_p256_present(self):
        self.cert_security.enable_secure_simple_pairing()
        self.cert_security.enable_secure_connections()
        oob_data = self.cert_security.get_oob_data_from_controller(OobDataPresent.P256_PRESENT)
        assertThat(len(oob_data)).isEqualTo(4)
        has192C = not all([i == 0 for i in oob_data[0]])
        has192R = not all([i == 0 for i in oob_data[1]])
        has256C = not all([i == 0 for i in oob_data[2]])
        has256R = not all([i == 0 for i in oob_data[3]])
        assertThat(has192C).isFalse()
        assertThat(has192R).isFalse()
        assertThat(has256C).isTrue()
        assertThat(has256R).isTrue()

    def test_get_oob_data_from_controller_p192_and_p256_present(self):
        self.cert_security.enable_secure_simple_pairing()
        self.cert_security.enable_secure_connections()
        oob_data = self.cert_security.get_oob_data_from_controller(OobDataPresent.P192_AND_256_PRESENT)
        assertThat(len(oob_data)).isEqualTo(4)
        has192C = not all([i == 0 for i in oob_data[0]])
        has192R = not all([i == 0 for i in oob_data[1]])
        has256C = not all([i == 0 for i in oob_data[2]])
        has256R = not all([i == 0 for i in oob_data[3]])
        assertThat(has192C).isTrue()
        assertThat(has192R).isTrue()
        assertThat(has256C).isTrue()
        assertThat(has256R).isTrue()