Loading system/gd/cert/matchers.py +58 −6 Original line number Diff line number Diff line Loading @@ -97,11 +97,15 @@ class L2capMatchers(object): @staticmethod def IFrame(tx_seq=None, payload=None, f=None): return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f) return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=False) @staticmethod def IFrameWithFcs(tx_seq=None, payload=None, f=None): return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=True) @staticmethod def IFrameStart(tx_seq=None, payload=None, f=None): return lambda packet: L2capMatchers._is_matching_information_start_frame(packet, tx_seq, payload, f) return lambda packet: L2capMatchers._is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False) @staticmethod def Data(payload): Loading Loading @@ -135,6 +139,10 @@ class L2capMatchers(object): def ExtractBasicFrame(scid): return lambda packet: L2capMatchers._basic_frame_for(packet, scid) @staticmethod def ExtractBasicFrameWithFcs(scid): return lambda packet: L2capMatchers._basic_frame_with_fcs_for(packet, scid) @staticmethod def InformationResponseExtendedFeatures(supports_ertm=None, supports_streaming=None, Loading @@ -149,6 +157,13 @@ class L2capMatchers(object): return l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet.payload))) @staticmethod def _basic_frame_with_fcs(packet): if packet is None: return None return l2cap_packets.BasicFrameWithFcsView( bt_packets.PacketViewLittleEndian(list(packet.payload))) @staticmethod def _basic_frame_for(packet, scid): frame = L2capMatchers._basic_frame(packet) Loading @@ -156,6 +171,16 @@ class L2capMatchers(object): return None return frame @staticmethod def _basic_frame_with_fcs_for(packet, scid): frame = L2capMatchers._basic_frame(packet) if frame.GetChannelId() != scid: return None frame = L2capMatchers._basic_frame_with_fcs(packet) if frame is None: return None return frame @staticmethod def _information_frame(packet): standard_frame = l2cap_packets.StandardFrameView(packet) Loading @@ -163,6 +188,15 @@ class L2capMatchers(object): return None return l2cap_packets.EnhancedInformationFrameView(standard_frame) @staticmethod def _information_frame_with_fcs(packet): standard_frame = l2cap_packets.StandardFrameWithFcsView(packet) if standard_frame is None: return None if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME: return None return l2cap_packets.EnhancedInformationFrameWithFcsView(standard_frame) @staticmethod def _information_start_frame(packet): start_frame = L2capMatchers._information_frame(packet) Loading @@ -170,6 +204,14 @@ class L2capMatchers(object): return None return l2cap_packets.EnhancedInformationStartFrameView(start_frame) @staticmethod def _information_start_frame_with_fcs(packet): start_frame = L2capMatchers._information_frame_with_fcs(packet) if start_frame is None: return None return l2cap_packets.EnhancedInformationStartFrameWithFcsView( start_frame) @staticmethod def _supervisory_frame(packet): standard_frame = l2cap_packets.StandardFrameView(packet) Loading @@ -178,7 +220,10 @@ class L2capMatchers(object): return l2cap_packets.EnhancedSupervisoryFrameView(standard_frame) @staticmethod def _is_matching_information_frame(packet, tx_seq, payload, f): def _is_matching_information_frame(packet, tx_seq, payload, f, fcs=False): if fcs: frame = L2capMatchers._information_frame_with_fcs(packet) else: frame = L2capMatchers._information_frame(packet) if frame is None: return False Loading @@ -191,7 +236,14 @@ class L2capMatchers(object): return True @staticmethod def _is_matching_information_start_frame(packet, tx_seq, payload, f): def _is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False): if fcs: frame = L2capMatchers._information_start_frame_with_fcs(packet) else: frame = L2capMatchers._information_start_frame(packet) if frame is None: return False Loading system/gd/l2cap/classic/cert/cert_l2cap.py +27 −8 Original line number Diff line number Diff line Loading @@ -34,13 +34,25 @@ from cert.captures import L2capCaptures class CertL2capChannel(IEventStream): def __init__(self, device, scid, dcid, acl_stream, acl, control_channel): def __init__(self, device, scid, dcid, acl_stream, acl, control_channel, fcs_enabled=False): self._device = device self._scid = scid self._dcid = dcid self._acl_stream = acl_stream self._acl = acl self._control_channel = control_channel self.fcs_enabled = fcs_enabled if fcs_enabled: self._our_acl_view = FilteringEventStream( acl_stream, L2capMatchers.ExtractBasicFrameWithFcs(scid)) else: self._our_acl_view = FilteringEventStream( acl_stream, L2capMatchers.ExtractBasicFrame(scid)) Loading @@ -56,7 +68,12 @@ class CertL2capChannel(IEventStream): req_seq, f=Final.NOT_SET, sar=SegmentationAndReassembly.UNSEGMENTED, payload=None): payload=None, fcs=False): if fcs: frame = l2cap_packets.EnhancedInformationFrameWithFcsBuilder( self._dcid, tx_seq, f, req_seq, sar, payload) else: frame = l2cap_packets.EnhancedInformationFrameBuilder( self._dcid, tx_seq, f, req_seq, sar, payload) self._acl.send(frame.Serialize()) Loading Loading @@ -129,6 +146,7 @@ class CertL2cap(Closable): self.basic_option = None self.ertm_option = None self.fcs_option = None self.fcs_enabled = False self.config_response_result = l2cap_packets.ConfigurationResponseResult.SUCCESS self.config_response_options = [] Loading Loading @@ -158,7 +176,7 @@ class CertL2cap(Closable): return CertL2capChannel(self._device, scid, response.get().GetDestinationCid(), self._get_acl_stream(), self._acl, self.control_channel) self.control_channel, self.fcs_enabled) def verify_and_respond_open_channel_from_remote(self, psm=0x33): request = L2capCaptures.ConnectionRequest(psm) Loading Loading @@ -189,7 +207,7 @@ class CertL2cap(Closable): channel = CertL2capChannel(self._device, cid, cid, self._get_acl_stream(), self._acl, self.control_channel) self.control_channel, self.fcs_enabled) return channel # prefer to use channel abstraction instead, if at all possible Loading Loading @@ -223,6 +241,7 @@ class CertL2cap(Closable): def turn_on_fcs(self): self.fcs_option = l2cap_packets.FrameCheckSequenceOption() self.fcs_option.fcs_type = l2cap_packets.FcsType.DEFAULT self.fcs_enabled = True # more of a hack for the moment def ignore_config_and_connections(self): Loading system/gd/l2cap/classic/cert/l2cap_test.py +72 −5 Original line number Diff line number Diff line Loading @@ -467,8 +467,7 @@ class L2capTest(GdBaseTestClass): dut_channel.send(b'abc') assertThat(cert_channel).emits( L2capMatchers.PartialData( b"abc\x4f\xa3")) # TODO: Use packet parser L2capMatchers.IFrameWithFcs(payload=b"abc")) @metadata( pts_test_id="L2CAP/FOC/BV-03-C", Loading @@ -491,8 +490,77 @@ class L2capTest(GdBaseTestClass): dut_channel.send(b'abc') assertThat(cert_channel).emits( L2capMatchers.PartialData( b"abc\x4f\xa3")) # TODO: Use packet parser L2capMatchers.IFrameWithFcs(payload=b"abc")) @metadata( pts_test_id="L2CAP/OFS/BV-01-C", pts_test_name="Sending I-Frames without FCS for ERTM") def test_sending_i_frames_without_fcs_for_ertm(self): """ Verify the IUT does not include the FCS in I-frames. """ self._setup_link_from_cert() self.cert_l2cap.turn_on_ertm() (dut_channel, cert_channel) = self._open_channel( scid=0x41, psm=0x33, mode=RetransmissionFlowControlMode.ERTM) dut_channel.send(b'abc') assertThat(cert_channel).emits( L2capMatchers.IFrame(tx_seq=0, payload=b"abc")) @metadata( pts_test_id="L2CAP/OFS/BV-02-C", pts_test_name="Receiving I-Frames without FCS for ERTM") def test_receiving_i_frames_without_fcs_for_ertm(self): """ Verify the IUT can handle I-frames that do not contain the FCS. """ self._setup_link_from_cert() self.cert_l2cap.turn_on_ertm() (dut_channel, cert_channel) = self._open_channel( scid=0x41, psm=0x33, mode=RetransmissionFlowControlMode.ERTM) dut_channel.send(b"abc") assertThat(cert_channel).emits( L2capMatchers.IFrame(tx_seq=0, payload=b"abc")) @metadata( pts_test_id="L2CAP/OFS/BV-05-C", pts_test_name="Sending I-Frames with FCS for ERTM") def test_sending_i_frames_with_fcs_for_ertm(self): """ Verify the IUT does include the FCS in I-frames. """ self._setup_link_from_cert() self.cert_l2cap.turn_on_ertm() self.cert_l2cap.turn_on_fcs() (dut_channel, cert_channel) = self._open_channel( scid=0x41, psm=0x33, mode=RetransmissionFlowControlMode.ERTM) dut_channel.send(b'abc') assertThat(cert_channel).emits( L2capMatchers.IFrameWithFcs(tx_seq=0, payload=b"abc")) @metadata( pts_test_id="L2CAP/OFS/BV-06-C", pts_test_name="Receiving I-Frames with FCS for ERTM") def test_aareceiving_i_frames_with_fcs_for_ertm(self): """ Verify the IUT can handle I-frames that do contain the FCS. """ self._setup_link_from_cert() self.cert_l2cap.turn_on_ertm() self.cert_l2cap.turn_on_fcs() (dut_channel, cert_channel) = self._open_channel( scid=0x41, psm=0x33, mode=RetransmissionFlowControlMode.ERTM) dut_channel.send(b"abc") assertThat(cert_channel).emits( L2capMatchers.IFrameWithFcs(tx_seq=0, payload=b"abc")) @metadata( pts_test_id="L2CAP/ERM/BV-01-C", pts_test_name="Transmit I-frames") Loading @@ -512,7 +580,6 @@ class L2capTest(GdBaseTestClass): assertThat(cert_channel).emits( L2capMatchers.IFrame(tx_seq=0, payload=b"abc")) # todo: verify packet received? cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) dut_channel.send(b'abc') Loading Loading
system/gd/cert/matchers.py +58 −6 Original line number Diff line number Diff line Loading @@ -97,11 +97,15 @@ class L2capMatchers(object): @staticmethod def IFrame(tx_seq=None, payload=None, f=None): return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f) return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=False) @staticmethod def IFrameWithFcs(tx_seq=None, payload=None, f=None): return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=True) @staticmethod def IFrameStart(tx_seq=None, payload=None, f=None): return lambda packet: L2capMatchers._is_matching_information_start_frame(packet, tx_seq, payload, f) return lambda packet: L2capMatchers._is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False) @staticmethod def Data(payload): Loading Loading @@ -135,6 +139,10 @@ class L2capMatchers(object): def ExtractBasicFrame(scid): return lambda packet: L2capMatchers._basic_frame_for(packet, scid) @staticmethod def ExtractBasicFrameWithFcs(scid): return lambda packet: L2capMatchers._basic_frame_with_fcs_for(packet, scid) @staticmethod def InformationResponseExtendedFeatures(supports_ertm=None, supports_streaming=None, Loading @@ -149,6 +157,13 @@ class L2capMatchers(object): return l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(packet.payload))) @staticmethod def _basic_frame_with_fcs(packet): if packet is None: return None return l2cap_packets.BasicFrameWithFcsView( bt_packets.PacketViewLittleEndian(list(packet.payload))) @staticmethod def _basic_frame_for(packet, scid): frame = L2capMatchers._basic_frame(packet) Loading @@ -156,6 +171,16 @@ class L2capMatchers(object): return None return frame @staticmethod def _basic_frame_with_fcs_for(packet, scid): frame = L2capMatchers._basic_frame(packet) if frame.GetChannelId() != scid: return None frame = L2capMatchers._basic_frame_with_fcs(packet) if frame is None: return None return frame @staticmethod def _information_frame(packet): standard_frame = l2cap_packets.StandardFrameView(packet) Loading @@ -163,6 +188,15 @@ class L2capMatchers(object): return None return l2cap_packets.EnhancedInformationFrameView(standard_frame) @staticmethod def _information_frame_with_fcs(packet): standard_frame = l2cap_packets.StandardFrameWithFcsView(packet) if standard_frame is None: return None if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME: return None return l2cap_packets.EnhancedInformationFrameWithFcsView(standard_frame) @staticmethod def _information_start_frame(packet): start_frame = L2capMatchers._information_frame(packet) Loading @@ -170,6 +204,14 @@ class L2capMatchers(object): return None return l2cap_packets.EnhancedInformationStartFrameView(start_frame) @staticmethod def _information_start_frame_with_fcs(packet): start_frame = L2capMatchers._information_frame_with_fcs(packet) if start_frame is None: return None return l2cap_packets.EnhancedInformationStartFrameWithFcsView( start_frame) @staticmethod def _supervisory_frame(packet): standard_frame = l2cap_packets.StandardFrameView(packet) Loading @@ -178,7 +220,10 @@ class L2capMatchers(object): return l2cap_packets.EnhancedSupervisoryFrameView(standard_frame) @staticmethod def _is_matching_information_frame(packet, tx_seq, payload, f): def _is_matching_information_frame(packet, tx_seq, payload, f, fcs=False): if fcs: frame = L2capMatchers._information_frame_with_fcs(packet) else: frame = L2capMatchers._information_frame(packet) if frame is None: return False Loading @@ -191,7 +236,14 @@ class L2capMatchers(object): return True @staticmethod def _is_matching_information_start_frame(packet, tx_seq, payload, f): def _is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False): if fcs: frame = L2capMatchers._information_start_frame_with_fcs(packet) else: frame = L2capMatchers._information_start_frame(packet) if frame is None: return False Loading
system/gd/l2cap/classic/cert/cert_l2cap.py +27 −8 Original line number Diff line number Diff line Loading @@ -34,13 +34,25 @@ from cert.captures import L2capCaptures class CertL2capChannel(IEventStream): def __init__(self, device, scid, dcid, acl_stream, acl, control_channel): def __init__(self, device, scid, dcid, acl_stream, acl, control_channel, fcs_enabled=False): self._device = device self._scid = scid self._dcid = dcid self._acl_stream = acl_stream self._acl = acl self._control_channel = control_channel self.fcs_enabled = fcs_enabled if fcs_enabled: self._our_acl_view = FilteringEventStream( acl_stream, L2capMatchers.ExtractBasicFrameWithFcs(scid)) else: self._our_acl_view = FilteringEventStream( acl_stream, L2capMatchers.ExtractBasicFrame(scid)) Loading @@ -56,7 +68,12 @@ class CertL2capChannel(IEventStream): req_seq, f=Final.NOT_SET, sar=SegmentationAndReassembly.UNSEGMENTED, payload=None): payload=None, fcs=False): if fcs: frame = l2cap_packets.EnhancedInformationFrameWithFcsBuilder( self._dcid, tx_seq, f, req_seq, sar, payload) else: frame = l2cap_packets.EnhancedInformationFrameBuilder( self._dcid, tx_seq, f, req_seq, sar, payload) self._acl.send(frame.Serialize()) Loading Loading @@ -129,6 +146,7 @@ class CertL2cap(Closable): self.basic_option = None self.ertm_option = None self.fcs_option = None self.fcs_enabled = False self.config_response_result = l2cap_packets.ConfigurationResponseResult.SUCCESS self.config_response_options = [] Loading Loading @@ -158,7 +176,7 @@ class CertL2cap(Closable): return CertL2capChannel(self._device, scid, response.get().GetDestinationCid(), self._get_acl_stream(), self._acl, self.control_channel) self.control_channel, self.fcs_enabled) def verify_and_respond_open_channel_from_remote(self, psm=0x33): request = L2capCaptures.ConnectionRequest(psm) Loading Loading @@ -189,7 +207,7 @@ class CertL2cap(Closable): channel = CertL2capChannel(self._device, cid, cid, self._get_acl_stream(), self._acl, self.control_channel) self.control_channel, self.fcs_enabled) return channel # prefer to use channel abstraction instead, if at all possible Loading Loading @@ -223,6 +241,7 @@ class CertL2cap(Closable): def turn_on_fcs(self): self.fcs_option = l2cap_packets.FrameCheckSequenceOption() self.fcs_option.fcs_type = l2cap_packets.FcsType.DEFAULT self.fcs_enabled = True # more of a hack for the moment def ignore_config_and_connections(self): Loading
system/gd/l2cap/classic/cert/l2cap_test.py +72 −5 Original line number Diff line number Diff line Loading @@ -467,8 +467,7 @@ class L2capTest(GdBaseTestClass): dut_channel.send(b'abc') assertThat(cert_channel).emits( L2capMatchers.PartialData( b"abc\x4f\xa3")) # TODO: Use packet parser L2capMatchers.IFrameWithFcs(payload=b"abc")) @metadata( pts_test_id="L2CAP/FOC/BV-03-C", Loading @@ -491,8 +490,77 @@ class L2capTest(GdBaseTestClass): dut_channel.send(b'abc') assertThat(cert_channel).emits( L2capMatchers.PartialData( b"abc\x4f\xa3")) # TODO: Use packet parser L2capMatchers.IFrameWithFcs(payload=b"abc")) @metadata( pts_test_id="L2CAP/OFS/BV-01-C", pts_test_name="Sending I-Frames without FCS for ERTM") def test_sending_i_frames_without_fcs_for_ertm(self): """ Verify the IUT does not include the FCS in I-frames. """ self._setup_link_from_cert() self.cert_l2cap.turn_on_ertm() (dut_channel, cert_channel) = self._open_channel( scid=0x41, psm=0x33, mode=RetransmissionFlowControlMode.ERTM) dut_channel.send(b'abc') assertThat(cert_channel).emits( L2capMatchers.IFrame(tx_seq=0, payload=b"abc")) @metadata( pts_test_id="L2CAP/OFS/BV-02-C", pts_test_name="Receiving I-Frames without FCS for ERTM") def test_receiving_i_frames_without_fcs_for_ertm(self): """ Verify the IUT can handle I-frames that do not contain the FCS. """ self._setup_link_from_cert() self.cert_l2cap.turn_on_ertm() (dut_channel, cert_channel) = self._open_channel( scid=0x41, psm=0x33, mode=RetransmissionFlowControlMode.ERTM) dut_channel.send(b"abc") assertThat(cert_channel).emits( L2capMatchers.IFrame(tx_seq=0, payload=b"abc")) @metadata( pts_test_id="L2CAP/OFS/BV-05-C", pts_test_name="Sending I-Frames with FCS for ERTM") def test_sending_i_frames_with_fcs_for_ertm(self): """ Verify the IUT does include the FCS in I-frames. """ self._setup_link_from_cert() self.cert_l2cap.turn_on_ertm() self.cert_l2cap.turn_on_fcs() (dut_channel, cert_channel) = self._open_channel( scid=0x41, psm=0x33, mode=RetransmissionFlowControlMode.ERTM) dut_channel.send(b'abc') assertThat(cert_channel).emits( L2capMatchers.IFrameWithFcs(tx_seq=0, payload=b"abc")) @metadata( pts_test_id="L2CAP/OFS/BV-06-C", pts_test_name="Receiving I-Frames with FCS for ERTM") def test_aareceiving_i_frames_with_fcs_for_ertm(self): """ Verify the IUT can handle I-frames that do contain the FCS. """ self._setup_link_from_cert() self.cert_l2cap.turn_on_ertm() self.cert_l2cap.turn_on_fcs() (dut_channel, cert_channel) = self._open_channel( scid=0x41, psm=0x33, mode=RetransmissionFlowControlMode.ERTM) dut_channel.send(b"abc") assertThat(cert_channel).emits( L2capMatchers.IFrameWithFcs(tx_seq=0, payload=b"abc")) @metadata( pts_test_id="L2CAP/ERM/BV-01-C", pts_test_name="Transmit I-frames") Loading @@ -512,7 +580,6 @@ class L2capTest(GdBaseTestClass): assertThat(cert_channel).emits( L2capMatchers.IFrame(tx_seq=0, payload=b"abc")) # todo: verify packet received? cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) dut_channel.send(b'abc') Loading