Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 943ef1aa authored by TreeHugger Robot's avatar TreeHugger Robot Committed by Android (Google) Code Review
Browse files

Merge "use cc_benchmark for buffer_transport_benchmark"

parents 597d2687 2f1ead99
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -54,7 +54,7 @@ cc_test {
    tags: ["optional"],
}

cc_test {
cc_benchmark {
    srcs: ["buffer_transport_benchmark.cpp"],
    static_libs: static_libraries,
    shared_libs: shared_libraries,
+136 −216
Original line number Diff line number Diff line
#include <android/native_window.h>
#include <base/logging.h>
#include <android-base/logging.h>
#include <benchmark/benchmark.h>
#include <binder/IPCThreadState.h>
#include <binder/IServiceManager.h>
#include <dvr/dvr_api.h>
#include <dvr/performance_client_api.h>
#include <gtest/gtest.h>
#include <gui/BufferItem.h>
#include <gui/BufferItemConsumer.h>
#include <gui/Surface.h>
#include <private/dvr/buffer_hub_queue_producer.h>
#include <utils/Trace.h>

#include <chrono>
#include <functional>
#include <mutex>
#include <iostream>
#include <thread>
#include <vector>

#include <poll.h>
#include <sys/wait.h>
#include <unistd.h>  // for pipe

// Use ALWAYS at the tag level. Control is performed manually during command
// line processing.
@@ -29,6 +29,7 @@

using namespace android;
using namespace android::dvr;
using ::benchmark::State;

static const String16 kBinderService = String16("bufferTransport");
static const uint32_t kBufferWidth = 100;
@@ -38,82 +39,13 @@ static const uint32_t kBufferFormat = HAL_PIXEL_FORMAT_BLOB;
static const uint64_t kBufferUsage =
    GRALLOC_USAGE_SW_READ_OFTEN | GRALLOC_USAGE_SW_WRITE_OFTEN;
static const int kMaxAcquiredImages = 1;
static const int kQueueDepth = 2;  // We are double buffering for this test.
static const size_t kMaxQueueCounts = 128;

static int gConcurrency = 1;  // 1 writer at a time
static int gIterations = 1000;  // 1K times
static int gSleepIntervalUs = 16 * 1000;  // 16ms

enum BufferTransportServiceCode {
  CREATE_BUFFER_QUEUE = IBinder::FIRST_CALL_TRANSACTION,
};

// A mininal cross process helper class based on a bidirectional pipe pair. This
// is used to signal that Binder-based BufferTransportService has finished
// initialization.
class Pipe {
 public:
  static std::tuple<Pipe, Pipe> CreatePipePair() {
    int a[2] = {-1, -1};
    int b[2] = {-1, -1};

    pipe(a);
    pipe(b);

    return std::make_tuple(Pipe(a[0], b[1]), Pipe(b[0], a[1]));
  }

  Pipe() = default;

  Pipe(Pipe&& other) {
    read_fd_ = other.read_fd_;
    write_fd_ = other.write_fd_;
    other.read_fd_ = 0;
    other.write_fd_ = 0;
  }

  Pipe& operator=(Pipe&& other) {
    Reset();
    read_fd_ = other.read_fd_;
    write_fd_ = other.write_fd_;
    other.read_fd_ = 0;
    other.write_fd_ = 0;
    return *this;
  }

  ~Pipe() { Reset(); }

  Pipe(const Pipe&) = delete;
  Pipe& operator=(const Pipe&) = delete;
  Pipe& operator=(const Pipe&&) = delete;

  bool IsValid() { return read_fd_ > 0 && write_fd_ > 0; }

  void Signal() {
    bool val = true;
    int error = write(write_fd_, &val, sizeof(val));
    ASSERT_GE(error, 0);
  };

  void Wait() {
    bool val = false;
    int error = read(read_fd_, &val, sizeof(val));
    ASSERT_GE(error, 0);
  }

  void Reset() {
    if (read_fd_)
      close(read_fd_);
    if (write_fd_)
      close(write_fd_);
  }

 private:
  int read_fd_ = -1;
  int write_fd_ = -1;
  Pipe(int read_fd, int write_fd) : read_fd_{read_fd}, write_fd_{write_fd} {}
};

// A binder services that minics a compositor that consumes buffers. It provides
// one Binder interface to create a new Surface for buffer producer to write
// into; while itself will carry out no-op buffer consuming by acquiring then
@@ -149,8 +81,6 @@ class BufferTransportService : public BBinder {
          buffer_item_consumer_(buffer_item_consumer) {}

    void onFrameAvailable(const BufferItem& /*item*/) override {
      std::unique_lock<std::mutex> autolock(service_->reader_mutex_);

      BufferItem buffer;
      status_t ret = 0;
      {
@@ -197,7 +127,6 @@ class BufferTransportService : public BBinder {
    sp<FrameListener> frame_listener_;
  };

  std::mutex reader_mutex_;
  std::vector<std::shared_ptr<BufferQueueHolder>> buffer_queues_;
};

@@ -225,32 +154,19 @@ class BinderBufferTransport : public BufferTransport {
 public:
  BinderBufferTransport() {}

  ~BinderBufferTransport() {
    if (client_pipe_.IsValid()) {
      client_pipe_.Signal();
      LOG(INFO) << "Client signals service to shut down.";
    }
  }

  int Start() override {
    // Fork a process to run a binder server. The parent process will return
    // a pipe here, and we use the pipe to signal the binder server to exit.
    client_pipe_ = CreateBinderServer();

    // Wait until service is ready.
    LOG(INFO) << "Service is ready for client.";
    client_pipe_.Wait();
    return 0;
  }

  sp<Surface> CreateSurface() override {
    sp<IServiceManager> sm = defaultServiceManager();
    service_ = sm->getService(kBinderService);
    if (service_ == nullptr) {
      LOG(ERROR) << "Failed to set the benchmark service.";
      return nullptr;
      LOG(ERROR) << "Failed to get the benchmark service.";
      return -EIO;
    }

    LOG(INFO) << "Binder server is ready for client.";
    return 0;
  }

  sp<Surface> CreateSurface() override {
    Parcel data;
    Parcel reply;
    int error = service_->transact(CREATE_BUFFER_QUEUE, data, &reply);
@@ -283,38 +199,7 @@ class BinderBufferTransport : public BufferTransport {
  }

 private:
  static Pipe CreateBinderServer() {
    std::tuple<Pipe, Pipe> pipe_pair = Pipe::CreatePipePair();
    pid_t pid = fork();
    if (pid) {
      // parent, i.e. the client side.
      ProcessState::self()->startThreadPool();
      LOG(INFO) << "Binder server pid: " << pid;
      return std::move(std::get<0>(pipe_pair));
    } else {
      // child, i.e. the service side.
      Pipe service_pipe = std::move(std::get<1>(pipe_pair));

      ProcessState::self()->startThreadPool();
      sp<IServiceManager> sm = defaultServiceManager();
      sp<BufferTransportService> service = new BufferTransportService;
      sm->addService(kBinderService, service, false);

      LOG(INFO) << "Binder Service Running...";

      service_pipe.Signal();
      service_pipe.Wait();

      LOG(INFO) << "Service Exiting...";
      exit(EXIT_SUCCESS);

      /* never get here */
      return {};
    }
  }

  sp<IBinder> service_;
  Pipe client_pipe_;
};

// BufferHub/PDX-based buffer transport.
@@ -378,7 +263,6 @@ class BufferHubTransport : public BufferTransport {
        const int num_events = ret;
        for (int i = 0; i < num_events; i++) {
          uint32_t surface_index = events[i].data.u32;
          // LOG(INFO) << "!!! handle queue events index: " << surface_index;
          buffer_queues_[surface_index]->consumer_queue_->HandleQueueEvents();
        }
      }
@@ -390,8 +274,6 @@ class BufferHubTransport : public BufferTransport {
  }

  sp<Surface> CreateSurface() override {
    std::lock_guard<std::mutex> autolock(queue_mutex_);

    auto new_queue = std::make_shared<BufferQueueHolder>();
    if (new_queue->producer_ == nullptr) {
      LOG(ERROR) << "Failed to create buffer producer.";
@@ -472,8 +354,6 @@ class BufferHubTransport : public BufferTransport {
  std::atomic<bool> stopped_;
  std::thread reader_thread_;

  // Mutex to guard epoll_fd_ and buffer_queues_.
  std::mutex queue_mutex_;
  EpollFileDescriptor epoll_fd_;
  std::vector<std::shared_ptr<BufferQueueHolder>> buffer_queues_;
};
@@ -486,11 +366,12 @@ enum TransportType {
// Main test suite, which supports two transport backend: 1) BinderBufferQueue,
// 2) BufferHubQueue. The test case drives the producer end of both transport
// backend by queuing buffers into the buffer queue by using ANativeWindow API.
class BufferTransportBenchmark
    : public ::testing::TestWithParam<TransportType> {
class BufferTransportBenchmark : public ::benchmark::Fixture {
 public:
  void SetUp() override {
    switch (GetParam()) {
  void SetUp(State& state) override {
    if (state.thread_index == 0) {
      const int transport = state.range(0);
      switch (transport) {
        case kBinderBufferTransport:
          transport_.reset(new BinderBufferTransport);
          break;
@@ -498,112 +379,142 @@ class BufferTransportBenchmark
          transport_.reset(new BufferHubTransport);
          break;
        default:
        FAIL() << "Unknown test case.";
          CHECK(false) << "Unknown test case.";
          break;
      }

      CHECK(transport_);
      const int ret = transport_->Start();
      CHECK_EQ(ret, 0);

      LOG(INFO) << "Transport backend running, transport=" << transport << ".";

      // Create surfaces for each thread.
      surfaces_.resize(state.threads);
      for (int i = 0; i < state.threads; i++) {
        // Common setup every thread needs.
        surfaces_[i] = transport_->CreateSurface();
        CHECK(surfaces_[i]);

        LOG(INFO) << "Surface initialized on thread " << i << ".";
      }
    }
  }

  void TearDown(State& state) override {
    if (state.thread_index == 0) {
      surfaces_.clear();
      transport_.reset();
      LOG(INFO) << "Tear down benchmark.";
    }
  }

 protected:
  void ProduceBuffers(sp<Surface> surface, int iterations, int sleep_usec) {
    ANativeWindow* window = static_cast<ANativeWindow*>(surface.get());
  std::unique_ptr<BufferTransport> transport_;
  std::vector<sp<Surface>> surfaces_;
};

BENCHMARK_DEFINE_F(BufferTransportBenchmark, Producers)(State& state) {
  ANativeWindow* window = nullptr;
  ANativeWindow_Buffer buffer;
  int32_t error = 0;

    for (int i = 0; i < iterations; i++) {
      usleep(sleep_usec);
  double total_gain_buffer_us = 0;
  double total_post_buffer_us = 0;
  int iterations = 0;

  while (state.KeepRunning()) {
    if (window == nullptr) {
      CHECK(surfaces_[state.thread_index]);
      window = static_cast<ANativeWindow*>(surfaces_[state.thread_index].get());

      // Lock buffers a couple time from the queue, so that we have the buffer
      // allocated.
      for (int i = 0; i < kQueueDepth; i++) {
        error = ANativeWindow_lock(window, &buffer,
                                   /*inOutDirtyBounds=*/nullptr);
        CHECK_EQ(error, 0);
        error = ANativeWindow_unlockAndPost(window);
        CHECK_EQ(error, 0);
      }
    }

    {
      ATRACE_NAME("GainBuffer");
      auto t1 = std::chrono::high_resolution_clock::now();
      error = ANativeWindow_lock(window, &buffer,
                                 /*inOutDirtyBounds=*/nullptr);
      auto t2 = std::chrono::high_resolution_clock::now();
      std::chrono::duration<double, std::micro> delta_us = t2 - t1;
      total_gain_buffer_us += delta_us.count();
    }
      ASSERT_EQ(error, 0);
    CHECK_EQ(error, 0);

    {
      ATRACE_NAME("PostBuffer");
      auto t1 = std::chrono::high_resolution_clock::now();
      error = ANativeWindow_unlockAndPost(window);
      auto t2 = std::chrono::high_resolution_clock::now();
      std::chrono::duration<double, std::micro> delta_us = t2 - t1;
      total_post_buffer_us += delta_us.count();
    }
      ASSERT_EQ(error, 0);
    }
  }
    CHECK_EQ(error, 0);

  std::unique_ptr<BufferTransport> transport_;
};
    iterations++;
  }

TEST_P(BufferTransportBenchmark, ContinuousLoad) {
  ASSERT_NE(transport_, nullptr);
  const int ret = transport_->Start();
  ASSERT_EQ(ret, 0);
  state.counters["gain_buffer_us"] = ::benchmark::Counter(
      total_gain_buffer_us / iterations, ::benchmark::Counter::kAvgThreads);
  state.counters["post_buffer_us"] = ::benchmark::Counter(
      total_post_buffer_us / iterations, ::benchmark::Counter::kAvgThreads);
  state.counters["producer_us"] = ::benchmark::Counter(
      (total_gain_buffer_us + total_post_buffer_us) / iterations,
      ::benchmark::Counter::kAvgThreads);
}

  LOG(INFO) << "Start Running.";
BENCHMARK_REGISTER_F(BufferTransportBenchmark, Producers)
    ->Unit(::benchmark::kMicrosecond)
    ->Ranges({{kBinderBufferTransport, kBufferHubTransport}})
    ->ThreadRange(1, 32);

  std::vector<std::thread> writer_threads;
  for (int i = 0; i < gConcurrency; i++) {
    std::thread writer_thread = std::thread([this]() {
      sp<Surface> surface = transport_->CreateSurface();
      ASSERT_NE(surface, nullptr);
static void runBinderServer() {
  ProcessState::self()->setThreadPoolMaxThreadCount(0);
  ProcessState::self()->startThreadPool();

      ASSERT_NO_FATAL_FAILURE(
          ProduceBuffers(surface, gIterations, gSleepIntervalUs));
  sp<IServiceManager> sm = defaultServiceManager();
  sp<BufferTransportService> service = new BufferTransportService;
  sm->addService(kBinderService, service, false);

      usleep(1000 * 100);
    });
  LOG(INFO) << "Binder server running...";

    writer_threads.push_back(std::move(writer_thread));
  while (true) {
    int stat, retval;
    retval = wait(&stat);
    if (retval == -1 && errno == ECHILD) {
      break;
    }

  for (auto& writer_thread : writer_threads) {
    writer_thread.join();
  }

  LOG(INFO) << "All done.";
};

INSTANTIATE_TEST_CASE_P(BufferTransportBenchmarkInstance,
                        BufferTransportBenchmark,
                        ::testing::ValuesIn({kBinderBufferTransport,
                                             kBufferHubTransport}));
  LOG(INFO) << "Service Exiting...";
}

// To run binder-based benchmark, use:
// adb shell buffer_transport_benchmark \
//   --gtest_filter="BufferTransportBenchmark.ContinuousLoad/0"
//   --benchmark_filter="BufferTransportBenchmark/ContinuousLoad/0/"
//
// To run bufferhub-based benchmark, use:
// adb shell buffer_transport_benchmark \
//   --gtest_filter="BufferTransportBenchmark.ContinuousLoad/1"
//   --benchmark_filter="BufferTransportBenchmark/ContinuousLoad/1/"
int main(int argc, char** argv) {
  bool tracing_enabled = false;

  // Parse arguments in addition to "--gtest_filter" paramters.
  // Parse arguments in addition to "--benchmark_filter" paramters.
  for (int i = 1; i < argc; i++) {
    if (std::string(argv[i]) == "--help") {
      std::cout << "Usage: binderThroughputTest [OPTIONS]" << std::endl;
      std::cout << "\t-c N: Specify number of concurrent writer threads, "
                   "(default: 1, max: 128)."
                << std::endl;
      std::cout << "\t-i N: Specify number of iterations, (default: 1000)."
                << std::endl;
      std::cout << "\t-s N: Specify sleep interval in usec, (default: 16000)."
                << std::endl;
      std::cout << "\t--trace: Enable systrace logging."
                << std::endl;
      return 0;
    }
    if (std::string(argv[i]) == "-c") {
      gConcurrency = atoi(argv[i + 1]);
      i++;
      continue;
    }
    if (std::string(argv[i]) == "-s") {
      gSleepIntervalUs = atoi(argv[i + 1]);
      i++;
      continue;
    }
    if (std::string(argv[i]) == "-i") {
      gIterations = atoi(argv[i + 1]);
      i++;
      continue;
    }
    if (std::string(argv[i]) == "--trace") {
      tracing_enabled = true;
      continue;
@@ -614,6 +525,15 @@ int main(int argc, char** argv) {
  atrace_setup();
  atrace_set_tracing_enabled(tracing_enabled);

  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
  pid_t pid = fork();
  if (pid == 0) {
    // parent, i.e. the client side.
    ProcessState::self()->startThreadPool();

    ::benchmark::Initialize(&argc, argv);
    ::benchmark::RunSpecifiedBenchmarks();
  } else {
    LOG(INFO) << "Benchmark process pid: " << pid;
    runBinderServer();
  }
}