Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 9d8bd095 authored by Corey Tabaka's avatar Corey Tabaka
Browse files

Return pdx::Status<T> from BufferHubQueue::Dequeue.

Switch to using Status<T> to return buffers or meaningful errors
from BufferHubQueue::Dequeue. This enables determining whether an
error is normal (e.g. timeout) or abnormal (e.g. disconnect).

Bug: 36401174
Test: buffer_hub_queue-test passes.
Change-Id: Ifef5f737a5e737b70d19bdbffd7544a993438e1c
parent 8a4e6a90
Loading
Loading
Loading
Loading
+39 −34
Original line number Diff line number Diff line
@@ -121,7 +121,8 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
                                   count() == 0 ? timeout : 0);

    if (ret == 0) {
      ALOGD_IF(TRACE, "Wait on epoll returns nothing before timeout.");
      ALOGI_IF(TRACE,
               "BufferHubQueue::WaitForBuffers: No events before timeout.");
      return count() != 0;
    }

@@ -139,7 +140,9 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
    for (int i = 0; i < num_events; i++) {
      int64_t index = static_cast<int64_t>(events[i].data.u64);

      ALOGD_IF(TRACE, "New BufferHubQueue event %d: index=%" PRId64, i, index);
      ALOGD_IF(TRACE,
               "BufferHubQueue::WaitForBuffers: event %d: index=%" PRId64, i,
               index);

      if (is_buffer_event_index(index)) {
        HandleBufferEvent(static_cast<size_t>(index), events[i]);
@@ -296,14 +299,12 @@ void BufferHubQueue::Enqueue(const std::shared_ptr<BufferHubBuffer>& buf,
  available_buffers_.Append(std::move(buffer_info));
}

std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout,
                                                         size_t* slot,
                                                         void* meta,
                                                         LocalHandle* fence) {
Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(
    int timeout, size_t* slot, void* meta, LocalHandle* fence) {
  ALOGD_IF(TRACE, "Dequeue: count=%zu, timeout=%d", count(), timeout);

  if (!WaitForBuffers(timeout))
    return nullptr;
    return ErrorStatus(ETIMEDOUT);

  std::shared_ptr<BufferHubBuffer> buf;
  BufferInfo& buffer_info = available_buffers_.Front();
@@ -321,7 +322,7 @@ std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout,

  if (!buf) {
    ALOGE("BufferHubQueue::Dequeue: Buffer to be dequeued is nullptr");
    return nullptr;
    return ErrorStatus(ENOBUFS);
  }

  if (meta) {
@@ -329,7 +330,7 @@ std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout,
              reinterpret_cast<uint8_t*>(meta));
  }

  return buf;
  return {std::move(buf)};
}

ProducerQueue::ProducerQueue(size_t meta_size)
@@ -420,9 +421,7 @@ int ProducerQueue::DetachBuffer(size_t slot) {
  auto status =
      InvokeRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(slot);
  if (!status) {
    ALOGE(
        "ProducerQueue::DetachBuffer failed to detach producer buffer through "
        "BufferHub, error: %s",
    ALOGE("ProducerQueue::DetachBuffer: Failed to detach producer buffer: %s",
          status.GetErrorMessage().c_str());
    return -status.error();
  }
@@ -430,17 +429,20 @@ int ProducerQueue::DetachBuffer(size_t slot) {
  return BufferHubQueue::DetachBuffer(slot);
}

std::shared_ptr<BufferProducer> ProducerQueue::Dequeue(
Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
    int timeout, size_t* slot, LocalHandle* release_fence) {
  if (slot == nullptr || release_fence == nullptr) {
    ALOGE(
        "ProducerQueue::Dequeue: invalid parameter, slot=%p, release_fence=%p",
    ALOGE("ProducerQueue::Dequeue: Invalid parameter: slot=%p release_fence=%p",
          slot, release_fence);
    return nullptr;
    return ErrorStatus(EINVAL);
  }

  auto buf = BufferHubQueue::Dequeue(timeout, slot, nullptr, release_fence);
  return std::static_pointer_cast<BufferProducer>(buf);
  auto buffer_status =
      BufferHubQueue::Dequeue(timeout, slot, nullptr, release_fence);
  if (!buffer_status)
    return buffer_status.error_status();

  return {std::static_pointer_cast<BufferProducer>(buffer_status.take())};
}

int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
@@ -471,9 +473,7 @@ ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
Status<size_t> ConsumerQueue::ImportBuffers() {
  auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
  if (!status) {
    ALOGE(
        "ConsumerQueue::ImportBuffers failed to import consumer buffer through "
        "BufferBub, error: %s",
    ALOGE("ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
          status.GetErrorMessage().c_str());
    return ErrorStatus(status.error());
  }
@@ -484,8 +484,7 @@ Status<size_t> ConsumerQueue::ImportBuffers() {

  auto buffer_handle_slots = status.take();
  for (auto& buffer_handle_slot : buffer_handle_slots) {
    ALOGD_IF(TRACE,
             "ConsumerQueue::ImportBuffers, new buffer, buffer_handle: %d",
    ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d",
             buffer_handle_slot.first.value());

    std::unique_ptr<BufferConsumer> buffer_consumer =
@@ -509,7 +508,7 @@ Status<size_t> ConsumerQueue::ImportBuffers() {

    ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
    if (ret < 0) {
      ALOGE("ConsumerQueue::ImportBuffers failed to add buffer, ret: %s",
      ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
            strerror(-ret));
      last_error = ret;
      continue;
@@ -530,7 +529,7 @@ int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
  return BufferHubQueue::AddBuffer(buf, slot);
}

std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(
Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
    int timeout, size_t* slot, void* meta, size_t meta_size,
    LocalHandle* acquire_fence) {
  if (meta_size != meta_size_) {
@@ -538,19 +537,23 @@ std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(
        "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
        "does not match metadata size (%zu) for the queue.",
        meta_size, meta_size_);
    return nullptr;
    return ErrorStatus(EINVAL);
  }

  if (slot == nullptr || acquire_fence == nullptr) {
    ALOGE(
        "ConsumerQueue::Dequeue: Invalid parameter, slot=%p, meta=%p, "
        "ConsumerQueue::Dequeue: Invalid parameter: slot=%p meta=%p "
        "acquire_fence=%p",
        slot, meta, acquire_fence);
    return nullptr;
    return ErrorStatus(EINVAL);
  }

  auto buf = BufferHubQueue::Dequeue(timeout, slot, meta, acquire_fence);
  return std::static_pointer_cast<BufferConsumer>(buf);
  auto buffer_status =
      BufferHubQueue::Dequeue(timeout, slot, meta, acquire_fence);
  if (!buffer_status)
    return buffer_status.error_status();

  return {std::static_pointer_cast<BufferConsumer>(buffer_status.take())};
}

int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
@@ -569,7 +572,9 @@ Status<void> ConsumerQueue::OnBufferAllocated() {
    ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
    return ErrorStatus(ENOBUFS);
  } else {
    ALOGD_IF(TRACE, "Imported %zu consumer buffers.", status.get());
    ALOGD_IF(TRACE,
             "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.",
             status.get());
    return {};
  }
}
+3 −1
Original line number Diff line number Diff line
@@ -138,11 +138,13 @@ status_t BufferHubQueueProducer::dequeueBuffer(

  for (size_t retry = 0; retry < BufferHubQueue::kMaxQueueCapacity; retry++) {
    LocalHandle fence;
    buffer_producer =
    auto buffer_status  =
        core_->producer_->Dequeue(core_->dequeue_timeout_ms_, &slot, &fence);
    if (!buffer_producer)
      return NO_MEMORY;

    buffer_producer = buffer_status.take();

    if (width == buffer_producer->width() &&
        height == buffer_producer->height() &&
        static_cast<uint32_t>(format) == buffer_producer->format()) {
+13 −11
Original line number Diff line number Diff line
@@ -129,8 +129,10 @@ class BufferHubQueue : public pdx::Client {
  // block. Specifying a timeout of -1 causes |Dequeue()| to block indefinitely,
  // while specifying a timeout equal to zero cause |Dequeue()| to return
  // immediately, even if no buffers are available.
  std::shared_ptr<BufferHubBuffer> Dequeue(int timeout, size_t* slot,
                                           void* meta, LocalHandle* fence);
  pdx::Status<std::shared_ptr<BufferHubBuffer>> Dequeue(int timeout,
                                                        size_t* slot,
                                                        void* meta,
                                                        LocalHandle* fence);

  // Wait for buffers to be released and re-add them to the queue.
  bool WaitForBuffers(int timeout);
@@ -341,8 +343,8 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> {
  // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode,
  // and caller should call Post() once it's done writing to release the buffer
  // to the consumer side.
  std::shared_ptr<BufferProducer> Dequeue(int timeout, size_t* slot,
                                          LocalHandle* release_fence);
  pdx::Status<std::shared_ptr<BufferProducer>> Dequeue(
      int timeout, size_t* slot, LocalHandle* release_fence);

 private:
  friend BASE;
@@ -393,17 +395,17 @@ class ConsumerQueue : public BufferHubQueue {
  // Dequeue() is done with the corect metadata type and size with those used
  // when the buffer is orignally created.
  template <typename Meta>
  std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, Meta* meta,
                                          LocalHandle* acquire_fence) {
  pdx::Status<std::shared_ptr<BufferConsumer>> Dequeue(
      int timeout, size_t* slot, Meta* meta, LocalHandle* acquire_fence) {
    return Dequeue(timeout, slot, meta, sizeof(*meta), acquire_fence);
  }
  std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot,
                                          LocalHandle* acquire_fence) {
  pdx::Status<std::shared_ptr<BufferConsumer>> Dequeue(
      int timeout, size_t* slot, LocalHandle* acquire_fence) {
    return Dequeue(timeout, slot, nullptr, 0, acquire_fence);
  }

  std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, void* meta,
                                          size_t meta_size,
  pdx::Status<std::shared_ptr<BufferConsumer>> Dequeue(
      int timeout, size_t* slot, void* meta, size_t meta_size,
      LocalHandle* acquire_fence);

 private:
+63 −32
Original line number Diff line number Diff line
@@ -64,12 +64,16 @@ TEST_F(BufferHubQueueTest, TestDequeue) {
  for (size_t i = 0; i < nb_dequeue_times; i++) {
    size_t slot;
    LocalHandle fence;
    auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
    auto p1_status = producer_queue_->Dequeue(0, &slot, &fence);
    ASSERT_TRUE(p1_status.ok());
    auto p1 = p1_status.take();
    ASSERT_NE(nullptr, p1);
    size_t mi = i;
    ASSERT_EQ(p1->Post(LocalHandle(), &mi, sizeof(mi)), 0);
    size_t mo;
    auto c1 = consumer_queue_->Dequeue(100, &slot, &mo, &fence);
    auto c1_status = consumer_queue_->Dequeue(100, &slot, &mo, &fence);
    ASSERT_TRUE(c1_status.ok());
    auto c1 = c1_status.take();
    ASSERT_NE(nullptr, c1);
    ASSERT_EQ(mi, mo);
    c1->Release(LocalHandle());
@@ -94,23 +98,27 @@ TEST_F(BufferHubQueueTest, TestProducerConsumer) {
    ASSERT_EQ(consumer_queue_->count(), 0U);
    // Consumer queue does not import buffers until a dequeue is issued.
    ASSERT_EQ(consumer_queue_->capacity(), i);
    // Dequeue returns nullptr since no buffer is ready to consumer, but
    // Dequeue returns timeout since no buffer is ready to consumer, but
    // this implicitly triggers buffer import and bump up |capacity|.
    LocalHandle fence;
    auto consumer_null = consumer_queue_->Dequeue(0, &slot, &seq, &fence);
    ASSERT_EQ(nullptr, consumer_null);
    auto status = consumer_queue_->Dequeue(0, &slot, &seq, &fence);
    ASSERT_FALSE(status.ok());
    ASSERT_EQ(ETIMEDOUT, status.error());
    ASSERT_EQ(consumer_queue_->capacity(), i + 1);
  }

  for (size_t i = 0; i < nb_buffer; i++) {
    LocalHandle fence;
    // First time, there is no buffer available to dequeue.
    auto buffer_null = consumer_queue_->Dequeue(0, &slot, &seq, &fence);
    ASSERT_EQ(nullptr, buffer_null);
    auto consumer_status = consumer_queue_->Dequeue(0, &slot, &seq, &fence);
    ASSERT_FALSE(consumer_status.ok());
    ASSERT_EQ(ETIMEDOUT, consumer_status.error());

    // Make sure Producer buffer is Post()'ed so that it's ready to Accquire
    // in the consumer's Dequeue() function.
    auto producer = producer_queue_->Dequeue(0, &slot, &fence);
    auto producer_status = producer_queue_->Dequeue(0, &slot, &fence);
    ASSERT_TRUE(producer_status.ok());
    auto producer = producer_status.take();
    ASSERT_NE(nullptr, producer);

    uint64_t seq_in = static_cast<uint64_t>(i);
@@ -118,7 +126,9 @@ TEST_F(BufferHubQueueTest, TestProducerConsumer) {

    // Second time, the just |Post()|'ed buffer should be dequeued.
    uint64_t seq_out = 0;
    auto consumer = consumer_queue_->Dequeue(0, &slot, &seq_out, &fence);
    consumer_status = consumer_queue_->Dequeue(0, &slot, &seq_out, &fence);
    ASSERT_TRUE(consumer_status.ok());
    auto consumer = consumer_status.take();
    ASSERT_NE(nullptr, consumer);
    ASSERT_EQ(seq_in, seq_out);
  }
@@ -140,11 +150,15 @@ TEST_F(BufferHubQueueTest, TestMetadata) {
  for (auto mi : ms) {
    size_t slot;
    LocalHandle fence;
    auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
    auto p1_status = producer_queue_->Dequeue(0, &slot, &fence);
    ASSERT_TRUE(p1_status.ok());
    auto p1 = p1_status.take();
    ASSERT_NE(nullptr, p1);
    ASSERT_EQ(p1->Post(LocalHandle(-1), &mi, sizeof(mi)), 0);
    TestMetadata mo;
    auto c1 = consumer_queue_->Dequeue(0, &slot, &mo, &fence);
    auto c1_status = consumer_queue_->Dequeue(0, &slot, &mo, &fence);
    ASSERT_TRUE(c1_status.ok());
    auto c1 = c1_status.take();
    ASSERT_EQ(mi.a, mo.a);
    ASSERT_EQ(mi.b, mo.b);
    ASSERT_EQ(mi.c, mo.c);
@@ -159,14 +173,16 @@ TEST_F(BufferHubQueueTest, TestMetadataMismatch) {
  int64_t mi = 3;
  size_t slot;
  LocalHandle fence;
  auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
  auto p1_status = producer_queue_->Dequeue(0, &slot, &fence);
  ASSERT_TRUE(p1_status.ok());
  auto p1 = p1_status.take();
  ASSERT_NE(nullptr, p1);
  ASSERT_EQ(p1->Post(LocalHandle(-1), &mi, sizeof(mi)), 0);

  int32_t mo;
  // Acquire a buffer with mismatched metadata is not OK.
  auto c1 = consumer_queue_->Dequeue(0, &slot, &mo, &fence);
  ASSERT_EQ(nullptr, c1);
  auto c1_status = consumer_queue_->Dequeue(0, &slot, &mo, &fence);
  ASSERT_FALSE(c1_status.ok());
}

TEST_F(BufferHubQueueTest, TestEnqueue) {
@@ -175,13 +191,15 @@ TEST_F(BufferHubQueueTest, TestEnqueue) {

  size_t slot;
  LocalHandle fence;
  auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
  auto p1_status = producer_queue_->Dequeue(0, &slot, &fence);
  ASSERT_TRUE(p1_status.ok());
  auto p1 = p1_status.take();
  ASSERT_NE(nullptr, p1);

  int64_t mo;
  producer_queue_->Enqueue(p1, slot);
  auto c1 = consumer_queue_->Dequeue(0, &slot, &mo, &fence);
  ASSERT_EQ(nullptr, c1);
  auto c1_status = consumer_queue_->Dequeue(0, &slot, &mo, &fence);
  ASSERT_FALSE(c1_status.ok());
}

TEST_F(BufferHubQueueTest, TestAllocateBuffer) {
@@ -190,13 +208,16 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) {
  size_t s1;
  AllocateBuffer();
  LocalHandle fence;
  auto p1 = producer_queue_->Dequeue(0, &s1, &fence);
  auto p1_status = producer_queue_->Dequeue(0, &s1, &fence);
  ASSERT_TRUE(p1_status.ok());
  auto p1 = p1_status.take();
  ASSERT_NE(nullptr, p1);

  // producer queue is exhausted
  size_t s2;
  auto p2_null = producer_queue_->Dequeue(0, &s2, &fence);
  ASSERT_EQ(nullptr, p2_null);
  auto p2_status = producer_queue_->Dequeue(0, &s2, &fence);
  ASSERT_FALSE(p2_status.ok());
  ASSERT_EQ(ETIMEDOUT, p2_status.error());

  // dynamically add buffer.
  AllocateBuffer();
@@ -204,7 +225,9 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) {
  ASSERT_EQ(producer_queue_->capacity(), 2U);

  // now we can dequeue again
  auto p2 = producer_queue_->Dequeue(0, &s2, &fence);
  p2_status = producer_queue_->Dequeue(0, &s2, &fence);
  ASSERT_TRUE(p2_status.ok());
  auto p2 = p2_status.take();
  ASSERT_NE(nullptr, p2);
  ASSERT_EQ(producer_queue_->count(), 0U);
  // p1 and p2 should have different slot number
@@ -217,20 +240,24 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) {
  int64_t seq = 1;
  ASSERT_EQ(p1->Post(LocalHandle(), seq), 0);
  size_t cs1, cs2;
  auto c1 = consumer_queue_->Dequeue(0, &cs1, &seq, &fence);
  auto c1_status = consumer_queue_->Dequeue(0, &cs1, &seq, &fence);
  ASSERT_TRUE(c1_status.ok());
  auto c1 = c1_status.take();
  ASSERT_NE(nullptr, c1);
  ASSERT_EQ(consumer_queue_->count(), 0U);
  ASSERT_EQ(consumer_queue_->capacity(), 2U);
  ASSERT_EQ(cs1, s1);

  ASSERT_EQ(p2->Post(LocalHandle(), seq), 0);
  auto c2 = consumer_queue_->Dequeue(0, &cs2, &seq, &fence);
  auto c2_status = consumer_queue_->Dequeue(0, &cs2, &seq, &fence);
  ASSERT_TRUE(c2_status.ok());
  auto c2 = c2_status.take();
  ASSERT_NE(nullptr, c2);
  ASSERT_EQ(cs2, s2);
}

TEST_F(BufferHubQueueTest, TestUsageSetMask) {
  const int set_mask = GRALLOC_USAGE_SW_WRITE_OFTEN;
  const uint32_t set_mask = GRALLOC_USAGE_SW_WRITE_OFTEN;
  ASSERT_TRUE(CreateQueues<int64_t>(set_mask, 0, 0, 0));

  // When allocation, leave out |set_mask| from usage bits on purpose.
@@ -238,15 +265,17 @@ TEST_F(BufferHubQueueTest, TestUsageSetMask) {
  int ret = producer_queue_->AllocateBuffer(
      kBufferWidth, kBufferHeight, kBufferFormat, kBufferUsage & ~set_mask,
      kBufferSliceCount, &slot);
  ASSERT_EQ(ret, 0);
  ASSERT_EQ(0, ret);

  LocalHandle fence;
  auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
  auto p1_status = producer_queue_->Dequeue(0, &slot, &fence);
  ASSERT_TRUE(p1_status.ok());
  auto p1 = p1_status.take();
  ASSERT_EQ(p1->usage() & set_mask, set_mask);
}

TEST_F(BufferHubQueueTest, TestUsageClearMask) {
  const int clear_mask = GRALLOC_USAGE_SW_WRITE_OFTEN;
  const uint32_t clear_mask = GRALLOC_USAGE_SW_WRITE_OFTEN;
  ASSERT_TRUE(CreateQueues<int64_t>(0, clear_mask, 0, 0));

  // When allocation, add |clear_mask| into usage bits on purpose.
@@ -254,15 +283,17 @@ TEST_F(BufferHubQueueTest, TestUsageClearMask) {
  int ret = producer_queue_->AllocateBuffer(
      kBufferWidth, kBufferHeight, kBufferFormat, kBufferUsage | clear_mask,
      kBufferSliceCount, &slot);
  ASSERT_EQ(ret, 0);
  ASSERT_EQ(0, ret);

  LocalHandle fence;
  auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
  ASSERT_EQ(p1->usage() & clear_mask, 0);
  auto p1_status = producer_queue_->Dequeue(0, &slot, &fence);
  ASSERT_TRUE(p1_status.ok());
  auto p1 = p1_status.take();
  ASSERT_EQ(0u, p1->usage() & clear_mask);
}

TEST_F(BufferHubQueueTest, TestUsageDenySetMask) {
  const int deny_set_mask = GRALLOC_USAGE_SW_WRITE_OFTEN;
  const uint32_t deny_set_mask = GRALLOC_USAGE_SW_WRITE_OFTEN;
  ASSERT_TRUE(CreateQueues<int64_t>(0, 0, deny_set_mask, 0));

  // Now that |deny_set_mask| is illegal, allocation without those bits should
@@ -281,7 +312,7 @@ TEST_F(BufferHubQueueTest, TestUsageDenySetMask) {
}

TEST_F(BufferHubQueueTest, TestUsageDenyClearMask) {
  const int deny_clear_mask = GRALLOC_USAGE_SW_WRITE_OFTEN;
  const uint32_t deny_clear_mask = GRALLOC_USAGE_SW_WRITE_OFTEN;
  ASSERT_TRUE(CreateQueues<int64_t>(0, 0, 0, deny_clear_mask));

  // Now that clearing |deny_clear_mask| is illegal (i.e. setting these bits are