Loading system/gd/cert/all_cert_testcases +1 −0 Original line number Diff line number Diff line Loading @@ -12,3 +12,4 @@ L2capTest LeL2capTest DualL2capTest LeSecurityTest L2capPerformanceTest system/gd/cert/performance_test_logger.py 0 → 100644 +71 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2020 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from datetime import datetime class PerformanceTestLogger(object): """ A helper class to log time points and intervals """ def __init__(self): self.base_timepoint = datetime.now() # We use a dictionary of a list of timepoints self.start_interval_points = {} self.end_interval_points = {} self.single_points = {} def log_single_point(self, label=""): if label not in self.single_points: self.single_points[label] = [] self.single_points[label].append(datetime.now()) def start_interval(self, label=""): if label not in self.start_interval_points: self.start_interval_points[label] = [] self.start_interval_points[label].append(datetime.now()) def end_interval(self, label=""): if label not in self.end_interval_points: self.end_interval_points[label] = [] self.end_interval_points[label].append(datetime.now()) def _check_interval_label(self, label): if label not in self.start_interval_points or label not in self.end_interval_points: raise KeyError("label %s doesn't exist" % label) if len(self.start_interval_points[label]) != len(self.end_interval_points[label]): raise KeyError("label %s doesn't have correct start and end log" % label) def get_duration_of_intervals(self, label): """ Return the list of duration of the intervals with specified label. """ self._check_interval_label(label) intervals = [] for i in range(len(self.start_interval_points[label])): interval = self.end_interval_points[label][i] - self.start_interval_points[label][i] intervals.append(interval.seconds * 1000 + interval.microseconds / 1000) return intervals def dump_intervals(self): """ Gives an iterator of (iterator of label, start, end) over all labels """ for label in self.start_interval_points: self._check_interval_label(label) yield ((label, self.start_interval_points[label][i], self.end_interval_points[label][i]) for i in range(len(self.start_interval_points[label]))) system/gd/l2cap/classic/cert/l2cap_performance_test.py 0 → 100644 +149 −0 Original line number Diff line number Diff line # # Copyright 2020 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from datetime import datetime, timedelta from bluetooth_packets_python3 import RawBuilder from cert.matchers import L2capMatchers from cert.truth import assertThat from cert.performance_test_logger import PerformanceTestLogger from l2cap.classic.cert.cert_l2cap import CertL2cap from l2cap.classic.cert.l2cap_test import L2capTestBase from l2cap.classic.facade_pb2 import RetransmissionFlowControlMode from bluetooth_packets_python3.l2cap_packets import FcsType from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction class L2capPerformanceTest(L2capTestBase): def setup_test(self): super().setup_test() self.performance_test_logger = PerformanceTestLogger() def teardown_test(self): super().teardown_test() def _basic_mode_tx(self, mtu, packets): self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() self.performance_test_logger.start_interval("TX") for _ in range(packets): dut_channel.send(b'a' * mtu) assertThat(cert_channel).emits( L2capMatchers.Data(b'a' * mtu), at_least_times=packets, timeout=timedelta(seconds=60)) self.performance_test_logger.end_interval("TX") duration = self.performance_test_logger.get_duration_of_intervals("TX")[0] self.log.info("Duration: %d" % duration) def _basic_mode_rx(self, mtu, packets): self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() self.performance_test_logger.start_interval("RX") data = b"a" * mtu data_packet = RawBuilder([x for x in data]) for _ in range(packets): cert_channel.send(data_packet) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(data), at_least_times=packets, timeout=timedelta(seconds=60)) self.performance_test_logger.end_interval("RX") duration = self.performance_test_logger.get_duration_of_intervals("RX")[0] self.log.info("Duration: %d" % duration) def _ertm_mode_tx(self, mtu, packets, tx_window_size=10): # Make sure that number of packets is a multiple of tx_window_size packets = packets // tx_window_size * tx_window_size # For ERTM TX test, we have to do it sequentially because cert needs to ack self._setup_link_from_cert() config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=tx_window_size) (dut_channel, cert_channel) = self._open_channel_from_cert( mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS, req_config_options=config, rsp_config_options=config) self.performance_test_logger.start_interval("TX") for i in range(packets): dut_channel.send(b'a' * mtu) if i % tx_window_size == tx_window_size - 1: assertThat(cert_channel).emits(L2capMatchers.IFrame(payload=b'a' * mtu), at_least_times=tx_window_size) cert_channel.send_s_frame(req_seq=(i + 1) % 64, s=SupervisoryFunction.RECEIVER_READY) self.performance_test_logger.end_interval("TX") duration = self.performance_test_logger.get_duration_of_intervals("TX")[0] self.log.info("Duration: %d" % duration) def _ertm_mode_rx(self, mtu, packets, tx_window_size=10): # Make sure that number of packets is a multiple of tx_window_size packets = packets // tx_window_size * tx_window_size self._setup_link_from_cert() config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=tx_window_size) (dut_channel, cert_channel) = self._open_channel_from_cert( mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS, req_config_options=config, rsp_config_options=config) data = b"a" * mtu data_packet = RawBuilder([x for x in data]) self.performance_test_logger.start_interval("RX") for i in range(packets): cert_channel.send_i_frame(tx_seq=i % 64, req_seq=0, payload=data_packet) if i % tx_window_size == (tx_window_size - 1): assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=(i + 1) % 64)) self.performance_test_logger.end_interval("RX") duration = self.performance_test_logger.get_duration_of_intervals("RX")[0] self.log.info("Duration: %d" % duration) def test_basic_mode_tx_672_100(self): self._basic_mode_tx(672, 100) def test_basic_mode_tx_100_100(self): self._basic_mode_tx(100, 100) def test_ertm_mode_tx_672_100(self): self._ertm_mode_tx(672, 100) def test_basic_mode_rx_672_100(self): self._basic_mode_rx(672, 100) def test_ertm_mode_rx_672_100(self): self._ertm_mode_rx(672, 100) def test_basic_mode_end_to_end_latency(self): self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() data = b"a" * 100 data_packet = RawBuilder([x for x in data]) for i in range(100): self.performance_test_logger.start_interval("RX") cert_channel.send(data_packet) assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(data)) self.performance_test_logger.end_interval("RX") duration = self.performance_test_logger.get_duration_of_intervals("RX") mean = sum(duration) / len(duration) self.log.info("Mean: %d" % mean) system/gd/l2cap/classic/cert/l2cap_test.py +8 −3 Original line number Diff line number Diff line Loading @@ -41,7 +41,7 @@ SAMPLE_PACKET_DATA = b"\x19\x26\x08\x17" SAMPLE_PACKET = RawBuilder([x for x in SAMPLE_PACKET_DATA]) class L2capTest(GdBaseTestClass): class L2capTestBase(GdBaseTestClass): def setup_class(self): super().setup_class(dut_module='L2CAP', cert_module='HCI_INTERFACES') Loading Loading @@ -122,6 +122,9 @@ class L2capTest(GdBaseTestClass): return (dut_channel, cert_channel) class L2capTest(L2capTestBase): def test_connect_dynamic_channel_and_send_data(self): self._setup_link_from_cert() Loading Loading @@ -768,7 +771,8 @@ class L2capTest(GdBaseTestClass): dut_channel.set_traffic_paused(True) buffer_size = self.dut_l2cap.get_channel_queue_buffer_size() # Allow 1 additional packet in channel queue buffer buffer_size = self.dut_l2cap.get_channel_queue_buffer_size() + 1 for i in range(buffer_size): cert_channel.send_i_frame(tx_seq=i + 1, req_seq=1, payload=SAMPLE_PACKET) Loading Loading @@ -1035,7 +1039,8 @@ class L2capTest(GdBaseTestClass): dut_channel.set_traffic_paused(True) buffer_size = self.dut_l2cap.get_channel_queue_buffer_size() # Allow 1 additional packet in channel queue buffer buffer_size = self.dut_l2cap.get_channel_queue_buffer_size() + 1 for i in range(buffer_size): cert_channel.send_i_frame(tx_seq=i + 1, req_seq=1, payload=SAMPLE_PACKET) Loading system/gd/l2cap/internal/enhanced_retransmission_mode_channel_data_controller.cc +1 −1 Original line number Diff line number Diff line Loading @@ -966,7 +966,7 @@ std::unique_ptr<packet::BasePacketBuilder> ErtmController::GetNextPacket() { void ErtmController::stage_for_reassembly(SegmentationAndReassembly sar, uint16_t sdu_size, const packet::PacketView<kLittleEndian>& payload) { // If EnqueueBuffer has more than 1 packets, we claim LocalBusy, until queue is empty constexpr size_t kEnqueueBufferBusyThreshold = 2; constexpr size_t kEnqueueBufferBusyThreshold = 3; switch (sar) { case SegmentationAndReassembly::UNSEGMENTED: if (sar_state_ != SegmentationAndReassembly::END) { Loading Loading
system/gd/cert/all_cert_testcases +1 −0 Original line number Diff line number Diff line Loading @@ -12,3 +12,4 @@ L2capTest LeL2capTest DualL2capTest LeSecurityTest L2capPerformanceTest
system/gd/cert/performance_test_logger.py 0 → 100644 +71 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # # Copyright 2020 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from datetime import datetime class PerformanceTestLogger(object): """ A helper class to log time points and intervals """ def __init__(self): self.base_timepoint = datetime.now() # We use a dictionary of a list of timepoints self.start_interval_points = {} self.end_interval_points = {} self.single_points = {} def log_single_point(self, label=""): if label not in self.single_points: self.single_points[label] = [] self.single_points[label].append(datetime.now()) def start_interval(self, label=""): if label not in self.start_interval_points: self.start_interval_points[label] = [] self.start_interval_points[label].append(datetime.now()) def end_interval(self, label=""): if label not in self.end_interval_points: self.end_interval_points[label] = [] self.end_interval_points[label].append(datetime.now()) def _check_interval_label(self, label): if label not in self.start_interval_points or label not in self.end_interval_points: raise KeyError("label %s doesn't exist" % label) if len(self.start_interval_points[label]) != len(self.end_interval_points[label]): raise KeyError("label %s doesn't have correct start and end log" % label) def get_duration_of_intervals(self, label): """ Return the list of duration of the intervals with specified label. """ self._check_interval_label(label) intervals = [] for i in range(len(self.start_interval_points[label])): interval = self.end_interval_points[label][i] - self.start_interval_points[label][i] intervals.append(interval.seconds * 1000 + interval.microseconds / 1000) return intervals def dump_intervals(self): """ Gives an iterator of (iterator of label, start, end) over all labels """ for label in self.start_interval_points: self._check_interval_label(label) yield ((label, self.start_interval_points[label][i], self.end_interval_points[label][i]) for i in range(len(self.start_interval_points[label])))
system/gd/l2cap/classic/cert/l2cap_performance_test.py 0 → 100644 +149 −0 Original line number Diff line number Diff line # # Copyright 2020 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from datetime import datetime, timedelta from bluetooth_packets_python3 import RawBuilder from cert.matchers import L2capMatchers from cert.truth import assertThat from cert.performance_test_logger import PerformanceTestLogger from l2cap.classic.cert.cert_l2cap import CertL2cap from l2cap.classic.cert.l2cap_test import L2capTestBase from l2cap.classic.facade_pb2 import RetransmissionFlowControlMode from bluetooth_packets_python3.l2cap_packets import FcsType from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction class L2capPerformanceTest(L2capTestBase): def setup_test(self): super().setup_test() self.performance_test_logger = PerformanceTestLogger() def teardown_test(self): super().teardown_test() def _basic_mode_tx(self, mtu, packets): self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() self.performance_test_logger.start_interval("TX") for _ in range(packets): dut_channel.send(b'a' * mtu) assertThat(cert_channel).emits( L2capMatchers.Data(b'a' * mtu), at_least_times=packets, timeout=timedelta(seconds=60)) self.performance_test_logger.end_interval("TX") duration = self.performance_test_logger.get_duration_of_intervals("TX")[0] self.log.info("Duration: %d" % duration) def _basic_mode_rx(self, mtu, packets): self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() self.performance_test_logger.start_interval("RX") data = b"a" * mtu data_packet = RawBuilder([x for x in data]) for _ in range(packets): cert_channel.send(data_packet) assertThat(dut_channel).emits( L2capMatchers.PacketPayloadRawData(data), at_least_times=packets, timeout=timedelta(seconds=60)) self.performance_test_logger.end_interval("RX") duration = self.performance_test_logger.get_duration_of_intervals("RX")[0] self.log.info("Duration: %d" % duration) def _ertm_mode_tx(self, mtu, packets, tx_window_size=10): # Make sure that number of packets is a multiple of tx_window_size packets = packets // tx_window_size * tx_window_size # For ERTM TX test, we have to do it sequentially because cert needs to ack self._setup_link_from_cert() config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=tx_window_size) (dut_channel, cert_channel) = self._open_channel_from_cert( mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS, req_config_options=config, rsp_config_options=config) self.performance_test_logger.start_interval("TX") for i in range(packets): dut_channel.send(b'a' * mtu) if i % tx_window_size == tx_window_size - 1: assertThat(cert_channel).emits(L2capMatchers.IFrame(payload=b'a' * mtu), at_least_times=tx_window_size) cert_channel.send_s_frame(req_seq=(i + 1) % 64, s=SupervisoryFunction.RECEIVER_READY) self.performance_test_logger.end_interval("TX") duration = self.performance_test_logger.get_duration_of_intervals("TX")[0] self.log.info("Duration: %d" % duration) def _ertm_mode_rx(self, mtu, packets, tx_window_size=10): # Make sure that number of packets is a multiple of tx_window_size packets = packets // tx_window_size * tx_window_size self._setup_link_from_cert() config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=tx_window_size) (dut_channel, cert_channel) = self._open_channel_from_cert( mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS, req_config_options=config, rsp_config_options=config) data = b"a" * mtu data_packet = RawBuilder([x for x in data]) self.performance_test_logger.start_interval("RX") for i in range(packets): cert_channel.send_i_frame(tx_seq=i % 64, req_seq=0, payload=data_packet) if i % tx_window_size == (tx_window_size - 1): assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=(i + 1) % 64)) self.performance_test_logger.end_interval("RX") duration = self.performance_test_logger.get_duration_of_intervals("RX")[0] self.log.info("Duration: %d" % duration) def test_basic_mode_tx_672_100(self): self._basic_mode_tx(672, 100) def test_basic_mode_tx_100_100(self): self._basic_mode_tx(100, 100) def test_ertm_mode_tx_672_100(self): self._ertm_mode_tx(672, 100) def test_basic_mode_rx_672_100(self): self._basic_mode_rx(672, 100) def test_ertm_mode_rx_672_100(self): self._ertm_mode_rx(672, 100) def test_basic_mode_end_to_end_latency(self): self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() data = b"a" * 100 data_packet = RawBuilder([x for x in data]) for i in range(100): self.performance_test_logger.start_interval("RX") cert_channel.send(data_packet) assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(data)) self.performance_test_logger.end_interval("RX") duration = self.performance_test_logger.get_duration_of_intervals("RX") mean = sum(duration) / len(duration) self.log.info("Mean: %d" % mean)
system/gd/l2cap/classic/cert/l2cap_test.py +8 −3 Original line number Diff line number Diff line Loading @@ -41,7 +41,7 @@ SAMPLE_PACKET_DATA = b"\x19\x26\x08\x17" SAMPLE_PACKET = RawBuilder([x for x in SAMPLE_PACKET_DATA]) class L2capTest(GdBaseTestClass): class L2capTestBase(GdBaseTestClass): def setup_class(self): super().setup_class(dut_module='L2CAP', cert_module='HCI_INTERFACES') Loading Loading @@ -122,6 +122,9 @@ class L2capTest(GdBaseTestClass): return (dut_channel, cert_channel) class L2capTest(L2capTestBase): def test_connect_dynamic_channel_and_send_data(self): self._setup_link_from_cert() Loading Loading @@ -768,7 +771,8 @@ class L2capTest(GdBaseTestClass): dut_channel.set_traffic_paused(True) buffer_size = self.dut_l2cap.get_channel_queue_buffer_size() # Allow 1 additional packet in channel queue buffer buffer_size = self.dut_l2cap.get_channel_queue_buffer_size() + 1 for i in range(buffer_size): cert_channel.send_i_frame(tx_seq=i + 1, req_seq=1, payload=SAMPLE_PACKET) Loading Loading @@ -1035,7 +1039,8 @@ class L2capTest(GdBaseTestClass): dut_channel.set_traffic_paused(True) buffer_size = self.dut_l2cap.get_channel_queue_buffer_size() # Allow 1 additional packet in channel queue buffer buffer_size = self.dut_l2cap.get_channel_queue_buffer_size() + 1 for i in range(buffer_size): cert_channel.send_i_frame(tx_seq=i + 1, req_seq=1, payload=SAMPLE_PACKET) Loading
system/gd/l2cap/internal/enhanced_retransmission_mode_channel_data_controller.cc +1 −1 Original line number Diff line number Diff line Loading @@ -966,7 +966,7 @@ std::unique_ptr<packet::BasePacketBuilder> ErtmController::GetNextPacket() { void ErtmController::stage_for_reassembly(SegmentationAndReassembly sar, uint16_t sdu_size, const packet::PacketView<kLittleEndian>& payload) { // If EnqueueBuffer has more than 1 packets, we claim LocalBusy, until queue is empty constexpr size_t kEnqueueBufferBusyThreshold = 2; constexpr size_t kEnqueueBufferBusyThreshold = 3; switch (sar) { case SegmentationAndReassembly::UNSEGMENTED: if (sar_state_ != SegmentationAndReassembly::END) { Loading