Loading system/gd/l2cap/classic/cert/cert.cc +18 −0 Original line number Diff line number Diff line Loading @@ -395,6 +395,24 @@ class L2capClassicModuleCertService : public L2capClassicModuleCert::Service { FetchL2capLogResponse response; response.mutable_configuration_request()->set_signal_id(control_view.GetIdentifier()); response.mutable_configuration_request()->set_dcid(view.GetDestinationCid()); for (auto& option : view.GetConfig()) { if (option->type_ == ConfigurationOptionType::RETRANSMISSION_AND_FLOW_CONTROL) { auto config = RetransmissionAndFlowControlConfigurationOption::Specialize(option.get()); response.mutable_configuration_request()->mutable_retransmission_config()->set_mode( ChannelRetransmissionFlowControlMode::ERTM); response.mutable_configuration_request()->mutable_retransmission_config()->set_tx_window( config->tx_window_size_); response.mutable_configuration_request()->mutable_retransmission_config()->set_max_transmit( config->max_transmit_); response.mutable_configuration_request()->mutable_retransmission_config()->set_retransmit_timeout( config->retransmission_time_out_); response.mutable_configuration_request()->mutable_retransmission_config()->set_monitor_timeout( config->monitor_time_out_); response.mutable_configuration_request()->mutable_retransmission_config()->set_mps( config->maximum_pdu_size_); } } LogEvent(response); break; } Loading system/gd/l2cap/classic/cert/simple_l2cap_test.py +74 −0 Original line number Diff line number Diff line Loading @@ -457,3 +457,77 @@ class SimpleL2capTest(GdBaseTestClass): event_handler.execute(logs) assert len(disconnect_request) == 1, "No disconnect request received" assert disconnect_request[0] == (scid, self.scid_dcid_map[scid]), "Incorrect disconnect request received: scid %r, dcid %r" % (disconnect_request[0][0], disconnect_request[0][1]) def test_sent_rej_lost(self): """ L2CAP/ERM/BI-01-C [S-Frame [REJ] Lost or Corrupted] """ self._setup_link() signal_id = 3 scid = 0x0101 psm = 1 mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM self.tx_window = 1 self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, retransmission_mode=mode)) self.cert_device.l2cap.SendConnectionRequest(l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) info_response = [] def handle_connection_response(log): log = log.connection_response self.scid_dcid_map[log.scid] = log.dcid self.cert_device.l2cap.SendConfigurationRequest(l2cap_cert_pb2.ConfigurationRequest( dcid= self.scid_dcid_map[scid], signal_id=signal_id + 1, retransmission_config=l2cap_cert_pb2.ChannelRetransmissionFlowControlConfig( mode=mode ))) self.event_handler.on(is_connection_response, handle_connection_response) def handle_configuration_request(log): log = log.configuration_request if log.dcid not in self.scid_dcid_map: return dcid = self.scid_dcid_map[log.dcid] if log.HasField("retransmission_config"): self.tx_window = log.retransmission_config.tx_window self.cert_device.l2cap.SendConfigurationResponse(l2cap_cert_pb2.ConfigurationResponse( scid=dcid, signal_id=log.signal_id, )) self.cert_device.l2cap.StopFetchingL2capLog(l2cap_cert_pb2.StopFetchingL2capLogRequest()) self.event_handler.on(is_configuration_request, handle_configuration_request) logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest()) self.event_handler.execute(logs) self.cert_device.l2cap.StopFetchingL2capLog(l2cap_cert_pb2.StopFetchingL2capLogRequest()) data_received = [] self.cert_device.l2cap.SendIFrame(l2cap_cert_pb2.IFrame(channel=self.scid_dcid_map[scid], req_seq=0, tx_seq=0, sar=0)) self.cert_device.l2cap.SendIFrame(l2cap_cert_pb2.IFrame(channel=self.scid_dcid_map[scid], req_seq=0, tx_seq=(self.tx_window - 1), sar=0)) def on_data_received(log): log = log.data_packet if (log.channel == scid): data_received.append(log.payload) self.cert_device.l2cap.StopFetchingL2capLog(l2cap_cert_pb2.StopFetchingL2capLogRequest()) self.event_handler.on(lambda log : log.HasField("data_packet"), on_data_received) logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest()) self.event_handler.execute(logs) assert b'\x05\x01' in data_received self.cert_device.l2cap.SendSFrame(l2cap_cert_pb2.SFrame(channel=self.scid_dcid_map[scid], req_seq=0, p=1, s=0)) logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest()) self.event_handler.execute(logs) assert b'\x81\x01' in data_received for i in range(1, self.tx_window): self.cert_device.l2cap.SendIFrame(l2cap_cert_pb2.IFrame(channel=self.scid_dcid_map[scid], req_seq=0, tx_seq=(i), sar=0)) time.sleep(0.1) logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest()) self.event_handler.execute(logs) assert b'\x01\x0a' in data_received Loading
system/gd/l2cap/classic/cert/cert.cc +18 −0 Original line number Diff line number Diff line Loading @@ -395,6 +395,24 @@ class L2capClassicModuleCertService : public L2capClassicModuleCert::Service { FetchL2capLogResponse response; response.mutable_configuration_request()->set_signal_id(control_view.GetIdentifier()); response.mutable_configuration_request()->set_dcid(view.GetDestinationCid()); for (auto& option : view.GetConfig()) { if (option->type_ == ConfigurationOptionType::RETRANSMISSION_AND_FLOW_CONTROL) { auto config = RetransmissionAndFlowControlConfigurationOption::Specialize(option.get()); response.mutable_configuration_request()->mutable_retransmission_config()->set_mode( ChannelRetransmissionFlowControlMode::ERTM); response.mutable_configuration_request()->mutable_retransmission_config()->set_tx_window( config->tx_window_size_); response.mutable_configuration_request()->mutable_retransmission_config()->set_max_transmit( config->max_transmit_); response.mutable_configuration_request()->mutable_retransmission_config()->set_retransmit_timeout( config->retransmission_time_out_); response.mutable_configuration_request()->mutable_retransmission_config()->set_monitor_timeout( config->monitor_time_out_); response.mutable_configuration_request()->mutable_retransmission_config()->set_mps( config->maximum_pdu_size_); } } LogEvent(response); break; } Loading
system/gd/l2cap/classic/cert/simple_l2cap_test.py +74 −0 Original line number Diff line number Diff line Loading @@ -457,3 +457,77 @@ class SimpleL2capTest(GdBaseTestClass): event_handler.execute(logs) assert len(disconnect_request) == 1, "No disconnect request received" assert disconnect_request[0] == (scid, self.scid_dcid_map[scid]), "Incorrect disconnect request received: scid %r, dcid %r" % (disconnect_request[0][0], disconnect_request[0][1]) def test_sent_rej_lost(self): """ L2CAP/ERM/BI-01-C [S-Frame [REJ] Lost or Corrupted] """ self._setup_link() signal_id = 3 scid = 0x0101 psm = 1 mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM self.tx_window = 1 self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, retransmission_mode=mode)) self.cert_device.l2cap.SendConnectionRequest(l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) info_response = [] def handle_connection_response(log): log = log.connection_response self.scid_dcid_map[log.scid] = log.dcid self.cert_device.l2cap.SendConfigurationRequest(l2cap_cert_pb2.ConfigurationRequest( dcid= self.scid_dcid_map[scid], signal_id=signal_id + 1, retransmission_config=l2cap_cert_pb2.ChannelRetransmissionFlowControlConfig( mode=mode ))) self.event_handler.on(is_connection_response, handle_connection_response) def handle_configuration_request(log): log = log.configuration_request if log.dcid not in self.scid_dcid_map: return dcid = self.scid_dcid_map[log.dcid] if log.HasField("retransmission_config"): self.tx_window = log.retransmission_config.tx_window self.cert_device.l2cap.SendConfigurationResponse(l2cap_cert_pb2.ConfigurationResponse( scid=dcid, signal_id=log.signal_id, )) self.cert_device.l2cap.StopFetchingL2capLog(l2cap_cert_pb2.StopFetchingL2capLogRequest()) self.event_handler.on(is_configuration_request, handle_configuration_request) logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest()) self.event_handler.execute(logs) self.cert_device.l2cap.StopFetchingL2capLog(l2cap_cert_pb2.StopFetchingL2capLogRequest()) data_received = [] self.cert_device.l2cap.SendIFrame(l2cap_cert_pb2.IFrame(channel=self.scid_dcid_map[scid], req_seq=0, tx_seq=0, sar=0)) self.cert_device.l2cap.SendIFrame(l2cap_cert_pb2.IFrame(channel=self.scid_dcid_map[scid], req_seq=0, tx_seq=(self.tx_window - 1), sar=0)) def on_data_received(log): log = log.data_packet if (log.channel == scid): data_received.append(log.payload) self.cert_device.l2cap.StopFetchingL2capLog(l2cap_cert_pb2.StopFetchingL2capLogRequest()) self.event_handler.on(lambda log : log.HasField("data_packet"), on_data_received) logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest()) self.event_handler.execute(logs) assert b'\x05\x01' in data_received self.cert_device.l2cap.SendSFrame(l2cap_cert_pb2.SFrame(channel=self.scid_dcid_map[scid], req_seq=0, p=1, s=0)) logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest()) self.event_handler.execute(logs) assert b'\x81\x01' in data_received for i in range(1, self.tx_window): self.cert_device.l2cap.SendIFrame(l2cap_cert_pb2.IFrame(channel=self.scid_dcid_map[scid], req_seq=0, tx_seq=(i), sar=0)) time.sleep(0.1) logs = self.cert_device.l2cap.FetchL2capLog(l2cap_cert_pb2.FetchL2capLogRequest()) self.event_handler.execute(logs) assert b'\x01\x0a' in data_received