Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 52786bb5 authored by Hansong Zhang's avatar Hansong Zhang
Browse files

L2capTest: Migrate basic ERTM test case

And fixed the logic in control packet handler

Test: cert/run_cert_facade_only.sh
Bug: 14987419
Change-Id: Ibd7592c749350a4ac694586cff2bb37295e4c398
parent b8d16a3b
Loading
Loading
Loading
Loading
+504 −35

File changed.

Preview size limit exceeded, changes collapsed.

+0 −221
Original line number Diff line number Diff line
@@ -358,187 +358,6 @@ class SimpleL2capTest(GdBaseTestClass):
                lambda log: is_connection_request(log) and log.connection_request.psm == psm
            )

    def test_query_for_1_2_features(self):
        """
        L2CAP/COS/IEX/BV-01-C [Query for 1.2 Features]
        """
        with EventCallbackStream(
                self.cert_device.l2cap.FetchL2capLog(
                    empty_pb2.Empty())) as l2cap_log_stream:
            l2cap_event_asserts = EventAsserts(l2cap_log_stream)
            self._register_callbacks(l2cap_log_stream)
            self._setup_link(l2cap_event_asserts)
            signal_id = 3
            self.cert_device.l2cap.SendInformationRequest(
                l2cap_cert_pb2.InformationRequest(
                    type=l2cap_cert_pb2.InformationRequestType.FIXED_CHANNELS,
                    signal_id=signal_id))
            l2cap_event_asserts.assert_event_occurs(
                lambda log : is_information_response(log) and \
                             log.information_response.signal_id == signal_id and \
                             log.information_response.type == l2cap_cert_pb2.InformationRequestType.FIXED_CHANNELS)

    def test_extended_feature_info_response_ertm(self):
        """
        L2CAP/EXF/BV-01-C [Extended Features Information Response for Enhanced
        Retransmission Mode]
        """
        with EventCallbackStream(
                self.cert_device.l2cap.FetchL2capLog(
                    empty_pb2.Empty())) as l2cap_log_stream:
            l2cap_event_asserts = EventAsserts(l2cap_log_stream)
            l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream)

            self._register_callbacks(l2cap_log_stream)
            self._setup_link(l2cap_event_asserts)
            signal_id = 3
            self.cert_device.l2cap.SendInformationRequest(
                l2cap_cert_pb2.InformationRequest(
                    type=l2cap_cert_pb2.InformationRequestType.
                    EXTENDED_FEATURES,
                    signal_id=signal_id))

            l2cap_event_asserts_alt.assert_event_occurs_at_most(
                is_information_response, 1)

            expected_log_type = l2cap_cert_pb2.InformationRequestType.EXTENDED_FEATURES
            expected_mask = 1 << 3
            l2cap_event_asserts.assert_event_occurs(
                lambda log : is_information_response(log) and \
                    log.information_response.signal_id == signal_id and \
                    log.information_response.type == expected_log_type and \
                    log.information_response.information_value & expected_mask == expected_mask)

    def test_extended_feature_info_response_fcs(self):
        """
        L2CAP/EXF/BV-03-C [Extended Features Information Response for FCS Option]
        """
        with EventCallbackStream(
                self.cert_device.l2cap.FetchL2capLog(
                    empty_pb2.Empty())) as l2cap_log_stream:
            l2cap_event_asserts = EventAsserts(l2cap_log_stream)
            l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream)

            self._register_callbacks(l2cap_log_stream)
            self._setup_link(l2cap_event_asserts)
            signal_id = 3
            self.cert_device.l2cap.SendInformationRequest(
                l2cap_cert_pb2.InformationRequest(
                    type=l2cap_cert_pb2.InformationRequestType.
                    EXTENDED_FEATURES,
                    signal_id=signal_id))

            l2cap_event_asserts_alt.assert_event_occurs_at_most(
                is_information_response, 1)

            expected_log_type = l2cap_cert_pb2.InformationRequestType.EXTENDED_FEATURES
            expected_mask = 1 << 5
            l2cap_event_asserts.assert_event_occurs(
                lambda log : is_information_response(log) and \
                    log.information_response.signal_id == signal_id and \
                    log.information_response.type == expected_log_type and \
                    log.information_response.information_value & expected_mask == expected_mask)

    def test_config_channel_not_use_FCS(self):
        """
        L2CAP/FOC/BV-01-C [IUT Initiated Configuration of the FCS Option]
        Verify the IUT can configure a channel to not use FCS in I/S-frames.
        """
        with EventCallbackStream(
                self.cert_device.l2cap.FetchL2capLog(
                    empty_pb2.Empty())) as l2cap_log_stream:
            l2cap_event_asserts = EventAsserts(l2cap_log_stream)
            self._register_callbacks(l2cap_log_stream)
            self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM
            scid = 0x0101
            self._setup_link(l2cap_event_asserts)
            self._open_channel(
                l2cap_event_asserts, scid=scid, mode=self.retransmission_mode)
            self.device_under_test.l2cap.SendDynamicChannelPacket(
                l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))

            l2cap_event_asserts.assert_event_occurs(
                lambda log: match_frame(log, scid=scid, control_field=b'\x00\x00', payload=b'abc'))

    def test_explicitly_request_use_FCS(self):
        """
        L2CAP/FOC/BV-02-C [Lower Tester Explicitly Requests FCS should be Used]
        Verify the IUT will include the FCS in I/S-frames if the Lower Tester explicitly requests that FCS
        should be used.
        """
        with EventCallbackStream(
                self.cert_device.l2cap.FetchL2capLog(
                    empty_pb2.Empty())) as l2cap_log_stream:
            l2cap_event_asserts = EventAsserts(l2cap_log_stream)
            l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream)
            self._register_callbacks(l2cap_log_stream)
            self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM
            scid = 0x0101
            information_value = 1 << 3 | 1 << 5

            def handle_information_request(log):
                log = log.information_request
                self.cert_device.l2cap.SendInformationResponse(
                    l2cap_cert_pb2.InformationResponse(
                        type=log.type,
                        signal_id=log.signal_id,
                        information_value=information_value))

            l2cap_log_stream.unregister_callback(
                self.handle_information_request,
                matcher_fn=is_information_request)
            l2cap_log_stream.register_callback(
                handle_information_request, matcher_fn=is_information_request)

            def handle_connection_response(log):
                log = log.connection_response
                self.scid_dcid_map[log.scid] = log.dcid

            l2cap_log_stream.unregister_callback(
                self.handle_connection_response,
                matcher_fn=is_connection_response)
            l2cap_log_stream.register_callback(
                handle_connection_response, matcher_fn=is_connection_response)

            def handle_configuration_request(log):
                log = log.configuration_request
                if log.dcid not in self.scid_dcid_map:
                    return
                dcid = self.scid_dcid_map[log.dcid]
                self.cert_device.l2cap.SendConfigurationResponse(
                    l2cap_cert_pb2.ConfigurationResponse(
                        scid=dcid,
                        signal_id=log.signal_id,
                        retransmission_config=l2cap_cert_pb2.
                        ChannelRetransmissionFlowControlConfig(
                            mode=self.retransmission_mode)))
                self.cert_device.l2cap.SendConfigurationRequest(
                    l2cap_cert_pb2.ConfigurationRequest(
                        dcid=dcid,
                        signal_id=log.signal_id + 1,
                        retransmission_config=l2cap_cert_pb2.
                        ChannelRetransmissionFlowControlConfig(
                            mode=self.retransmission_mode),
                        fcs_config=l2cap_cert_pb2.FcsConfig.DEFAULT))

            l2cap_log_stream.unregister_callback(
                self.handle_configuration_request,
                matcher_fn=is_configuration_request)
            l2cap_log_stream.register_callback(
                handle_configuration_request,
                matcher_fn=is_configuration_request)

            self._setup_link(l2cap_event_asserts)
            self._open_channel(
                l2cap_event_asserts, scid=scid, mode=self.retransmission_mode)
            l2cap_event_asserts_alt.assert_event_occurs(
                lambda log: is_configuration_request(log) and \
                    log.configuration_request.fcs_config == l2cap_cert_pb2.FcsConfig.NO_FCS)
            self.device_under_test.l2cap.SendDynamicChannelPacket(
                l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
            l2cap_event_asserts.assert_event_occurs(
                lambda log: match_frame(log, scid=scid, payload=b'abc\x0f\xb6'))

    def test_implicitly_request_use_FCS(self):
        """
        L2CAP/FOC/BV-03-C [Lower Tester Implicitly Requests FCS should be Used]
@@ -621,46 +440,6 @@ class SimpleL2capTest(GdBaseTestClass):
            l2cap_event_asserts.assert_event_occurs(
                lambda log: match_frame(log, scid=scid, control_field=b'\x81\x00', payload=b'\x75\xe8'))

    def test_transmit_i_frames(self):
        """
        L2CAP/ERM/BV-01-C [Transmit I-frames]
        """
        with EventCallbackStream(
                self.cert_device.l2cap.FetchL2capLog(
                    empty_pb2.Empty())) as l2cap_log_stream:
            l2cap_event_asserts = EventAsserts(l2cap_log_stream)
            l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream)
            self._register_callbacks(l2cap_log_stream)
            self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM
            scid = 0x0101
            self._setup_link(l2cap_event_asserts)
            self._open_channel(
                l2cap_event_asserts, scid=scid, mode=self.retransmission_mode)
            self.device_under_test.l2cap.SendDynamicChannelPacket(
                l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
            self.cert_device.l2cap.SendSFrame(
                l2cap_cert_pb2.SFrame(
                    channel=self.scid_dcid_map[scid], req_seq=1, s=0))
            self.device_under_test.l2cap.SendDynamicChannelPacket(
                l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
            self.cert_device.l2cap.SendSFrame(
                l2cap_cert_pb2.SFrame(
                    channel=self.scid_dcid_map[scid], req_seq=2, s=0))
            self.device_under_test.l2cap.SendDynamicChannelPacket(
                l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
            self.cert_device.l2cap.SendSFrame(
                l2cap_cert_pb2.SFrame(
                    channel=self.scid_dcid_map[scid], req_seq=3, s=0))

            l2cap_event_asserts.assert_event_occurs(
                lambda log: match_frame(log, scid=scid, payload=b'abc'))
            l2cap_event_asserts.assert_event_occurs(
                lambda log: match_frame(log, scid=scid, payload=b'abc'))
            l2cap_event_asserts.assert_event_occurs(
                lambda log: match_frame(log, scid=scid, payload=b'abc'))
            l2cap_event_asserts_alt.assert_event_occurs_at_most(
                lambda log: log.HasField("data_packet"), 3)

    def test_receive_i_frames(self):
        """
        L2CAP/ERM/BV-02-C [Receive I-Frames]
+26 −9
Original line number Diff line number Diff line
@@ -13,6 +13,8 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <condition_variable>
#include <cstdint>
#include <unordered_map>

@@ -72,7 +74,9 @@ class L2capClassicModuleFacadeService : public L2capClassicModuleFacade::Service
      return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Channel not registered");
    }
    std::vector<uint8_t> packet(request->payload().begin(), request->payload().end());
    fixed_channel_helper_map_[request->channel()]->SendPacket(packet);
    if (!fixed_channel_helper_map_[request->channel()]->SendPacket(packet)) {
      return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Channel not open");
    }
    response->set_result_type(SendL2capPacketResultType::OK);
    return ::grpc::Status::OK;
  }
@@ -84,7 +88,9 @@ class L2capClassicModuleFacadeService : public L2capClassicModuleFacade::Service
      return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Psm not registered");
    }
    std::vector<uint8_t> packet(request->payload().begin(), request->payload().end());
    dynamic_channel_helper_map_[request->psm()]->SendPacket(packet);
    if (!dynamic_channel_helper_map_[request->psm()]->SendPacket(packet)) {
      return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Channel not open");
    }
    return ::grpc::Status::OK;
  }

@@ -211,13 +217,14 @@ class L2capClassicModuleFacadeService : public L2capClassicModuleFacade::Service
      }
    }

    void SendPacket(const std::vector<uint8_t>& packet) {
    bool SendPacket(const std::vector<uint8_t>& packet) {
      if (channel_ == nullptr) {
        LOG_WARN("Channel is not open");
        return;
        return false;
      }
      channel_->GetQueueUpEnd()->RegisterEnqueue(
          handler_, common::Bind(&L2capFixedChannelHelper::enqueue_callback, common::Unretained(this), packet));
      return true;
    }

    void on_close_callback(hci::ErrorCode error_code) {
@@ -306,7 +313,11 @@ class L2capClassicModuleFacadeService : public L2capClassicModuleFacade::Service
      ConnectionCompleteEvent event;
      event.mutable_remote()->set_address(channel->GetDevice().ToString());
      facade_service_->pending_connection_complete_.OnIncomingEvent(event);
      {
        std::unique_lock<std::mutex> lock(channel_open_cv_mutex_);
        channel_ = std::move(channel);
      }
      channel_open_cv_.notify_all();
      channel_->RegisterOnCloseCallback(
          facade_service_->facade_handler_,
          common::BindOnce(&L2capDynamicChannelHelper::on_close_callback, common::Unretained(this)));
@@ -322,7 +333,7 @@ class L2capClassicModuleFacadeService : public L2capClassicModuleFacade::Service

    void on_close_callback(hci::ErrorCode error_code) {
      {
        std::unique_lock<std::mutex> lock(facade_service_->channel_map_mutex_);
        std::unique_lock<std::mutex> lock(channel_open_cv_mutex_);
        if (facade_service_->fetch_l2cap_data_) {
          channel_->GetQueueUpEnd()->UnregisterDequeue();
        }
@@ -345,13 +356,17 @@ class L2capClassicModuleFacadeService : public L2capClassicModuleFacade::Service
      facade_service_->pending_l2cap_data_.OnIncomingEvent(l2cap_data);
    }

    void SendPacket(std::vector<uint8_t> packet) {
    bool SendPacket(std::vector<uint8_t> packet) {
      if (channel_ == nullptr) {
        std::unique_lock<std::mutex> lock(channel_open_cv_mutex_);
        if (!channel_open_cv_.wait_for(lock, std::chrono::seconds(1), [this] { return channel_ != nullptr; })) {
          LOG_WARN("Channel is not open");
        return;
          return false;
        }
      }
      channel_->GetQueueUpEnd()->RegisterEnqueue(
          handler_, common::Bind(&L2capDynamicChannelHelper::enqueue_callback, common::Unretained(this), packet));
      return true;
    }

    std::unique_ptr<packet::BasePacketBuilder> enqueue_callback(std::vector<uint8_t> packet) {
@@ -368,6 +383,8 @@ class L2capClassicModuleFacadeService : public L2capClassicModuleFacade::Service
    std::unique_ptr<DynamicChannelService> service_;
    std::unique_ptr<DynamicChannel> channel_ = nullptr;
    Psm psm_;
    std::condition_variable channel_open_cv_;
    std::mutex channel_open_cv_mutex_;
  };

  L2capClassicModule* l2cap_layer_;