Loading system/gd/cert/matchers.py +1 −1 Original line number Diff line number Diff line Loading @@ -31,7 +31,7 @@ from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionRespo class HciMatchers(object): @staticmethod def CommandComplete(opcode=None): def CommandComplete(opcode): return lambda msg: HciMatchers._is_matching_command_complete(msg.payload, opcode) @staticmethod Loading system/gd/cert/py_security.py +50 −8 Original line number Diff line number Diff line Loading @@ -31,7 +31,10 @@ from security.facade_pb2 import BondMsgType from security.facade_pb2 import SecurityPolicyMessage from security.facade_pb2 import IoCapabilities from security.facade_pb2 import IoCapabilityMessage from security.facade_pb2 import OobDataBondMessage from security.facade_pb2 import OobDataMessage from security.facade_pb2 import OobDataPresentMessage from security.facade_pb2 import UiMsgType from security.facade_pb2 import UiCallbackMsg from security.facade_pb2 import UiCallbackType Loading @@ -44,7 +47,7 @@ class PySecurity(Closable): _io_capabilities_name_lookup = { IoCapabilities.DISPLAY_ONLY: "DISPLAY_ONLY", IoCapabilities.DISPLAY_YES_NO_IO_CAP: "DISPLAY_YES_NO_IO_CAP", #IoCapabilities.KEYBOARD_ONLY:"KEYBOARD_ONLY", IoCapabilities.KEYBOARD_ONLY: "KEYBOARD_ONLY", IoCapabilities.NO_INPUT_NO_OUTPUT: "NO_INPUT_NO_OUTPUT", } Loading Loading @@ -78,6 +81,27 @@ class PySecurity(Closable): self._device.security.CreateBond( common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type)) def create_bond_out_of_band(self, address, type, p192_oob_data, p256_oob_data): """ Triggers stack under test to create bond using Out of Band method """ logging.debug("DUT: Creating OOB bond to '%s' from '%s'" % (str(address), str(self._device.address))) self._device.security.CreateBondOutOfBand( OobDataBondMessage( address=common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type), p192_data=OobDataMessage( address=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=address), type=type), confirmation_value=bytes(bytearray(p192_oob_data[0])), random_value=bytes(bytearray(p192_oob_data[1]))), p256_data=OobDataMessage( address=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=address), type=type), confirmation_value=bytes(bytearray(p256_oob_data[0])), random_value=bytes(bytearray(p256_oob_data[1]))))) def remove_bond(self, address, type): """ Removes bond from stack under test Loading @@ -101,13 +125,6 @@ class PySecurity(Closable): auth_reqs, "ERROR")) self._device.security.SetAuthenticationRequirements(AuthenticationRequirementsMessage(requirement=auth_reqs)) def set_oob_data(self, data_present): """ Set the Out-of-band data present flag for SSP pairing """ logging.info("DUT: setting OOB data present to '%s'" % data_present) self._device.security.SetOobDataPresent(OobDataPresentMessage(data_present=data_present)) def send_ui_callback(self, address, callback_type, b, uid): """ Send a callback from the UI as if the user pressed a button on the dialog Loading Loading @@ -141,6 +158,31 @@ class PySecurity(Closable): """ pass def accept_oob_pairing(self, cert_address, reply_boolean): """ Here we pass, but in cert we perform pairing flow tasks. This was added here in order to be more dynamic, but the stack under test will handle the pairing flow. """ pass def wait_for_passkey(self, cert_address): """ Respond to the UI event """ passkey = -1 def get_unique_id(event): if event.message_type == UiMsgType.DISPLAY_PASSKEY: nonlocal passkey passkey = event.numeric_value return True return False logging.debug("DUT: Waiting for expected UI event") assertThat(self._ui_event_stream).emits(get_unique_id) return passkey def on_user_input(self, cert_address, reply_boolean, expected_ui_event): """ Respond to the UI event Loading system/gd/packet/parser/main.cc +24 −8 Original line number Diff line number Diff line Loading @@ -110,6 +110,8 @@ bool generate_cpp_headers_one_file(const Declarations& decls, const std::filesys auto gen_file = gen_path / (input_filename + ".h"); std::cout << "generating " << gen_file << std::endl; std::ofstream out_file; out_file.open(gen_file); if (!out_file.is_open()) { Loading Loading @@ -270,6 +272,7 @@ bool generate_pybind11_sources_one_file(const Declarations& decls, const std::fi std::vector<std::ofstream> out_file_shards(num_shards); for (size_t i = 0; i < out_file_shards.size(); i++) { auto filename = gen_path / (input_filename + "_python3_shard_" + std::to_string(i) + ".cc"); std::cout << "generating " << filename << std::endl; auto& out_file = out_file_shards[i]; out_file.open(filename); if (!out_file.is_open()) { Loading Loading @@ -416,11 +419,13 @@ int main(int argc, const char** argv) { std::string root_namespace = "bluetooth"; // Number of shards per output pybind11 cc file size_t num_shards = 1; bool generate_rust = false; std::queue<std::filesystem::path> input_files; const std::string arg_out = "--out="; const std::string arg_include = "--include="; const std::string arg_namespace = "--root_namespace="; const std::string arg_num_shards = "--num_shards="; const std::string arg_rust = "--rust"; for (int i = 1; i < argc; i++) { std::string arg = argv[i]; Loading @@ -432,6 +437,8 @@ int main(int argc, const char** argv) { root_namespace = arg.substr(arg_namespace.size()); } else if (arg.find(arg_num_shards) == 0) { num_shards = std::stoul(arg.substr(arg_num_shards.size())); } else if (arg.find(arg_rust) == 0) { generate_rust = true; } else { input_files.emplace(std::filesystem::current_path() / std::filesystem::path(arg)); } Loading @@ -442,21 +449,30 @@ int main(int argc, const char** argv) { return 1; } std::cout << "out dir: " << out_dir << std::endl; while (!input_files.empty()) { Declarations declarations; std::cout << "parsing: " << input_files.front() << std::endl; if (!parse_declarations_one_file(input_files.front(), &declarations)) { std::cerr << "Cannot parse " << input_files.front() << " correctly" << std::endl; return 2; } if (generate_rust) { std::cout << "generating rust" << std::endl; // TODO do fun things } else { std::cout << "generating c++ and pybind11" << std::endl; if (!generate_cpp_headers_one_file(declarations, input_files.front(), include_dir, out_dir, root_namespace)) { std::cerr << "Didn't generate cpp headers for " << input_files.front() << std::endl; return 3; } if (!generate_pybind11_sources_one_file(declarations, input_files.front(), include_dir, out_dir, root_namespace, num_shards)) { if (!generate_pybind11_sources_one_file( declarations, input_files.front(), include_dir, out_dir, root_namespace, num_shards)) { std::cerr << "Didn't generate pybind11 sources for " << input_files.front() << std::endl; return 4; } } input_files.pop(); } Loading system/gd/security/cert/cert_security.py +50 −25 Original line number Diff line number Diff line Loading @@ -69,7 +69,6 @@ class CertSecurity(PySecurity): _hci_event_stream = None _io_caps = hci_packets.IoCapability.DISPLAY_ONLY _remote_oob_data = None _auth_reqs = hci_packets.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION _secure_connections_enabled = False Loading Loading @@ -131,13 +130,6 @@ class CertSecurity(PySecurity): auth_reqs, "ERROR")) self._auth_reqs = self._auth_req_lookup.get(auth_reqs, hci_packets.AuthenticationRequirements.GENERAL_BONDING) def set_remote_oob_data(self, remote_data): """ Set the Out-of-band data for SSP pairing """ logging.info("Cert: setting OOB data present to '%s'" % remote_data) self._remote_oob_data = remote_data def get_oob_data_from_controller(self, pb_oob_data_type): """ Get the Out-of-band data for SSP pairing Loading @@ -146,7 +138,6 @@ class CertSecurity(PySecurity): :return: a tuple of bytes (192c,192r,256c,256r) with increasing security; bytes may be all 0s depending on pb_oob_data_type value """ oob_data_type = self._oob_present_lookup[pb_oob_data_type] if (oob_data_type == hci_packets.OobDataPresent.NOT_PRESENT): Loading Loading @@ -202,6 +193,43 @@ class CertSecurity(PySecurity): return (list(complete.GetC192()), list(complete.GetR192()), list(complete.GetC256()), list(complete.GetR256())) def input_passkey(self, address, passkey): """ Pretend to answer the pairing dialog as a user """ logging.info("Cert: Waiting for PASSKEY request") assertThat(self._hci_event_stream).emits(HciMatchers.EventWithCode(hci_packets.EventCode.USER_PASSKEY_REQUEST)) logging.info("Cert: Send user input passkey %d for %s" % (passkey, address)) peer = address.decode('utf-8') self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.ENTRY_STARTED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.CLEARED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ERASED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.ENTRY_COMPLETED), True) self._enqueue_hci_command(hci_packets.UserPasskeyRequestReplyBuilder(peer, passkey), True) def send_ui_callback(self, address, callback_type, b, uid): """ Pretend to answer the pairing dailog as a user Loading Loading @@ -229,7 +257,17 @@ class CertSecurity(PySecurity): logging.info("Cert: Waiting for controller response") assertThat(self._hci_event_stream).emits( HciMatchers.CommandComplete(hci_packets.OpCode.WRITE_SECURE_CONNECTIONS_HOST_SUPPORT)) self._secure_connections_enabled = True # TODO(optedoblivion): Figure this out and remove (see classic_pairing_handler.cc) #self._secure_connections_enabled = True def send_io_caps(self, address): logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST") assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest()) logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY") oob_data_present = hci_packets.OobDataPresent.NOT_PRESENT self._enqueue_hci_command( hci_packets.IoCapabilityRequestReplyBuilder( address.decode('utf8'), self._io_caps, oob_data_present, self._auth_reqs), True) def accept_pairing(self, dut_address, reply_boolean): """ Loading @@ -239,13 +277,7 @@ class CertSecurity(PySecurity): assertThat(self._hci_event_stream).emits(HciMatchers.LinkKeyRequest()) logging.info("Cert: Sending LINK_KEY_REQUEST_NEGATIVE_REPLY") self._enqueue_hci_command(hci_packets.LinkKeyRequestNegativeReplyBuilder(dut_address.decode('utf8')), True) logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST") assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest()) logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY") self._enqueue_hci_command( hci_packets.IoCapabilityRequestReplyBuilder( dut_address.decode('utf8'), self._io_caps, hci_packets.OobDataPresent.NOT_PRESENT, self._auth_reqs), True) self.send_io_caps(dut_address) logging.info("Cert: Waiting for USER_CONFIRMATION_REQUEST") assertThat(self._hci_event_stream).emits(HciMatchers.UserConfirmationRequest()) logging.info("Cert: Sending Simulated User Response '%s'" % reply_boolean) Loading @@ -266,14 +298,7 @@ class CertSecurity(PySecurity): def accept_oob_pairing(self, dut_address): logging.info("Cert: Waiting for IO_CAPABILITY_RESPONSE") assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityResponse()) logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST") assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest()) logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY") oob_data_present = hci_packets.OobDataPresent.NOT_PRESENT self._enqueue_hci_command( hci_packets.IoCapabilityRequestReplyBuilder( dut_address.decode('utf8'), self._io_caps, oob_data_present, self._auth_reqs), True) assertThat(self._hci_event_stream).emits(HciMatchers.CommandComplete()) self.send_io_caps(dut_address) logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE") ssp_complete_capture = HciCaptures.SimplePairingCompleteCapture() assertThat(self._hci_event_stream).emits(ssp_complete_capture) Loading system/gd/security/cert/le_security_test.py +24 −24 Original line number Diff line number Diff line Loading @@ -831,20 +831,20 @@ class LeSecurityTest(GdBaseTestClass): self._prepare_cert_for_connection() if dut_oob_flag == LeOobDataFlag.PRESENT: oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.cert.security.GetLeOutOfBandData(empty_proto.Empty()) self.dut.security.SetOutOfBandData( OobDataMessage( address=self.cert_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) if cert_oob_flag == LeOobDataFlag.PRESENT: oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.dut.security.GetLeOutOfBandData(empty_proto.Empty()) self.cert.security.SetOutOfBandData( OobDataMessage( address=self.dut_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) self.dut.security.SetLeIoCapability(KEYBOARD_ONLY) self.dut.security.SetLeOobDataPresent(dut_oob_flag) Loading Loading @@ -898,20 +898,20 @@ class LeSecurityTest(GdBaseTestClass): self._prepare_dut_for_connection() if dut_oob_flag == LeOobDataFlag.PRESENT: oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.cert.security.GetLeOutOfBandData(empty_proto.Empty()) self.dut.security.SetOutOfBandData( OobDataMessage( address=self.cert_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) if cert_oob_flag == LeOobDataFlag.PRESENT: oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.dut.security.GetLeOutOfBandData(empty_proto.Empty()) self.cert.security.SetOutOfBandData( OobDataMessage( address=self.dut_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) self.dut.security.SetLeIoCapability(KEYBOARD_ONLY) self.dut.security.SetLeOobDataPresent(dut_oob_flag) Loading Loading @@ -965,19 +965,19 @@ class LeSecurityTest(GdBaseTestClass): self._prepare_dut_for_connection() oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.cert.security.GetLeOutOfBandData(empty_proto.Empty()) self.dut.security.SetOutOfBandData( OobDataMessage( address=self.cert_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.dut.security.GetLeOutOfBandData(empty_proto.Empty()) self.cert.security.SetOutOfBandData( OobDataMessage( address=self.dut_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) self.dut.security.SetLeIoCapability(KEYBOARD_ONLY) self.dut.security.SetLeOobDataPresent(OOB_PRESENT) Loading Loading @@ -1036,19 +1036,19 @@ class LeSecurityTest(GdBaseTestClass): self._prepare_cert_for_connection() oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.cert.security.GetLeOutOfBandData(empty_proto.Empty()) self.dut.security.SetOutOfBandData( OobDataMessage( address=self.cert_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.dut.security.GetLeOutOfBandData(empty_proto.Empty()) self.cert.security.SetOutOfBandData( OobDataMessage( address=self.dut_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) self.dut.security.SetLeIoCapability(KEYBOARD_ONLY) self.dut.security.SetLeOobDataPresent(OOB_PRESENT) Loading Loading
system/gd/cert/matchers.py +1 −1 Original line number Diff line number Diff line Loading @@ -31,7 +31,7 @@ from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionRespo class HciMatchers(object): @staticmethod def CommandComplete(opcode=None): def CommandComplete(opcode): return lambda msg: HciMatchers._is_matching_command_complete(msg.payload, opcode) @staticmethod Loading
system/gd/cert/py_security.py +50 −8 Original line number Diff line number Diff line Loading @@ -31,7 +31,10 @@ from security.facade_pb2 import BondMsgType from security.facade_pb2 import SecurityPolicyMessage from security.facade_pb2 import IoCapabilities from security.facade_pb2 import IoCapabilityMessage from security.facade_pb2 import OobDataBondMessage from security.facade_pb2 import OobDataMessage from security.facade_pb2 import OobDataPresentMessage from security.facade_pb2 import UiMsgType from security.facade_pb2 import UiCallbackMsg from security.facade_pb2 import UiCallbackType Loading @@ -44,7 +47,7 @@ class PySecurity(Closable): _io_capabilities_name_lookup = { IoCapabilities.DISPLAY_ONLY: "DISPLAY_ONLY", IoCapabilities.DISPLAY_YES_NO_IO_CAP: "DISPLAY_YES_NO_IO_CAP", #IoCapabilities.KEYBOARD_ONLY:"KEYBOARD_ONLY", IoCapabilities.KEYBOARD_ONLY: "KEYBOARD_ONLY", IoCapabilities.NO_INPUT_NO_OUTPUT: "NO_INPUT_NO_OUTPUT", } Loading Loading @@ -78,6 +81,27 @@ class PySecurity(Closable): self._device.security.CreateBond( common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type)) def create_bond_out_of_band(self, address, type, p192_oob_data, p256_oob_data): """ Triggers stack under test to create bond using Out of Band method """ logging.debug("DUT: Creating OOB bond to '%s' from '%s'" % (str(address), str(self._device.address))) self._device.security.CreateBondOutOfBand( OobDataBondMessage( address=common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type), p192_data=OobDataMessage( address=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=address), type=type), confirmation_value=bytes(bytearray(p192_oob_data[0])), random_value=bytes(bytearray(p192_oob_data[1]))), p256_data=OobDataMessage( address=common.BluetoothAddressWithType( address=common.BluetoothAddress(address=address), type=type), confirmation_value=bytes(bytearray(p256_oob_data[0])), random_value=bytes(bytearray(p256_oob_data[1]))))) def remove_bond(self, address, type): """ Removes bond from stack under test Loading @@ -101,13 +125,6 @@ class PySecurity(Closable): auth_reqs, "ERROR")) self._device.security.SetAuthenticationRequirements(AuthenticationRequirementsMessage(requirement=auth_reqs)) def set_oob_data(self, data_present): """ Set the Out-of-band data present flag for SSP pairing """ logging.info("DUT: setting OOB data present to '%s'" % data_present) self._device.security.SetOobDataPresent(OobDataPresentMessage(data_present=data_present)) def send_ui_callback(self, address, callback_type, b, uid): """ Send a callback from the UI as if the user pressed a button on the dialog Loading Loading @@ -141,6 +158,31 @@ class PySecurity(Closable): """ pass def accept_oob_pairing(self, cert_address, reply_boolean): """ Here we pass, but in cert we perform pairing flow tasks. This was added here in order to be more dynamic, but the stack under test will handle the pairing flow. """ pass def wait_for_passkey(self, cert_address): """ Respond to the UI event """ passkey = -1 def get_unique_id(event): if event.message_type == UiMsgType.DISPLAY_PASSKEY: nonlocal passkey passkey = event.numeric_value return True return False logging.debug("DUT: Waiting for expected UI event") assertThat(self._ui_event_stream).emits(get_unique_id) return passkey def on_user_input(self, cert_address, reply_boolean, expected_ui_event): """ Respond to the UI event Loading
system/gd/packet/parser/main.cc +24 −8 Original line number Diff line number Diff line Loading @@ -110,6 +110,8 @@ bool generate_cpp_headers_one_file(const Declarations& decls, const std::filesys auto gen_file = gen_path / (input_filename + ".h"); std::cout << "generating " << gen_file << std::endl; std::ofstream out_file; out_file.open(gen_file); if (!out_file.is_open()) { Loading Loading @@ -270,6 +272,7 @@ bool generate_pybind11_sources_one_file(const Declarations& decls, const std::fi std::vector<std::ofstream> out_file_shards(num_shards); for (size_t i = 0; i < out_file_shards.size(); i++) { auto filename = gen_path / (input_filename + "_python3_shard_" + std::to_string(i) + ".cc"); std::cout << "generating " << filename << std::endl; auto& out_file = out_file_shards[i]; out_file.open(filename); if (!out_file.is_open()) { Loading Loading @@ -416,11 +419,13 @@ int main(int argc, const char** argv) { std::string root_namespace = "bluetooth"; // Number of shards per output pybind11 cc file size_t num_shards = 1; bool generate_rust = false; std::queue<std::filesystem::path> input_files; const std::string arg_out = "--out="; const std::string arg_include = "--include="; const std::string arg_namespace = "--root_namespace="; const std::string arg_num_shards = "--num_shards="; const std::string arg_rust = "--rust"; for (int i = 1; i < argc; i++) { std::string arg = argv[i]; Loading @@ -432,6 +437,8 @@ int main(int argc, const char** argv) { root_namespace = arg.substr(arg_namespace.size()); } else if (arg.find(arg_num_shards) == 0) { num_shards = std::stoul(arg.substr(arg_num_shards.size())); } else if (arg.find(arg_rust) == 0) { generate_rust = true; } else { input_files.emplace(std::filesystem::current_path() / std::filesystem::path(arg)); } Loading @@ -442,21 +449,30 @@ int main(int argc, const char** argv) { return 1; } std::cout << "out dir: " << out_dir << std::endl; while (!input_files.empty()) { Declarations declarations; std::cout << "parsing: " << input_files.front() << std::endl; if (!parse_declarations_one_file(input_files.front(), &declarations)) { std::cerr << "Cannot parse " << input_files.front() << " correctly" << std::endl; return 2; } if (generate_rust) { std::cout << "generating rust" << std::endl; // TODO do fun things } else { std::cout << "generating c++ and pybind11" << std::endl; if (!generate_cpp_headers_one_file(declarations, input_files.front(), include_dir, out_dir, root_namespace)) { std::cerr << "Didn't generate cpp headers for " << input_files.front() << std::endl; return 3; } if (!generate_pybind11_sources_one_file(declarations, input_files.front(), include_dir, out_dir, root_namespace, num_shards)) { if (!generate_pybind11_sources_one_file( declarations, input_files.front(), include_dir, out_dir, root_namespace, num_shards)) { std::cerr << "Didn't generate pybind11 sources for " << input_files.front() << std::endl; return 4; } } input_files.pop(); } Loading
system/gd/security/cert/cert_security.py +50 −25 Original line number Diff line number Diff line Loading @@ -69,7 +69,6 @@ class CertSecurity(PySecurity): _hci_event_stream = None _io_caps = hci_packets.IoCapability.DISPLAY_ONLY _remote_oob_data = None _auth_reqs = hci_packets.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION _secure_connections_enabled = False Loading Loading @@ -131,13 +130,6 @@ class CertSecurity(PySecurity): auth_reqs, "ERROR")) self._auth_reqs = self._auth_req_lookup.get(auth_reqs, hci_packets.AuthenticationRequirements.GENERAL_BONDING) def set_remote_oob_data(self, remote_data): """ Set the Out-of-band data for SSP pairing """ logging.info("Cert: setting OOB data present to '%s'" % remote_data) self._remote_oob_data = remote_data def get_oob_data_from_controller(self, pb_oob_data_type): """ Get the Out-of-band data for SSP pairing Loading @@ -146,7 +138,6 @@ class CertSecurity(PySecurity): :return: a tuple of bytes (192c,192r,256c,256r) with increasing security; bytes may be all 0s depending on pb_oob_data_type value """ oob_data_type = self._oob_present_lookup[pb_oob_data_type] if (oob_data_type == hci_packets.OobDataPresent.NOT_PRESENT): Loading Loading @@ -202,6 +193,43 @@ class CertSecurity(PySecurity): return (list(complete.GetC192()), list(complete.GetR192()), list(complete.GetC256()), list(complete.GetR256())) def input_passkey(self, address, passkey): """ Pretend to answer the pairing dialog as a user """ logging.info("Cert: Waiting for PASSKEY request") assertThat(self._hci_event_stream).emits(HciMatchers.EventWithCode(hci_packets.EventCode.USER_PASSKEY_REQUEST)) logging.info("Cert: Send user input passkey %d for %s" % (passkey, address)) peer = address.decode('utf-8') self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.ENTRY_STARTED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.CLEARED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ERASED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) self._enqueue_hci_command( hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.ENTRY_COMPLETED), True) self._enqueue_hci_command(hci_packets.UserPasskeyRequestReplyBuilder(peer, passkey), True) def send_ui_callback(self, address, callback_type, b, uid): """ Pretend to answer the pairing dailog as a user Loading Loading @@ -229,7 +257,17 @@ class CertSecurity(PySecurity): logging.info("Cert: Waiting for controller response") assertThat(self._hci_event_stream).emits( HciMatchers.CommandComplete(hci_packets.OpCode.WRITE_SECURE_CONNECTIONS_HOST_SUPPORT)) self._secure_connections_enabled = True # TODO(optedoblivion): Figure this out and remove (see classic_pairing_handler.cc) #self._secure_connections_enabled = True def send_io_caps(self, address): logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST") assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest()) logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY") oob_data_present = hci_packets.OobDataPresent.NOT_PRESENT self._enqueue_hci_command( hci_packets.IoCapabilityRequestReplyBuilder( address.decode('utf8'), self._io_caps, oob_data_present, self._auth_reqs), True) def accept_pairing(self, dut_address, reply_boolean): """ Loading @@ -239,13 +277,7 @@ class CertSecurity(PySecurity): assertThat(self._hci_event_stream).emits(HciMatchers.LinkKeyRequest()) logging.info("Cert: Sending LINK_KEY_REQUEST_NEGATIVE_REPLY") self._enqueue_hci_command(hci_packets.LinkKeyRequestNegativeReplyBuilder(dut_address.decode('utf8')), True) logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST") assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest()) logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY") self._enqueue_hci_command( hci_packets.IoCapabilityRequestReplyBuilder( dut_address.decode('utf8'), self._io_caps, hci_packets.OobDataPresent.NOT_PRESENT, self._auth_reqs), True) self.send_io_caps(dut_address) logging.info("Cert: Waiting for USER_CONFIRMATION_REQUEST") assertThat(self._hci_event_stream).emits(HciMatchers.UserConfirmationRequest()) logging.info("Cert: Sending Simulated User Response '%s'" % reply_boolean) Loading @@ -266,14 +298,7 @@ class CertSecurity(PySecurity): def accept_oob_pairing(self, dut_address): logging.info("Cert: Waiting for IO_CAPABILITY_RESPONSE") assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityResponse()) logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST") assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest()) logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY") oob_data_present = hci_packets.OobDataPresent.NOT_PRESENT self._enqueue_hci_command( hci_packets.IoCapabilityRequestReplyBuilder( dut_address.decode('utf8'), self._io_caps, oob_data_present, self._auth_reqs), True) assertThat(self._hci_event_stream).emits(HciMatchers.CommandComplete()) self.send_io_caps(dut_address) logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE") ssp_complete_capture = HciCaptures.SimplePairingCompleteCapture() assertThat(self._hci_event_stream).emits(ssp_complete_capture) Loading
system/gd/security/cert/le_security_test.py +24 −24 Original line number Diff line number Diff line Loading @@ -831,20 +831,20 @@ class LeSecurityTest(GdBaseTestClass): self._prepare_cert_for_connection() if dut_oob_flag == LeOobDataFlag.PRESENT: oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.cert.security.GetLeOutOfBandData(empty_proto.Empty()) self.dut.security.SetOutOfBandData( OobDataMessage( address=self.cert_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) if cert_oob_flag == LeOobDataFlag.PRESENT: oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.dut.security.GetLeOutOfBandData(empty_proto.Empty()) self.cert.security.SetOutOfBandData( OobDataMessage( address=self.dut_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) self.dut.security.SetLeIoCapability(KEYBOARD_ONLY) self.dut.security.SetLeOobDataPresent(dut_oob_flag) Loading Loading @@ -898,20 +898,20 @@ class LeSecurityTest(GdBaseTestClass): self._prepare_dut_for_connection() if dut_oob_flag == LeOobDataFlag.PRESENT: oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.cert.security.GetLeOutOfBandData(empty_proto.Empty()) self.dut.security.SetOutOfBandData( OobDataMessage( address=self.cert_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) if cert_oob_flag == LeOobDataFlag.PRESENT: oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.dut.security.GetLeOutOfBandData(empty_proto.Empty()) self.cert.security.SetOutOfBandData( OobDataMessage( address=self.dut_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) self.dut.security.SetLeIoCapability(KEYBOARD_ONLY) self.dut.security.SetLeOobDataPresent(dut_oob_flag) Loading Loading @@ -965,19 +965,19 @@ class LeSecurityTest(GdBaseTestClass): self._prepare_dut_for_connection() oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.cert.security.GetLeOutOfBandData(empty_proto.Empty()) self.dut.security.SetOutOfBandData( OobDataMessage( address=self.cert_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.dut.security.GetLeOutOfBandData(empty_proto.Empty()) self.cert.security.SetOutOfBandData( OobDataMessage( address=self.dut_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) self.dut.security.SetLeIoCapability(KEYBOARD_ONLY) self.dut.security.SetLeOobDataPresent(OOB_PRESENT) Loading Loading @@ -1036,19 +1036,19 @@ class LeSecurityTest(GdBaseTestClass): self._prepare_cert_for_connection() oobdata = self.cert.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.cert.security.GetLeOutOfBandData(empty_proto.Empty()) self.dut.security.SetOutOfBandData( OobDataMessage( address=self.cert_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) oobdata = self.dut.security.GetOutOfBandData(empty_proto.Empty()) oobdata = self.dut.security.GetLeOutOfBandData(empty_proto.Empty()) self.cert.security.SetOutOfBandData( OobDataMessage( address=self.dut_address, le_sc_confirmation_value=oobdata.le_sc_confirmation_value, le_sc_random_value=oobdata.le_sc_random_value)) confirmation_value=oobdata.confirmation_value, random_value=oobdata.random_value)) self.dut.security.SetLeIoCapability(KEYBOARD_ONLY) self.dut.security.SetLeOobDataPresent(OOB_PRESENT) Loading