Loading system/gd/l2cap/classic/cert/l2cap_test.py +6 −10 Original line number Diff line number Diff line Loading @@ -41,8 +41,8 @@ from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType from l2cap.classic.cert.cert_l2cap import CertL2cap from l2cap.classic.facade_pb2 import RetransmissionFlowControlMode # Assemble a sample packet. TODO: Use RawBuilder SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1) # Assemble a sample packet. SAMPLE_PACKET = bt_packets.RawBuilder([0x19, 0x26, 0x08, 0x17]) class L2capTest(GdBaseTestClass): Loading Loading @@ -295,13 +295,12 @@ class L2capTest(GdBaseTestClass): """ self._setup_link_from_cert() asserts.skip("Need to use packet builders (RawBuilder)") # TODO(hsz): Use packet builders with opcode 0xff, sid 0x1, size 0x0 invalid_command_packet = b"\xff\x01\x00\x00" # Command code ff, Signal id 01, size 0000 invalid_command_packet = bt_packets.RawBuilder([0xff, 0x01, 0x00, 0x00]) self.cert_l2cap.get_control_channel().send(invalid_command_packet) assertThat(self.cert_channel).emits(L2capMatchers.CommandReject()) assertThat(self.cert_l2cap.get_control_channel()).emits( L2capMatchers.CommandReject()) def test_respond_with_1_2_features(self): """ Loading Loading @@ -434,9 +433,6 @@ class L2capTest(GdBaseTestClass): assertThat(cert_channel).emits( L2capMatchers.IFrame(tx_seq=0, payload=b"abc")) # Assemble a sample packet. TODO: Use RawBuilder SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1) # todo: verify packet received? cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) Loading system/gd/l2cap/le/cert/le_l2cap_test.py +22 −23 Original line number Diff line number Diff line Loading @@ -36,8 +36,8 @@ from bluetooth_packets_python3 import hci_packets, l2cap_packets from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult from l2cap.le.cert.cert_le_l2cap import CertLeL2cap # Assemble a sample packet. TODO: Use RawBuilder SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1) # Assemble a sample packet. SAMPLE_PACKET = bt_packets.RawBuilder([0x19, 0x26, 0x08, 0x17]) class LeL2capTest(GdBaseTestClass): Loading Loading @@ -164,7 +164,7 @@ class LeL2capTest(GdBaseTestClass): (dut_channel, cert_channel) = self._open_fixed_channel(4) cert_channel.send(SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) def test_connect_from_dut_and_open_dynamic_channel(self): """ Loading Loading @@ -262,12 +262,12 @@ class LeL2capTest(GdBaseTestClass): """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() sdu_size_for_two_sample_packet = 12 sdu_size_for_two_sample_packet = 8 cert_channel.send_first_le_i_frame(sdu_size_for_two_sample_packet, SAMPLE_PACKET) cert_channel.send(SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00' * 2)) L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17' * 2)) def test_data_receiving(self): """ Loading @@ -275,15 +275,15 @@ class LeL2capTest(GdBaseTestClass): """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) def test_data_receiving_dut_is_master(self): (dut_channel, cert_channel) = self._set_link_from_dut_and_open_channel() cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) def test_multiple_channels_with_interleaved_data_streams(self): """ Loading @@ -296,22 +296,21 @@ class LeL2capTest(GdBaseTestClass): signal_id=2, scid=0x0105, psm=0x35) (dut_channel_z, cert_channel_z) = self._open_channel_from_cert( signal_id=3, scid=0x0107, psm=0x37) cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_y.send_first_le_i_frame(4, SAMPLE_PACKET) cert_channel_z.send_first_le_i_frame(4, SAMPLE_PACKET) cert_channel_y.send_first_le_i_frame(4, SAMPLE_PACKET) cert_channel_z.send_first_le_i_frame(4, SAMPLE_PACKET) cert_channel_y.send_first_le_i_frame(4, SAMPLE_PACKET) # TODO: We should assert two events in order, but it got stuck assertThat(dut_channel_y).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'), L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'), at_least_times=3) assertThat(dut_channel_z).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'), L2capMatchers.PacketPayloadRawData( b'\x01\x01\x02\x00\x00\x00')).inOrder() cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET) L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'), L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')).inOrder() cert_channel_z.send_first_le_i_frame(4, SAMPLE_PACKET) assertThat(dut_channel_z).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) def test_reject_unknown_command_in_le_sigling_channel(self): """ Loading Loading @@ -351,9 +350,9 @@ class LeL2capTest(GdBaseTestClass): """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_dut() cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) def test_credit_based_connection_response_on_supported_le_psm(self): """ Loading Loading @@ -422,7 +421,7 @@ class LeL2capTest(GdBaseTestClass): # test without sending too many packets (may take too long). # This behavior is not expected for all Bluetooth stacks. for _ in range(min(credits + 1, 3)): cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET) self.cert_l2cap.verify_le_flow_control_credit(cert_channel) def test_disconnection_request(self): Loading system/gd/packet/python3_module.cc +11 −0 Original line number Diff line number Diff line Loading @@ -29,6 +29,7 @@ #include "packet/packet_view.h" #include "packet/parser/checksum_type_checker.h" #include "packet/parser/custom_type_checker.h" #include "packet/raw_builder.h" namespace py = pybind11; Loading Loading @@ -57,10 +58,20 @@ using ::bluetooth::packet::kLittleEndian; using ::bluetooth::packet::PacketBuilder; using ::bluetooth::packet::PacketStruct; using ::bluetooth::packet::PacketView; using ::bluetooth::packet::RawBuilder; using ::bluetooth::packet::parser::ChecksumTypeChecker; PYBIND11_MODULE(bluetooth_packets_python3, m) { py::class_<BasePacketBuilder, std::shared_ptr<BasePacketBuilder>>(m, "BasePacketBuilder"); py::class_<RawBuilder, BasePacketBuilder, std::shared_ptr<RawBuilder>>(m, "RawBuilder") .def(py::init([](std::vector<uint8_t> bytes) { return std::make_unique<RawBuilder>(bytes); })) .def("Serialize", [](RawBuilder& builder) { std::vector<uint8_t> packet; BitInserter it(packet); builder.Serialize(it); std::string result = std::string(packet.begin(), packet.end()); return py::bytes(result); }); py::class_<PacketBuilder<kLittleEndian>, BasePacketBuilder, std::shared_ptr<PacketBuilder<kLittleEndian>>>( m, "PacketBuilderLittleEndian"); py::class_<PacketBuilder<!kLittleEndian>, BasePacketBuilder, std::shared_ptr<PacketBuilder<!kLittleEndian>>>( Loading Loading
system/gd/l2cap/classic/cert/l2cap_test.py +6 −10 Original line number Diff line number Diff line Loading @@ -41,8 +41,8 @@ from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType from l2cap.classic.cert.cert_l2cap import CertL2cap from l2cap.classic.facade_pb2 import RetransmissionFlowControlMode # Assemble a sample packet. TODO: Use RawBuilder SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1) # Assemble a sample packet. SAMPLE_PACKET = bt_packets.RawBuilder([0x19, 0x26, 0x08, 0x17]) class L2capTest(GdBaseTestClass): Loading Loading @@ -295,13 +295,12 @@ class L2capTest(GdBaseTestClass): """ self._setup_link_from_cert() asserts.skip("Need to use packet builders (RawBuilder)") # TODO(hsz): Use packet builders with opcode 0xff, sid 0x1, size 0x0 invalid_command_packet = b"\xff\x01\x00\x00" # Command code ff, Signal id 01, size 0000 invalid_command_packet = bt_packets.RawBuilder([0xff, 0x01, 0x00, 0x00]) self.cert_l2cap.get_control_channel().send(invalid_command_packet) assertThat(self.cert_channel).emits(L2capMatchers.CommandReject()) assertThat(self.cert_l2cap.get_control_channel()).emits( L2capMatchers.CommandReject()) def test_respond_with_1_2_features(self): """ Loading Loading @@ -434,9 +433,6 @@ class L2capTest(GdBaseTestClass): assertThat(cert_channel).emits( L2capMatchers.IFrame(tx_seq=0, payload=b"abc")) # Assemble a sample packet. TODO: Use RawBuilder SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1) # todo: verify packet received? cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) Loading
system/gd/l2cap/le/cert/le_l2cap_test.py +22 −23 Original line number Diff line number Diff line Loading @@ -36,8 +36,8 @@ from bluetooth_packets_python3 import hci_packets, l2cap_packets from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult from l2cap.le.cert.cert_le_l2cap import CertLeL2cap # Assemble a sample packet. TODO: Use RawBuilder SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1) # Assemble a sample packet. SAMPLE_PACKET = bt_packets.RawBuilder([0x19, 0x26, 0x08, 0x17]) class LeL2capTest(GdBaseTestClass): Loading Loading @@ -164,7 +164,7 @@ class LeL2capTest(GdBaseTestClass): (dut_channel, cert_channel) = self._open_fixed_channel(4) cert_channel.send(SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) def test_connect_from_dut_and_open_dynamic_channel(self): """ Loading Loading @@ -262,12 +262,12 @@ class LeL2capTest(GdBaseTestClass): """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() sdu_size_for_two_sample_packet = 12 sdu_size_for_two_sample_packet = 8 cert_channel.send_first_le_i_frame(sdu_size_for_two_sample_packet, SAMPLE_PACKET) cert_channel.send(SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00' * 2)) L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17' * 2)) def test_data_receiving(self): """ Loading @@ -275,15 +275,15 @@ class LeL2capTest(GdBaseTestClass): """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) def test_data_receiving_dut_is_master(self): (dut_channel, cert_channel) = self._set_link_from_dut_and_open_channel() cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) def test_multiple_channels_with_interleaved_data_streams(self): """ Loading @@ -296,22 +296,21 @@ class LeL2capTest(GdBaseTestClass): signal_id=2, scid=0x0105, psm=0x35) (dut_channel_z, cert_channel_z) = self._open_channel_from_cert( signal_id=3, scid=0x0107, psm=0x37) cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel_y.send_first_le_i_frame(4, SAMPLE_PACKET) cert_channel_z.send_first_le_i_frame(4, SAMPLE_PACKET) cert_channel_y.send_first_le_i_frame(4, SAMPLE_PACKET) cert_channel_z.send_first_le_i_frame(4, SAMPLE_PACKET) cert_channel_y.send_first_le_i_frame(4, SAMPLE_PACKET) # TODO: We should assert two events in order, but it got stuck assertThat(dut_channel_y).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'), L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'), at_least_times=3) assertThat(dut_channel_z).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'), L2capMatchers.PacketPayloadRawData( b'\x01\x01\x02\x00\x00\x00')).inOrder() cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET) L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'), L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')).inOrder() cert_channel_z.send_first_le_i_frame(4, SAMPLE_PACKET) assertThat(dut_channel_z).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) def test_reject_unknown_command_in_le_sigling_channel(self): """ Loading Loading @@ -351,9 +350,9 @@ class LeL2capTest(GdBaseTestClass): """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_dut() cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00')) L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) def test_credit_based_connection_response_on_supported_le_psm(self): """ Loading Loading @@ -422,7 +421,7 @@ class LeL2capTest(GdBaseTestClass): # test without sending too many packets (may take too long). # This behavior is not expected for all Bluetooth stacks. for _ in range(min(credits + 1, 3)): cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET) cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET) self.cert_l2cap.verify_le_flow_control_credit(cert_channel) def test_disconnection_request(self): Loading
system/gd/packet/python3_module.cc +11 −0 Original line number Diff line number Diff line Loading @@ -29,6 +29,7 @@ #include "packet/packet_view.h" #include "packet/parser/checksum_type_checker.h" #include "packet/parser/custom_type_checker.h" #include "packet/raw_builder.h" namespace py = pybind11; Loading Loading @@ -57,10 +58,20 @@ using ::bluetooth::packet::kLittleEndian; using ::bluetooth::packet::PacketBuilder; using ::bluetooth::packet::PacketStruct; using ::bluetooth::packet::PacketView; using ::bluetooth::packet::RawBuilder; using ::bluetooth::packet::parser::ChecksumTypeChecker; PYBIND11_MODULE(bluetooth_packets_python3, m) { py::class_<BasePacketBuilder, std::shared_ptr<BasePacketBuilder>>(m, "BasePacketBuilder"); py::class_<RawBuilder, BasePacketBuilder, std::shared_ptr<RawBuilder>>(m, "RawBuilder") .def(py::init([](std::vector<uint8_t> bytes) { return std::make_unique<RawBuilder>(bytes); })) .def("Serialize", [](RawBuilder& builder) { std::vector<uint8_t> packet; BitInserter it(packet); builder.Serialize(it); std::string result = std::string(packet.begin(), packet.end()); return py::bytes(result); }); py::class_<PacketBuilder<kLittleEndian>, BasePacketBuilder, std::shared_ptr<PacketBuilder<kLittleEndian>>>( m, "PacketBuilderLittleEndian"); py::class_<PacketBuilder<!kLittleEndian>, BasePacketBuilder, std::shared_ptr<PacketBuilder<!kLittleEndian>>>( Loading