Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c4a799be authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "L2CAP: Add test script for pts test"

parents 284bd474 446f340c
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -85,3 +85,4 @@ class GdDevice(GdDeviceBase):
        self.hci_classic_security.command_complete_stream = EventStream(self.hci_classic_security.FetchCommandCompleteEvent)
        self.l2cap.packet_stream = EventStream(self.l2cap.FetchL2capData)
        self.l2cap.connection_complete_stream = EventStream(self.l2cap.FetchConnectionComplete)
        self.l2cap.connection_close_stream = EventStream(self.l2cap.FetchConnectionClose)
+39 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
#   Copyright 2019 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from acts.base_test import BaseTestClass

import importlib
import logging
import os
import signal
import sys
import subprocess

ANDROID_BUILD_TOP = os.environ.get('ANDROID_BUILD_TOP')

sys.path.append(ANDROID_BUILD_TOP + '/out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothFacadeAndCertGeneratedStub_py/gen')


class PTSBaseTestClass(BaseTestClass):
    def __init__(self, configs):
        BaseTestClass.__init__(self, configs)

        gd_devices = self.controller_configs.get("GdDevice")

        self.register_controller(
            importlib.import_module('cert.gd_device'),
            builtin=True)
+5 −0
Original line number Diff line number Diff line
#! /bin/bash

# For bluetooth_packets_python3
export PYTHONPATH=$PYTHONPATH:$ANDROID_BUILD_TOP/out/host/linux-x86/lib64
python3.8 `which act.py` -c $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd/l2cap/pts/pts.json -tf $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd/l2cap/pts/pts_l2cap_testcase -tp $ANDROID_BUILD_TOP/packages/modules/Bluetooth/system/gd
+190 −0
Original line number Diff line number Diff line
from cert.pts_base_test import PTSBaseTestClass
from facade import common_pb2
from facade import rootservice_pb2 as facade_rootservice_pb2
from l2cap.classic import facade_pb2 as l2cap_facade_pb2
from google.protobuf import empty_pb2

from datetime import timedelta
import time
class PTSL2capTest(PTSBaseTestClass):
    def setup_test(self):
        self.device_under_test = self.gd_devices[0]

        self.device_under_test.rootservice.StartStack(
            facade_rootservice_pb2.StartStackRequest(
                module_under_test=facade_rootservice_pb2.BluetoothModule.Value('L2CAP'),
            )
        )

        self.device_under_test.wait_channel_ready()

        dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress(empty_pb2.Empty()).address
        pts_address = self.controller_configs.get('pts_address').lower()
        self.device_under_test.address = dut_address

        self.dut_address = common_pb2.BluetoothAddress(
            address=self.device_under_test.address)
        self.pts_address = common_pb2.BluetoothAddress(
            address=str.encode(pts_address))

    def teardown_test(self):
        self.device_under_test.rootservice.StopStack(
            facade_rootservice_pb2.StopStackRequest()
        )

    def _pending_connection_complete(self, timeout=30):
        dut_connection_stream = self.device_under_test.l2cap.connection_complete_stream
        dut_connection_stream.subscribe()
        dut_connection_stream.assert_event_occurs(
          lambda device : device.remote.address == self.pts_address.address,
          timeout=timedelta(seconds=timeout)
        )
        dut_connection_stream.unsubscribe()

    def _pending_connection_close(self, timeout=30):
        dut_connection_close_stream = self.device_under_test.l2cap.connection_close_stream
        dut_connection_close_stream.subscribe()
        dut_connection_close_stream.assert_event_occurs(
          lambda device : device.remote.address == self.pts_address.address,
          timeout=timedelta(seconds=timeout)
        )
        dut_connection_close_stream.unsubscribe()

    def test_L2CAP_COS_CED_BV_01_C(self):
        """
        L2CAP/COS/CED/BV-01-C [Request Connection]
        Verify that the IUT is able to request the connection establishment for an L2CAP data channel and
        initiate the configuration procedure.
        """
        psm = 1
        self.device_under_test.l2cap.OpenChannel(l2cap_facade_pb2.OpenChannelRequest(remote=self.pts_address, psm=psm))
        self._pending_connection_complete()
        self.device_under_test.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=psm))
        self._pending_connection_close()

    def test_L2CAP_COS_CED_BV_03_C(self):
        """
        L2CAP/COS/CED/BV-03-C [Send Data]
        Verify that the IUT is able to send DATA.
        """
        psm = 1
        self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
            psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
        self._pending_connection_complete()
        self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'*34))
        self._pending_connection_close()

    def test_L2CAP_COS_CED_BV_04_C(self):
        """
        L2CAP/COS/CED/BV-04-C [Disconnect]
        Verify that the IUT is able to disconnect the data channel.
        """
        psm = 1
        self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
            psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
        self._pending_connection_complete()
        time.sleep(2)
        self.device_under_test.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=psm))
        self._pending_connection_close()

    def test_L2CAP_COS_CED_BV_05_C(self):
        """
        L2CAP/COS/CED/BV-05-C [Accept Connection]
        Verify that the IUT is able to disconnect the data channel.
        """
        psm = 1
        self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
            psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
        self._pending_connection_complete()
        self._pending_connection_close()

    def test_L2CAP_COS_CED_BV_07_C(self):
        """
        L2CAP/COS/CED/BV-07-C [Accept Disconnect]
        Verify that the IUT is able to respond to the request to disconnect the data channel.
        """
        psm = 1
        self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
            psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
        self._pending_connection_complete()
        self._pending_connection_close()

    def test_L2CAP_COS_CED_BV_08_C(self):
        """
        L2CAP/COS/CED/BV-08-C [Disconnect on Timeout]
        Verify that the IUT disconnects the data channel and shuts down this channel if no response occurs
        """
        psm = 1
        self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
            psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))

        time.sleep(120)

    def test_L2CAP_COS_CED_BV_09_C(self):
        """
        L2CAP/COS/CED/BV-09-C [Receive Multi-Command Packet]
        Verify that the IUT is able to receive more than one signaling command in one L2CAP packet.
        """
        psm = 1
        self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
            psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
        self._pending_connection_complete()
        self._pending_connection_close()

    def test_L2CAP_COS_CED_BV_11_C(self):
        """
        L2CAP/COS/CED/BV-11-C [Configure MTU Size]
        Verify that the IUT is able to configure the supported MTU size
        """
        psm = 1
        self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
            psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
        self._pending_connection_complete()
        self._pending_connection_close()

    def test_L2CAP_COS_CED_BI_01_C(self):
        """
        L2CAP/COS/CED/BI-01-C [Reject Unknown Command]
        Verify that the IUT rejects an unknown signaling command.
        """
        psm = 1
        self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
            psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
        self._pending_connection_complete()
        time.sleep(5)

    def test_L2CAP_COS_CFD_BV_03_C(self):
        """
        L2CAP/COS/CFD/BV-03-C [Send Requested Options]
        Verify that the IUT can receive a configuration request with no options and send the requested
        options to the Lower Tester
        """
        psm = 1
        self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
            psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
        self._pending_connection_close()

    def test_L2CAP_COS_CFD_BV_08_C(self):
        """
        L2CAP/COS/CFD/BV-08-C [Non-blocking Config Response]
        Verify that the IUT does not block transmitting L2CAP_ConfigRsp while waiting for L2CAP_ConfigRsp
        from the Lower Tester.
        """
        psm = 1
        self.device_under_test.l2cap.OpenChannel(l2cap_facade_pb2.OpenChannelRequest(remote=self.pts_address, psm=psm))
        self._pending_connection_complete()
        self.device_under_test.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=psm))
        self._pending_connection_close()


    def test_L2CAP_ERM_BI_01_C(self):
        """
        L2CAP/ERM/BI-01-C [S-Frame [REJ] Lost or Corrupted]
        Verify the IUT can handle receipt of an S-=frame [RR] Poll = 1 if the S-frame [REJ] sent from the IUT
        is lost.
        """
        psm = 1
        self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
            psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM))
        self._pending_connection_complete()
        self._pending_connection_close(timeout=60)
+40 −0
Original line number Diff line number Diff line
@@ -64,6 +64,22 @@ class L2capClassicModuleFacadeService : public L2capClassicModuleFacade::Service
    return connection_complete_stream_.HandleRequest(context, request, writer);
  }

  class ConnectionCloseCallback : public grpc::GrpcEventStreamCallback<ConnectionCloseEvent, ConnectionCloseEvent> {
   public:
    void OnWriteResponse(ConnectionCloseEvent* response, const ConnectionCloseEvent& event) override {
      response->CopyFrom(event);
    }

  } connection_close_callback_;
  ::bluetooth::grpc::GrpcEventStream<ConnectionCloseEvent, ConnectionCloseEvent> connection_close_stream_{
      &connection_close_callback_};

  ::grpc::Status FetchConnectionClose(::grpc::ServerContext* context,
                                      const ::bluetooth::facade::EventStreamRequest* request,
                                      ::grpc::ServerWriter<classic::ConnectionCloseEvent>* writer) override {
    return connection_close_stream_.HandleRequest(context, request, writer);
  }

  ::grpc::Status Connect(::grpc::ServerContext* context, const facade::BluetoothAddress* request,
                         ::google::protobuf::Empty* response) override {
    auto fixed_channel_manager = l2cap_layer_->GetFixedChannelManager();
@@ -108,6 +124,17 @@ class L2capClassicModuleFacadeService : public L2capClassicModuleFacade::Service
    return ::grpc::Status::OK;
  }

  ::grpc::Status CloseChannel(::grpc::ServerContext* context,
                              const ::bluetooth::l2cap::classic::CloseChannelRequest* request,
                              ::google::protobuf::Empty* response) override {
    auto psm = request->psm();
    if (dynamic_channel_helper_map_.find(request->psm()) == dynamic_channel_helper_map_.end()) {
      return ::grpc::Status(::grpc::StatusCode::FAILED_PRECONDITION, "Psm not registered");
    }
    dynamic_channel_helper_map_[psm]->disconnect();
    return ::grpc::Status::OK;
  }

  ::grpc::Status FetchL2capData(::grpc::ServerContext* context, const ::bluetooth::facade::EventStreamRequest* request,
                                ::grpc::ServerWriter<classic::L2capPacket>* writer) override {
    return l2cap_stream_.HandleRequest(context, request, writer);
@@ -218,6 +245,10 @@ class L2capClassicModuleFacadeService : public L2capClassicModuleFacade::Service
          common::Bind(&L2capDynamicChannelHelper::on_connect_fail, common::Unretained(this)), handler_);
    }

    void disconnect() {
      channel_->Close();
    }

    void on_l2cap_service_registration_complete(DynamicChannelManager::RegistrationResult registration_result,
                                                std::unique_ptr<DynamicChannelService> service) {}

@@ -226,6 +257,15 @@ class L2capClassicModuleFacadeService : public L2capClassicModuleFacade::Service
      event.mutable_remote()->set_address(channel->GetDevice().ToString());
      facade_service_->connection_complete_stream_.OnIncomingEvent(event);
      channel_ = std::move(channel);
      channel_->RegisterOnCloseCallback(
          handler_, common::Bind(&L2capDynamicChannelHelper::on_close_callback, common::Unretained(this)));
    }

    void on_close_callback(hci::ErrorCode errorCode) {
      ConnectionCloseEvent event;
      event.mutable_remote()->set_address(channel_->GetDevice().ToString());
      event.set_reason(static_cast<uint32_t>(errorCode));
      facade_service_->connection_close_stream_.OnIncomingEvent(event);
    }

    void on_connect_fail(DynamicChannelManager::ConnectionResult result) {}
Loading