Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 238f69fe authored by Hansong Zhang's avatar Hansong Zhang Committed by Automerger Merge Worker
Browse files

Python Packet: Add RawBuilder am: 28c34695

Change-Id: Icbe7323338f952c5ab857bd5cf1249206786337c
parents d752a72f 28c34695
Loading
Loading
Loading
Loading
+6 −10
Original line number Diff line number Diff line
@@ -41,8 +41,8 @@ from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType
from l2cap.classic.cert.cert_l2cap import CertL2cap
from l2cap.classic.facade_pb2 import RetransmissionFlowControlMode

# Assemble a sample packet. TODO: Use RawBuilder
SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1)
# Assemble a sample packet.
SAMPLE_PACKET = bt_packets.RawBuilder([0x19, 0x26, 0x08, 0x17])


class L2capTest(GdBaseTestClass):
@@ -295,13 +295,12 @@ class L2capTest(GdBaseTestClass):
        """
        self._setup_link_from_cert()

        asserts.skip("Need to use packet builders (RawBuilder)")

        # TODO(hsz): Use packet builders with opcode 0xff, sid 0x1, size 0x0
        invalid_command_packet = b"\xff\x01\x00\x00"
        # Command code ff, Signal id 01, size 0000
        invalid_command_packet = bt_packets.RawBuilder([0xff, 0x01, 0x00, 0x00])
        self.cert_l2cap.get_control_channel().send(invalid_command_packet)

        assertThat(self.cert_channel).emits(L2capMatchers.CommandReject())
        assertThat(self.cert_l2cap.get_control_channel()).emits(
            L2capMatchers.CommandReject())

    def test_respond_with_1_2_features(self):
        """
@@ -434,9 +433,6 @@ class L2capTest(GdBaseTestClass):
        assertThat(cert_channel).emits(
            L2capMatchers.IFrame(tx_seq=0, payload=b"abc"))

        # Assemble a sample packet. TODO: Use RawBuilder
        SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1)

        # todo: verify packet received?
        cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET)

+22 −23
Original line number Diff line number Diff line
@@ -36,8 +36,8 @@ from bluetooth_packets_python3 import hci_packets, l2cap_packets
from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult
from l2cap.le.cert.cert_le_l2cap import CertLeL2cap

# Assemble a sample packet. TODO: Use RawBuilder
SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1)
# Assemble a sample packet.
SAMPLE_PACKET = bt_packets.RawBuilder([0x19, 0x26, 0x08, 0x17])


class LeL2capTest(GdBaseTestClass):
@@ -164,7 +164,7 @@ class LeL2capTest(GdBaseTestClass):
        (dut_channel, cert_channel) = self._open_fixed_channel(4)
        cert_channel.send(SAMPLE_PACKET)
        assertThat(dut_channel).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'))
            L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'))

    def test_connect_from_dut_and_open_dynamic_channel(self):
        """
@@ -262,12 +262,12 @@ class LeL2capTest(GdBaseTestClass):
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        sdu_size_for_two_sample_packet = 12
        sdu_size_for_two_sample_packet = 8
        cert_channel.send_first_le_i_frame(sdu_size_for_two_sample_packet,
                                           SAMPLE_PACKET)
        cert_channel.send(SAMPLE_PACKET)
        assertThat(dut_channel).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00' * 2))
            L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17' * 2))

    def test_data_receiving(self):
        """
@@ -275,15 +275,15 @@ class LeL2capTest(GdBaseTestClass):
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_channel_from_cert()
        cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET)
        cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET)
        assertThat(dut_channel).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'))
            L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'))

    def test_data_receiving_dut_is_master(self):
        (dut_channel, cert_channel) = self._set_link_from_dut_and_open_channel()
        cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET)
        cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET)
        assertThat(dut_channel).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'))
            L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'))

    def test_multiple_channels_with_interleaved_data_streams(self):
        """
@@ -296,22 +296,21 @@ class LeL2capTest(GdBaseTestClass):
            signal_id=2, scid=0x0105, psm=0x35)
        (dut_channel_z, cert_channel_z) = self._open_channel_from_cert(
            signal_id=3, scid=0x0107, psm=0x37)
        cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET)
        cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET)
        cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET)
        cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET)
        cert_channel_y.send_first_le_i_frame(6, SAMPLE_PACKET)
        cert_channel_y.send_first_le_i_frame(4, SAMPLE_PACKET)
        cert_channel_z.send_first_le_i_frame(4, SAMPLE_PACKET)
        cert_channel_y.send_first_le_i_frame(4, SAMPLE_PACKET)
        cert_channel_z.send_first_le_i_frame(4, SAMPLE_PACKET)
        cert_channel_y.send_first_le_i_frame(4, SAMPLE_PACKET)
        # TODO: We should assert two events in order, but it got stuck
        assertThat(dut_channel_y).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'),
            L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'),
            at_least_times=3)
        assertThat(dut_channel_z).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'),
            L2capMatchers.PacketPayloadRawData(
                b'\x01\x01\x02\x00\x00\x00')).inOrder()
        cert_channel_z.send_first_le_i_frame(6, SAMPLE_PACKET)
            L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'),
            L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')).inOrder()
        cert_channel_z.send_first_le_i_frame(4, SAMPLE_PACKET)
        assertThat(dut_channel_z).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'))
            L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'))

    def test_reject_unknown_command_in_le_sigling_channel(self):
        """
@@ -351,9 +350,9 @@ class LeL2capTest(GdBaseTestClass):
        """
        self._setup_link_from_cert()
        (dut_channel, cert_channel) = self._open_channel_from_dut()
        cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET)
        cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET)
        assertThat(dut_channel).emits(
            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'))
            L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17'))

    def test_credit_based_connection_response_on_supported_le_psm(self):
        """
@@ -422,7 +421,7 @@ class LeL2capTest(GdBaseTestClass):
        # test without sending too many packets (may take too long).
        # This behavior is not expected for all Bluetooth stacks.
        for _ in range(min(credits + 1, 3)):
            cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET)
            cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET)
        self.cert_l2cap.verify_le_flow_control_credit(cert_channel)

    def test_disconnection_request(self):
+11 −0
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@
#include "packet/packet_view.h"
#include "packet/parser/checksum_type_checker.h"
#include "packet/parser/custom_type_checker.h"
#include "packet/raw_builder.h"

namespace py = pybind11;

@@ -57,10 +58,20 @@ using ::bluetooth::packet::kLittleEndian;
using ::bluetooth::packet::PacketBuilder;
using ::bluetooth::packet::PacketStruct;
using ::bluetooth::packet::PacketView;
using ::bluetooth::packet::RawBuilder;
using ::bluetooth::packet::parser::ChecksumTypeChecker;

PYBIND11_MODULE(bluetooth_packets_python3, m) {
  py::class_<BasePacketBuilder, std::shared_ptr<BasePacketBuilder>>(m, "BasePacketBuilder");
  py::class_<RawBuilder, BasePacketBuilder, std::shared_ptr<RawBuilder>>(m, "RawBuilder")
      .def(py::init([](std::vector<uint8_t> bytes) { return std::make_unique<RawBuilder>(bytes); }))
      .def("Serialize", [](RawBuilder& builder) {
        std::vector<uint8_t> packet;
        BitInserter it(packet);
        builder.Serialize(it);
        std::string result = std::string(packet.begin(), packet.end());
        return py::bytes(result);
      });
  py::class_<PacketBuilder<kLittleEndian>, BasePacketBuilder, std::shared_ptr<PacketBuilder<kLittleEndian>>>(
      m, "PacketBuilderLittleEndian");
  py::class_<PacketBuilder<!kLittleEndian>, BasePacketBuilder, std::shared_ptr<PacketBuilder<!kLittleEndian>>>(