Loading system/gd/cert/matchers.py +8 −0 Original line number Diff line number Diff line Loading @@ -129,6 +129,10 @@ class L2capMatchers(object): def ConfigurationRequestWithErtm(): return lambda packet: L2capMatchers._is_matching_configuration_request_with_ertm(packet) @staticmethod def ConfigurationRequestView(dcid): return lambda request_view: request_view.GetDestinationCid() == dcid @staticmethod def DisconnectionRequest(scid, dcid): return lambda packet: L2capMatchers._is_matching_disconnection_request(packet, scid, dcid) Loading @@ -137,6 +141,10 @@ class L2capMatchers(object): def DisconnectionResponse(scid, dcid): return lambda packet: L2capMatchers._is_matching_disconnection_response(packet, scid, dcid) @staticmethod def EchoResponse(): return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.ECHO_RESPONSE) @staticmethod def CommandReject(): return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT) Loading system/gd/cert/py_l2cap.py +28 −14 Original line number Diff line number Diff line Loading @@ -27,11 +27,18 @@ from cert.matchers import L2capMatchers from facade import common_pb2 as common class PyL2capChannel(object): class PyL2capChannel(IEventStream): def __init__(self, device, psm): def __init__(self, device, psm, l2cap_stream): self._device = device self._psm = psm self._le_l2cap_stream = l2cap_stream self._our_le_l2cap_view = FilteringEventStream( self._le_l2cap_stream, L2capMatchers.PacketPayloadWithMatchingPsm(self._psm)) def get_event_queue(self): return self._our_le_l2cap_view.get_event_queue() def send(self, payload): self._device.l2cap.SendDynamicChannelPacket( Loading @@ -49,13 +56,14 @@ class _ClassicConnectionResponseFutureWrapper(object): create the corresponding PyL2capDynamicChannel object later """ def __init__(self, grpc_response_future, device, psm): def __init__(self, grpc_response_future, device, psm, l2cap_stream): self._grpc_response_future = grpc_response_future self._device = device self._psm = psm self._l2cap_stream = l2cap_stream def get_channel(self): return PyL2capChannel(self._device, self._psm) return PyL2capChannel(self._device, self._psm, self._l2cap_stream) class PyL2cap(Closable): Loading @@ -63,9 +71,11 @@ class PyL2cap(Closable): def __init__(self, device, cert_address): self._device = device self._cert_address = cert_address self._l2cap_stream = EventStream( self._device.l2cap.FetchL2capData(empty_proto.Empty())) def close(self): pass safeClose(self._l2cap_stream) def register_dynamic_channel( self, Loading @@ -74,7 +84,7 @@ class PyL2cap(Closable): self._device.l2cap.SetDynamicChannel( l2cap_facade_pb2.SetEnableDynamicChannelRequest( psm=psm, retransmission_mode=mode)) return PyL2capChannel(self._device, psm) return PyL2capChannel(self._device, psm, self._l2cap_stream) def connect_dynamic_channel_to_cert( self, Loading @@ -89,8 +99,8 @@ class PyL2cap(Closable): l2cap_facade_pb2.OpenChannelRequest( psm=psm, remote=self._cert_address, mode=mode)) return _ClassicConnectionResponseFutureWrapper(response_future, self._device, psm) return _ClassicConnectionResponseFutureWrapper( response_future, self._device, psm, self._l2cap_stream) class PyLeL2capFixedChannel(IEventStream): Loading Loading @@ -148,7 +158,8 @@ class _CreditBasedConnectionResponseFutureWrapper(object): create the corresponding PyLeL2capDynamicChannel object later """ def __init__(self, grpc_response_future, device, cert_address, psm, le_l2cap_stream): def __init__(self, grpc_response_future, device, cert_address, psm, le_l2cap_stream): self._grpc_response_future = grpc_response_future self._device = device self._cert_address = cert_address Loading @@ -162,8 +173,8 @@ class _CreditBasedConnectionResponseFutureWrapper(object): def get_channel(self): assertThat(self.get_status()).isEqualTo( l2cap_packets.LeCreditBasedConnectionResponseResult.SUCCESS) return PyLeL2capDynamicChannel(self._device, self._cert_address, self._psm, self._le_l2cap_stream) return PyLeL2capDynamicChannel(self._device, self._cert_address, self._psm, self._le_l2cap_stream) class PyLeL2cap(Closable): Loading @@ -188,7 +199,8 @@ class PyLeL2cap(Closable): self._device.l2cap_le.SetDynamicChannel( l2cap_le_facade_pb2.SetEnableDynamicChannelRequest( psm=psm, enable=True)) return PyLeL2capDynamicChannel(self._device, cert_address, psm, self._le_l2cap_stream) return PyLeL2capDynamicChannel(self._device, cert_address, psm, self._le_l2cap_stream) def connect_coc_to_cert(self, cert_address, psm=0x33): """ Loading @@ -196,10 +208,12 @@ class PyLeL2cap(Closable): """ self.register_coc(cert_address, psm) response_future = self._device.l2cap_le.OpenDynamicChannel.future( l2cap_le_facade_pb2.OpenDynamicChannelRequest(psm=psm, remote=cert_address)) l2cap_le_facade_pb2.OpenDynamicChannelRequest( psm=psm, remote=cert_address)) return _CreditBasedConnectionResponseFutureWrapper( response_future, self._device, cert_address, psm, self._le_l2cap_stream) response_future, self._device, cert_address, psm, self._le_l2cap_stream) def update_connection_parameter(self, conn_interval_min=0x10, Loading system/gd/l2cap/classic/cert/cert_l2cap.py +95 −78 Original line number Diff line number Diff line Loading @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging from cert.behavior import IHasBehaviors, SingleArgumentBehavior, ReplyStage from cert.closable import Closable from cert.closable import safeClose Loading @@ -34,29 +36,7 @@ from cert.matchers import L2capMatchers from cert.captures import L2capCaptures class CertL2capControlChannelBehaviors(object): def __init__(self, parent): self.on_packet_behavior = SingleArgumentBehavior( lambda: CertL2capControlChannelBehaviors.CertReplyStage(parent)) def on_packet(self, matcher): return self.on_packet_behavior.begin(matcher) class CertReplyStage(ReplyStage): def __init__(self, parent): self.parent = parent def send_packet(self): self._commit(lambda packet: self._send_packet(packet)) return self def _send_packet(self, packet): self.parent._control_channel.send(packet) class CertL2capChannel(IEventStream, IHasBehaviors): class CertL2capChannel(IEventStream): def __init__(self, device, Loading @@ -72,7 +52,8 @@ class CertL2capChannel(IEventStream, IHasBehaviors): self._acl_stream = acl_stream self._acl = acl self._control_channel = control_channel self.control_behaviors = CertL2capControlChannelBehaviors(self) self._config_rsp_received = False self._config_rsp_sent = False if fcs == l2cap_packets.FcsType.DEFAULT: self._our_acl_view = FilteringEventStream( acl_stream, L2capMatchers.ExtractBasicFrameWithFcs(scid)) Loading @@ -80,12 +61,12 @@ class CertL2capChannel(IEventStream, IHasBehaviors): self._our_acl_view = FilteringEventStream( acl_stream, L2capMatchers.ExtractBasicFrame(scid)) def get_behaviors(self): return self.control_behaviors def get_event_queue(self): return self._our_acl_view.get_event_queue() def is_configured(self): return self._config_rsp_received and self._config_rsp_sent def send(self, packet): frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet) self._acl.send(frame.Serialize()) Loading @@ -97,7 +78,7 @@ class CertL2capChannel(IEventStream, IHasBehaviors): sar=SegmentationAndReassembly.UNSEGMENTED, payload=None, fcs=False): if fcs: if fcs == l2cap_packets.FcsType.DEFAULT: frame = l2cap_packets.EnhancedInformationFrameWithFcsBuilder( self._dcid, tx_seq, f, req_seq, sar, payload) else: Loading @@ -114,16 +95,19 @@ class CertL2capChannel(IEventStream, IHasBehaviors): self._dcid, s, p, f, req_seq) self._acl.send(frame.Serialize()) def config_request_for_me(self): return L2capMatchers.ConfigurationRequest(self._scid) def send_configure_request(self, options, sid=2, continuation=l2cap_packets.Continuation.END): assertThat(self._scid).isNotEqualTo(1) request = l2cap_packets.ConfigurationRequestBuilder( 2, self._dcid, l2cap_packets.Continuation.END, options) sid, self._dcid, continuation, options) self._control_channel.send(request) def send_information_request(self, type): def _send_information_request(self, type): assertThat(self._scid).isEqualTo(1) signal_id = 3 information_request = l2cap_packets.InformationRequestBuilder( Loading @@ -131,7 +115,7 @@ class CertL2capChannel(IEventStream, IHasBehaviors): self.send(information_request) def send_extended_features_request(self): self.send_information_request( self._send_information_request( InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED) def verify_configuration_request_and_respond( Loading @@ -146,9 +130,22 @@ class CertL2capChannel(IEventStream, IHasBehaviors): sid, self._dcid, l2cap_packets.Continuation.END, result, options) self._control_channel.send(config_response) def verify_configuration_response(self): def send_configuration_response(self, request, result=ConfigurationResponseResult.SUCCESS, options=None): sid = request.GetIdentifier() if options is None: options = [] config_response = l2cap_packets.ConfigurationResponseBuilder( sid, self._dcid, l2cap_packets.Continuation.END, result, options) self._control_channel.send(config_response) self._config_rsp_sent = True def verify_configuration_response( self, result=ConfigurationResponseResult.SUCCESS): assertThat(self._control_channel).emits( L2capMatchers.ConfigurationResponse()) L2capMatchers.ConfigurationResponse(result)) def disconnect_and_verify(self): assertThat(self._scid).isNotEqualTo(1) Loading @@ -164,7 +161,39 @@ class CertL2capChannel(IEventStream, IHasBehaviors): L2capMatchers.DisconnectionRequest(self._dcid, self._scid)) class CertL2cap(Closable): class CertL2capControlChannelBehaviors(object): def __init__(self, parent): self.on_config_req_behavior = SingleArgumentBehavior( lambda: CertL2capControlChannelBehaviors.CertReplyStage(parent)) def on_config_req(self, matcher): return self.on_config_req_behavior.begin(matcher) class CertReplyStage(ReplyStage): def __init__(self, parent): self.parent = parent def send_configuration_response( self, result=ConfigurationResponseResult.SUCCESS, options=None): self._commit(lambda request: self._send_configuration_response(request, result, options)) return self def _send_configuration_response( self, request, result=ConfigurationResponseResult.SUCCESS, options=None): dcid = request.GetDestinationCid() if dcid not in self.parent.scid_to_channel: logging.warning("Received config request with unknown dcid") return self.parent.scid_to_channel[dcid].send_configuration_response( request, result, options) class CertL2cap(Closable, IHasBehaviors): def __init__(self, device): self._device = device Loading @@ -188,16 +217,22 @@ class CertL2cap(Closable): self._on_information_response_default } self.scid_to_dcid = {} self.scid_to_channel = {} self.support_ertm = True self.support_fcs = True self._control_behaviors = CertL2capControlChannelBehaviors(self) self._control_behaviors.on_config_req_behavior.set_default( self._send_configuration_response_default) def close(self): self._acl_manager.close() safeClose(self._acl) def get_behaviors(self): return self._control_behaviors def connect_acl(self, remote_addr): self._acl = self._acl_manager.initiate_connection(remote_addr) self._acl.wait_for_connection_complete() Loading Loading @@ -234,11 +269,8 @@ class CertL2cap(Closable): sid = request.get().GetIdentifier() dcid = request.get().GetSourceCid() if scid is None or scid in self.scid_to_dcid: if scid is None or scid in self.scid_to_channel: scid = dcid self.scid_to_dcid[scid] = dcid channel = CertL2capChannel(self._device, scid, dcid, self._get_acl_stream(), self._acl, self.control_channel, fcs) Loading Loading @@ -266,45 +298,23 @@ class CertL2cap(Closable): def claim_ertm_unsupported(self): self.support_ertm = False def turn_on_ertm(self, tx_window_size=10, max_transmit=20, mps=1010): pass # more of a hack for the moment def reply_with_unknown_options_and_hint(self): self.control_table[ CommandCode. CONNECTION_RESPONSE] = self._on_connection_response_configuration_request_with_unknown_options_and_hint def _on_connection_response_default(self, l2cap_control_view): pass def _on_connection_response_configuration_request_with_unknown_options_and_hint( self, l2cap_control_view): connection_response_view = l2cap_packets.ConnectionResponseView( l2cap_control_view) sid = connection_response_view.GetIdentifier() scid = connection_response_view.GetSourceCid() dcid = connection_response_view.GetDestinationCid() self.scid_to_dcid[scid] = dcid mtu_opt = l2cap_packets.MtuConfigurationOption() mtu_opt.mtu = 0x1234 mtu_opt.is_hint = l2cap_packets.ConfigurationOptionIsHint.OPTION_IS_A_HINT options = [mtu_opt] config_request = l2cap_packets.ConfigurationRequestBuilder( sid + 1, dcid, l2cap_packets.Continuation.END, options) config_request_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_request) byte_array = bytearray(config_request_l2cap.Serialize()) ## Modify configuration option type to be a unknown byte_array[12] |= 0x7f self._acl.send(bytes(byte_array)) return True def _on_configuration_request_default(self, l2cap_control_view): pass request = l2cap_packets.ConfigurationRequestView(l2cap_control_view) dcid = request.GetDestinationCid() if dcid not in self.scid_to_channel: logging.warning("Received config request with unknown dcid") return self._control_behaviors.on_config_req_behavior.run(request) def _send_configuration_response_default(self, captured_request_view): dcid = captured_request_view.GetDestinationCid() if dcid not in self.scid_to_channel: return self.scid_to_channel[dcid].send_configuration_response( captured_request_view) @staticmethod def config_option_basic_explicit(mtu=642): Loading @@ -327,7 +337,8 @@ class CertL2cap(Closable): max_transmit=10, mps=1010, tx_window_size=10, monitor_time_out=2000): monitor_time_out=2000, retransmission_time_out=1000): result = [] mtu_opt = l2cap_packets.MtuConfigurationOption() mtu_opt.mtu = mtu Loading @@ -341,7 +352,7 @@ class CertL2cap(Closable): rfc_opt.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.ENHANCED_RETRANSMISSION rfc_opt.tx_window_size = tx_window_size rfc_opt.max_transmit = max_transmit rfc_opt.retransmission_time_out = 1000 rfc_opt.retransmission_time_out = retransmission_time_out rfc_opt.monitor_time_out = monitor_time_out rfc_opt.maximum_pdu_size = mps result.append(rfc_opt) Loading @@ -356,7 +367,14 @@ class CertL2cap(Closable): return CertL2cap.config_option_ertm(mps=mps) def _on_configuration_response_default(self, l2cap_control_view): pass response = l2cap_packets.ConfigurationResponseView(l2cap_control_view) scid = response.GetSourceCid() if scid not in self.scid_to_channel: logging.warning("Received config request with unknown dcid") return result = response.GetResult() if result == ConfigurationResponseResult.SUCCESS: self.scid_to_channel[scid]._config_rsp_received = True def _on_disconnection_request_default(self, l2cap_control_view): disconnection_request = l2cap_packets.DisconnectionRequestView( Loading Loading @@ -406,4 +424,3 @@ class CertL2cap(Closable): fn = self.control_table.get(l2cap_control_view.GetCode()) if fn is not None: fn(l2cap_control_view) return system/gd/l2cap/classic/cert/l2cap_test.py +174 −157 File changed.Preview size limit exceeded, changes collapsed. Show changes system/gd/l2cap/classic/facade.cc +1 −1 Original line number Diff line number Diff line Loading @@ -194,7 +194,7 @@ class L2capClassicModuleFacadeService : public L2capClassicModuleFacade::Service auto packet = channel_->GetQueueUpEnd()->TryDequeue(); std::string data = std::string(packet->begin(), packet->end()); L2capPacket l2cap_data; // l2cap_data.set_channel(cid_); l2cap_data.set_psm(psm_); l2cap_data.set_payload(data); facade_service_->pending_l2cap_data_.OnIncomingEvent(l2cap_data); } Loading Loading
system/gd/cert/matchers.py +8 −0 Original line number Diff line number Diff line Loading @@ -129,6 +129,10 @@ class L2capMatchers(object): def ConfigurationRequestWithErtm(): return lambda packet: L2capMatchers._is_matching_configuration_request_with_ertm(packet) @staticmethod def ConfigurationRequestView(dcid): return lambda request_view: request_view.GetDestinationCid() == dcid @staticmethod def DisconnectionRequest(scid, dcid): return lambda packet: L2capMatchers._is_matching_disconnection_request(packet, scid, dcid) Loading @@ -137,6 +141,10 @@ class L2capMatchers(object): def DisconnectionResponse(scid, dcid): return lambda packet: L2capMatchers._is_matching_disconnection_response(packet, scid, dcid) @staticmethod def EchoResponse(): return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.ECHO_RESPONSE) @staticmethod def CommandReject(): return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT) Loading
system/gd/cert/py_l2cap.py +28 −14 Original line number Diff line number Diff line Loading @@ -27,11 +27,18 @@ from cert.matchers import L2capMatchers from facade import common_pb2 as common class PyL2capChannel(object): class PyL2capChannel(IEventStream): def __init__(self, device, psm): def __init__(self, device, psm, l2cap_stream): self._device = device self._psm = psm self._le_l2cap_stream = l2cap_stream self._our_le_l2cap_view = FilteringEventStream( self._le_l2cap_stream, L2capMatchers.PacketPayloadWithMatchingPsm(self._psm)) def get_event_queue(self): return self._our_le_l2cap_view.get_event_queue() def send(self, payload): self._device.l2cap.SendDynamicChannelPacket( Loading @@ -49,13 +56,14 @@ class _ClassicConnectionResponseFutureWrapper(object): create the corresponding PyL2capDynamicChannel object later """ def __init__(self, grpc_response_future, device, psm): def __init__(self, grpc_response_future, device, psm, l2cap_stream): self._grpc_response_future = grpc_response_future self._device = device self._psm = psm self._l2cap_stream = l2cap_stream def get_channel(self): return PyL2capChannel(self._device, self._psm) return PyL2capChannel(self._device, self._psm, self._l2cap_stream) class PyL2cap(Closable): Loading @@ -63,9 +71,11 @@ class PyL2cap(Closable): def __init__(self, device, cert_address): self._device = device self._cert_address = cert_address self._l2cap_stream = EventStream( self._device.l2cap.FetchL2capData(empty_proto.Empty())) def close(self): pass safeClose(self._l2cap_stream) def register_dynamic_channel( self, Loading @@ -74,7 +84,7 @@ class PyL2cap(Closable): self._device.l2cap.SetDynamicChannel( l2cap_facade_pb2.SetEnableDynamicChannelRequest( psm=psm, retransmission_mode=mode)) return PyL2capChannel(self._device, psm) return PyL2capChannel(self._device, psm, self._l2cap_stream) def connect_dynamic_channel_to_cert( self, Loading @@ -89,8 +99,8 @@ class PyL2cap(Closable): l2cap_facade_pb2.OpenChannelRequest( psm=psm, remote=self._cert_address, mode=mode)) return _ClassicConnectionResponseFutureWrapper(response_future, self._device, psm) return _ClassicConnectionResponseFutureWrapper( response_future, self._device, psm, self._l2cap_stream) class PyLeL2capFixedChannel(IEventStream): Loading Loading @@ -148,7 +158,8 @@ class _CreditBasedConnectionResponseFutureWrapper(object): create the corresponding PyLeL2capDynamicChannel object later """ def __init__(self, grpc_response_future, device, cert_address, psm, le_l2cap_stream): def __init__(self, grpc_response_future, device, cert_address, psm, le_l2cap_stream): self._grpc_response_future = grpc_response_future self._device = device self._cert_address = cert_address Loading @@ -162,8 +173,8 @@ class _CreditBasedConnectionResponseFutureWrapper(object): def get_channel(self): assertThat(self.get_status()).isEqualTo( l2cap_packets.LeCreditBasedConnectionResponseResult.SUCCESS) return PyLeL2capDynamicChannel(self._device, self._cert_address, self._psm, self._le_l2cap_stream) return PyLeL2capDynamicChannel(self._device, self._cert_address, self._psm, self._le_l2cap_stream) class PyLeL2cap(Closable): Loading @@ -188,7 +199,8 @@ class PyLeL2cap(Closable): self._device.l2cap_le.SetDynamicChannel( l2cap_le_facade_pb2.SetEnableDynamicChannelRequest( psm=psm, enable=True)) return PyLeL2capDynamicChannel(self._device, cert_address, psm, self._le_l2cap_stream) return PyLeL2capDynamicChannel(self._device, cert_address, psm, self._le_l2cap_stream) def connect_coc_to_cert(self, cert_address, psm=0x33): """ Loading @@ -196,10 +208,12 @@ class PyLeL2cap(Closable): """ self.register_coc(cert_address, psm) response_future = self._device.l2cap_le.OpenDynamicChannel.future( l2cap_le_facade_pb2.OpenDynamicChannelRequest(psm=psm, remote=cert_address)) l2cap_le_facade_pb2.OpenDynamicChannelRequest( psm=psm, remote=cert_address)) return _CreditBasedConnectionResponseFutureWrapper( response_future, self._device, cert_address, psm, self._le_l2cap_stream) response_future, self._device, cert_address, psm, self._le_l2cap_stream) def update_connection_parameter(self, conn_interval_min=0x10, Loading
system/gd/l2cap/classic/cert/cert_l2cap.py +95 −78 Original line number Diff line number Diff line Loading @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging from cert.behavior import IHasBehaviors, SingleArgumentBehavior, ReplyStage from cert.closable import Closable from cert.closable import safeClose Loading @@ -34,29 +36,7 @@ from cert.matchers import L2capMatchers from cert.captures import L2capCaptures class CertL2capControlChannelBehaviors(object): def __init__(self, parent): self.on_packet_behavior = SingleArgumentBehavior( lambda: CertL2capControlChannelBehaviors.CertReplyStage(parent)) def on_packet(self, matcher): return self.on_packet_behavior.begin(matcher) class CertReplyStage(ReplyStage): def __init__(self, parent): self.parent = parent def send_packet(self): self._commit(lambda packet: self._send_packet(packet)) return self def _send_packet(self, packet): self.parent._control_channel.send(packet) class CertL2capChannel(IEventStream, IHasBehaviors): class CertL2capChannel(IEventStream): def __init__(self, device, Loading @@ -72,7 +52,8 @@ class CertL2capChannel(IEventStream, IHasBehaviors): self._acl_stream = acl_stream self._acl = acl self._control_channel = control_channel self.control_behaviors = CertL2capControlChannelBehaviors(self) self._config_rsp_received = False self._config_rsp_sent = False if fcs == l2cap_packets.FcsType.DEFAULT: self._our_acl_view = FilteringEventStream( acl_stream, L2capMatchers.ExtractBasicFrameWithFcs(scid)) Loading @@ -80,12 +61,12 @@ class CertL2capChannel(IEventStream, IHasBehaviors): self._our_acl_view = FilteringEventStream( acl_stream, L2capMatchers.ExtractBasicFrame(scid)) def get_behaviors(self): return self.control_behaviors def get_event_queue(self): return self._our_acl_view.get_event_queue() def is_configured(self): return self._config_rsp_received and self._config_rsp_sent def send(self, packet): frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet) self._acl.send(frame.Serialize()) Loading @@ -97,7 +78,7 @@ class CertL2capChannel(IEventStream, IHasBehaviors): sar=SegmentationAndReassembly.UNSEGMENTED, payload=None, fcs=False): if fcs: if fcs == l2cap_packets.FcsType.DEFAULT: frame = l2cap_packets.EnhancedInformationFrameWithFcsBuilder( self._dcid, tx_seq, f, req_seq, sar, payload) else: Loading @@ -114,16 +95,19 @@ class CertL2capChannel(IEventStream, IHasBehaviors): self._dcid, s, p, f, req_seq) self._acl.send(frame.Serialize()) def config_request_for_me(self): return L2capMatchers.ConfigurationRequest(self._scid) def send_configure_request(self, options, sid=2, continuation=l2cap_packets.Continuation.END): assertThat(self._scid).isNotEqualTo(1) request = l2cap_packets.ConfigurationRequestBuilder( 2, self._dcid, l2cap_packets.Continuation.END, options) sid, self._dcid, continuation, options) self._control_channel.send(request) def send_information_request(self, type): def _send_information_request(self, type): assertThat(self._scid).isEqualTo(1) signal_id = 3 information_request = l2cap_packets.InformationRequestBuilder( Loading @@ -131,7 +115,7 @@ class CertL2capChannel(IEventStream, IHasBehaviors): self.send(information_request) def send_extended_features_request(self): self.send_information_request( self._send_information_request( InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED) def verify_configuration_request_and_respond( Loading @@ -146,9 +130,22 @@ class CertL2capChannel(IEventStream, IHasBehaviors): sid, self._dcid, l2cap_packets.Continuation.END, result, options) self._control_channel.send(config_response) def verify_configuration_response(self): def send_configuration_response(self, request, result=ConfigurationResponseResult.SUCCESS, options=None): sid = request.GetIdentifier() if options is None: options = [] config_response = l2cap_packets.ConfigurationResponseBuilder( sid, self._dcid, l2cap_packets.Continuation.END, result, options) self._control_channel.send(config_response) self._config_rsp_sent = True def verify_configuration_response( self, result=ConfigurationResponseResult.SUCCESS): assertThat(self._control_channel).emits( L2capMatchers.ConfigurationResponse()) L2capMatchers.ConfigurationResponse(result)) def disconnect_and_verify(self): assertThat(self._scid).isNotEqualTo(1) Loading @@ -164,7 +161,39 @@ class CertL2capChannel(IEventStream, IHasBehaviors): L2capMatchers.DisconnectionRequest(self._dcid, self._scid)) class CertL2cap(Closable): class CertL2capControlChannelBehaviors(object): def __init__(self, parent): self.on_config_req_behavior = SingleArgumentBehavior( lambda: CertL2capControlChannelBehaviors.CertReplyStage(parent)) def on_config_req(self, matcher): return self.on_config_req_behavior.begin(matcher) class CertReplyStage(ReplyStage): def __init__(self, parent): self.parent = parent def send_configuration_response( self, result=ConfigurationResponseResult.SUCCESS, options=None): self._commit(lambda request: self._send_configuration_response(request, result, options)) return self def _send_configuration_response( self, request, result=ConfigurationResponseResult.SUCCESS, options=None): dcid = request.GetDestinationCid() if dcid not in self.parent.scid_to_channel: logging.warning("Received config request with unknown dcid") return self.parent.scid_to_channel[dcid].send_configuration_response( request, result, options) class CertL2cap(Closable, IHasBehaviors): def __init__(self, device): self._device = device Loading @@ -188,16 +217,22 @@ class CertL2cap(Closable): self._on_information_response_default } self.scid_to_dcid = {} self.scid_to_channel = {} self.support_ertm = True self.support_fcs = True self._control_behaviors = CertL2capControlChannelBehaviors(self) self._control_behaviors.on_config_req_behavior.set_default( self._send_configuration_response_default) def close(self): self._acl_manager.close() safeClose(self._acl) def get_behaviors(self): return self._control_behaviors def connect_acl(self, remote_addr): self._acl = self._acl_manager.initiate_connection(remote_addr) self._acl.wait_for_connection_complete() Loading Loading @@ -234,11 +269,8 @@ class CertL2cap(Closable): sid = request.get().GetIdentifier() dcid = request.get().GetSourceCid() if scid is None or scid in self.scid_to_dcid: if scid is None or scid in self.scid_to_channel: scid = dcid self.scid_to_dcid[scid] = dcid channel = CertL2capChannel(self._device, scid, dcid, self._get_acl_stream(), self._acl, self.control_channel, fcs) Loading Loading @@ -266,45 +298,23 @@ class CertL2cap(Closable): def claim_ertm_unsupported(self): self.support_ertm = False def turn_on_ertm(self, tx_window_size=10, max_transmit=20, mps=1010): pass # more of a hack for the moment def reply_with_unknown_options_and_hint(self): self.control_table[ CommandCode. CONNECTION_RESPONSE] = self._on_connection_response_configuration_request_with_unknown_options_and_hint def _on_connection_response_default(self, l2cap_control_view): pass def _on_connection_response_configuration_request_with_unknown_options_and_hint( self, l2cap_control_view): connection_response_view = l2cap_packets.ConnectionResponseView( l2cap_control_view) sid = connection_response_view.GetIdentifier() scid = connection_response_view.GetSourceCid() dcid = connection_response_view.GetDestinationCid() self.scid_to_dcid[scid] = dcid mtu_opt = l2cap_packets.MtuConfigurationOption() mtu_opt.mtu = 0x1234 mtu_opt.is_hint = l2cap_packets.ConfigurationOptionIsHint.OPTION_IS_A_HINT options = [mtu_opt] config_request = l2cap_packets.ConfigurationRequestBuilder( sid + 1, dcid, l2cap_packets.Continuation.END, options) config_request_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_request) byte_array = bytearray(config_request_l2cap.Serialize()) ## Modify configuration option type to be a unknown byte_array[12] |= 0x7f self._acl.send(bytes(byte_array)) return True def _on_configuration_request_default(self, l2cap_control_view): pass request = l2cap_packets.ConfigurationRequestView(l2cap_control_view) dcid = request.GetDestinationCid() if dcid not in self.scid_to_channel: logging.warning("Received config request with unknown dcid") return self._control_behaviors.on_config_req_behavior.run(request) def _send_configuration_response_default(self, captured_request_view): dcid = captured_request_view.GetDestinationCid() if dcid not in self.scid_to_channel: return self.scid_to_channel[dcid].send_configuration_response( captured_request_view) @staticmethod def config_option_basic_explicit(mtu=642): Loading @@ -327,7 +337,8 @@ class CertL2cap(Closable): max_transmit=10, mps=1010, tx_window_size=10, monitor_time_out=2000): monitor_time_out=2000, retransmission_time_out=1000): result = [] mtu_opt = l2cap_packets.MtuConfigurationOption() mtu_opt.mtu = mtu Loading @@ -341,7 +352,7 @@ class CertL2cap(Closable): rfc_opt.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.ENHANCED_RETRANSMISSION rfc_opt.tx_window_size = tx_window_size rfc_opt.max_transmit = max_transmit rfc_opt.retransmission_time_out = 1000 rfc_opt.retransmission_time_out = retransmission_time_out rfc_opt.monitor_time_out = monitor_time_out rfc_opt.maximum_pdu_size = mps result.append(rfc_opt) Loading @@ -356,7 +367,14 @@ class CertL2cap(Closable): return CertL2cap.config_option_ertm(mps=mps) def _on_configuration_response_default(self, l2cap_control_view): pass response = l2cap_packets.ConfigurationResponseView(l2cap_control_view) scid = response.GetSourceCid() if scid not in self.scid_to_channel: logging.warning("Received config request with unknown dcid") return result = response.GetResult() if result == ConfigurationResponseResult.SUCCESS: self.scid_to_channel[scid]._config_rsp_received = True def _on_disconnection_request_default(self, l2cap_control_view): disconnection_request = l2cap_packets.DisconnectionRequestView( Loading Loading @@ -406,4 +424,3 @@ class CertL2cap(Closable): fn = self.control_table.get(l2cap_control_view.GetCode()) if fn is not None: fn(l2cap_control_view) return
system/gd/l2cap/classic/cert/l2cap_test.py +174 −157 File changed.Preview size limit exceeded, changes collapsed. Show changes
system/gd/l2cap/classic/facade.cc +1 −1 Original line number Diff line number Diff line Loading @@ -194,7 +194,7 @@ class L2capClassicModuleFacadeService : public L2capClassicModuleFacade::Service auto packet = channel_->GetQueueUpEnd()->TryDequeue(); std::string data = std::string(packet->begin(), packet->end()); L2capPacket l2cap_data; // l2cap_data.set_channel(cid_); l2cap_data.set_psm(psm_); l2cap_data.set_payload(data); facade_service_->pending_l2cap_data_.OnIncomingEvent(l2cap_data); } Loading