Loading system/gd/cert/captures.py +18 −0 Original line number Diff line number Diff line Loading @@ -16,7 +16,10 @@ import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import hci_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode from cert.capture import Capture from cert.matchers import L2capMatchers def ReadBdAddrCompleteCapture(): Loading @@ -42,3 +45,18 @@ def ConnectionCompleteCapture(): hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.event))))) class L2capCaptures(object): @staticmethod def ConnectionResponse(scid): return Capture( L2capMatchers.ConnectionResponse(scid), L2capCaptures._extract_connection_response) @staticmethod def _extract_connection_response(packet): frame = L2capMatchers.control_frame_with_code( packet, CommandCode.CONNECTION_RESPONSE) return l2cap_packets.ConnectionResponseView(frame) system/gd/cert/matchers.py +5 −6 Original line number Diff line number Diff line Loading @@ -107,8 +107,7 @@ class L2capMatchers(object): return False if tx_seq is not None and frame.GetTxSeq() != tx_seq: return False if payload is not None and frame.GetPayload( ) != payload: # TODO(mylesgw) this doesn't work if payload is not None and frame.GetPayload().GetBytes() != payload: return False return True Loading @@ -134,7 +133,7 @@ class L2capMatchers(object): return l2cap_packets.ControlView(packet.GetPayload()) @staticmethod def _control_frame_with_code(packet, code): def control_frame_with_code(packet, code): frame = L2capMatchers._control_frame(packet) if frame is None or frame.GetCode() != code: return None Loading @@ -142,11 +141,11 @@ class L2capMatchers(object): @staticmethod def _is_control_frame_with_code(packet, code): return L2capMatchers._control_frame_with_code(packet, code) is not None return L2capMatchers.control_frame_with_code(packet, code) is not None @staticmethod def _is_matching_connection_response(packet, scid): frame = L2capMatchers._control_frame_with_code( frame = L2capMatchers.control_frame_with_code( packet, CommandCode.CONNECTION_RESPONSE) if frame is None: return False Loading @@ -157,7 +156,7 @@ class L2capMatchers(object): @staticmethod def _is_matching_disconnection_response(packet, scid, dcid): frame = L2capMatchers._control_frame_with_code( frame = L2capMatchers.control_frame_with_code( packet, CommandCode.DISCONNECTION_RESPONSE) if frame is None: return False Loading system/gd/l2cap/classic/cert/cert_l2cap.py +22 −7 Original line number Diff line number Diff line Loading @@ -21,16 +21,20 @@ from cert.truth import assertThat import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode from bluetooth_packets_python3.l2cap_packets import Final from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly from cert.event_stream import FilteringEventStream from cert.event_stream import IEventStream from cert.matchers import L2capMatchers from cert.captures import L2capCaptures class CertL2capChannel(IEventStream): def __init__(self, device, scid, acl_stream, acl): def __init__(self, device, scid, dcid, acl_stream, acl): self._device = device self._scid = scid self._dcid = dcid self._acl_stream = acl_stream self._acl = acl self._our_acl_view = FilteringEventStream( Loading @@ -40,7 +44,17 @@ class CertL2capChannel(IEventStream): return self._our_acl_view.get_event_queue() def send(self, packet): frame = l2cap_packets.BasicFrameBuilder(self._scid, packet) frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet) self._acl.send(frame.Serialize()) def send_i_frame(self, tx_seq, req_seq, f=Final.NOT_SET, sar=SegmentationAndReassembly.UNSEGMENTED, payload=None): frame = l2cap_packets.EnhancedInformationFrameBuilder( self._dcid, tx_seq, f, req_seq, sar, payload) self._acl.send(frame.Serialize()) Loading Loading @@ -82,7 +96,7 @@ class CertL2cap(Closable): def connect_acl(self, remote_addr): self._acl = self._acl_manager.initiate_connection(remote_addr) self._acl.wait_for_connection_complete() self.control_channel = CertL2capChannel(self._device, 1, self.control_channel = CertL2capChannel(self._device, 1, 1, self.get_acl_stream(), self._acl) self.get_acl_stream().register_callback(self._handle_control_packet) Loading @@ -91,10 +105,11 @@ class CertL2cap(Closable): self.control_channel.send( l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid)) assertThat(self.control_channel).emits( L2capMatchers.ConnectionResponse(scid)) return CertL2capChannel(self._device, scid, self.get_acl_stream(), self._acl) response = L2capCaptures.ConnectionResponse(scid) assertThat(self.control_channel).emits(response) return CertL2capChannel(self._device, scid, response.get().GetDestinationCid(), self.get_acl_stream(), self._acl) # prefer to use channel abstraction instead, if at all possible def send_acl(self, packet): Loading system/gd/l2cap/classic/cert/l2cap_test.py +57 −77 Original line number Diff line number Diff line Loading @@ -32,6 +32,8 @@ from neighbor.facade import facade_pb2 as neighbor_facade from hci.facade import acl_manager_facade_pb2 as acl_manager_facade import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import hci_packets, l2cap_packets from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly from bluetooth_packets_python3.l2cap_packets import Final from bluetooth_packets_python3.l2cap_packets import CommandCode from cert_l2cap import CertL2cap Loading Loading @@ -146,12 +148,11 @@ class L2capTest(GdFacadeOnlyBaseTestClass): self.dut_channel.send(b'abc' * 34) assertThat(self.cert_channel).emits( L2capMatchers.PartialData(b'abc' * 34)) L2capMatchers.InformationFrame(tx_seq=0, payload=b'abc' * 34)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 0, l2cap_packets.Final.NOT_SET, 1, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) # todo verify received? def test_basic_operation_request_connection(self): """ Loading Loading @@ -460,7 +461,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): L2capMatchers.ConfigurationRequest()).inAnyOrder() self.dut_channel.send(b'abc') assertThat(self.cert_channel).emits(L2capMatchers.PartialData(b'abc')) assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=0, payload=b'abc')) def test_explicitly_request_use_FCS(self): """ Loading Loading @@ -540,31 +542,28 @@ class L2capTest(GdFacadeOnlyBaseTestClass): L2capMatchers.ConfigurationRequest()).inAnyOrder() self.dut_channel.send(b'abc') assertThat(self.cert_channel).emits(L2capMatchers.PartialData(b"abc")) assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=0, payload=b"abc")) # Assemble a sample packet. TODO: Use RawBuilder SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 0, l2cap_packets.Final.NOT_SET, 1, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) # todo: verify packet received? self.cert_channel.send_i_frame( tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) self.dut_channel.send(b'abc') assertThat(self.cert_channel).emits(L2capMatchers.PartialData(b"abc")) assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=1, payload=b"abc")) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 1, l2cap_packets.Final.NOT_SET, 2, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=1, req_seq=2, payload=SAMPLE_PACKET) self.dut_channel.send(b'abc') assertThat(self.cert_channel).emits(L2capMatchers.PartialData(b"abc")) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 2, l2cap_packets.Final.NOT_SET, 3, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=2, req_seq=3, payload=SAMPLE_PACKET) def test_receive_i_frames(self): """ Loading @@ -590,32 +589,32 @@ class L2capTest(GdFacadeOnlyBaseTestClass): L2capMatchers.ConfigurationRequest()).inAnyOrder() for i in range(3): i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, i, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=i, req_seq=0, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(req_seq=i + 1)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 3, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.START, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=3, req_seq=0, sar=SegmentationAndReassembly.START, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(req_seq=4)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 4, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.CONTINUATION, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=4, req_seq=0, sar=SegmentationAndReassembly.CONTINUATION, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(req_seq=5)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 5, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.END, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=5, req_seq=0, sar=SegmentationAndReassembly.END, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(req_seq=6)) Loading Loading @@ -643,11 +642,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): L2capMatchers.ConfigurationRequest()).inAnyOrder() for i in range(3): i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, i, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=i, req_seq=0, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(req_seq=i + 1)) Loading Loading @@ -682,11 +678,10 @@ class L2capTest(GdFacadeOnlyBaseTestClass): self.dut_channel.send(b'abc') self.dut_channel.send(b'def') # TODO: Besides checking TxSeq, we also want to chpacketpayload, once we can get it from packet view assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=0)) L2capMatchers.InformationFrame(tx_seq=0, payload=b'abc')) assertThat(self.cert_channel).emitsNone( L2capMatchers.InformationFrame(tx_seq=1)) L2capMatchers.InformationFrame(tx_seq=1, payload=b'def')) s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY, l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 1) Loading Loading @@ -722,24 +717,20 @@ class L2capTest(GdFacadeOnlyBaseTestClass): self.dut_channel.send(b'def') assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=0)) L2capMatchers.InformationFrame(tx_seq=0, payload=b'abc')) # TODO: If 1 second is greater than their retransmit timeout, use a smaller timeout assertThat(self.cert_channel).emitsNone( L2capMatchers.InformationFrame(tx_seq=1), L2capMatchers.InformationFrame(tx_seq=1, payload=b'abc'), timeout=timedelta(seconds=1)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 0, l2cap_packets.Final.NOT_SET, 1, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=1)) L2capMatchers.InformationFrame(tx_seq=1, payload=b'def')) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 1, l2cap_packets.Final.NOT_SET, 2, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=1, req_seq=2, payload=SAMPLE_PACKET) def test_transmit_s_frame_rr_with_poll_bit_set(self): """ Loading Loading @@ -971,10 +962,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(p=l2cap_packets.Poll.POLL)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=0, req_seq=0, f=Final.POLL_RESPONSE, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=0)) Loading Loading @@ -1041,17 +1030,13 @@ class L2capTest(GdFacadeOnlyBaseTestClass): dcid = self.cert_l2cap.get_dcid(scid) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 0, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=0, req_seq=0, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(req_seq=1)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, ertm_tx_window_size - 1, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=ertm_tx_window_size - 1, req_seq=0, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame( s=l2cap_packets.SupervisoryFunction.REJECT)) Loading @@ -1065,11 +1050,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): L2capMatchers.SupervisoryFrame( req_seq=1, f=l2cap_packets.Final.POLL_RESPONSE)) for i in range(1, ertm_tx_window_size): i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, i, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=i, req_seq=0, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(req_seq=i + 1)) Loading Loading @@ -1214,10 +1196,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): assertThat(self.cert_channel).emitsNone(timeout=timedelta(seconds=0.5)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=0, req_seq=0, f=Final.POLL_RESPONSE, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=0)) Loading Loading
system/gd/cert/captures.py +18 −0 Original line number Diff line number Diff line Loading @@ -16,7 +16,10 @@ import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import hci_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode from cert.capture import Capture from cert.matchers import L2capMatchers def ReadBdAddrCompleteCapture(): Loading @@ -42,3 +45,18 @@ def ConnectionCompleteCapture(): hci_packets.EventPacketView( bt_packets.PacketViewLittleEndian( list(packet.event))))) class L2capCaptures(object): @staticmethod def ConnectionResponse(scid): return Capture( L2capMatchers.ConnectionResponse(scid), L2capCaptures._extract_connection_response) @staticmethod def _extract_connection_response(packet): frame = L2capMatchers.control_frame_with_code( packet, CommandCode.CONNECTION_RESPONSE) return l2cap_packets.ConnectionResponseView(frame)
system/gd/cert/matchers.py +5 −6 Original line number Diff line number Diff line Loading @@ -107,8 +107,7 @@ class L2capMatchers(object): return False if tx_seq is not None and frame.GetTxSeq() != tx_seq: return False if payload is not None and frame.GetPayload( ) != payload: # TODO(mylesgw) this doesn't work if payload is not None and frame.GetPayload().GetBytes() != payload: return False return True Loading @@ -134,7 +133,7 @@ class L2capMatchers(object): return l2cap_packets.ControlView(packet.GetPayload()) @staticmethod def _control_frame_with_code(packet, code): def control_frame_with_code(packet, code): frame = L2capMatchers._control_frame(packet) if frame is None or frame.GetCode() != code: return None Loading @@ -142,11 +141,11 @@ class L2capMatchers(object): @staticmethod def _is_control_frame_with_code(packet, code): return L2capMatchers._control_frame_with_code(packet, code) is not None return L2capMatchers.control_frame_with_code(packet, code) is not None @staticmethod def _is_matching_connection_response(packet, scid): frame = L2capMatchers._control_frame_with_code( frame = L2capMatchers.control_frame_with_code( packet, CommandCode.CONNECTION_RESPONSE) if frame is None: return False Loading @@ -157,7 +156,7 @@ class L2capMatchers(object): @staticmethod def _is_matching_disconnection_response(packet, scid, dcid): frame = L2capMatchers._control_frame_with_code( frame = L2capMatchers.control_frame_with_code( packet, CommandCode.DISCONNECTION_RESPONSE) if frame is None: return False Loading
system/gd/l2cap/classic/cert/cert_l2cap.py +22 −7 Original line number Diff line number Diff line Loading @@ -21,16 +21,20 @@ from cert.truth import assertThat import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import l2cap_packets from bluetooth_packets_python3.l2cap_packets import CommandCode from bluetooth_packets_python3.l2cap_packets import Final from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly from cert.event_stream import FilteringEventStream from cert.event_stream import IEventStream from cert.matchers import L2capMatchers from cert.captures import L2capCaptures class CertL2capChannel(IEventStream): def __init__(self, device, scid, acl_stream, acl): def __init__(self, device, scid, dcid, acl_stream, acl): self._device = device self._scid = scid self._dcid = dcid self._acl_stream = acl_stream self._acl = acl self._our_acl_view = FilteringEventStream( Loading @@ -40,7 +44,17 @@ class CertL2capChannel(IEventStream): return self._our_acl_view.get_event_queue() def send(self, packet): frame = l2cap_packets.BasicFrameBuilder(self._scid, packet) frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet) self._acl.send(frame.Serialize()) def send_i_frame(self, tx_seq, req_seq, f=Final.NOT_SET, sar=SegmentationAndReassembly.UNSEGMENTED, payload=None): frame = l2cap_packets.EnhancedInformationFrameBuilder( self._dcid, tx_seq, f, req_seq, sar, payload) self._acl.send(frame.Serialize()) Loading Loading @@ -82,7 +96,7 @@ class CertL2cap(Closable): def connect_acl(self, remote_addr): self._acl = self._acl_manager.initiate_connection(remote_addr) self._acl.wait_for_connection_complete() self.control_channel = CertL2capChannel(self._device, 1, self.control_channel = CertL2capChannel(self._device, 1, 1, self.get_acl_stream(), self._acl) self.get_acl_stream().register_callback(self._handle_control_packet) Loading @@ -91,10 +105,11 @@ class CertL2cap(Closable): self.control_channel.send( l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid)) assertThat(self.control_channel).emits( L2capMatchers.ConnectionResponse(scid)) return CertL2capChannel(self._device, scid, self.get_acl_stream(), self._acl) response = L2capCaptures.ConnectionResponse(scid) assertThat(self.control_channel).emits(response) return CertL2capChannel(self._device, scid, response.get().GetDestinationCid(), self.get_acl_stream(), self._acl) # prefer to use channel abstraction instead, if at all possible def send_acl(self, packet): Loading
system/gd/l2cap/classic/cert/l2cap_test.py +57 −77 Original line number Diff line number Diff line Loading @@ -32,6 +32,8 @@ from neighbor.facade import facade_pb2 as neighbor_facade from hci.facade import acl_manager_facade_pb2 as acl_manager_facade import bluetooth_packets_python3 as bt_packets from bluetooth_packets_python3 import hci_packets, l2cap_packets from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly from bluetooth_packets_python3.l2cap_packets import Final from bluetooth_packets_python3.l2cap_packets import CommandCode from cert_l2cap import CertL2cap Loading Loading @@ -146,12 +148,11 @@ class L2capTest(GdFacadeOnlyBaseTestClass): self.dut_channel.send(b'abc' * 34) assertThat(self.cert_channel).emits( L2capMatchers.PartialData(b'abc' * 34)) L2capMatchers.InformationFrame(tx_seq=0, payload=b'abc' * 34)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 0, l2cap_packets.Final.NOT_SET, 1, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) # todo verify received? def test_basic_operation_request_connection(self): """ Loading Loading @@ -460,7 +461,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): L2capMatchers.ConfigurationRequest()).inAnyOrder() self.dut_channel.send(b'abc') assertThat(self.cert_channel).emits(L2capMatchers.PartialData(b'abc')) assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=0, payload=b'abc')) def test_explicitly_request_use_FCS(self): """ Loading Loading @@ -540,31 +542,28 @@ class L2capTest(GdFacadeOnlyBaseTestClass): L2capMatchers.ConfigurationRequest()).inAnyOrder() self.dut_channel.send(b'abc') assertThat(self.cert_channel).emits(L2capMatchers.PartialData(b"abc")) assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=0, payload=b"abc")) # Assemble a sample packet. TODO: Use RawBuilder SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 0, l2cap_packets.Final.NOT_SET, 1, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) # todo: verify packet received? self.cert_channel.send_i_frame( tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) self.dut_channel.send(b'abc') assertThat(self.cert_channel).emits(L2capMatchers.PartialData(b"abc")) assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=1, payload=b"abc")) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 1, l2cap_packets.Final.NOT_SET, 2, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=1, req_seq=2, payload=SAMPLE_PACKET) self.dut_channel.send(b'abc') assertThat(self.cert_channel).emits(L2capMatchers.PartialData(b"abc")) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 2, l2cap_packets.Final.NOT_SET, 3, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=2, req_seq=3, payload=SAMPLE_PACKET) def test_receive_i_frames(self): """ Loading @@ -590,32 +589,32 @@ class L2capTest(GdFacadeOnlyBaseTestClass): L2capMatchers.ConfigurationRequest()).inAnyOrder() for i in range(3): i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, i, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=i, req_seq=0, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(req_seq=i + 1)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 3, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.START, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=3, req_seq=0, sar=SegmentationAndReassembly.START, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(req_seq=4)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 4, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.CONTINUATION, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=4, req_seq=0, sar=SegmentationAndReassembly.CONTINUATION, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(req_seq=5)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 5, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.END, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=5, req_seq=0, sar=SegmentationAndReassembly.END, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(req_seq=6)) Loading Loading @@ -643,11 +642,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): L2capMatchers.ConfigurationRequest()).inAnyOrder() for i in range(3): i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, i, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=i, req_seq=0, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(req_seq=i + 1)) Loading Loading @@ -682,11 +678,10 @@ class L2capTest(GdFacadeOnlyBaseTestClass): self.dut_channel.send(b'abc') self.dut_channel.send(b'def') # TODO: Besides checking TxSeq, we also want to chpacketpayload, once we can get it from packet view assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=0)) L2capMatchers.InformationFrame(tx_seq=0, payload=b'abc')) assertThat(self.cert_channel).emitsNone( L2capMatchers.InformationFrame(tx_seq=1)) L2capMatchers.InformationFrame(tx_seq=1, payload=b'def')) s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY, l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 1) Loading Loading @@ -722,24 +717,20 @@ class L2capTest(GdFacadeOnlyBaseTestClass): self.dut_channel.send(b'def') assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=0)) L2capMatchers.InformationFrame(tx_seq=0, payload=b'abc')) # TODO: If 1 second is greater than their retransmit timeout, use a smaller timeout assertThat(self.cert_channel).emitsNone( L2capMatchers.InformationFrame(tx_seq=1), L2capMatchers.InformationFrame(tx_seq=1, payload=b'abc'), timeout=timedelta(seconds=1)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 0, l2cap_packets.Final.NOT_SET, 1, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=1)) L2capMatchers.InformationFrame(tx_seq=1, payload=b'def')) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 1, l2cap_packets.Final.NOT_SET, 2, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=1, req_seq=2, payload=SAMPLE_PACKET) def test_transmit_s_frame_rr_with_poll_bit_set(self): """ Loading Loading @@ -971,10 +962,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(p=l2cap_packets.Poll.POLL)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=0, req_seq=0, f=Final.POLL_RESPONSE, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=0)) Loading Loading @@ -1041,17 +1030,13 @@ class L2capTest(GdFacadeOnlyBaseTestClass): dcid = self.cert_l2cap.get_dcid(scid) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 0, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=0, req_seq=0, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(req_seq=1)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, ertm_tx_window_size - 1, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=ertm_tx_window_size - 1, req_seq=0, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame( s=l2cap_packets.SupervisoryFunction.REJECT)) Loading @@ -1065,11 +1050,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): L2capMatchers.SupervisoryFrame( req_seq=1, f=l2cap_packets.Final.POLL_RESPONSE)) for i in range(1, ertm_tx_window_size): i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, i, l2cap_packets.Final.NOT_SET, 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=i, req_seq=0, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.SupervisoryFrame(req_seq=i + 1)) Loading Loading @@ -1214,10 +1196,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): assertThat(self.cert_channel).emitsNone(timeout=timedelta(seconds=0.5)) i_frame = l2cap_packets.EnhancedInformationFrameBuilder( dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) self.cert_send_b_frame(i_frame) self.cert_channel.send_i_frame( tx_seq=0, req_seq=0, f=Final.POLL_RESPONSE, payload=SAMPLE_PACKET) assertThat(self.cert_channel).emits( L2capMatchers.InformationFrame(tx_seq=0)) Loading