Loading system/gd/cert/matchers.py +5 −1 Original line number Diff line number Diff line Loading @@ -144,7 +144,7 @@ class HciMatchers(object): @staticmethod def LogEventCode(): return lambda event: logging.info("Received event: %x" % hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(event.event))).GetEventCode()) return lambda event: logging.info("Received event: %x" % hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(event.payload))).GetEventCode()) @staticmethod def LinkKeyRequest(): Loading Loading @@ -194,6 +194,10 @@ class HciMatchers(object): def RemoteOobDataRequest(): return lambda event: HciMatchers.EventWithCode(EventCode.REMOTE_OOB_DATA_REQUEST) @staticmethod def PinCodeRequest(): return lambda event: HciMatchers.EventWithCode(EventCode.PIN_CODE_REQUEST) @staticmethod def LoopbackOf(packet): return HciMatchers.Exactly(hci_packets.LoopbackCommandBuilder(packet)) Loading system/gd/cert/py_security.py +11 −10 Original line number Diff line number Diff line Loading @@ -65,7 +65,7 @@ class PySecurity(Closable): _oob_data_event_stream = None def __init__(self, device): logging.debug("DUT: Init") logging.info("DUT: Init") self._device = device self._device.wait_channel_ready() self._ui_event_stream = EventStream(self._device.security.FetchUiEvents(empty_proto.Empty())) Loading @@ -80,7 +80,7 @@ class PySecurity(Closable): """ Triggers stack under test to create bond """ logging.debug("DUT: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address))) logging.info("DUT: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address))) self._device.security.CreateBond( common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type)) Loading @@ -89,7 +89,7 @@ class PySecurity(Closable): Triggers stack under test to create bond using Out of Band method """ logging.debug("DUT: Creating OOB bond to '%s' from '%s'" % (str(address), str(self._device.address))) logging.info("DUT: Creating OOB bond to '%s' from '%s'" % (str(address), str(self._device.address))) self._device.security.CreateBondOutOfBand( OobDataBondMessage( Loading @@ -116,7 +116,7 @@ class PySecurity(Closable): """ Set the IO Capabilities used for the DUT """ logging.debug("DUT: setting IO Capabilities data to '%s'" % self._io_capabilities_name_lookup.get( logging.info("DUT: setting IO Capabilities data to '%s'" % self._io_capabilities_name_lookup.get( io_capabilities, "ERROR")) self._device.security.SetIoCapability(IoCapabilityMessage(capability=io_capabilities)) Loading @@ -124,7 +124,7 @@ class PySecurity(Closable): """ Establish authentication requirements for the stack """ logging.debug("DUT: setting Authentication Requirements data to '%s'" % self._auth_reqs_name_lookup.get( logging.info("DUT: setting Authentication Requirements data to '%s'" % self._auth_reqs_name_lookup.get( auth_reqs, "ERROR")) self._device.security.SetAuthenticationRequirements(AuthenticationRequirementsMessage(requirement=auth_reqs)) Loading @@ -132,7 +132,7 @@ class PySecurity(Closable): """ Send a callback from the UI as if the user pressed a button on the dialog """ logging.debug("DUT: Sending user input response uid: %d; response: %s" % (uid, b)) logging.info("DUT: Sending user input response uid: %d; response: %s" % (uid, b)) self._device.security.SendUiCallback( UiCallbackMsg( message_type=callback_type, Loading Loading @@ -183,7 +183,7 @@ class PySecurity(Closable): return True return False logging.debug("DUT: Waiting for expected UI event") logging.info("DUT: Waiting for expected UI event") assertThat(self._ui_event_stream).emits(get_passkey) return passkey Loading @@ -191,6 +191,7 @@ class PySecurity(Closable): """ Respond to the UI event """ logging.info("DUT: Inputting pin code: %s" % str(pin)) self.on_user_input( cert_address=cert_address, reply_boolean=True, expected_ui_event=UiMsgType.DISPLAY_PIN_ENTRY, pin=pin) Loading @@ -210,7 +211,7 @@ class PySecurity(Closable): return True return False logging.debug("DUT: Waiting for expected UI event") logging.info("DUT: Waiting for expected UI event") assertThat(self._ui_event_stream).emits(get_unique_id) callback_type = UiCallbackType.YES_NO if len(pin) == 0 else UiCallbackType.PIN self.__send_ui_callback(cert_address, callback_type, reply_boolean, ui_id, pin) Loading @@ -224,7 +225,7 @@ class PySecurity(Closable): is complete. For the DUT we need to wait for it, for Cert it isn't needed. """ logging.debug("DUT: Waiting for Bond Event") logging.info("DUT: Waiting for Bond Event: %s " % expected_bond_event) assertThat(self._bond_event_stream).emits( lambda event: event.message_type == expected_bond_event or logging.info("DUT: Actual Bond Event: %s" % event.message_type) ) Loading @@ -245,7 +246,7 @@ class PySecurity(Closable): The Address is expected to be returned """ logging.info("DUT: Waiting for Disconnect Event") assertThat(self._disconnect_event_stream).emits(lambda event: 1 == 1) assertThat(self._disconnect_event_stream).emits(lambda event: logging.info("event: %s" % event.address) or True) def enforce_security_policy(self, address, type, policy): """ Loading system/gd/hci/controller.cc +0 −8 Original line number Diff line number Diff line Loading @@ -42,7 +42,6 @@ struct Controller::impl { le_set_event_mask(kDefaultLeEventMask); set_event_mask(kDefaultEventMask); write_simple_pairing_mode(Enable::ENABLED); write_le_host_support(Enable::ENABLED); hci_->EnqueueCommand(ReadLocalNameBuilder::Create(), handler->BindOnceOn(this, &Controller::impl::read_local_name_complete_handler)); Loading Loading @@ -442,13 +441,6 @@ struct Controller::impl { this, &Controller::impl::check_status<SetEventMaskCompleteView>)); } void write_simple_pairing_mode(Enable enable) { std::unique_ptr<WriteSimplePairingModeBuilder> packet = WriteSimplePairingModeBuilder::Create(enable); hci_->EnqueueCommand( std::move(packet), module_.GetHandler()->BindOnceOn(this, &Controller::impl::check_status<WriteSimplePairingModeCompleteView>)); } void write_le_host_support(Enable enable) { std::unique_ptr<WriteLeHostSupportBuilder> packet = WriteLeHostSupportBuilder::Create(enable); hci_->EnqueueCommand( Loading system/gd/security/cert/cert_security.py +3 −1 Original line number Diff line number Diff line Loading @@ -237,10 +237,12 @@ class CertSecurity(PySecurity): """ Pretend to answer the pairing dialog as a user """ if len(pin) > self.MAX_PIN_LENGTH or len(pin) < self.MIN_PIN_LENGTH: raise Exception("Pin code must be within range") logging.info("Cert: Waiting for PIN request") assertThat(self._hci_event_stream).emits(HciMatchers.EventWithCode(hci_packets.EventCode.PIN_CODE_REQUEST)) assertThat(self._hci_event_stream).emits(HciMatchers.PinCodeRequest()) logging.info("Cert: Send user input PIN %s for %s" % (pin.decode(), address)) peer = address.decode('utf-8') pin_list = list(pin) Loading system/gd/security/cert/security_test.py +33 −0 Original line number Diff line number Diff line Loading @@ -171,6 +171,18 @@ class SecurityTest(GdBaseTestClass): initiator.wait_for_bond_event(expected_init_bond_event) responder.wait_for_bond_event(expected_resp_bond_event) def _run_pin(self, initiator, responder, expected_init_bond_event, expected_resp_bond_event): initiator.create_bond(responder.get_address(), common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS) self._verify_pin(initiator, responder, expected_init_bond_event, expected_resp_bond_event) def _verify_pin(self, initiator, responder, expected_init_bond_event, expected_resp_bond_event): pin = b'123456789A' logging.info("pin: %s" % pin) initiator.input_pin(responder.get_address(), pin) responder.input_pin(initiator.get_address(), pin) initiator.wait_for_bond_event(expected_init_bond_event) responder.wait_for_bond_event(expected_resp_bond_event) def test_setup_teardown(self): """ Make sure our setup and teardown is sane Loading Loading @@ -507,3 +519,24 @@ class SecurityTest(GdBaseTestClass): self.dut_security.wait_for_disconnect_event() self.cert_security.wait_for_disconnect_event() def test_successful_dut_initiated_pin(self): self.dut_security.set_io_capabilities(IoCapabilities.DISPLAY_YES_NO_IO_CAP) self.dut_security.set_authentication_requirements(AuthenticationRequirements.DEDICATED_BONDING) self._run_pin( initiator=self.dut_security, responder=self.cert_security, expected_init_bond_event=BondMsgType.DEVICE_BONDED, expected_resp_bond_event=BondMsgType.DEVICE_BONDED) self.dut_security.remove_bond(self.cert_security.get_address(), common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS) self.cert_security.remove_bond(self.dut_security.get_address(), common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS) self.dut_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED) self.cert_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED) self.dut_security.wait_for_disconnect_event() self.cert_security.wait_for_disconnect_event() Loading
system/gd/cert/matchers.py +5 −1 Original line number Diff line number Diff line Loading @@ -144,7 +144,7 @@ class HciMatchers(object): @staticmethod def LogEventCode(): return lambda event: logging.info("Received event: %x" % hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(event.event))).GetEventCode()) return lambda event: logging.info("Received event: %x" % hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(event.payload))).GetEventCode()) @staticmethod def LinkKeyRequest(): Loading Loading @@ -194,6 +194,10 @@ class HciMatchers(object): def RemoteOobDataRequest(): return lambda event: HciMatchers.EventWithCode(EventCode.REMOTE_OOB_DATA_REQUEST) @staticmethod def PinCodeRequest(): return lambda event: HciMatchers.EventWithCode(EventCode.PIN_CODE_REQUEST) @staticmethod def LoopbackOf(packet): return HciMatchers.Exactly(hci_packets.LoopbackCommandBuilder(packet)) Loading
system/gd/cert/py_security.py +11 −10 Original line number Diff line number Diff line Loading @@ -65,7 +65,7 @@ class PySecurity(Closable): _oob_data_event_stream = None def __init__(self, device): logging.debug("DUT: Init") logging.info("DUT: Init") self._device = device self._device.wait_channel_ready() self._ui_event_stream = EventStream(self._device.security.FetchUiEvents(empty_proto.Empty())) Loading @@ -80,7 +80,7 @@ class PySecurity(Closable): """ Triggers stack under test to create bond """ logging.debug("DUT: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address))) logging.info("DUT: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address))) self._device.security.CreateBond( common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type)) Loading @@ -89,7 +89,7 @@ class PySecurity(Closable): Triggers stack under test to create bond using Out of Band method """ logging.debug("DUT: Creating OOB bond to '%s' from '%s'" % (str(address), str(self._device.address))) logging.info("DUT: Creating OOB bond to '%s' from '%s'" % (str(address), str(self._device.address))) self._device.security.CreateBondOutOfBand( OobDataBondMessage( Loading @@ -116,7 +116,7 @@ class PySecurity(Closable): """ Set the IO Capabilities used for the DUT """ logging.debug("DUT: setting IO Capabilities data to '%s'" % self._io_capabilities_name_lookup.get( logging.info("DUT: setting IO Capabilities data to '%s'" % self._io_capabilities_name_lookup.get( io_capabilities, "ERROR")) self._device.security.SetIoCapability(IoCapabilityMessage(capability=io_capabilities)) Loading @@ -124,7 +124,7 @@ class PySecurity(Closable): """ Establish authentication requirements for the stack """ logging.debug("DUT: setting Authentication Requirements data to '%s'" % self._auth_reqs_name_lookup.get( logging.info("DUT: setting Authentication Requirements data to '%s'" % self._auth_reqs_name_lookup.get( auth_reqs, "ERROR")) self._device.security.SetAuthenticationRequirements(AuthenticationRequirementsMessage(requirement=auth_reqs)) Loading @@ -132,7 +132,7 @@ class PySecurity(Closable): """ Send a callback from the UI as if the user pressed a button on the dialog """ logging.debug("DUT: Sending user input response uid: %d; response: %s" % (uid, b)) logging.info("DUT: Sending user input response uid: %d; response: %s" % (uid, b)) self._device.security.SendUiCallback( UiCallbackMsg( message_type=callback_type, Loading Loading @@ -183,7 +183,7 @@ class PySecurity(Closable): return True return False logging.debug("DUT: Waiting for expected UI event") logging.info("DUT: Waiting for expected UI event") assertThat(self._ui_event_stream).emits(get_passkey) return passkey Loading @@ -191,6 +191,7 @@ class PySecurity(Closable): """ Respond to the UI event """ logging.info("DUT: Inputting pin code: %s" % str(pin)) self.on_user_input( cert_address=cert_address, reply_boolean=True, expected_ui_event=UiMsgType.DISPLAY_PIN_ENTRY, pin=pin) Loading @@ -210,7 +211,7 @@ class PySecurity(Closable): return True return False logging.debug("DUT: Waiting for expected UI event") logging.info("DUT: Waiting for expected UI event") assertThat(self._ui_event_stream).emits(get_unique_id) callback_type = UiCallbackType.YES_NO if len(pin) == 0 else UiCallbackType.PIN self.__send_ui_callback(cert_address, callback_type, reply_boolean, ui_id, pin) Loading @@ -224,7 +225,7 @@ class PySecurity(Closable): is complete. For the DUT we need to wait for it, for Cert it isn't needed. """ logging.debug("DUT: Waiting for Bond Event") logging.info("DUT: Waiting for Bond Event: %s " % expected_bond_event) assertThat(self._bond_event_stream).emits( lambda event: event.message_type == expected_bond_event or logging.info("DUT: Actual Bond Event: %s" % event.message_type) ) Loading @@ -245,7 +246,7 @@ class PySecurity(Closable): The Address is expected to be returned """ logging.info("DUT: Waiting for Disconnect Event") assertThat(self._disconnect_event_stream).emits(lambda event: 1 == 1) assertThat(self._disconnect_event_stream).emits(lambda event: logging.info("event: %s" % event.address) or True) def enforce_security_policy(self, address, type, policy): """ Loading
system/gd/hci/controller.cc +0 −8 Original line number Diff line number Diff line Loading @@ -42,7 +42,6 @@ struct Controller::impl { le_set_event_mask(kDefaultLeEventMask); set_event_mask(kDefaultEventMask); write_simple_pairing_mode(Enable::ENABLED); write_le_host_support(Enable::ENABLED); hci_->EnqueueCommand(ReadLocalNameBuilder::Create(), handler->BindOnceOn(this, &Controller::impl::read_local_name_complete_handler)); Loading Loading @@ -442,13 +441,6 @@ struct Controller::impl { this, &Controller::impl::check_status<SetEventMaskCompleteView>)); } void write_simple_pairing_mode(Enable enable) { std::unique_ptr<WriteSimplePairingModeBuilder> packet = WriteSimplePairingModeBuilder::Create(enable); hci_->EnqueueCommand( std::move(packet), module_.GetHandler()->BindOnceOn(this, &Controller::impl::check_status<WriteSimplePairingModeCompleteView>)); } void write_le_host_support(Enable enable) { std::unique_ptr<WriteLeHostSupportBuilder> packet = WriteLeHostSupportBuilder::Create(enable); hci_->EnqueueCommand( Loading
system/gd/security/cert/cert_security.py +3 −1 Original line number Diff line number Diff line Loading @@ -237,10 +237,12 @@ class CertSecurity(PySecurity): """ Pretend to answer the pairing dialog as a user """ if len(pin) > self.MAX_PIN_LENGTH or len(pin) < self.MIN_PIN_LENGTH: raise Exception("Pin code must be within range") logging.info("Cert: Waiting for PIN request") assertThat(self._hci_event_stream).emits(HciMatchers.EventWithCode(hci_packets.EventCode.PIN_CODE_REQUEST)) assertThat(self._hci_event_stream).emits(HciMatchers.PinCodeRequest()) logging.info("Cert: Send user input PIN %s for %s" % (pin.decode(), address)) peer = address.decode('utf-8') pin_list = list(pin) Loading
system/gd/security/cert/security_test.py +33 −0 Original line number Diff line number Diff line Loading @@ -171,6 +171,18 @@ class SecurityTest(GdBaseTestClass): initiator.wait_for_bond_event(expected_init_bond_event) responder.wait_for_bond_event(expected_resp_bond_event) def _run_pin(self, initiator, responder, expected_init_bond_event, expected_resp_bond_event): initiator.create_bond(responder.get_address(), common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS) self._verify_pin(initiator, responder, expected_init_bond_event, expected_resp_bond_event) def _verify_pin(self, initiator, responder, expected_init_bond_event, expected_resp_bond_event): pin = b'123456789A' logging.info("pin: %s" % pin) initiator.input_pin(responder.get_address(), pin) responder.input_pin(initiator.get_address(), pin) initiator.wait_for_bond_event(expected_init_bond_event) responder.wait_for_bond_event(expected_resp_bond_event) def test_setup_teardown(self): """ Make sure our setup and teardown is sane Loading Loading @@ -507,3 +519,24 @@ class SecurityTest(GdBaseTestClass): self.dut_security.wait_for_disconnect_event() self.cert_security.wait_for_disconnect_event() def test_successful_dut_initiated_pin(self): self.dut_security.set_io_capabilities(IoCapabilities.DISPLAY_YES_NO_IO_CAP) self.dut_security.set_authentication_requirements(AuthenticationRequirements.DEDICATED_BONDING) self._run_pin( initiator=self.dut_security, responder=self.cert_security, expected_init_bond_event=BondMsgType.DEVICE_BONDED, expected_resp_bond_event=BondMsgType.DEVICE_BONDED) self.dut_security.remove_bond(self.cert_security.get_address(), common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS) self.cert_security.remove_bond(self.dut_security.get_address(), common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS) self.dut_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED) self.cert_security.wait_for_bond_event(BondMsgType.DEVICE_UNBONDED) self.dut_security.wait_for_disconnect_event() self.cert_security.wait_for_disconnect_event()