Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 12065e7f authored by Myles Watson's avatar Myles Watson
Browse files

Security: Add a keyboard bonding test

Bug: 162984360
Tag: #gd-refactor
Test: cert/run --host SecurityTest
Change-Id: I98b7e16fb8790b001b79e9c802e45b9f180bb203
parent 5b4f65e3
Loading
Loading
Loading
Loading
+20 −2
Original line number Diff line number Diff line
@@ -34,6 +34,7 @@ from security.facade_pb2 import IoCapabilityMessage
from security.facade_pb2 import OobDataBondMessage
from security.facade_pb2 import OobDataMessage
from security.facade_pb2 import OobDataPresentMessage
from security.facade_pb2 import UiMsgType
from security.facade_pb2 import UiCallbackMsg
from security.facade_pb2 import UiCallbackType

@@ -46,7 +47,7 @@ class PySecurity(Closable):
    _io_capabilities_name_lookup = {
        IoCapabilities.DISPLAY_ONLY: "DISPLAY_ONLY",
        IoCapabilities.DISPLAY_YES_NO_IO_CAP: "DISPLAY_YES_NO_IO_CAP",
        #IoCapabilities.KEYBOARD_ONLY:"KEYBOARD_ONLY",
        IoCapabilities.KEYBOARD_ONLY: "KEYBOARD_ONLY",
        IoCapabilities.NO_INPUT_NO_OUTPUT: "NO_INPUT_NO_OUTPUT",
    }

@@ -157,7 +158,7 @@ class PySecurity(Closable):
        """
        pass

    def accept_oob_pairing(self, cert_address, reply_boolean, p192_data, p256_data):
    def accept_oob_pairing(self, cert_address, reply_boolean):
        """
            Here we pass, but in cert we perform pairing flow tasks.
            This was added here in order to be more dynamic, but the stack
@@ -165,6 +166,23 @@ class PySecurity(Closable):
        """
        pass

    def wait_for_passkey(self, cert_address):
        """
            Respond to the UI event
        """
        passkey = -1

        def get_unique_id(event):
            if event.message_type == UiMsgType.DISPLAY_PASSKEY:
                nonlocal passkey
                passkey = event.numeric_value
                return True
            return False

        logging.debug("DUT: Waiting for expected UI event")
        assertThat(self._ui_event_stream).emits(get_unique_id)
        return passkey

    def on_user_input(self, cert_address, reply_boolean, expected_ui_event):
        """
            Respond to the UI event
+48 −16
Original line number Diff line number Diff line
@@ -193,6 +193,43 @@ class CertSecurity(PySecurity):
            return (list(complete.GetC192()), list(complete.GetR192()), list(complete.GetC256()),
                    list(complete.GetR256()))

    def input_passkey(self, address, passkey):
        """
            Pretend to answer the pairing dialog as a user
        """
        logging.info("Cert: Waiting for PASSKEY request")
        assertThat(self._hci_event_stream).emits(HciMatchers.EventWithCode(hci_packets.EventCode.USER_PASSKEY_REQUEST))
        logging.info("Cert: Send user input passkey %d for %s" % (passkey, address))
        peer = address.decode('utf-8')
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.ENTRY_STARTED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.CLEARED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ERASED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
        self._enqueue_hci_command(
            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.ENTRY_COMPLETED),
            True)
        self._enqueue_hci_command(hci_packets.UserPasskeyRequestReplyBuilder(peer, passkey), True)

    def send_ui_callback(self, address, callback_type, b, uid):
        """
            Pretend to answer the pairing dailog as a user
@@ -223,6 +260,15 @@ class CertSecurity(PySecurity):
        # TODO(optedoblivion): Figure this out and remove (see classic_pairing_handler.cc)
        #self._secure_connections_enabled = True

    def send_io_caps(self, address):
        logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST")
        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest())
        logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY")
        oob_data_present = hci_packets.OobDataPresent.NOT_PRESENT
        self._enqueue_hci_command(
            hci_packets.IoCapabilityRequestReplyBuilder(
                address.decode('utf8'), self._io_caps, oob_data_present, self._auth_reqs), True)

    def accept_pairing(self, dut_address, reply_boolean):
        """
            Here we handle the pairing events at the HCI level
@@ -231,13 +277,7 @@ class CertSecurity(PySecurity):
        assertThat(self._hci_event_stream).emits(HciMatchers.LinkKeyRequest())
        logging.info("Cert: Sending LINK_KEY_REQUEST_NEGATIVE_REPLY")
        self._enqueue_hci_command(hci_packets.LinkKeyRequestNegativeReplyBuilder(dut_address.decode('utf8')), True)
        logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST")
        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest())
        logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY")
        self._enqueue_hci_command(
            hci_packets.IoCapabilityRequestReplyBuilder(
                dut_address.decode('utf8'), self._io_caps, hci_packets.OobDataPresent.NOT_PRESENT, self._auth_reqs),
            True)
        self.send_io_caps(dut_address)
        logging.info("Cert: Waiting for USER_CONFIRMATION_REQUEST")
        assertThat(self._hci_event_stream).emits(HciMatchers.UserConfirmationRequest())
        logging.info("Cert: Sending Simulated User Response '%s'" % reply_boolean)
@@ -258,15 +298,7 @@ class CertSecurity(PySecurity):
    def accept_oob_pairing(self, dut_address):
        logging.info("Cert: Waiting for IO_CAPABILITY_RESPONSE")
        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityResponse())
        logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST")
        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest())
        logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY")
        oob_data_present = hci_packets.OobDataPresent.NOT_PRESENT
        self._enqueue_hci_command(
            hci_packets.IoCapabilityRequestReplyBuilder(
                dut_address.decode('utf8'), self._io_caps, oob_data_present, self._auth_reqs), True)
        assertThat(self._hci_event_stream).emits(
            HciMatchers.CommandComplete(hci_packets.OpCode.IO_CAPABILITY_REQUEST_REPLY))
        self.send_io_caps(dut_address)
        logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE")
        ssp_complete_capture = HciCaptures.SimplePairingCompleteCapture()
        assertThat(self._hci_event_stream).emits(ssp_complete_capture)
+42 −0
Original line number Diff line number Diff line
@@ -158,6 +158,19 @@ class SecurityTest(GdBaseTestClass):
        initiator.wait_for_bond_event(expected_init_bond_event)
        responder.wait_for_bond_event(expected_resp_bond_event)

    def _run_ssp_passkey(self, initiator, responder, expected_init_bond_event, expected_resp_bond_event):
        initiator.enable_secure_simple_pairing()
        responder.enable_secure_simple_pairing()
        initiator.create_bond(responder.get_address(), common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
        self._verify_ssp_passkey(initiator, responder, expected_init_bond_event, expected_resp_bond_event)

    def _verify_ssp_passkey(self, initiator, responder, expected_init_bond_event, expected_resp_bond_event):
        responder.send_io_caps(initiator.get_address())
        passkey = initiator.wait_for_passkey(responder.get_address())
        responder.input_passkey(initiator.get_address(), passkey)
        initiator.wait_for_bond_event(expected_init_bond_event)
        responder.wait_for_bond_event(expected_resp_bond_event)

    def test_setup_teardown(self):
        """
            Make sure our setup and teardown is sane
@@ -453,3 +466,32 @@ class SecurityTest(GdBaseTestClass):
        self.cert_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)
        self.dut_security.wait_for_disconnect_event()
        self.cert_security.wait_for_disconnect_event()

    def test_successful_dut_initiated_ssp_keyboard(self):
        dut_io_capability = IoCapabilities.DISPLAY_YES_NO_IO_CAP
        dut_auth_reqs = AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION
        dut_oob_present = OobDataPresent.NOT_PRESENT
        cert_io_capability = IoCapabilities.KEYBOARD_ONLY
        cert_auth_reqs = AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION
        cert_oob_present = OobDataPresent.NOT_PRESENT
        self.dut_security.set_io_capabilities(dut_io_capability)
        self.dut_security.set_authentication_requirements(dut_auth_reqs)
        self.cert_security.set_io_capabilities(cert_io_capability)
        self.cert_security.set_authentication_requirements(cert_auth_reqs)

        self._run_ssp_passkey(
            initiator=self.dut_security,
            responder=self.cert_security,
            expected_init_bond_event=BondMsgType.DEVICE_BONDED,
            expected_resp_bond_event=BondMsgType.DEVICE_BONDED)

        self.dut_security.remove_bond(self.cert_security.get_address(),
                                      common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)
        self.cert_security.remove_bond(self.dut_security.get_address(),
                                       common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)

        self.dut_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)
        self.cert_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED)

        self.dut_security.wait_for_disconnect_event()
        self.cert_security.wait_for_disconnect_event()
+7 −5
Original line number Diff line number Diff line
@@ -153,6 +153,7 @@ void ClassicPairingHandler::OnReceive(hci::PinCodeRequestView packet) {
  ASSERT(packet.IsValid());
  LOG_INFO("Received: %s", hci::EventCodeText(packet.GetEventCode()).c_str());
  ASSERT_LOG(GetRecord()->GetPseudoAddress()->GetAddress() == packet.GetBdAddr(), "Address mismatch");
  NotifyUiDisplayPasskeyInput();
}

void ClassicPairingHandler::OnReceive(hci::LinkKeyRequestView packet) {
@@ -349,6 +350,7 @@ void ClassicPairingHandler::OnReceive(hci::UserPasskeyNotificationView packet) {
  ASSERT(packet.IsValid());
  LOG_INFO("Received: %s", hci::EventCodeText(packet.GetEventCode()).c_str());
  ASSERT_LOG(GetRecord()->GetPseudoAddress()->GetAddress() == packet.GetBdAddr(), "Address mismatch");
  NotifyUiDisplayPasskey(packet.GetPasskey());
}

void ClassicPairingHandler::OnReceive(hci::KeypressNotificationView packet) {
@@ -357,19 +359,19 @@ void ClassicPairingHandler::OnReceive(hci::KeypressNotificationView packet) {
  LOG_INFO("Notification Type: %s", hci::KeypressNotificationTypeText(packet.GetNotificationType()).c_str());
  switch (packet.GetNotificationType()) {
    case hci::KeypressNotificationType::ENTRY_STARTED:
      // Get ready to keep track of key input
      // Tell the UI to highlight the first digit
      break;
    case hci::KeypressNotificationType::DIGIT_ENTERED:
      // Append digit to key
      // Tell the UI to move one digit to the right
      break;
    case hci::KeypressNotificationType::DIGIT_ERASED:
      // erase last digit from key
      // Tell the UI to move back one digit
      break;
    case hci::KeypressNotificationType::CLEARED:
      // erase all digits from key
      // Tell the UI to highlight the first digit again
      break;
    case hci::KeypressNotificationType::ENTRY_COMPLETED:
      // set full key to security record
      // Tell the UI to hide the dialog
      break;
  }
}