Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 34965b12 authored by Myles Watson's avatar Myles Watson Committed by Gerrit Code Review
Browse files

Merge changes from topic "gd_oob"

* changes:
  Security: Add a keyboard bonding test
  RootCanal: Add Passkey and PasskeyFailed
  HciMatchers: enforce argument requirement
  Rename GetOutOfBandData -> GetLeOutOfBandData
  Facade OobDataMessage: Rename members to be generic.
  SecurityTest: Create bond Out of Band
parents a6a51e22 12065e7f
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -31,7 +31,7 @@ from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionRespo
class HciMatchers(object):

    @staticmethod
    def CommandComplete(opcode=None):
    def CommandComplete(opcode):
        return lambda msg: HciMatchers._is_matching_command_complete(msg.payload, opcode)

    @staticmethod
+26 −6
Original line number Diff line number Diff line
@@ -31,7 +31,10 @@ from security.facade_pb2 import BondMsgType
from security.facade_pb2 import SecurityPolicyMessage
from security.facade_pb2 import IoCapabilities
from security.facade_pb2 import IoCapabilityMessage
from security.facade_pb2 import OobDataBondMessage
from security.facade_pb2 import OobDataMessage
from security.facade_pb2 import OobDataPresentMessage
from security.facade_pb2 import UiMsgType
from security.facade_pb2 import UiCallbackMsg
from security.facade_pb2 import UiCallbackType

@@ -44,7 +47,7 @@ class PySecurity(Closable):
    _io_capabilities_name_lookup = {
        IoCapabilities.DISPLAY_ONLY: "DISPLAY_ONLY",
        IoCapabilities.DISPLAY_YES_NO_IO_CAP: "DISPLAY_YES_NO_IO_CAP",
        #IoCapabilities.KEYBOARD_ONLY:"KEYBOARD_ONLY",
        IoCapabilities.KEYBOARD_ONLY: "KEYBOARD_ONLY",
        IoCapabilities.NO_INPUT_NO_OUTPUT: "NO_INPUT_NO_OUTPUT",
    }

@@ -91,13 +94,13 @@ class PySecurity(Closable):
                p192_data=OobDataMessage(
                    address=common.BluetoothAddressWithType(
                        address=common.BluetoothAddress(address=address), type=type),
                    le_sc_confirmation_value=bytes(bytearray(p192_oob_data[0])),
                    le_sc_random_value=bytes(bytearray(p192_oob_data[1]))),
                    confirmation_value=bytes(bytearray(p192_oob_data[0])),
                    random_value=bytes(bytearray(p192_oob_data[1]))),
                p256_data=OobDataMessage(
                    address=common.BluetoothAddressWithType(
                        address=common.BluetoothAddress(address=address), type=type),
                    le_sc_confirmation_value=bytes(bytearray(p256_oob_data[0])),
                    le_sc_random_value=bytes(bytearray(p256_oob_data[1])))))
                    confirmation_value=bytes(bytearray(p256_oob_data[0])),
                    random_value=bytes(bytearray(p256_oob_data[1])))))

    def remove_bond(self, address, type):
        """
@@ -155,7 +158,7 @@ class PySecurity(Closable):
        """
        pass

    def accept_oob_pairing(self, cert_address, reply_boolean, p192_data, p256_data):
    def accept_oob_pairing(self, cert_address, reply_boolean):
        """
            Here we pass, but in cert we perform pairing flow tasks.
            This was added here in order to be more dynamic, but the stack
@@ -163,6 +166,23 @@ class PySecurity(Closable):
        """
        pass

    def wait_for_passkey(self, cert_address):
        """
            Respond to the UI event
        """
        passkey = -1

        def get_unique_id(event):
            if event.message_type == UiMsgType.DISPLAY_PASSKEY:
                nonlocal passkey
                passkey = event.numeric_value
                return True
            return False

        logging.debug("DUT: Waiting for expected UI event")
        assertThat(self._ui_event_stream).emits(get_unique_id)
        return passkey

    def on_user_input(self, cert_address, reply_boolean, expected_ui_event):
        """
            Respond to the UI event
+50 −17
Original line number Diff line number Diff line
@@ -138,7 +138,6 @@ class CertSecurity(PySecurity):
            :return: a tuple of bytes (192c,192r,256c,256r) with increasing security; bytes may be all 0s depending on pb_oob_data_type value

        """

        oob_data_type = self._oob_present_lookup[pb_oob_data_type]

        if (oob_data_type == hci_packets.OobDataPresent.NOT_PRESENT):
@@ -194,6 +193,43 @@ class CertSecurity(PySecurity):
            return (list(complete.GetC192()), list(complete.GetR192()), list(complete.GetC256()),
                    list(complete.GetR256()))

    def input_passkey(self, address, passkey):
        """
            Pretend to answer the pairing dialog as a user
        """
        logging.info("Cert: Waiting for PASSKEY request")
        assertThat(self._hci_event_stream).emits(HciMatchers.EventWithCode(hci_packets.EventCode.USER_PASSKEY_REQUEST))
        logging.info("Cert: Send user input passkey %d for %s" % (passkey, address))
        peer = address.decode('utf-8')
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.ENTRY_STARTED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.CLEARED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ERASED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.ENTRY_COMPLETED),
            True)
        self._enqueue_hci_command(hci_packets.UserPasskeyRequestReplyBuilder(peer, passkey), True)

    def send_ui_callback(self, address, callback_type, b, uid):
        """
            Pretend to answer the pairing dailog as a user
@@ -221,7 +257,17 @@ class CertSecurity(PySecurity):
        logging.info("Cert: Waiting for controller response")
        assertThat(self._hci_event_stream).emits(
            HciMatchers.CommandComplete(hci_packets.OpCode.WRITE_SECURE_CONNECTIONS_HOST_SUPPORT))
        self._secure_connections_enabled = True
        # TODO(optedoblivion): Figure this out and remove (see classic_pairing_handler.cc)
        #self._secure_connections_enabled = True

    def send_io_caps(self, address):
        logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST")
        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest())
        logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY")
        oob_data_present = hci_packets.OobDataPresent.NOT_PRESENT
        self._enqueue_hci_command(
            hci_packets.IoCapabilityRequestReplyBuilder(
                address.decode('utf8'), self._io_caps, oob_data_present, self._auth_reqs), True)

    def accept_pairing(self, dut_address, reply_boolean):
        """
@@ -231,13 +277,7 @@ class CertSecurity(PySecurity):
        assertThat(self._hci_event_stream).emits(HciMatchers.LinkKeyRequest())
        logging.info("Cert: Sending LINK_KEY_REQUEST_NEGATIVE_REPLY")
        self._enqueue_hci_command(hci_packets.LinkKeyRequestNegativeReplyBuilder(dut_address.decode('utf8')), True)
        logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST")
        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest())
        logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY")
        self._enqueue_hci_command(
            hci_packets.IoCapabilityRequestReplyBuilder(
                dut_address.decode('utf8'), self._io_caps, hci_packets.OobDataPresent.NOT_PRESENT, self._auth_reqs),
            True)
        self.send_io_caps(dut_address)
        logging.info("Cert: Waiting for USER_CONFIRMATION_REQUEST")
        assertThat(self._hci_event_stream).emits(HciMatchers.UserConfirmationRequest())
        logging.info("Cert: Sending Simulated User Response '%s'" % reply_boolean)
@@ -258,14 +298,7 @@ class CertSecurity(PySecurity):
    def accept_oob_pairing(self, dut_address):
        logging.info("Cert: Waiting for IO_CAPABILITY_RESPONSE")
        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityResponse())
        logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST")
        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest())
        logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY")
        oob_data_present = hci_packets.OobDataPresent.NOT_PRESENT
        self._enqueue_hci_command(
            hci_packets.IoCapabilityRequestReplyBuilder(
                dut_address.decode('utf8'), self._io_caps, oob_data_present, self._auth_reqs), True)
        assertThat(self._hci_event_stream).emits(HciMatchers.CommandComplete())
        self.send_io_caps(dut_address)
        logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE")
        ssp_complete_capture = HciCaptures.SimplePairingCompleteCapture()
        assertThat(self._hci_event_stream).emits(ssp_complete_capture)
+24 −24
Original line number Diff line number Diff line
@@ -831,20 +831,20 @@ class LeSecurityTest(GdBaseTestClass):
            self._prepare_cert_for_connection()

            if dut_oob_flag == LeOobDataFlag.PRESENT:
                oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty())
                oobdata = self.cert.security.GetLeOutOfBandData(empty_proto.Empty())
                self.dut.security.SetOutOfBandData(
                    OobDataMessage(
                        address=self.cert_address,
                        le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                        le_sc_random_value=oobdata.le_sc_random_value))
                        confirmation_value=oobdata.confirmation_value,
                        random_value=oobdata.random_value))

            if cert_oob_flag == LeOobDataFlag.PRESENT:
                oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty())
                oobdata = self.dut.security.GetLeOutOfBandData(empty_proto.Empty())
                self.cert.security.SetOutOfBandData(
                    OobDataMessage(
                        address=self.dut_address,
                        le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                        le_sc_random_value=oobdata.le_sc_random_value))
                        confirmation_value=oobdata.confirmation_value,
                        random_value=oobdata.random_value))

            self.dut.security.SetLeIoCapability(KEYBOARD_ONLY)
            self.dut.security.SetLeOobDataPresent(dut_oob_flag)
@@ -898,20 +898,20 @@ class LeSecurityTest(GdBaseTestClass):
            self._prepare_dut_for_connection()

            if dut_oob_flag == LeOobDataFlag.PRESENT:
                oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty())
                oobdata = self.cert.security.GetLeOutOfBandData(empty_proto.Empty())
                self.dut.security.SetOutOfBandData(
                    OobDataMessage(
                        address=self.cert_address,
                        le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                        le_sc_random_value=oobdata.le_sc_random_value))
                        confirmation_value=oobdata.confirmation_value,
                        random_value=oobdata.random_value))

            if cert_oob_flag == LeOobDataFlag.PRESENT:
                oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty())
                oobdata = self.dut.security.GetLeOutOfBandData(empty_proto.Empty())
                self.cert.security.SetOutOfBandData(
                    OobDataMessage(
                        address=self.dut_address,
                        le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                        le_sc_random_value=oobdata.le_sc_random_value))
                        confirmation_value=oobdata.confirmation_value,
                        random_value=oobdata.random_value))

            self.dut.security.SetLeIoCapability(KEYBOARD_ONLY)
            self.dut.security.SetLeOobDataPresent(dut_oob_flag)
@@ -965,19 +965,19 @@ class LeSecurityTest(GdBaseTestClass):

            self._prepare_dut_for_connection()

            oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty())
            oobdata = self.cert.security.GetLeOutOfBandData(empty_proto.Empty())
            self.dut.security.SetOutOfBandData(
                OobDataMessage(
                    address=self.cert_address,
                    le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                    le_sc_random_value=oobdata.le_sc_random_value))
                    confirmation_value=oobdata.confirmation_value,
                    random_value=oobdata.random_value))

            oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty())
            oobdata = self.dut.security.GetLeOutOfBandData(empty_proto.Empty())
            self.cert.security.SetOutOfBandData(
                OobDataMessage(
                    address=self.dut_address,
                    le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                    le_sc_random_value=oobdata.le_sc_random_value))
                    confirmation_value=oobdata.confirmation_value,
                    random_value=oobdata.random_value))

            self.dut.security.SetLeIoCapability(KEYBOARD_ONLY)
            self.dut.security.SetLeOobDataPresent(OOB_PRESENT)
@@ -1036,19 +1036,19 @@ class LeSecurityTest(GdBaseTestClass):

            self._prepare_cert_for_connection()

            oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty())
            oobdata = self.cert.security.GetLeOutOfBandData(empty_proto.Empty())
            self.dut.security.SetOutOfBandData(
                OobDataMessage(
                    address=self.cert_address,
                    le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                    le_sc_random_value=oobdata.le_sc_random_value))
                    confirmation_value=oobdata.confirmation_value,
                    random_value=oobdata.random_value))

            oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty())
            oobdata = self.dut.security.GetLeOutOfBandData(empty_proto.Empty())
            self.cert.security.SetOutOfBandData(
                OobDataMessage(
                    address=self.dut_address,
                    le_sc_confirmation_value=oobdata.le_sc_confirmation_value,
                    le_sc_random_value=oobdata.le_sc_random_value))
                    confirmation_value=oobdata.confirmation_value,
                    random_value=oobdata.random_value))

            self.dut.security.SetLeIoCapability(KEYBOARD_ONLY)
            self.dut.security.SetLeOobDataPresent(OOB_PRESENT)
+107 −0
Original line number Diff line number Diff line
@@ -137,6 +137,40 @@ class SecurityTest(GdBaseTestClass):
        initiator.wait_for_bond_event(expected_init_bond_event)
        responder.wait_for_bond_event(expected_resp_bond_event)

    def _run_ssp_oob(self, initiator, responder, init_ui_response, resp_ui_response, expected_init_ui_event,
                     expected_resp_ui_event, expected_init_bond_event, expected_resp_bond_event, p192_oob_data,
                     p256_oob_data):
        initiator.enable_secure_simple_pairing()
        responder.enable_secure_simple_pairing()
        initiator.create_bond_out_of_band(responder.get_address(),
                                          common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS, p192_oob_data,
                                          p256_oob_data)
        self._verify_ssp_oob(initiator, responder, init_ui_response, resp_ui_response, expected_init_ui_event,
                             expected_resp_ui_event, expected_init_bond_event, expected_resp_bond_event, p192_oob_data,
                             p256_oob_data)

    # Verifies the events for the numeric comparion test
    def _verify_ssp_oob(self, initiator, responder, init_ui_response, resp_ui_response, expected_init_ui_event,
                        expected_resp_ui_event, expected_init_bond_event, expected_resp_bond_event, p192_oob_data,
                        p256_oob_data):
        responder.accept_oob_pairing(initiator.get_address())
        initiator.on_user_input(responder.get_address(), init_ui_response, expected_init_ui_event)
        initiator.wait_for_bond_event(expected_init_bond_event)
        responder.wait_for_bond_event(expected_resp_bond_event)

    def _run_ssp_passkey(self, initiator, responder, expected_init_bond_event, expected_resp_bond_event):
        initiator.enable_secure_simple_pairing()
        responder.enable_secure_simple_pairing()
        initiator.create_bond(responder.get_address(), common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
        self._verify_ssp_passkey(initiator, responder, expected_init_bond_event, expected_resp_bond_event)

    def _verify_ssp_passkey(self, initiator, responder, expected_init_bond_event, expected_resp_bond_event):
        responder.send_io_caps(initiator.get_address())
        passkey = initiator.wait_for_passkey(responder.get_address())
        responder.input_passkey(initiator.get_address(), passkey)
        initiator.wait_for_bond_event(expected_init_bond_event)
        responder.wait_for_bond_event(expected_resp_bond_event)

    def test_setup_teardown(self):
        """
            Make sure our setup and teardown is sane
@@ -388,3 +422,76 @@ class SecurityTest(GdBaseTestClass):
        assertThat(has192R).isTrue()
        assertThat(has256C).isTrue()
        assertThat(has256R).isTrue()

    def test_successful_dut_initiated_ssp_oob(self):
        dut_io_capability = IoCapabilities.NO_INPUT_NO_OUTPUT
        cert_io_capability = IoCapabilities.NO_INPUT_NO_OUTPUT
        dut_auth_reqs = AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION
        cert_auth_reqs = AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION
        cert_oob_present = OobDataPresent.P192_PRESENT
        self.dut_security.enable_secure_simple_pairing()
        self.dut_security.enable_secure_connections()
        self.cert_security.enable_secure_simple_pairing()
        self.cert_security.enable_secure_connections()
        self.dut_security.set_io_capabilities(dut_io_capability)
        self.dut_security.set_authentication_requirements(dut_auth_reqs)
        self.cert_security.set_io_capabilities(cert_io_capability)
        self.cert_security.set_authentication_requirements(cert_auth_reqs)
        init_ui_response = True
        resp_ui_response = True
        expected_init_ui_event = None  # None is auto accept
        expected_resp_ui_event = None  # None is auto accept
        expected_init_bond_event = BondMsgType.DEVICE_BONDED
        expected_resp_bond_event = None
        # get_oob_data returns a tuple of bytes (p192c,p192r,p256c,p256r)
        local_oob_data = self.cert_security.get_oob_data_from_controller(cert_oob_present)
        p192_oob_data = local_oob_data[0:2]
        p256_oob_data = local_oob_data[2:4]
        self._run_ssp_oob(
            initiator=self.dut_security,
            responder=self.cert_security,
            init_ui_response=init_ui_response,
            resp_ui_response=resp_ui_response,
            expected_init_ui_event=expected_init_ui_event,
            expected_resp_ui_event=expected_resp_ui_event,
            expected_init_bond_event=expected_init_bond_event,
            expected_resp_bond_event=expected_resp_bond_event,
            p192_oob_data=p192_oob_data,
            p256_oob_data=p256_oob_data)
        self.dut_security.remove_bond(self.cert_security.get_address(),
                                      common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
        self.cert_security.remove_bond(self.dut_security.get_address(),
                                       common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
        self.dut_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)
        self.cert_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)
        self.dut_security.wait_for_disconnect_event()
        self.cert_security.wait_for_disconnect_event()

    def test_successful_dut_initiated_ssp_keyboard(self):
        dut_io_capability = IoCapabilities.DISPLAY_YES_NO_IO_CAP
        dut_auth_reqs = AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION
        dut_oob_present = OobDataPresent.NOT_PRESENT
        cert_io_capability = IoCapabilities.KEYBOARD_ONLY
        cert_auth_reqs = AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION
        cert_oob_present = OobDataPresent.NOT_PRESENT
        self.dut_security.set_io_capabilities(dut_io_capability)
        self.dut_security.set_authentication_requirements(dut_auth_reqs)
        self.cert_security.set_io_capabilities(cert_io_capability)
        self.cert_security.set_authentication_requirements(cert_auth_reqs)

        self._run_ssp_passkey(
            initiator=self.dut_security,
            responder=self.cert_security,
            expected_init_bond_event=BondMsgType.DEVICE_BONDED,
            expected_resp_bond_event=BondMsgType.DEVICE_BONDED)

        self.dut_security.remove_bond(self.cert_security.get_address(),
                                      common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
        self.cert_security.remove_bond(self.dut_security.get_address(),
                                       common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)

        self.dut_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)
        self.cert_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)

        self.dut_security.wait_for_disconnect_event()
        self.cert_security.wait_for_disconnect_event()
Loading