Loading system/gd/cert/performance_test_logger.py +1 −1 Original line number Diff line number Diff line Loading @@ -58,7 +58,7 @@ class PerformanceTestLogger(object): intervals = [] for i in range(len(self.start_interval_points[label])): interval = self.end_interval_points[label][i] - self.start_interval_points[label][i] intervals.append(interval.seconds * 1000 + interval.microseconds / 1000) intervals.append(interval) return intervals def dump_intervals(self): Loading system/gd/cert/truth.py +11 −0 Original line number Diff line number Diff line Loading @@ -130,10 +130,21 @@ class BooleanSubject(ObjectSubject): assert_false(self._value, "") class TimeDeltaSubject(ObjectSubject): def __init__(self, value): super().__init__(value) def isWithin(self, time_bound): assert_true(self._value < time_bound, "") def assertThat(subject): if type(subject) is bool: return BooleanSubject(subject) elif isinstance(subject, IEventStream): return EventStreamSubject(subject) elif isinstance(subject, timedelta): return TimeDeltaSubject(subject) else: return ObjectSubject(subject) system/gd/l2cap/classic/cert/l2cap_performance_test.py +47 −9 Original line number Diff line number Diff line Loading @@ -36,6 +36,9 @@ class L2capPerformanceTest(L2capTestBase): super().teardown_test() def _basic_mode_tx(self, mtu, packets): """ Send the specified number of packets and return the time interval in ms. """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() Loading @@ -47,7 +50,28 @@ class L2capPerformanceTest(L2capTestBase): self.performance_test_logger.end_interval("TX") duration = self.performance_test_logger.get_duration_of_intervals("TX")[0] self.log.info("Duration: %d" % duration) self.log.info("Duration: %s" % str(duration)) return duration def _basic_mode_tx_fixed_interval(self, mtu, interval=timedelta(seconds=10), batch_size=20): """ Send packets as much as possible over a certain interval, and return the number of packets sent """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() start_time = datetime.now() end_time = start_time + interval packets_sent = 0 while datetime.now() < end_time: for _ in range(batch_size): dut_channel.send(b'a' * mtu) packets_sent += batch_size assertThat(cert_channel).emits(L2capMatchers.Data(b'a' * mtu), at_least_times=batch_size) return packets_sent def _basic_mode_rx(self, mtu, packets): self._setup_link_from_cert() Loading @@ -63,9 +87,12 @@ class L2capPerformanceTest(L2capTestBase): self.performance_test_logger.end_interval("RX") duration = self.performance_test_logger.get_duration_of_intervals("RX")[0] self.log.info("Duration: %d" % duration) self.log.info("Duration: %s" % str(duration)) def _ertm_mode_tx(self, mtu, packets, tx_window_size=10): """ Send the specified number of packets and return the time interval in ms. """ # Make sure that number of packets is a multiple of tx_window_size packets = packets // tx_window_size * tx_window_size # For ERTM TX test, we have to do it sequentially because cert needs to ack Loading @@ -89,7 +116,9 @@ class L2capPerformanceTest(L2capTestBase): self.performance_test_logger.end_interval("TX") duration = self.performance_test_logger.get_duration_of_intervals("TX")[0] self.log.info("Duration: %d" % duration) self.log.info("Duration: %s" % str(duration)) return duration def _ertm_mode_rx(self, mtu, packets, tx_window_size=10): # Make sure that number of packets is a multiple of tx_window_size Loading @@ -115,16 +144,19 @@ class L2capPerformanceTest(L2capTestBase): self.performance_test_logger.end_interval("RX") duration = self.performance_test_logger.get_duration_of_intervals("RX")[0] self.log.info("Duration: %d" % duration) self.log.info("Duration: %s" % str(duration)) def test_basic_mode_tx_672_100(self): self._basic_mode_tx(672, 100) duration = self._basic_mode_tx(672, 100) assertThat(duration).isWithin(timedelta(seconds=2)) def test_basic_mode_tx_100_100(self): self._basic_mode_tx(100, 100) duration = self._basic_mode_tx(100, 100) assertThat(duration).isWithin(timedelta(seconds=2)) def test_ertm_mode_tx_672_100(self): self._ertm_mode_tx(672, 100) duration = self._ertm_mode_tx(672, 100) assertThat(duration).isWithin(timedelta(seconds=5)) def test_basic_mode_rx_672_100(self): self._basic_mode_rx(672, 100) Loading @@ -145,5 +177,11 @@ class L2capPerformanceTest(L2capTestBase): assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(data)) self.performance_test_logger.end_interval("RX") duration = self.performance_test_logger.get_duration_of_intervals("RX") mean = sum(duration) / len(duration) self.log.info("Mean: %d" % mean) mean = sum(duration, timedelta()) / len(duration) self.log.info("Mean: %s" % str(mean)) def test_basic_mode_number_of_packets_10_seconds_672(self): number_packets = self._basic_mode_tx_fixed_interval(672) # Requiring that 500 packets (20ms period on average) are sent self.log.info("Packets sent: %d" % number_packets) assertThat(number_packets > 500).isTrue() Loading
system/gd/cert/performance_test_logger.py +1 −1 Original line number Diff line number Diff line Loading @@ -58,7 +58,7 @@ class PerformanceTestLogger(object): intervals = [] for i in range(len(self.start_interval_points[label])): interval = self.end_interval_points[label][i] - self.start_interval_points[label][i] intervals.append(interval.seconds * 1000 + interval.microseconds / 1000) intervals.append(interval) return intervals def dump_intervals(self): Loading
system/gd/cert/truth.py +11 −0 Original line number Diff line number Diff line Loading @@ -130,10 +130,21 @@ class BooleanSubject(ObjectSubject): assert_false(self._value, "") class TimeDeltaSubject(ObjectSubject): def __init__(self, value): super().__init__(value) def isWithin(self, time_bound): assert_true(self._value < time_bound, "") def assertThat(subject): if type(subject) is bool: return BooleanSubject(subject) elif isinstance(subject, IEventStream): return EventStreamSubject(subject) elif isinstance(subject, timedelta): return TimeDeltaSubject(subject) else: return ObjectSubject(subject)
system/gd/l2cap/classic/cert/l2cap_performance_test.py +47 −9 Original line number Diff line number Diff line Loading @@ -36,6 +36,9 @@ class L2capPerformanceTest(L2capTestBase): super().teardown_test() def _basic_mode_tx(self, mtu, packets): """ Send the specified number of packets and return the time interval in ms. """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() Loading @@ -47,7 +50,28 @@ class L2capPerformanceTest(L2capTestBase): self.performance_test_logger.end_interval("TX") duration = self.performance_test_logger.get_duration_of_intervals("TX")[0] self.log.info("Duration: %d" % duration) self.log.info("Duration: %s" % str(duration)) return duration def _basic_mode_tx_fixed_interval(self, mtu, interval=timedelta(seconds=10), batch_size=20): """ Send packets as much as possible over a certain interval, and return the number of packets sent """ self._setup_link_from_cert() (dut_channel, cert_channel) = self._open_channel_from_cert() start_time = datetime.now() end_time = start_time + interval packets_sent = 0 while datetime.now() < end_time: for _ in range(batch_size): dut_channel.send(b'a' * mtu) packets_sent += batch_size assertThat(cert_channel).emits(L2capMatchers.Data(b'a' * mtu), at_least_times=batch_size) return packets_sent def _basic_mode_rx(self, mtu, packets): self._setup_link_from_cert() Loading @@ -63,9 +87,12 @@ class L2capPerformanceTest(L2capTestBase): self.performance_test_logger.end_interval("RX") duration = self.performance_test_logger.get_duration_of_intervals("RX")[0] self.log.info("Duration: %d" % duration) self.log.info("Duration: %s" % str(duration)) def _ertm_mode_tx(self, mtu, packets, tx_window_size=10): """ Send the specified number of packets and return the time interval in ms. """ # Make sure that number of packets is a multiple of tx_window_size packets = packets // tx_window_size * tx_window_size # For ERTM TX test, we have to do it sequentially because cert needs to ack Loading @@ -89,7 +116,9 @@ class L2capPerformanceTest(L2capTestBase): self.performance_test_logger.end_interval("TX") duration = self.performance_test_logger.get_duration_of_intervals("TX")[0] self.log.info("Duration: %d" % duration) self.log.info("Duration: %s" % str(duration)) return duration def _ertm_mode_rx(self, mtu, packets, tx_window_size=10): # Make sure that number of packets is a multiple of tx_window_size Loading @@ -115,16 +144,19 @@ class L2capPerformanceTest(L2capTestBase): self.performance_test_logger.end_interval("RX") duration = self.performance_test_logger.get_duration_of_intervals("RX")[0] self.log.info("Duration: %d" % duration) self.log.info("Duration: %s" % str(duration)) def test_basic_mode_tx_672_100(self): self._basic_mode_tx(672, 100) duration = self._basic_mode_tx(672, 100) assertThat(duration).isWithin(timedelta(seconds=2)) def test_basic_mode_tx_100_100(self): self._basic_mode_tx(100, 100) duration = self._basic_mode_tx(100, 100) assertThat(duration).isWithin(timedelta(seconds=2)) def test_ertm_mode_tx_672_100(self): self._ertm_mode_tx(672, 100) duration = self._ertm_mode_tx(672, 100) assertThat(duration).isWithin(timedelta(seconds=5)) def test_basic_mode_rx_672_100(self): self._basic_mode_rx(672, 100) Loading @@ -145,5 +177,11 @@ class L2capPerformanceTest(L2capTestBase): assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(data)) self.performance_test_logger.end_interval("RX") duration = self.performance_test_logger.get_duration_of_intervals("RX") mean = sum(duration) / len(duration) self.log.info("Mean: %d" % mean) mean = sum(duration, timedelta()) / len(duration) self.log.info("Mean: %s" % str(mean)) def test_basic_mode_number_of_packets_10_seconds_672(self): number_packets = self._basic_mode_tx_fixed_interval(672) # Requiring that 500 packets (20ms period on average) are sent self.log.info("Packets sent: %d" % number_packets) assertThat(number_packets > 500).isTrue()