Loading system/gd/cert/captures.py +18 −0 Original line number Diff line number Diff line Loading @@ -16,7 +16,10 @@ import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import hci_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode from cert.capture import Capture from cert.matchers import L2capMatchers def ReadBdAddrCompleteCapture(): Loading @@ -42,3 +45,18 @@ def ConnectionCompleteCapture(): hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.event))))) class L2capCaptures(object): @staticmethod def ConnectionResponse(scid): return Capture( L2capMatchers.ConnectionResponse(scid), L2capCaptures._extract_connection_response) @staticmethod def _extract_connection_response(packet): frame = L2capMatchers.control_frame_with_code( packet, CommandCode.CONNECTION_RESPONSE) return l2cap_packets.ConnectionResponseView(frame) system/gd/cert/event_stream.py +4 −2 Original line number Diff line number Diff line Loading @@ -43,10 +43,12 @@ class FilteringEventStream(IEventStream): self.event_queue = SimpleQueue() self.stream = stream self.stream.register_callback(self.__event_callback, self.filter_fn) self.stream.register_callback( self.__event_callback, lambda packet: self.filter_fn(packet) is not None) def __event_callback(self, event): self.event_queue.put(event) self.event_queue.put(self.filter_fn(event)) def get_event_queue(self): return self.event_queue Loading system/gd/cert/matchers.py +77 −24 Original line number Diff line number Diff line Loading @@ -18,6 +18,8 @@ import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType import logging class L2capMatchers(object): Loading Loading @@ -51,12 +53,32 @@ class L2capMatchers(object): return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT) @staticmethod def SupervisoryFrame(scid, req_seq=None, f=None, s=None, p=None): return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, scid, req_seq, f, s, p) def SFrame(req_seq=None, f=None, s=None, p=None): return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, req_seq, f, s, p) @staticmethod def InformationFrame(scid, tx_seq=None, payload=None): return lambda packet: L2capMatchers._is_matching_information_frame(packet, scid, tx_seq, payload) def IFrame(tx_seq=None, payload=None): return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload) @staticmethod def Data(payload): return lambda packet: packet.GetPayload().GetBytes() == payload # this is a hack - should be removed @staticmethod def PartialData(payload): return lambda packet: payload in packet.GetPayload().GetBytes() @staticmethod def ExtractBasicFrame(scid): return lambda packet: L2capMatchers._basic_frame_for(packet, scid) @staticmethod def InformationResponseExtendedFeatures(supports_ertm=None, supports_streaming=None, supports_fcs=None, supports_fixed_channels=None): return lambda packet: L2capMatchers._is_matching_information_response_extended_features(packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels) @staticmethod def _basic_frame(packet): Loading @@ -66,40 +88,40 @@ class L2capMatchers(object): bt_packets.PacketViewLittleEndian(list(packet.payload))) @staticmethod def _information_frame(packet, scid): def _basic_frame_for(packet, scid): frame = L2capMatchers._basic_frame(packet) if frame.GetChannelId() != scid: return None standard_frame = l2cap_packets.StandardFrameView(frame) return frame @staticmethod def _information_frame(packet): standard_frame = l2cap_packets.StandardFrameView(packet) if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME: return None return l2cap_packets.EnhancedInformationFrameView(standard_frame) @staticmethod def _supervisory_frame(packet, scid): frame = L2capMatchers._basic_frame(packet) if frame.GetChannelId() != scid: return None standard_frame = l2cap_packets.StandardFrameView(frame) def _supervisory_frame(packet): standard_frame = l2cap_packets.StandardFrameView(packet) if standard_frame.GetFrameType() != l2cap_packets.FrameType.S_FRAME: return None return l2cap_packets.EnhancedSupervisoryFrameView(standard_frame) @staticmethod def _is_matching_information_frame(packet, scid, tx_seq, payload): frame = L2capMatchers._information_frame(packet, scid) def _is_matching_information_frame(packet, tx_seq, payload): frame = L2capMatchers._information_frame(packet) if frame is None: return False if tx_seq is not None and frame.GetTxSeq() != tx_seq: return False if payload is not None and frame.GetPayload( ) != payload: # TODO(mylesgw) this doesn't work if payload is not None and frame.GetPayload().GetBytes() != payload: return False return True @staticmethod def _is_matching_supervisory_frame(packet, scid, req_seq, f, s, p): frame = L2capMatchers._supervisory_frame(packet, scid) def _is_matching_supervisory_frame(packet, req_seq, f, s, p): frame = L2capMatchers._supervisory_frame(packet) if frame is None: return False if req_seq is not None and frame.GetReqSeq() != req_seq: Loading @@ -114,13 +136,12 @@ class L2capMatchers(object): @staticmethod def _control_frame(packet): frame = L2capMatchers._basic_frame(packet) if frame is None or frame.GetChannelId() != 1: if packet.GetChannelId() != 1: return None return l2cap_packets.ControlView(frame.GetPayload()) return l2cap_packets.ControlView(packet.GetPayload()) @staticmethod def _control_frame_with_code(packet, code): def control_frame_with_code(packet, code): frame = L2capMatchers._control_frame(packet) if frame is None or frame.GetCode() != code: return None Loading @@ -128,11 +149,11 @@ class L2capMatchers(object): @staticmethod def _is_control_frame_with_code(packet, code): return L2capMatchers._control_frame_with_code(packet, code) is not None return L2capMatchers.control_frame_with_code(packet, code) is not None @staticmethod def _is_matching_connection_response(packet, scid): frame = L2capMatchers._control_frame_with_code( frame = L2capMatchers.control_frame_with_code( packet, CommandCode.CONNECTION_RESPONSE) if frame is None: return False Loading @@ -143,10 +164,42 @@ class L2capMatchers(object): @staticmethod def _is_matching_disconnection_response(packet, scid, dcid): frame = L2capMatchers._control_frame_with_code( frame = L2capMatchers.control_frame_with_code( packet, CommandCode.DISCONNECTION_RESPONSE) if frame is None: return False response = l2cap_packets.DisconnectionResponseView(frame) return response.GetSourceCid() == scid and response.GetDestinationCid( ) == dcid @staticmethod def _information_response_with_type(packet, info_type): frame = L2capMatchers.control_frame_with_code( packet, CommandCode.INFORMATION_RESPONSE) if frame is None: return None response = l2cap_packets.InformationResponseView(frame) if response.GetInfoType() != info_type: return None return response @staticmethod def _is_matching_information_response_extended_features( packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels): frame = L2capMatchers._information_response_with_type( packet, InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED) if frame is None: return False features = l2cap_packets.InformationResponseExtendedFeaturesView(frame) if supports_ertm is not None and features.GetEnhancedRetransmissionMode( ) != supports_ertm: return False if supports_streaming is not None and features.GetStreamingMode != supports_streaming: return False if supports_fcs is not None and features.GetFcsOption() != supports_fcs: return False if supports_fixed_channels is not None and features.GetFixedChannels( ) != supports_fixed_channels: return False return True system/gd/l2cap/classic/cert/cert_l2cap.py +103 −113 Original line number Diff line number Diff line Loading @@ -21,21 +21,75 @@ from cert.truth import assertThat import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode from bluetooth_packets_python3.l2cap_packets import Final from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction from bluetooth_packets_python3.l2cap_packets import Poll from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType from cert.event_stream import FilteringEventStream from cert.event_stream import IEventStream from cert.matchers import L2capMatchers from cert.captures import L2capCaptures class CertL2capChannel(IEventStream): def __init__(self, device, scid, acl_stream): def __init__(self, device, scid, dcid, acl_stream, acl, control_channel): self._device = device self._scid = scid self._our_acl_view = acl_stream self._dcid = dcid self._acl_stream = acl_stream self._acl = acl self._control_channel = control_channel self._our_acl_view = FilteringEventStream( acl_stream, L2capMatchers.ExtractBasicFrame(scid)) def get_event_queue(self): return self._our_acl_view.get_event_queue() def send(self, packet): frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet) self._acl.send(frame.Serialize()) def send_i_frame(self, tx_seq, req_seq, f=Final.NOT_SET, sar=SegmentationAndReassembly.UNSEGMENTED, payload=None): frame = l2cap_packets.EnhancedInformationFrameBuilder( self._dcid, tx_seq, f, req_seq, sar, payload) self._acl.send(frame.Serialize()) def send_s_frame(self, req_seq, s=SupervisoryFunction.RECEIVER_READY, p=Poll.NOT_SET, f=Final.NOT_SET): frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( self._dcid, s, p, f, req_seq) self._acl.send(frame.Serialize()) def send_information_request(self, type): assertThat(self._scid).isEqualTo(1) signal_id = 3 information_request = l2cap_packets.InformationRequestBuilder( signal_id, type) self.send(information_request) def send_extended_features_request(self): self.send_information_request( InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED) def disconnect_and_verify(self): assertThat(self._scid).isNotEqualTo(1) self._control_channel.send( l2cap_packets.DisconnectionRequestBuilder(1, self._dcid, self._scid)) assertThat(self._control_channel).emits( L2capMatchers.DisconnectionResponse(self._scid, self._dcid)) class CertL2cap(Closable): Loading Loading @@ -64,8 +118,9 @@ class CertL2cap(Closable): } self.scid_to_dcid = {} self.ertm_tx_window_size = 10 self.ertm_max_transmit = 20 self.ertm_option = None self.fcs_option = None def close(self): self._acl_manager.close() Loading @@ -74,46 +129,49 @@ class CertL2cap(Closable): def connect_acl(self, remote_addr): self._acl = self._acl_manager.initiate_connection(remote_addr) self._acl.wait_for_connection_complete() self.get_acl_stream().register_callback(self._handle_control_packet) self.control_channel = CertL2capChannel( self._device, 1, 1, self._get_acl_stream(), self._acl, control_channel=None) self._get_acl_stream().register_callback(self._handle_control_packet) def open_channel(self, signal_id, psm, scid): # what is the 1 here for? open_channel = l2cap_packets.BasicFrameBuilder( 1, l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid)) self.send_acl(open_channel) self.control_channel.send( l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid)) assertThat(self._acl).emits(L2capMatchers.ConnectionResponse(scid)) return CertL2capChannel(self._device, scid, self.get_acl_stream()) response = L2capCaptures.ConnectionResponse(scid) assertThat(self.control_channel).emits(response) return CertL2capChannel(self._device, scid, response.get().GetDestinationCid(), self._get_acl_stream(), self._acl, self.control_channel) # prefer to use channel abstraction instead, if at all possible def send_acl(self, packet): self._acl.send(packet.Serialize()) # temporary until clients migrated def get_acl_stream(self): return self._acl_manager.get_acl_stream() # temporary until clients migrated def get_acl(self): return self._acl def get_control_channel(self): return self.control_channel # temporary until clients migrated def get_dcid(self, scid): return self.scid_to_dcid[scid] def _get_acl_stream(self): return self._acl_manager.get_acl_stream() # more of a hack for the moment def turn_on_ertm(self, tx_window_size=10, max_transmit=20): self.ertm_tx_window_size = tx_window_size self.ertm_max_transmit = max_transmit self.control_table[ CommandCode. CONNECTION_RESPONSE] = self._on_connection_response_use_ertm self.ertm_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption( ) self.ertm_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC self.ertm_option.tx_window_size = tx_window_size self.ertm_option.max_transmit = max_transmit self.ertm_option.retransmission_time_out = 2000 self.ertm_option.monitor_time_out = 12000 self.ertm_option.maximum_pdu_size = 1010 # more of a hack for the moment def turn_on_ertm_and_fcs(self): self.control_table[ CommandCode. CONNECTION_RESPONSE] = self._on_connection_response_use_ertm_and_fcs def turn_on_fcs(self): self.fcs_option = l2cap_packets.FrameCheckSequenceOption() self.fcs_option.fcs_type = l2cap_packets.FcsType.DEFAULT # more of a hack for the moment def ignore_config_and_connections(self): Loading Loading @@ -144,9 +202,7 @@ class CertL2cap(Closable): sid, cid, cid, l2cap_packets.ConnectionResponseResult.SUCCESS, l2cap_packets.ConnectionResponseStatus. NO_FURTHER_INFORMATION_AVAILABLE) connection_response_l2cap = l2cap_packets.BasicFrameBuilder( 1, connection_response) self.send_acl(connection_response_l2cap) self.control_channel.send(connection_response) return True def _on_connection_response_default(self, l2cap_control_view): Loading @@ -157,72 +213,15 @@ class CertL2cap(Closable): dcid = connection_response_view.GetDestinationCid() self.scid_to_dcid[scid] = dcid config_request = l2cap_packets.ConfigurationRequestBuilder( sid + 1, dcid, l2cap_packets.Continuation.END, []) config_request_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_request) self.send_acl(config_request_l2cap) return True def _on_connection_response_use_ertm(self, l2cap_control_view): connection_response_view = l2cap_packets.ConnectionResponseView( l2cap_control_view) sid = connection_response_view.GetIdentifier() scid = connection_response_view.GetSourceCid() dcid = connection_response_view.GetDestinationCid() self.scid_to_dcid[scid] = dcid # FIXME: This doesn't work! ertm_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption( ) ertm_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC ertm_option.tx_window_size = self.ertm_tx_window_size ertm_option.max_transmit = self.ertm_max_transmit ertm_option.retransmission_time_out = 2000 ertm_option.monitor_time_out = 12000 ertm_option.maximum_pdu_size = 1010 options = [ertm_option] config_request = l2cap_packets.ConfigurationRequestBuilder( sid + 1, dcid, l2cap_packets.Continuation.END, options) config_request_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_request) self.send_acl(config_request_l2cap) return True def _on_connection_response_use_ertm_and_fcs(self, l2cap_control_view): connection_response_view = l2cap_packets.ConnectionResponseView( l2cap_control_view) sid = connection_response_view.GetIdentifier() scid = connection_response_view.GetSourceCid() dcid = connection_response_view.GetDestinationCid() self.scid_to_dcid[scid] = dcid # FIXME: This doesn't work! ertm_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption( ) ertm_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC ertm_option.tx_window_size = self.ertm_tx_window_size ertm_option.max_transmit = self.ertm_max_transmit ertm_option.retransmission_time_out = 2000 ertm_option.monitor_time_out = 12000 ertm_option.maximum_pdu_size = 1010 fcs_option = l2cap_packets.FrameCheckSequenceOption() fcs_option.fcs_type = l2cap_packets.FcsType.DEFAULT options = [ertm_option, fcs_option] options = [] if self.ertm_option is not None: options.append(self.ertm_option) if self.fcs_option is not None: options.append(self.fcs_option) config_request = l2cap_packets.ConfigurationRequestBuilder( sid + 1, dcid, l2cap_packets.Continuation.END, options) config_request_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_request) self.send_acl(config_request_l2cap) self.control_channel.send(config_request) return True def _on_connection_response_configuration_request_with_unknown_options_and_hint( Loading Loading @@ -258,9 +257,7 @@ class CertL2cap(Closable): config_response = l2cap_packets.ConfigurationResponseBuilder( sid, self.scid_to_dcid.get(dcid, 0), l2cap_packets.Continuation.END, l2cap_packets.ConfigurationResponseResult.SUCCESS, []) config_response_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_response) self.send_acl(config_response_l2cap) self.control_channel.send(config_response) def _on_configuration_request_unacceptable_parameters( self, l2cap_control_view): Loading @@ -281,9 +278,7 @@ class CertL2cap(Closable): sid, self.scid_to_dcid.get(dcid, 0), l2cap_packets.Continuation.END, l2cap_packets.ConfigurationResponseResult.UNACCEPTABLE_PARAMETERS, [mtu_opt, fcs_opt, rfc_opt]) config_response_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_response) self.send_acl(config_response_l2cap) self.control_channel.send(config_response) def _on_configuration_response_default(self, l2cap_control_view): configuration_response = l2cap_packets.ConfigurationResponseView( Loading @@ -298,9 +293,7 @@ class CertL2cap(Closable): dcid = disconnection_request.GetDestinationCid() disconnection_response = l2cap_packets.DisconnectionResponseBuilder( sid, dcid, scid) disconnection_response_l2cap = l2cap_packets.BasicFrameBuilder( 1, disconnection_response) self.send_acl(disconnection_response_l2cap) self.control_channel.send(disconnection_response) def _on_disconnection_response_default(self, l2cap_control_view): disconnection_response = l2cap_packets.DisconnectionResponseView( Loading @@ -314,21 +307,18 @@ class CertL2cap(Closable): if information_type == l2cap_packets.InformationRequestInfoType.CONNECTIONLESS_MTU: response = l2cap_packets.InformationResponseConnectionlessMtuBuilder( sid, l2cap_packets.InformationRequestResult.SUCCESS, 100) response_l2cap = l2cap_packets.BasicFrameBuilder(1, response) self.send_acl(response_l2cap) self.control_channel.send(response) return if information_type == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: response = l2cap_packets.InformationResponseExtendedFeaturesBuilder( sid, l2cap_packets.InformationRequestResult.SUCCESS, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0) response_l2cap = l2cap_packets.BasicFrameBuilder(1, response) self.send_acl(response_l2cap) self.control_channel.send(response) return if information_type == l2cap_packets.InformationRequestInfoType.FIXED_CHANNELS_SUPPORTED: response = l2cap_packets.InformationResponseFixedChannelsBuilder( sid, l2cap_packets.InformationRequestResult.SUCCESS, 2) response_l2cap = l2cap_packets.BasicFrameBuilder(1, response) self.send_acl(response_l2cap) self.control_channel.send(response) return def _on_information_response_default(self, l2cap_control_view): Loading system/gd/l2cap/classic/cert/l2cap_test.py +313 −798 File changed.Preview size limit exceeded, changes collapsed. Show changes Loading
system/gd/cert/captures.py +18 −0 Original line number Diff line number Diff line Loading @@ -16,7 +16,10 @@ import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import hci_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode from cert.capture import Capture from cert.matchers import L2capMatchers def ReadBdAddrCompleteCapture(): Loading @@ -42,3 +45,18 @@ def ConnectionCompleteCapture(): hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.event))))) class L2capCaptures(object): @staticmethod def ConnectionResponse(scid): return Capture( L2capMatchers.ConnectionResponse(scid), L2capCaptures._extract_connection_response) @staticmethod def _extract_connection_response(packet): frame = L2capMatchers.control_frame_with_code( packet, CommandCode.CONNECTION_RESPONSE) return l2cap_packets.ConnectionResponseView(frame)
system/gd/cert/event_stream.py +4 −2 Original line number Diff line number Diff line Loading @@ -43,10 +43,12 @@ class FilteringEventStream(IEventStream): self.event_queue = SimpleQueue() self.stream = stream self.stream.register_callback(self.__event_callback, self.filter_fn) self.stream.register_callback( self.__event_callback, lambda packet: self.filter_fn(packet) is not None) def __event_callback(self, event): self.event_queue.put(event) self.event_queue.put(self.filter_fn(event)) def get_event_queue(self): return self.event_queue Loading
system/gd/cert/matchers.py +77 −24 Original line number Diff line number Diff line Loading @@ -18,6 +18,8 @@ import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType import logging class L2capMatchers(object): Loading Loading @@ -51,12 +53,32 @@ class L2capMatchers(object): return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT) @staticmethod def SupervisoryFrame(scid, req_seq=None, f=None, s=None, p=None): return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, scid, req_seq, f, s, p) def SFrame(req_seq=None, f=None, s=None, p=None): return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, req_seq, f, s, p) @staticmethod def InformationFrame(scid, tx_seq=None, payload=None): return lambda packet: L2capMatchers._is_matching_information_frame(packet, scid, tx_seq, payload) def IFrame(tx_seq=None, payload=None): return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload) @staticmethod def Data(payload): return lambda packet: packet.GetPayload().GetBytes() == payload # this is a hack - should be removed @staticmethod def PartialData(payload): return lambda packet: payload in packet.GetPayload().GetBytes() @staticmethod def ExtractBasicFrame(scid): return lambda packet: L2capMatchers._basic_frame_for(packet, scid) @staticmethod def InformationResponseExtendedFeatures(supports_ertm=None, supports_streaming=None, supports_fcs=None, supports_fixed_channels=None): return lambda packet: L2capMatchers._is_matching_information_response_extended_features(packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels) @staticmethod def _basic_frame(packet): Loading @@ -66,40 +88,40 @@ class L2capMatchers(object): bt_packets.PacketViewLittleEndian(list(packet.payload))) @staticmethod def _information_frame(packet, scid): def _basic_frame_for(packet, scid): frame = L2capMatchers._basic_frame(packet) if frame.GetChannelId() != scid: return None standard_frame = l2cap_packets.StandardFrameView(frame) return frame @staticmethod def _information_frame(packet): standard_frame = l2cap_packets.StandardFrameView(packet) if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME: return None return l2cap_packets.EnhancedInformationFrameView(standard_frame) @staticmethod def _supervisory_frame(packet, scid): frame = L2capMatchers._basic_frame(packet) if frame.GetChannelId() != scid: return None standard_frame = l2cap_packets.StandardFrameView(frame) def _supervisory_frame(packet): standard_frame = l2cap_packets.StandardFrameView(packet) if standard_frame.GetFrameType() != l2cap_packets.FrameType.S_FRAME: return None return l2cap_packets.EnhancedSupervisoryFrameView(standard_frame) @staticmethod def _is_matching_information_frame(packet, scid, tx_seq, payload): frame = L2capMatchers._information_frame(packet, scid) def _is_matching_information_frame(packet, tx_seq, payload): frame = L2capMatchers._information_frame(packet) if frame is None: return False if tx_seq is not None and frame.GetTxSeq() != tx_seq: return False if payload is not None and frame.GetPayload( ) != payload: # TODO(mylesgw) this doesn't work if payload is not None and frame.GetPayload().GetBytes() != payload: return False return True @staticmethod def _is_matching_supervisory_frame(packet, scid, req_seq, f, s, p): frame = L2capMatchers._supervisory_frame(packet, scid) def _is_matching_supervisory_frame(packet, req_seq, f, s, p): frame = L2capMatchers._supervisory_frame(packet) if frame is None: return False if req_seq is not None and frame.GetReqSeq() != req_seq: Loading @@ -114,13 +136,12 @@ class L2capMatchers(object): @staticmethod def _control_frame(packet): frame = L2capMatchers._basic_frame(packet) if frame is None or frame.GetChannelId() != 1: if packet.GetChannelId() != 1: return None return l2cap_packets.ControlView(frame.GetPayload()) return l2cap_packets.ControlView(packet.GetPayload()) @staticmethod def _control_frame_with_code(packet, code): def control_frame_with_code(packet, code): frame = L2capMatchers._control_frame(packet) if frame is None or frame.GetCode() != code: return None Loading @@ -128,11 +149,11 @@ class L2capMatchers(object): @staticmethod def _is_control_frame_with_code(packet, code): return L2capMatchers._control_frame_with_code(packet, code) is not None return L2capMatchers.control_frame_with_code(packet, code) is not None @staticmethod def _is_matching_connection_response(packet, scid): frame = L2capMatchers._control_frame_with_code( frame = L2capMatchers.control_frame_with_code( packet, CommandCode.CONNECTION_RESPONSE) if frame is None: return False Loading @@ -143,10 +164,42 @@ class L2capMatchers(object): @staticmethod def _is_matching_disconnection_response(packet, scid, dcid): frame = L2capMatchers._control_frame_with_code( frame = L2capMatchers.control_frame_with_code( packet, CommandCode.DISCONNECTION_RESPONSE) if frame is None: return False response = l2cap_packets.DisconnectionResponseView(frame) return response.GetSourceCid() == scid and response.GetDestinationCid( ) == dcid @staticmethod def _information_response_with_type(packet, info_type): frame = L2capMatchers.control_frame_with_code( packet, CommandCode.INFORMATION_RESPONSE) if frame is None: return None response = l2cap_packets.InformationResponseView(frame) if response.GetInfoType() != info_type: return None return response @staticmethod def _is_matching_information_response_extended_features( packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels): frame = L2capMatchers._information_response_with_type( packet, InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED) if frame is None: return False features = l2cap_packets.InformationResponseExtendedFeaturesView(frame) if supports_ertm is not None and features.GetEnhancedRetransmissionMode( ) != supports_ertm: return False if supports_streaming is not None and features.GetStreamingMode != supports_streaming: return False if supports_fcs is not None and features.GetFcsOption() != supports_fcs: return False if supports_fixed_channels is not None and features.GetFixedChannels( ) != supports_fixed_channels: return False return True
system/gd/l2cap/classic/cert/cert_l2cap.py +103 −113 Original line number Diff line number Diff line Loading @@ -21,21 +21,75 @@ from cert.truth import assertThat import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode from bluetooth_packets_python3.l2cap_packets import Final from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction from bluetooth_packets_python3.l2cap_packets import Poll from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType from cert.event_stream import FilteringEventStream from cert.event_stream import IEventStream from cert.matchers import L2capMatchers from cert.captures import L2capCaptures class CertL2capChannel(IEventStream): def __init__(self, device, scid, acl_stream): def __init__(self, device, scid, dcid, acl_stream, acl, control_channel): self._device = device self._scid = scid self._our_acl_view = acl_stream self._dcid = dcid self._acl_stream = acl_stream self._acl = acl self._control_channel = control_channel self._our_acl_view = FilteringEventStream( acl_stream, L2capMatchers.ExtractBasicFrame(scid)) def get_event_queue(self): return self._our_acl_view.get_event_queue() def send(self, packet): frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet) self._acl.send(frame.Serialize()) def send_i_frame(self, tx_seq, req_seq, f=Final.NOT_SET, sar=SegmentationAndReassembly.UNSEGMENTED, payload=None): frame = l2cap_packets.EnhancedInformationFrameBuilder( self._dcid, tx_seq, f, req_seq, sar, payload) self._acl.send(frame.Serialize()) def send_s_frame(self, req_seq, s=SupervisoryFunction.RECEIVER_READY, p=Poll.NOT_SET, f=Final.NOT_SET): frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( self._dcid, s, p, f, req_seq) self._acl.send(frame.Serialize()) def send_information_request(self, type): assertThat(self._scid).isEqualTo(1) signal_id = 3 information_request = l2cap_packets.InformationRequestBuilder( signal_id, type) self.send(information_request) def send_extended_features_request(self): self.send_information_request( InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED) def disconnect_and_verify(self): assertThat(self._scid).isNotEqualTo(1) self._control_channel.send( l2cap_packets.DisconnectionRequestBuilder(1, self._dcid, self._scid)) assertThat(self._control_channel).emits( L2capMatchers.DisconnectionResponse(self._scid, self._dcid)) class CertL2cap(Closable): Loading Loading @@ -64,8 +118,9 @@ class CertL2cap(Closable): } self.scid_to_dcid = {} self.ertm_tx_window_size = 10 self.ertm_max_transmit = 20 self.ertm_option = None self.fcs_option = None def close(self): self._acl_manager.close() Loading @@ -74,46 +129,49 @@ class CertL2cap(Closable): def connect_acl(self, remote_addr): self._acl = self._acl_manager.initiate_connection(remote_addr) self._acl.wait_for_connection_complete() self.get_acl_stream().register_callback(self._handle_control_packet) self.control_channel = CertL2capChannel( self._device, 1, 1, self._get_acl_stream(), self._acl, control_channel=None) self._get_acl_stream().register_callback(self._handle_control_packet) def open_channel(self, signal_id, psm, scid): # what is the 1 here for? open_channel = l2cap_packets.BasicFrameBuilder( 1, l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid)) self.send_acl(open_channel) self.control_channel.send( l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid)) assertThat(self._acl).emits(L2capMatchers.ConnectionResponse(scid)) return CertL2capChannel(self._device, scid, self.get_acl_stream()) response = L2capCaptures.ConnectionResponse(scid) assertThat(self.control_channel).emits(response) return CertL2capChannel(self._device, scid, response.get().GetDestinationCid(), self._get_acl_stream(), self._acl, self.control_channel) # prefer to use channel abstraction instead, if at all possible def send_acl(self, packet): self._acl.send(packet.Serialize()) # temporary until clients migrated def get_acl_stream(self): return self._acl_manager.get_acl_stream() # temporary until clients migrated def get_acl(self): return self._acl def get_control_channel(self): return self.control_channel # temporary until clients migrated def get_dcid(self, scid): return self.scid_to_dcid[scid] def _get_acl_stream(self): return self._acl_manager.get_acl_stream() # more of a hack for the moment def turn_on_ertm(self, tx_window_size=10, max_transmit=20): self.ertm_tx_window_size = tx_window_size self.ertm_max_transmit = max_transmit self.control_table[ CommandCode. CONNECTION_RESPONSE] = self._on_connection_response_use_ertm self.ertm_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption( ) self.ertm_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC self.ertm_option.tx_window_size = tx_window_size self.ertm_option.max_transmit = max_transmit self.ertm_option.retransmission_time_out = 2000 self.ertm_option.monitor_time_out = 12000 self.ertm_option.maximum_pdu_size = 1010 # more of a hack for the moment def turn_on_ertm_and_fcs(self): self.control_table[ CommandCode. CONNECTION_RESPONSE] = self._on_connection_response_use_ertm_and_fcs def turn_on_fcs(self): self.fcs_option = l2cap_packets.FrameCheckSequenceOption() self.fcs_option.fcs_type = l2cap_packets.FcsType.DEFAULT # more of a hack for the moment def ignore_config_and_connections(self): Loading Loading @@ -144,9 +202,7 @@ class CertL2cap(Closable): sid, cid, cid, l2cap_packets.ConnectionResponseResult.SUCCESS, l2cap_packets.ConnectionResponseStatus. NO_FURTHER_INFORMATION_AVAILABLE) connection_response_l2cap = l2cap_packets.BasicFrameBuilder( 1, connection_response) self.send_acl(connection_response_l2cap) self.control_channel.send(connection_response) return True def _on_connection_response_default(self, l2cap_control_view): Loading @@ -157,72 +213,15 @@ class CertL2cap(Closable): dcid = connection_response_view.GetDestinationCid() self.scid_to_dcid[scid] = dcid config_request = l2cap_packets.ConfigurationRequestBuilder( sid + 1, dcid, l2cap_packets.Continuation.END, []) config_request_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_request) self.send_acl(config_request_l2cap) return True def _on_connection_response_use_ertm(self, l2cap_control_view): connection_response_view = l2cap_packets.ConnectionResponseView( l2cap_control_view) sid = connection_response_view.GetIdentifier() scid = connection_response_view.GetSourceCid() dcid = connection_response_view.GetDestinationCid() self.scid_to_dcid[scid] = dcid # FIXME: This doesn't work! ertm_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption( ) ertm_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC ertm_option.tx_window_size = self.ertm_tx_window_size ertm_option.max_transmit = self.ertm_max_transmit ertm_option.retransmission_time_out = 2000 ertm_option.monitor_time_out = 12000 ertm_option.maximum_pdu_size = 1010 options = [ertm_option] config_request = l2cap_packets.ConfigurationRequestBuilder( sid + 1, dcid, l2cap_packets.Continuation.END, options) config_request_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_request) self.send_acl(config_request_l2cap) return True def _on_connection_response_use_ertm_and_fcs(self, l2cap_control_view): connection_response_view = l2cap_packets.ConnectionResponseView( l2cap_control_view) sid = connection_response_view.GetIdentifier() scid = connection_response_view.GetSourceCid() dcid = connection_response_view.GetDestinationCid() self.scid_to_dcid[scid] = dcid # FIXME: This doesn't work! ertm_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption( ) ertm_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC ertm_option.tx_window_size = self.ertm_tx_window_size ertm_option.max_transmit = self.ertm_max_transmit ertm_option.retransmission_time_out = 2000 ertm_option.monitor_time_out = 12000 ertm_option.maximum_pdu_size = 1010 fcs_option = l2cap_packets.FrameCheckSequenceOption() fcs_option.fcs_type = l2cap_packets.FcsType.DEFAULT options = [ertm_option, fcs_option] options = [] if self.ertm_option is not None: options.append(self.ertm_option) if self.fcs_option is not None: options.append(self.fcs_option) config_request = l2cap_packets.ConfigurationRequestBuilder( sid + 1, dcid, l2cap_packets.Continuation.END, options) config_request_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_request) self.send_acl(config_request_l2cap) self.control_channel.send(config_request) return True def _on_connection_response_configuration_request_with_unknown_options_and_hint( Loading Loading @@ -258,9 +257,7 @@ class CertL2cap(Closable): config_response = l2cap_packets.ConfigurationResponseBuilder( sid, self.scid_to_dcid.get(dcid, 0), l2cap_packets.Continuation.END, l2cap_packets.ConfigurationResponseResult.SUCCESS, []) config_response_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_response) self.send_acl(config_response_l2cap) self.control_channel.send(config_response) def _on_configuration_request_unacceptable_parameters( self, l2cap_control_view): Loading @@ -281,9 +278,7 @@ class CertL2cap(Closable): sid, self.scid_to_dcid.get(dcid, 0), l2cap_packets.Continuation.END, l2cap_packets.ConfigurationResponseResult.UNACCEPTABLE_PARAMETERS, [mtu_opt, fcs_opt, rfc_opt]) config_response_l2cap = l2cap_packets.BasicFrameBuilder( 1, config_response) self.send_acl(config_response_l2cap) self.control_channel.send(config_response) def _on_configuration_response_default(self, l2cap_control_view): configuration_response = l2cap_packets.ConfigurationResponseView( Loading @@ -298,9 +293,7 @@ class CertL2cap(Closable): dcid = disconnection_request.GetDestinationCid() disconnection_response = l2cap_packets.DisconnectionResponseBuilder( sid, dcid, scid) disconnection_response_l2cap = l2cap_packets.BasicFrameBuilder( 1, disconnection_response) self.send_acl(disconnection_response_l2cap) self.control_channel.send(disconnection_response) def _on_disconnection_response_default(self, l2cap_control_view): disconnection_response = l2cap_packets.DisconnectionResponseView( Loading @@ -314,21 +307,18 @@ class CertL2cap(Closable): if information_type == l2cap_packets.InformationRequestInfoType.CONNECTIONLESS_MTU: response = l2cap_packets.InformationResponseConnectionlessMtuBuilder( sid, l2cap_packets.InformationRequestResult.SUCCESS, 100) response_l2cap = l2cap_packets.BasicFrameBuilder(1, response) self.send_acl(response_l2cap) self.control_channel.send(response) return if information_type == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: response = l2cap_packets.InformationResponseExtendedFeaturesBuilder( sid, l2cap_packets.InformationRequestResult.SUCCESS, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0) response_l2cap = l2cap_packets.BasicFrameBuilder(1, response) self.send_acl(response_l2cap) self.control_channel.send(response) return if information_type == l2cap_packets.InformationRequestInfoType.FIXED_CHANNELS_SUPPORTED: response = l2cap_packets.InformationResponseFixedChannelsBuilder( sid, l2cap_packets.InformationRequestResult.SUCCESS, 2) response_l2cap = l2cap_packets.BasicFrameBuilder(1, response) self.send_acl(response_l2cap) self.control_channel.send(response) return def _on_information_response_default(self, l2cap_control_view): Loading
system/gd/l2cap/classic/cert/l2cap_test.py +313 −798 File changed.Preview size limit exceeded, changes collapsed. Show changes