Loading system/blueberry/facade/security/facade.proto +18 −0 Original line number Diff line number Diff line Loading @@ -31,6 +31,7 @@ service SecurityModuleFacade { rpc EnforceSecurityPolicy(SecurityPolicyMessage) returns (google.protobuf.Empty) {} rpc FetchEnforceSecurityPolicyEvents(google.protobuf.Empty) returns (stream EnforceSecurityPolicyMsg) {} rpc FetchDisconnectEvents(google.protobuf.Empty) returns (stream DisconnectMsg) {} rpc FetchAdvertisingCallbackEvents(google.protobuf.Empty) returns (stream AdvertisingCallbackMsg) {} } message OobDataMessage { Loading Loading @@ -177,3 +178,20 @@ message EnforceSecurityPolicyMsg { message DisconnectMsg { blueberry.facade.BluetoothAddressWithType address = 1; } enum AdvertisingCallbackMsgType { ADVERTISING_SET_STARTED = 0; OWN_ADDRESS_READ = 1; } enum AdvertisingSetStarted { NOT_STARTED = 0; STARTED = 1; } message AdvertisingCallbackMsg { AdvertisingCallbackMsgType message_type = 1; uint32 advertiser_id = 2; AdvertisingSetStarted advertising_started = 3; blueberry.facade.BluetoothAddress address = 4; } system/blueberry/tests/gd/cert/py_le_security.py +10 −0 Original line number Diff line number Diff line Loading @@ -45,6 +45,8 @@ class PyLeSecurity(Closable): self._ui_event_stream = EventStream(self._device.security.FetchUiEvents(empty_proto.Empty())) self._bond_event_stream = EventStream(self._device.security.FetchBondEvents(empty_proto.Empty())) self._helper_event_stream = EventStream(self._device.security.FetchHelperEvents(empty_proto.Empty())) self._advertising_callback_event_stream = EventStream( self._device.security.FetchAdvertisingCallbackEvents(empty_proto.Empty())) def get_ui_stream(self): return self._ui_event_stream Loading @@ -52,6 +54,9 @@ class PyLeSecurity(Closable): def get_bond_stream(self): return self._bond_event_stream def get_advertising_callback_event_stream(self): return self._advertising_callback_event_stream def wait_for_ui_event_passkey(self, timeout=timedelta(seconds=3)): display_passkey_capture = SecurityCaptures.DisplayPasskey() assertThat(self._ui_event_stream).emits(display_passkey_capture, timeout=timeout) Loading Loading @@ -79,3 +84,8 @@ class PyLeSecurity(Closable): safeClose(self._helper_event_stream) else: logging.info("DUT: Helper Event Stream is None!") if self._advertising_callback_event_stream is not None: safeClose(self._advertising_callback_event_stream) else: logging.info("DUT: Advertising Callback Event Stream is None!") system/blueberry/tests/gd_sl4a/security/oob_pairing_sl4a_test.py +163 −4 Original line number Diff line number Diff line Loading @@ -19,20 +19,51 @@ import logging from google.protobuf import empty_pb2 as empty_proto from bluetooth_packets_python3 import hci_packets from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_phys from blueberry.tests.gd_sl4a.lib.gd_sl4a_base_test import GdSl4aBaseTestClass from blueberry.tests.gd.cert.matchers import SecurityMatchers from blueberry.tests.gd.cert.py_le_security import PyLeSecurity from blueberry.tests.gd.cert.truth import assertThat from blueberry.facade import common_pb2 as common from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade from blueberry.facade.security.facade_pb2 import AdvertisingCallbackMsgType from blueberry.facade.security.facade_pb2 import BondMsgType from blueberry.facade.security.facade_pb2 import LeAuthRequirementsMessage from blueberry.facade.security.facade_pb2 import LeIoCapabilityMessage from blueberry.facade.security.facade_pb2 import LeOobDataPresentMessage from blueberry.facade.security.facade_pb2 import UiCallbackMsg from blueberry.facade.security.facade_pb2 import UiCallbackType from blueberry.facade.security.facade_pb2 import UiMsgType LeIoCapabilities = LeIoCapabilityMessage.LeIoCapabilities LeOobDataFlag = LeOobDataPresentMessage.LeOobDataFlag DISPLAY_ONLY = LeIoCapabilityMessage(capabilities=LeIoCapabilities.DISPLAY_ONLY) OOB_NOT_PRESENT = LeOobDataPresentMessage(data_present=LeOobDataFlag.NOT_PRESENT) class OobData: def __init__(self): pass address = None confirmation = None randomizer = None def __init__(self, address, confirmation, randomizer): self.address = address self.confirmation = confirmation self.randomizer = randomizer class OobPairingSl4aTest(GdSl4aBaseTestClass): # Events sent from SL4A SL4A_EVENT_GENERATED = "GeneratedOobData" SL4A_EVENT_ERROR = "ErrorOobData" SL4A_EVENT_BONDED = "Bonded" # Matches tBT_TRANSPORT # Used Strings because ints were causing gRPC problems Loading @@ -46,8 +77,10 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass): def setup_test(self): super().setup_test() self.cert_security = PyLeSecurity(self.cert) def teardown_test(self): self.cert_security.close() super().teardown_test() def _generate_sl4a_oob_data(self, transport): Loading @@ -59,15 +92,79 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass): logging.error("Failed to generate OOB data!") return None logging.info("Data received!") return OobData() logging.info(event_info["data"]) return OobData(event_info["data"]["address_with_type"], event_info["data"]["confirmation"], event_info["data"]["randomizer"]) def _generate_cert_oob_data(self, transport): if transport == self.TRANSPORT_LE: return self.cert.security.GetLeOutOfBandData(empty_proto.Empty()) oob_data = self.cert.security.GetLeOutOfBandData(empty_proto.Empty()) # GetLeOutOfBandData adds null terminator to string in C code # (length 17) before passing back to python via gRPC where it is # converted back to bytes. Remove the null terminator for handling # in python test, since length is known to be 16 for # confirmation_value and random_value oob_data.confirmation_value = oob_data.confirmation_value[:-1] oob_data.random_value = oob_data.random_value[:-1] return oob_data return None def set_cert_privacy_policy_with_random_address_but_advertise_resolvable(self, irk): # Random static address below, no random resolvable address at this point random_address_bytes = "DD:34:02:05:5C:EE".encode() private_policy = le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS, address_with_type=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=random_address_bytes), type=common.RANDOM_DEVICE_ADDRESS), rotation_irk=irk) self.cert.security.SetLeInitiatorAddressPolicy(private_policy) # Bluetooth MAC address must be upper case return random_address_bytes.decode('utf-8').upper() def wait_for_own_address(self): own_address = common.BluetoothAddress() def get_address(event): if event.message_type == AdvertisingCallbackMsgType.OWN_ADDRESS_READ: nonlocal own_address own_address = event.address.address return True return False assertThat(self.cert_security.get_advertising_callback_event_stream()).emits(get_address) return own_address def wait_for_advertising_set_started(self): advertising_started = False def get_advertising_set_started(event): if event.message_type == AdvertisingCallbackMsgType.ADVERTISING_SET_STARTED: nonlocal advertising_started if event.advertising_started == 1: advertising_started = True return True return False assertThat(self.cert_security.get_advertising_callback_event_stream()).emits(get_advertising_set_started) return advertising_started def wait_for_yes_no_dialog(self): address_with_type = common.BluetoothAddressWithType() def get_address_with_type(event): if event.message_type == UiMsgType.DISPLAY_PAIRING_PROMPT: nonlocal address_with_type address_with_type = event.peer return True return False assertThat(self.cert_security.get_ui_stream()).emits(get_address_with_type) return address_with_type def test_sl4a_classic_generate_oob_data(self): oob_data = self._generate_sl4a_oob_data(self.TRANSPORT_BREDR) logging.info("OOB data received") logging.info(oob_data) assertThat(oob_data).isNotNone() def test_sl4a_classic_generate_oob_data_twice(self): Loading @@ -81,3 +178,65 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass): def test_cert_ble_generate_oob_data(self): oob_data = self._generate_cert_oob_data(self.TRANSPORT_LE) assertThat(oob_data).isNotNone() def test_sl4a_create_bond_out_of_band(self): self.cert.security.SetLeIoCapability(DISPLAY_ONLY) self.cert.security.SetLeOobDataPresent(OOB_NOT_PRESENT) self.cert.security.SetLeAuthRequirements(LeAuthRequirementsMessage(bond=1, mitm=1, secure_connections=1)) data = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10] byteArrayObject = bytearray(data) irk = bytes(byteArrayObject) DEVICE_NAME = 'Im_The_CERT!' self.set_cert_privacy_policy_with_random_address_but_advertise_resolvable(irk) gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) config = le_advertising_facade.AdvertisingConfig( advertisement=[gap_data], interval_min=512, interval_max=768, advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, channel_map=7, filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) extended_config = le_advertising_facade.ExtendedAdvertisingConfig( include_tx_power=True, connectable=True, legacy_pdus=True, advertising_config=config, secondary_advertising_phy=ble_scan_settings_phys["1m"]) request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config) create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request) self.wait_for_advertising_set_started() get_own_address_request = le_advertising_facade.GetOwnAddressRequest( advertiser_id=create_response.advertiser_id) self.cert.hci_le_advertising_manager.GetOwnAddress(get_own_address_request) advertising_address = self.wait_for_own_address() oob_data = self._generate_cert_oob_data(self.TRANSPORT_LE) assertThat(oob_data).isNotNone() self.dut.sl4a.bluetoothCreateBondOutOfBand( advertising_address.decode("utf-8").upper(), self.TRANSPORT_LE, oob_data.confirmation_value.hex(), oob_data.random_value.hex()) address_with_type = self.wait_for_yes_no_dialog() self.cert.security.SendUiCallback( UiCallbackMsg( message_type=UiCallbackType.PAIRING_PROMPT, boolean=True, unique_id=1, address=address_with_type)) assertThat(self.cert_security.get_bond_stream()).emits(SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED)) try: bond_state = self.dut.ed.pop_event(self.SL4A_EVENT_BONDED, self.default_timeout) except queue.Empty as error: logging.error("Failed to generate OOB data!") assertThat(bond_state).isNotNone() assertThat(bond_state["data"]["bonded_state"]).isEqualTo(True) system/gd/security/facade.cc +73 −5 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include "grpc/grpc_event_queue.h" #include "hci/address_with_type.h" #include "hci/le_address_manager.h" #include "hci/le_advertising_manager.h" #include "l2cap/classic/security_policy.h" #include "l2cap/le/l2cap_le_module.h" #include "os/handler.h" Loading Loading @@ -54,11 +55,20 @@ blueberry::facade::BluetoothAddressWithType ToFacadeAddressWithType(hci::Address } // namespace class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public ISecurityManagerListener, public UI { class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public ISecurityManagerListener, public UI, public hci::AdvertisingCallback { public: SecurityModuleFacadeService( SecurityModule* security_module, L2capLeModule* l2cap_le_module, ::bluetooth::os::Handler* security_handler) : security_module_(security_module), l2cap_le_module_(l2cap_le_module), security_handler_(security_handler) { SecurityModule* security_module, L2capLeModule* l2cap_le_module, ::bluetooth::os::Handler* security_handler, hci::LeAdvertisingManager* le_advertising_manager) : security_module_(security_module), l2cap_le_module_(l2cap_le_module), security_handler_(security_handler), le_advertising_manager_(le_advertising_manager) { security_module_->GetSecurityManager()->RegisterCallbackListener(this, security_handler_); security_module_->GetSecurityManager()->SetUserInterfaceHandler(this, security_handler_); Loading Loading @@ -224,6 +234,58 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public ::grpc::ServerWriter<SecurityHelperMsg>* writer) override { return helper_events_.RunLoop(context, writer); } ::grpc::Status FetchAdvertisingCallbackEvents( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<AdvertisingCallbackMsg>* writer) override { le_advertising_manager_->RegisterAdvertisingCallback(this); return advertising_callback_events_.RunLoop(context, writer); } void OnAdvertisingSetStarted(int reg_id, uint8_t advertiser_id, int8_t tx_power, AdvertisingStatus status) { AdvertisingCallbackMsg advertising_set_started; advertising_set_started.set_message_type(AdvertisingCallbackMsgType::ADVERTISING_SET_STARTED); advertising_set_started.set_advertising_started(AdvertisingSetStarted::STARTED); advertising_set_started.set_advertiser_id(advertiser_id); advertising_callback_events_.OnIncomingEvent(advertising_set_started); } void OnAdvertisingEnabled(uint8_t advertiser_id, bool enable, uint8_t status) { // Not used yet } void OnAdvertisingDataSet(uint8_t advertiser_id, uint8_t status) { // Not used yet } void OnScanResponseDataSet(uint8_t advertiser_id, uint8_t status) { // Not used yet } void OnAdvertisingParametersUpdated(uint8_t advertiser_id, int8_t tx_power, uint8_t status) { // Not used yet } void OnPeriodicAdvertisingParametersUpdated(uint8_t advertiser_id, uint8_t status) { // Not used yet } void OnPeriodicAdvertisingDataSet(uint8_t advertiser_id, uint8_t status) { // Not used yet } void OnPeriodicAdvertisingEnabled(uint8_t advertiser_id, bool enable, uint8_t status) { // Not used yet } void OnOwnAddressRead(uint8_t advertiser_id, uint8_t address_type, Address address) { AdvertisingCallbackMsg get_own_address; get_own_address.set_message_type(AdvertisingCallbackMsgType::OWN_ADDRESS_READ); get_own_address.mutable_address()->set_address(address.ToString()); advertising_callback_events_.OnIncomingEvent(get_own_address); } ::grpc::Status SetIoCapability(::grpc::ServerContext* context, const IoCapabilityMessage* request, ::google::protobuf::Empty* response) override { security_module_->GetFacadeConfigurationApi()->SetIoCapability( Loading Loading @@ -532,6 +594,7 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public SecurityModule* security_module_; L2capLeModule* l2cap_le_module_; ::bluetooth::os::Handler* security_handler_; hci::LeAdvertisingManager* le_advertising_manager_; ::bluetooth::grpc::GrpcEventQueue<UiMsg> ui_events_{"UI events"}; ::bluetooth::grpc::GrpcEventQueue<BondMsg> bond_events_{"Bond events"}; ::bluetooth::grpc::GrpcEventQueue<SecurityHelperMsg> helper_events_{"Events that don't fit any other category"}; Loading @@ -539,6 +602,7 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public "Enforce Security Policy Events"}; ::bluetooth::grpc::GrpcEventQueue<DisconnectMsg> disconnect_events_{"Disconnect events"}; ::bluetooth::grpc::GrpcEventQueue<OobDataBondMessage> oob_events_{"OOB Data events"}; ::bluetooth::grpc::GrpcEventQueue<AdvertisingCallbackMsg> advertising_callback_events_{"Advertising callback events"}; uint32_t unique_id{1}; std::map<uint32_t, common::OnceCallback<void(bool)>> user_yes_no_callbacks_; std::map<uint32_t, common::OnceCallback<void(uint32_t)>> user_passkey_callbacks_; Loading @@ -548,12 +612,16 @@ void SecurityModuleFacadeModule::ListDependencies(ModuleList* list) const { ::bluetooth::grpc::GrpcFacadeModule::ListDependencies(list); list->add<SecurityModule>(); list->add<L2capLeModule>(); list->add<hci::LeAdvertisingManager>(); } void SecurityModuleFacadeModule::Start() { ::bluetooth::grpc::GrpcFacadeModule::Start(); service_ = new SecurityModuleFacadeService(GetDependency<SecurityModule>(), GetDependency<L2capLeModule>(), GetHandler()); service_ = new SecurityModuleFacadeService( GetDependency<SecurityModule>(), GetDependency<L2capLeModule>(), GetHandler(), GetDependency<hci::LeAdvertisingManager>()); } void SecurityModuleFacadeModule::Stop() { Loading system/gd/security/record/security_record_storage.cc +3 −3 Original line number Diff line number Diff line Loading @@ -114,10 +114,10 @@ void SecurityRecordStorage::SaveSecurityRecords(std::set<std::shared_ptr<record: } else if (!record->IsClassicLinkKeyValid() && record->remote_ltk) { mutation.Add(device.SetDeviceType(hci::DeviceType::LE)); } else { LOG_ERROR( "Cannot determine device type from security record for '%s'; dropping!", mutation.Add(device.SetDeviceType(hci::DeviceType::LE)); LOG_WARN( "Cannot determine device type from security record for '%s'; defaulting to LE", record->GetPseudoAddress()->ToString().c_str()); continue; } mutation.Commit(); SetClassicData(mutation, record, device); Loading Loading
system/blueberry/facade/security/facade.proto +18 −0 Original line number Diff line number Diff line Loading @@ -31,6 +31,7 @@ service SecurityModuleFacade { rpc EnforceSecurityPolicy(SecurityPolicyMessage) returns (google.protobuf.Empty) {} rpc FetchEnforceSecurityPolicyEvents(google.protobuf.Empty) returns (stream EnforceSecurityPolicyMsg) {} rpc FetchDisconnectEvents(google.protobuf.Empty) returns (stream DisconnectMsg) {} rpc FetchAdvertisingCallbackEvents(google.protobuf.Empty) returns (stream AdvertisingCallbackMsg) {} } message OobDataMessage { Loading Loading @@ -177,3 +178,20 @@ message EnforceSecurityPolicyMsg { message DisconnectMsg { blueberry.facade.BluetoothAddressWithType address = 1; } enum AdvertisingCallbackMsgType { ADVERTISING_SET_STARTED = 0; OWN_ADDRESS_READ = 1; } enum AdvertisingSetStarted { NOT_STARTED = 0; STARTED = 1; } message AdvertisingCallbackMsg { AdvertisingCallbackMsgType message_type = 1; uint32 advertiser_id = 2; AdvertisingSetStarted advertising_started = 3; blueberry.facade.BluetoothAddress address = 4; }
system/blueberry/tests/gd/cert/py_le_security.py +10 −0 Original line number Diff line number Diff line Loading @@ -45,6 +45,8 @@ class PyLeSecurity(Closable): self._ui_event_stream = EventStream(self._device.security.FetchUiEvents(empty_proto.Empty())) self._bond_event_stream = EventStream(self._device.security.FetchBondEvents(empty_proto.Empty())) self._helper_event_stream = EventStream(self._device.security.FetchHelperEvents(empty_proto.Empty())) self._advertising_callback_event_stream = EventStream( self._device.security.FetchAdvertisingCallbackEvents(empty_proto.Empty())) def get_ui_stream(self): return self._ui_event_stream Loading @@ -52,6 +54,9 @@ class PyLeSecurity(Closable): def get_bond_stream(self): return self._bond_event_stream def get_advertising_callback_event_stream(self): return self._advertising_callback_event_stream def wait_for_ui_event_passkey(self, timeout=timedelta(seconds=3)): display_passkey_capture = SecurityCaptures.DisplayPasskey() assertThat(self._ui_event_stream).emits(display_passkey_capture, timeout=timeout) Loading Loading @@ -79,3 +84,8 @@ class PyLeSecurity(Closable): safeClose(self._helper_event_stream) else: logging.info("DUT: Helper Event Stream is None!") if self._advertising_callback_event_stream is not None: safeClose(self._advertising_callback_event_stream) else: logging.info("DUT: Advertising Callback Event Stream is None!")
system/blueberry/tests/gd_sl4a/security/oob_pairing_sl4a_test.py +163 −4 Original line number Diff line number Diff line Loading @@ -19,20 +19,51 @@ import logging from google.protobuf import empty_pb2 as empty_proto from bluetooth_packets_python3 import hci_packets from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_phys from blueberry.tests.gd_sl4a.lib.gd_sl4a_base_test import GdSl4aBaseTestClass from blueberry.tests.gd.cert.matchers import SecurityMatchers from blueberry.tests.gd.cert.py_le_security import PyLeSecurity from blueberry.tests.gd.cert.truth import assertThat from blueberry.facade import common_pb2 as common from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade from blueberry.facade.security.facade_pb2 import AdvertisingCallbackMsgType from blueberry.facade.security.facade_pb2 import BondMsgType from blueberry.facade.security.facade_pb2 import LeAuthRequirementsMessage from blueberry.facade.security.facade_pb2 import LeIoCapabilityMessage from blueberry.facade.security.facade_pb2 import LeOobDataPresentMessage from blueberry.facade.security.facade_pb2 import UiCallbackMsg from blueberry.facade.security.facade_pb2 import UiCallbackType from blueberry.facade.security.facade_pb2 import UiMsgType LeIoCapabilities = LeIoCapabilityMessage.LeIoCapabilities LeOobDataFlag = LeOobDataPresentMessage.LeOobDataFlag DISPLAY_ONLY = LeIoCapabilityMessage(capabilities=LeIoCapabilities.DISPLAY_ONLY) OOB_NOT_PRESENT = LeOobDataPresentMessage(data_present=LeOobDataFlag.NOT_PRESENT) class OobData: def __init__(self): pass address = None confirmation = None randomizer = None def __init__(self, address, confirmation, randomizer): self.address = address self.confirmation = confirmation self.randomizer = randomizer class OobPairingSl4aTest(GdSl4aBaseTestClass): # Events sent from SL4A SL4A_EVENT_GENERATED = "GeneratedOobData" SL4A_EVENT_ERROR = "ErrorOobData" SL4A_EVENT_BONDED = "Bonded" # Matches tBT_TRANSPORT # Used Strings because ints were causing gRPC problems Loading @@ -46,8 +77,10 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass): def setup_test(self): super().setup_test() self.cert_security = PyLeSecurity(self.cert) def teardown_test(self): self.cert_security.close() super().teardown_test() def _generate_sl4a_oob_data(self, transport): Loading @@ -59,15 +92,79 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass): logging.error("Failed to generate OOB data!") return None logging.info("Data received!") return OobData() logging.info(event_info["data"]) return OobData(event_info["data"]["address_with_type"], event_info["data"]["confirmation"], event_info["data"]["randomizer"]) def _generate_cert_oob_data(self, transport): if transport == self.TRANSPORT_LE: return self.cert.security.GetLeOutOfBandData(empty_proto.Empty()) oob_data = self.cert.security.GetLeOutOfBandData(empty_proto.Empty()) # GetLeOutOfBandData adds null terminator to string in C code # (length 17) before passing back to python via gRPC where it is # converted back to bytes. Remove the null terminator for handling # in python test, since length is known to be 16 for # confirmation_value and random_value oob_data.confirmation_value = oob_data.confirmation_value[:-1] oob_data.random_value = oob_data.random_value[:-1] return oob_data return None def set_cert_privacy_policy_with_random_address_but_advertise_resolvable(self, irk): # Random static address below, no random resolvable address at this point random_address_bytes = "DD:34:02:05:5C:EE".encode() private_policy = le_initiator_address_facade.PrivacyPolicy( address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS, address_with_type=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=random_address_bytes), type=common.RANDOM_DEVICE_ADDRESS), rotation_irk=irk) self.cert.security.SetLeInitiatorAddressPolicy(private_policy) # Bluetooth MAC address must be upper case return random_address_bytes.decode('utf-8').upper() def wait_for_own_address(self): own_address = common.BluetoothAddress() def get_address(event): if event.message_type == AdvertisingCallbackMsgType.OWN_ADDRESS_READ: nonlocal own_address own_address = event.address.address return True return False assertThat(self.cert_security.get_advertising_callback_event_stream()).emits(get_address) return own_address def wait_for_advertising_set_started(self): advertising_started = False def get_advertising_set_started(event): if event.message_type == AdvertisingCallbackMsgType.ADVERTISING_SET_STARTED: nonlocal advertising_started if event.advertising_started == 1: advertising_started = True return True return False assertThat(self.cert_security.get_advertising_callback_event_stream()).emits(get_advertising_set_started) return advertising_started def wait_for_yes_no_dialog(self): address_with_type = common.BluetoothAddressWithType() def get_address_with_type(event): if event.message_type == UiMsgType.DISPLAY_PAIRING_PROMPT: nonlocal address_with_type address_with_type = event.peer return True return False assertThat(self.cert_security.get_ui_stream()).emits(get_address_with_type) return address_with_type def test_sl4a_classic_generate_oob_data(self): oob_data = self._generate_sl4a_oob_data(self.TRANSPORT_BREDR) logging.info("OOB data received") logging.info(oob_data) assertThat(oob_data).isNotNone() def test_sl4a_classic_generate_oob_data_twice(self): Loading @@ -81,3 +178,65 @@ class OobPairingSl4aTest(GdSl4aBaseTestClass): def test_cert_ble_generate_oob_data(self): oob_data = self._generate_cert_oob_data(self.TRANSPORT_LE) assertThat(oob_data).isNotNone() def test_sl4a_create_bond_out_of_band(self): self.cert.security.SetLeIoCapability(DISPLAY_ONLY) self.cert.security.SetLeOobDataPresent(OOB_NOT_PRESENT) self.cert.security.SetLeAuthRequirements(LeAuthRequirementsMessage(bond=1, mitm=1, secure_connections=1)) data = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10] byteArrayObject = bytearray(data) irk = bytes(byteArrayObject) DEVICE_NAME = 'Im_The_CERT!' self.set_cert_privacy_policy_with_random_address_but_advertise_resolvable(irk) gap_name = hci_packets.GapData() gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) config = le_advertising_facade.AdvertisingConfig( advertisement=[gap_data], interval_min=512, interval_max=768, advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, channel_map=7, filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) extended_config = le_advertising_facade.ExtendedAdvertisingConfig( include_tx_power=True, connectable=True, legacy_pdus=True, advertising_config=config, secondary_advertising_phy=ble_scan_settings_phys["1m"]) request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config) create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request) self.wait_for_advertising_set_started() get_own_address_request = le_advertising_facade.GetOwnAddressRequest( advertiser_id=create_response.advertiser_id) self.cert.hci_le_advertising_manager.GetOwnAddress(get_own_address_request) advertising_address = self.wait_for_own_address() oob_data = self._generate_cert_oob_data(self.TRANSPORT_LE) assertThat(oob_data).isNotNone() self.dut.sl4a.bluetoothCreateBondOutOfBand( advertising_address.decode("utf-8").upper(), self.TRANSPORT_LE, oob_data.confirmation_value.hex(), oob_data.random_value.hex()) address_with_type = self.wait_for_yes_no_dialog() self.cert.security.SendUiCallback( UiCallbackMsg( message_type=UiCallbackType.PAIRING_PROMPT, boolean=True, unique_id=1, address=address_with_type)) assertThat(self.cert_security.get_bond_stream()).emits(SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED)) try: bond_state = self.dut.ed.pop_event(self.SL4A_EVENT_BONDED, self.default_timeout) except queue.Empty as error: logging.error("Failed to generate OOB data!") assertThat(bond_state).isNotNone() assertThat(bond_state["data"]["bonded_state"]).isEqualTo(True)
system/gd/security/facade.cc +73 −5 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include "grpc/grpc_event_queue.h" #include "hci/address_with_type.h" #include "hci/le_address_manager.h" #include "hci/le_advertising_manager.h" #include "l2cap/classic/security_policy.h" #include "l2cap/le/l2cap_le_module.h" #include "os/handler.h" Loading Loading @@ -54,11 +55,20 @@ blueberry::facade::BluetoothAddressWithType ToFacadeAddressWithType(hci::Address } // namespace class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public ISecurityManagerListener, public UI { class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public ISecurityManagerListener, public UI, public hci::AdvertisingCallback { public: SecurityModuleFacadeService( SecurityModule* security_module, L2capLeModule* l2cap_le_module, ::bluetooth::os::Handler* security_handler) : security_module_(security_module), l2cap_le_module_(l2cap_le_module), security_handler_(security_handler) { SecurityModule* security_module, L2capLeModule* l2cap_le_module, ::bluetooth::os::Handler* security_handler, hci::LeAdvertisingManager* le_advertising_manager) : security_module_(security_module), l2cap_le_module_(l2cap_le_module), security_handler_(security_handler), le_advertising_manager_(le_advertising_manager) { security_module_->GetSecurityManager()->RegisterCallbackListener(this, security_handler_); security_module_->GetSecurityManager()->SetUserInterfaceHandler(this, security_handler_); Loading Loading @@ -224,6 +234,58 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public ::grpc::ServerWriter<SecurityHelperMsg>* writer) override { return helper_events_.RunLoop(context, writer); } ::grpc::Status FetchAdvertisingCallbackEvents( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<AdvertisingCallbackMsg>* writer) override { le_advertising_manager_->RegisterAdvertisingCallback(this); return advertising_callback_events_.RunLoop(context, writer); } void OnAdvertisingSetStarted(int reg_id, uint8_t advertiser_id, int8_t tx_power, AdvertisingStatus status) { AdvertisingCallbackMsg advertising_set_started; advertising_set_started.set_message_type(AdvertisingCallbackMsgType::ADVERTISING_SET_STARTED); advertising_set_started.set_advertising_started(AdvertisingSetStarted::STARTED); advertising_set_started.set_advertiser_id(advertiser_id); advertising_callback_events_.OnIncomingEvent(advertising_set_started); } void OnAdvertisingEnabled(uint8_t advertiser_id, bool enable, uint8_t status) { // Not used yet } void OnAdvertisingDataSet(uint8_t advertiser_id, uint8_t status) { // Not used yet } void OnScanResponseDataSet(uint8_t advertiser_id, uint8_t status) { // Not used yet } void OnAdvertisingParametersUpdated(uint8_t advertiser_id, int8_t tx_power, uint8_t status) { // Not used yet } void OnPeriodicAdvertisingParametersUpdated(uint8_t advertiser_id, uint8_t status) { // Not used yet } void OnPeriodicAdvertisingDataSet(uint8_t advertiser_id, uint8_t status) { // Not used yet } void OnPeriodicAdvertisingEnabled(uint8_t advertiser_id, bool enable, uint8_t status) { // Not used yet } void OnOwnAddressRead(uint8_t advertiser_id, uint8_t address_type, Address address) { AdvertisingCallbackMsg get_own_address; get_own_address.set_message_type(AdvertisingCallbackMsgType::OWN_ADDRESS_READ); get_own_address.mutable_address()->set_address(address.ToString()); advertising_callback_events_.OnIncomingEvent(get_own_address); } ::grpc::Status SetIoCapability(::grpc::ServerContext* context, const IoCapabilityMessage* request, ::google::protobuf::Empty* response) override { security_module_->GetFacadeConfigurationApi()->SetIoCapability( Loading Loading @@ -532,6 +594,7 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public SecurityModule* security_module_; L2capLeModule* l2cap_le_module_; ::bluetooth::os::Handler* security_handler_; hci::LeAdvertisingManager* le_advertising_manager_; ::bluetooth::grpc::GrpcEventQueue<UiMsg> ui_events_{"UI events"}; ::bluetooth::grpc::GrpcEventQueue<BondMsg> bond_events_{"Bond events"}; ::bluetooth::grpc::GrpcEventQueue<SecurityHelperMsg> helper_events_{"Events that don't fit any other category"}; Loading @@ -539,6 +602,7 @@ class SecurityModuleFacadeService : public SecurityModuleFacade::Service, public "Enforce Security Policy Events"}; ::bluetooth::grpc::GrpcEventQueue<DisconnectMsg> disconnect_events_{"Disconnect events"}; ::bluetooth::grpc::GrpcEventQueue<OobDataBondMessage> oob_events_{"OOB Data events"}; ::bluetooth::grpc::GrpcEventQueue<AdvertisingCallbackMsg> advertising_callback_events_{"Advertising callback events"}; uint32_t unique_id{1}; std::map<uint32_t, common::OnceCallback<void(bool)>> user_yes_no_callbacks_; std::map<uint32_t, common::OnceCallback<void(uint32_t)>> user_passkey_callbacks_; Loading @@ -548,12 +612,16 @@ void SecurityModuleFacadeModule::ListDependencies(ModuleList* list) const { ::bluetooth::grpc::GrpcFacadeModule::ListDependencies(list); list->add<SecurityModule>(); list->add<L2capLeModule>(); list->add<hci::LeAdvertisingManager>(); } void SecurityModuleFacadeModule::Start() { ::bluetooth::grpc::GrpcFacadeModule::Start(); service_ = new SecurityModuleFacadeService(GetDependency<SecurityModule>(), GetDependency<L2capLeModule>(), GetHandler()); service_ = new SecurityModuleFacadeService( GetDependency<SecurityModule>(), GetDependency<L2capLeModule>(), GetHandler(), GetDependency<hci::LeAdvertisingManager>()); } void SecurityModuleFacadeModule::Stop() { Loading
system/gd/security/record/security_record_storage.cc +3 −3 Original line number Diff line number Diff line Loading @@ -114,10 +114,10 @@ void SecurityRecordStorage::SaveSecurityRecords(std::set<std::shared_ptr<record: } else if (!record->IsClassicLinkKeyValid() && record->remote_ltk) { mutation.Add(device.SetDeviceType(hci::DeviceType::LE)); } else { LOG_ERROR( "Cannot determine device type from security record for '%s'; dropping!", mutation.Add(device.SetDeviceType(hci::DeviceType::LE)); LOG_WARN( "Cannot determine device type from security record for '%s'; defaulting to LE", record->GetPseudoAddress()->ToString().c_str()); continue; } mutation.Commit(); SetClassicData(mutation, record, device); Loading