Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit b7ca5dee authored by Corey Tabaka's avatar Corey Tabaka
Browse files

Clean up BufferHubQueue API and internal bookkeeping.

- Simplify buffer hangup accounting.
- Add extra checks to gracefully handle the epoll set and slots array
  being out of sync.
- Add tests for detaching buffers.
- Switch to using Status<T> for all return/error values.
- Fix minor bug in BufferHubQueueProducer from earlier Status<T>
  return value change.

Bug: 36401174
Test: buffer_hub_queue-test passes.
Change-Id: If7f86a45cc048dc77daa2ede56585d3f882dd24f
parent 5d67165b
Loading
Loading
Loading
Loading
+246 −183
Original line number Diff line number Diff line
@@ -23,34 +23,61 @@

using android::pdx::ErrorStatus;
using android::pdx::LocalChannelHandle;
using android::pdx::LocalHandle;
using android::pdx::Status;

namespace android {
namespace dvr {

namespace {

// Polls an fd for the given events.
Status<int> PollEvents(int fd, short events) {
  const int kTimeoutMs = 0;
  pollfd pfd{fd, events, 0};
  const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
  if (count < 0) {
    return ErrorStatus(errno);
  } else if (count == 0) {
    return ErrorStatus(ETIMEDOUT);
  } else {
    return {pfd.revents};
  }
}

// Polls a buffer for the given events, taking care to do the proper
// translation.
Status<int> PollEvents(const std::shared_ptr<BufferHubBuffer>& buffer,
                       short events) {
  auto poll_status = PollEvents(buffer->event_fd(), events);
  if (!poll_status)
    return poll_status;

  return buffer->GetEventMask(poll_status.get());
}

std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
  return {static_cast<int32_t>(value >> 32),
          static_cast<int32_t>(value & ((1ull << 32) - 1))};
}

uint64_t Stuff(int32_t a, int32_t b) {
  const uint32_t ua = static_cast<uint32_t>(a);
  const uint32_t ub = static_cast<uint32_t>(b);
  return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub);
}

}  // anonymous namespace

BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
    : Client{pdx::default_transport::ClientChannel::Create(
          std::move(channel_handle))},
      meta_size_(0),
      buffers_(BufferHubQueue::kMaxQueueCapacity),
      epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
      available_buffers_(BufferHubQueue::kMaxQueueCapacity),
      fences_(BufferHubQueue::kMaxQueueCapacity),
      capacity_(0),
      id_(-1) {
          std::move(channel_handle))} {
  Initialize();
}

BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
    : Client{pdx::default_transport::ClientChannelFactory::Create(
          endpoint_path)},
      meta_size_(0),
      buffers_(BufferHubQueue::kMaxQueueCapacity),
      epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
      available_buffers_(BufferHubQueue::kMaxQueueCapacity),
      fences_(BufferHubQueue::kMaxQueueCapacity),
      capacity_(0),
      id_(-1) {
    : Client{
          pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} {
  Initialize();
}

@@ -62,9 +89,9 @@ void BufferHubQueue::Initialize() {
    return;
  }

  epoll_event event = {.events = EPOLLIN | EPOLLET,
                       .data = {.u64 = static_cast<uint64_t>(
                                    BufferHubQueue::kEpollQueueEventIndex)}};
  epoll_event event = {
      .events = EPOLLIN | EPOLLET,
      .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
  ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
  if (ret < 0) {
    ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
@@ -87,7 +114,6 @@ Status<void> BufferHubQueue::ImportQueue() {
void BufferHubQueue::SetupQueue(size_t meta_size_bytes, int id) {
  meta_size_ = meta_size_bytes;
  id_ = id;
  meta_buffer_tmp_.reset(meta_size_ > 0 ? new uint8_t[meta_size_] : nullptr);
}

std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
@@ -152,19 +178,24 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
    // one for each buffer, in the queue and one extra event for the queue
    // client itself.
    for (int i = 0; i < num_events; i++) {
      int64_t index = static_cast<int64_t>(events[i].data.u64);
      int32_t event_fd;
      int32_t index;
      std::tie(event_fd, index) = Unstuff(events[i].data.u64);

      ALOGD_IF(TRACE,
               "BufferHubQueue::WaitForBuffers: event %d: index=%" PRId64, i,
               index);
               "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
               i, event_fd, index);

      if (is_buffer_event_index(index)) {
        HandleBufferEvent(static_cast<size_t>(index), events[i].events);
        HandleBufferEvent(static_cast<size_t>(index), event_fd,
                          events[i].events);
      } else if (is_queue_event_index(index)) {
        HandleQueueEvent(events[i].events);
      } else {
        ALOGW("BufferHubQueue::WaitForBuffers: Unknown event index: %" PRId64,
              index);
        ALOGW(
            "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d "
            "index=%d",
            event_fd, index);
      }
    }
  } while (count() == 0 && capacity() > 0 && !hung_up());
@@ -172,52 +203,72 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
  return count() != 0;
}

void BufferHubQueue::HandleBufferEvent(size_t slot, int poll_events) {
  auto buffer = buffers_[slot];
  if (!buffer) {
Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
                                               int poll_events) {
  if (!buffers_[slot]) {
    ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
    return;
    return ErrorStatus(ENOENT);
  }

  auto status = buffer->GetEventMask(poll_events);
  auto status = buffers_[slot]->GetEventMask(poll_events);
  if (!status) {
    ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
          status.GetErrorMessage().c_str());
    return;
    return status.error_status();
  }

  const int events = status.get();
  if (events & EPOLLIN) {
    const int ret = OnBufferReady(buffer, &fences_[slot]);
    if (ret == 0 || ret == -EALREADY || ret == -EBUSY) {
    auto entry_status = OnBufferReady(buffers_[slot], slot);
    if (entry_status.ok() || entry_status.error() == EALREADY) {
      // Only enqueue the buffer if it moves to or is already in the state
      // requested in OnBufferReady(). If the buffer is busy this means that the
      // buffer moved from released to posted when a new consumer was created
      // before the ProducerQueue had a chance to regain it. This is a valid
      // transition that we have to handle because edge triggered poll events
      // latch the ready state even if it is later de-asserted -- don't enqueue
      // or print an error log in this case.
      if (ret != -EBUSY)
        Enqueue(buffer, slot);
      // requested in OnBufferReady().
      return Enqueue(entry_status.take());
    } else if (entry_status.error() == EBUSY) {
      // If the buffer is busy this means that the buffer moved from released to
      // posted when a new consumer was created before the ProducerQueue had a
      // chance to regain it. This is a valid transition that we have to handle
      // because edge triggered poll events latch the ready state even if it is
      // later de-asserted -- don't enqueue or print an error log in this case.
    } else {
      ALOGE(
          "BufferHubQueue::HandleBufferEvent: Failed to set buffer ready, "
          "queue_id=%d buffer_id=%d: %s",
          id(), buffer->id(), strerror(-ret));
          id(), buffers_[slot]->id(), entry_status.GetErrorMessage().c_str());
    }
  } else if (events & EPOLLHUP) {
    // This might be caused by producer replacing an existing buffer slot, or
    // when BufferHubQueue is shutting down. For the first case, currently the
    // epoll FD is cleaned up when the replacement consumer client is imported,
    // we shouldn't detach again if |epollhub_pending_[slot]| is set.
    // Check to see if the current buffer in the slot hung up. This is a bit of
    // paranoia to deal with the epoll set getting out of sync with the buffer
    // slots.
    auto poll_status = PollEvents(buffers_[slot], POLLIN);
    if (!poll_status && poll_status.error() != ETIMEDOUT) {
      ALOGE("BufferHubQueue::HandleBufferEvent: Failed to poll buffer: %s",
            poll_status.GetErrorMessage().c_str());
      return poll_status.error_status();
    }

    const bool hangup_pending = status.ok() && (poll_status.get() & EPOLLHUP);

    ALOGW(
        "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP at slot: %zu, "
        "buffer event fd: %d, EPOLLHUP pending: %d",
        slot, buffer->event_fd(), int{epollhup_pending_[slot]});
    if (epollhup_pending_[slot]) {
      epollhup_pending_[slot] = false;
        "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
        "event_fd=%d buffer_id=%d hangup_pending=%d poll_status=%x",
        slot, buffers_[slot]->event_fd(), buffers_[slot]->id(), hangup_pending,
        poll_status.get());

    if (hangup_pending) {
      return DetachBuffer(slot);
    } else {
      DetachBuffer(slot);
      // Clean up the bookkeeping for the event fd. This is a bit of paranoia to
      // deal with the epoll set getting out of sync with the buffer slots.
      // Hitting this path should be very unusual.
      const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, event_fd, nullptr);
      if (ret < 0) {
        ALOGE(
            "BufferHubQueue::HandleBufferEvent: Failed to remove fd=%d from "
            "epoll set: %s",
            event_fd, strerror(-ret));
        return ErrorStatus(-ret);
      }
    }
  } else {
    ALOGW(
@@ -225,14 +276,16 @@ void BufferHubQueue::HandleBufferEvent(size_t slot, int poll_events) {
        "events=%d",
        slot, events);
  }

  return {};
}

void BufferHubQueue::HandleQueueEvent(int poll_event) {
Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
  auto status = GetEventMask(poll_event);
  if (!status) {
    ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
          status.GetErrorMessage().c_str());
    return;
    return status.error_status();
  }

  const int events = status.get();
@@ -250,111 +303,97 @@ void BufferHubQueue::HandleQueueEvent(int poll_event) {
  } else {
    ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
  }

  return {};
}

int BufferHubQueue::AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf,
                              size_t slot) {
Status<void> BufferHubQueue::AddBuffer(
    const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
  ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu",
           buffer->id(), slot);

  if (is_full()) {
    // TODO(jwcai) Move the check into Producer's AllocateBuffer and consumer's
    // import buffer.
    ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
          capacity_);
    return -E2BIG;
    return ErrorStatus(E2BIG);
  }

  if (buffers_[slot] != nullptr) {
    // Replace the buffer if the slot is preoccupied. This could happen when the
  if (buffers_[slot]) {
    // Replace the buffer if the slot is occupied. This could happen when the
    // producer side replaced the slot with a newly allocated buffer. Detach the
    // buffer before setting up with the new one.
    DetachBuffer(slot);
    epollhup_pending_[slot] = true;
    auto detach_status = DetachBuffer(slot);
    if (!detach_status)
      return detach_status.error_status();
  }

  epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u64 = slot}};
  const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buf->event_fd(), &event);
  epoll_event event = {.events = EPOLLIN | EPOLLET,
                       .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
  const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buffer->event_fd(), &event);
  if (ret < 0) {
    ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
          strerror(-ret));
    return ret;
    return ErrorStatus(-ret);
  }

  buffers_[slot] = buf;
  buffers_[slot] = buffer;
  capacity_++;
  return 0;
  return {};
}

int BufferHubQueue::DetachBuffer(size_t slot) {
  auto& buf = buffers_[slot];
  if (buf == nullptr) {
    ALOGE("BufferHubQueue::DetachBuffer: Invalid slot: %zu", slot);
    return -EINVAL;
  }
Status<void> BufferHubQueue::DetachBuffer(size_t slot) {
  ALOGD_IF(TRACE, "BufferHubQueue::DetachBuffer: slot=%zu", slot);

  const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, buf->event_fd(), nullptr);
  if (buffers_[slot]) {
    const int ret =
        epoll_fd_.Control(EPOLL_CTL_DEL, buffers_[slot]->event_fd(), nullptr);
    if (ret < 0) {
      ALOGE(
        "BufferHubQueue::DetachBuffer: Failed to detach buffer from epoll set: "
          "BufferHubQueue::DetachBuffer: Failed to detach buffer from epoll "
          "set: "
          "%s",
          strerror(-ret));
    return ret;
      return ErrorStatus(-ret);
    }

    buffers_[slot] = nullptr;
    capacity_--;
  return 0;
  }

void BufferHubQueue::Enqueue(const std::shared_ptr<BufferHubBuffer>& buf,
                             size_t slot) {
  if (count() == capacity_) {
    ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
    return;
  return {};
}

  // Set slot buffer back to vector.
  // TODO(jwcai) Here have to dynamically allocate BufferInfo::metadata due to
  // the limitation of the RingBuffer we are using. Would be better to refactor
  // that.
  BufferInfo buffer_info(slot, meta_size_);
  buffer_info.buffer = buf;
  // Swap metadata loaded during onBufferReady into vector.
  std::swap(buffer_info.metadata, meta_buffer_tmp_);

  available_buffers_.Append(std::move(buffer_info));
Status<void> BufferHubQueue::Enqueue(Entry entry) {
  if (!is_full()) {
    available_buffers_.Append(std::move(entry));
    return {};
  } else {
    ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
    return ErrorStatus(E2BIG);
  }
}

Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(
    int timeout, size_t* slot, void* meta, LocalHandle* fence) {
  ALOGD_IF(TRACE, "Dequeue: count=%zu, timeout=%d", count(), timeout);
  ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(),
           timeout);

  if (!WaitForBuffers(timeout))
    return ErrorStatus(ETIMEDOUT);

  std::shared_ptr<BufferHubBuffer> buf;
  BufferInfo& buffer_info = available_buffers_.Front();

  *fence = std::move(fences_[buffer_info.slot]);

  // Report current pos as the output slot.
  std::swap(buffer_info.slot, *slot);
  // Swap buffer from vector to be returned later.
  std::swap(buffer_info.buffer, buf);
  // Swap metadata from vector into tmp so that we can write out to |meta|.
  std::swap(buffer_info.metadata, meta_buffer_tmp_);

  available_buffers_.PopFront();

  if (!buf) {
    ALOGE("BufferHubQueue::Dequeue: Buffer to be dequeued is nullptr");
    return ErrorStatus(ENOBUFS);
  }
  auto& entry = available_buffers_.Front();

  if (meta) {
    std::copy(meta_buffer_tmp_.get(), meta_buffer_tmp_.get() + meta_size_,
  std::shared_ptr<BufferHubBuffer> buffer = std::move(entry.buffer);
  *slot = entry.slot;
  *fence = std::move(entry.fence);
  if (meta && entry.metadata) {
    std::copy(entry.metadata.get(), entry.metadata.get() + meta_size_,
              reinterpret_cast<uint8_t*>(meta));
  }

  return {std::move(buf)};
  available_buffers_.PopFront();

  return {std::move(buffer)};
}

ProducerQueue::ProducerQueue(size_t meta_size)
@@ -388,28 +427,29 @@ ProducerQueue::ProducerQueue(size_t meta_size, uint64_t usage_set_mask,
  SetupQueue(status.get().meta_size_bytes, status.get().id);
}

int ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
                                  uint32_t layer_count, uint32_t format,
                                  uint64_t usage, size_t* out_slot) {
Status<void> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
                                           uint32_t layer_count,
                                           uint32_t format, uint64_t usage,
                                           size_t* out_slot) {
  if (out_slot == nullptr) {
    ALOGE("ProducerQueue::AllocateBuffer: Parameter out_slot cannot be null.");
    return -EINVAL;
    return ErrorStatus(EINVAL);
  }

  if (is_full()) {
    ALOGE("ProducerQueue::AllocateBuffer queue is at maximum capacity: %zu",
          capacity());
    return -E2BIG;
    return ErrorStatus(E2BIG);
  }

  const size_t kBufferCount = 1U;
  const size_t kBufferCount = 1u;
  Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
      InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
          width, height, layer_count, format, usage, kBufferCount);
  if (!status) {
    ALOGE("ProducerQueue::AllocateBuffer failed to create producer buffer: %s",
          status.GetErrorMessage().c_str());
    return -status.error();
    return status.error_status();
  }

  auto buffer_handle_slots = status.take();
@@ -429,27 +469,26 @@ int ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
                   buffer_slot);
}

int ProducerQueue::AddBuffer(const std::shared_ptr<BufferProducer>& buf,
                             size_t slot) {
Status<void> ProducerQueue::AddBuffer(
    const std::shared_ptr<BufferProducer>& buffer, size_t slot) {
  ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
           id(), buf->id(), slot);
           id(), buffer->id(), slot);
  // For producer buffer, we need to enqueue the newly added buffer
  // immediately. Producer queue starts with all buffers in available state.
  const int ret = BufferHubQueue::AddBuffer(buf, slot);
  if (ret < 0)
    return ret;
  auto status = BufferHubQueue::AddBuffer(buffer, slot);
  if (!status)
    return status;

  Enqueue(buf, slot);
  return 0;
  return Enqueue(buffer, slot);
}

int ProducerQueue::DetachBuffer(size_t slot) {
Status<void> ProducerQueue::DetachBuffer(size_t slot) {
  auto status =
      InvokeRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(slot);
  if (!status) {
    ALOGE("ProducerQueue::DetachBuffer: Failed to detach producer buffer: %s",
          status.GetErrorMessage().c_str());
    return -status.error();
    return status.error_status();
  }

  return BufferHubQueue::DetachBuffer(slot);
@@ -471,12 +510,22 @@ Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
  return {std::static_pointer_cast<BufferProducer>(buffer_status.take())};
}

int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                                 LocalHandle* release_fence) {
  ALOGD_IF(TRACE, "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
           id(), buf->id());
  auto buffer = std::static_pointer_cast<BufferProducer>(buf);
  return buffer->Gain(release_fence);
Status<BufferHubQueue::Entry> ProducerQueue::OnBufferReady(
    const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
  ALOGD_IF(TRACE,
           "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d slot=%zu",
           id(), buffer->id(), slot);

  // Avoid taking a transient reference, buffer is valid for the duration of
  // this method call.
  auto* producer_buffer = static_cast<BufferProducer*>(buffer.get());
  LocalHandle release_fence;

  const int ret = producer_buffer->Gain(&release_fence);
  if (ret < 0)
    return ErrorStatus(-ret);
  else
    return {{buffer, nullptr, std::move(release_fence), slot}};
}

ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
@@ -503,12 +552,12 @@ Status<size_t> ConsumerQueue::ImportBuffers() {
  if (!status) {
    ALOGE("ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
          status.GetErrorMessage().c_str());
    return ErrorStatus(status.error());
    return status.error_status();
  }

  int ret;
  int last_error = 0;
  int imported_buffers = 0;
  Status<void> last_error;
  size_t imported_buffers_count = 0;

  auto buffer_handle_slots = status.take();
  for (auto& buffer_handle_slot : buffer_handle_slots) {
@@ -530,53 +579,52 @@ Status<size_t> ConsumerQueue::ImportBuffers() {
            "ConsumerQueue::ImportBuffers: Failed to set ignored state on "
            "imported buffer buffer_id=%d: %s",
            buffer_consumer->id(), strerror(-ret));
        last_error = ret;
        last_error = ErrorStatus(-ret);
      }
    }

    ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
    if (ret < 0) {
    auto add_status =
        AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
    if (!add_status) {
      ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
            strerror(-ret));
      last_error = ret;
      continue;
            add_status.GetErrorMessage().c_str());
      last_error = add_status;
    } else {
      imported_buffers++;
      imported_buffers_count++;
    }
  }

  if (imported_buffers > 0)
    return {imported_buffers};
  if (imported_buffers_count > 0)
    return {imported_buffers_count};
  else
    return ErrorStatus(-last_error);
    return last_error.error_status();
}

int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
                             size_t slot) {
Status<void> ConsumerQueue::AddBuffer(
    const std::shared_ptr<BufferConsumer>& buffer, size_t slot) {
  ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
           id(), buf->id(), slot);
  const int ret = BufferHubQueue::AddBuffer(buf, slot);
  if (ret < 0)
    return ret;
           id(), buffer->id(), slot);
  auto status = BufferHubQueue::AddBuffer(buffer, slot);
  if (!status)
    return status;

  // Check to see if the buffer is already signaled. This is necessary to catch
  // cases where buffers are already available; epoll edge triggered mode does
  // not fire until and edge transition when adding new buffers to the epoll
  // set.
  const int kTimeoutMs = 0;
  pollfd pfd{buf->event_fd(), POLLIN, 0};
  const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
  if (count < 0) {
    const int error = errno;
  // set. Note that we only poll the fd events because HandleBufferEvent() takes
  // care of checking the translated buffer events.
  auto poll_status = PollEvents(buffer->event_fd(), POLLIN);
  if (!poll_status && poll_status.error() != ETIMEDOUT) {
    ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s",
          strerror(errno));
    return -error;
          poll_status.GetErrorMessage().c_str());
    return poll_status.error_status();
  }

  if (count == 1)
    HandleBufferEvent(slot, pfd.revents);

  return 0;
  // Update accounting if the buffer is available.
  if (poll_status)
    return HandleBufferEvent(slot, buffer->event_fd(), poll_status.get());
  else
    return {};
}

Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
@@ -606,15 +654,30 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
  return {std::static_pointer_cast<BufferConsumer>(buffer_status.take())};
}

int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                                 LocalHandle* acquire_fence) {
  ALOGD_IF(TRACE, "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
           id(), buf->id());
  auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
  return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
Status<BufferHubQueue::Entry> ConsumerQueue::OnBufferReady(
    const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
  ALOGD_IF(TRACE,
           "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d slot=%zu",
           id(), buffer->id(), slot);

  // Avoid taking a transient reference, buffer is valid for the duration of
  // this method call.
  auto* consumer_buffer = static_cast<BufferConsumer*>(buffer.get());
  std::unique_ptr<uint8_t[]> metadata(meta_size_ ? new uint8_t[meta_size_]
                                                 : nullptr);
  LocalHandle acquire_fence;

  const int ret =
      consumer_buffer->Acquire(&acquire_fence, metadata.get(), meta_size_);
  if (ret < 0)
    return ErrorStatus(-ret);
  else
    return {{buffer, std::move(metadata), std::move(acquire_fence), slot}};
}

Status<void> ConsumerQueue::OnBufferAllocated() {
  ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id());

  auto status = ImportBuffers();
  if (!status) {
    ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
+13 −9
Original line number Diff line number Diff line
@@ -159,6 +159,8 @@ status_t BufferHubQueueProducer::dequeueBuffer(
  for (size_t retry = 0; retry < BufferHubQueue::kMaxQueueCapacity; retry++) {
    LocalHandle fence;
    auto buffer_status = queue_->Dequeue(dequeue_timeout_ms_, &slot, &fence);
    if (!buffer_status)
      return NO_MEMORY;

    buffer_producer = buffer_status.take();
    if (!buffer_producer)
@@ -608,10 +610,12 @@ status_t BufferHubQueueProducer::AllocateBuffer(uint32_t width, uint32_t height,
                                                PixelFormat format,
                                                uint64_t usage) {
  size_t slot;

  if (queue_->AllocateBuffer(width, height, layer_count, format, usage, &slot) <
      0) {
    ALOGE("Failed to allocate new buffer in BufferHub.");
  auto status =
      queue_->AllocateBuffer(width, height, layer_count, format, usage, &slot);
  if (!status) {
    ALOGE(
        "BufferHubQueueProducer::AllocateBuffer: Failed to allocate buffer: %s",
        status.GetErrorMessage().c_str());
    return NO_MEMORY;
  }

@@ -626,11 +630,11 @@ status_t BufferHubQueueProducer::AllocateBuffer(uint32_t width, uint32_t height,
}

status_t BufferHubQueueProducer::RemoveBuffer(size_t slot) {
  int ret = queue_->DetachBuffer(slot);
  if (ret < 0) {
    ALOGE("BufferHubQueueProducer::RemoveBuffer failed through RPC, ret=%s",
          strerror(-ret));
    return ret;
  auto status = queue_->DetachBuffer(slot);
  if (!status) {
    ALOGE("BufferHubQueueProducer::RemoveBuffer: Failed to detach buffer: %s",
          status.GetErrorMessage().c_str());
    return INVALID_OPERATION;
  }

  // Reset in memory objects related the the buffer.
+115 −169

File changed.

Preview size limit exceeded, changes collapsed.

+164 −32

File changed.

Preview size limit exceeded, changes collapsed.

+1 −1
Original line number Diff line number Diff line
@@ -192,7 +192,7 @@ TEST_F(BufferHubQueueProducerTest, Query_Succeeds) {
  EXPECT_EQ(NO_ERROR,
            mProducer->query(NATIVE_WINDOW_MIN_UNDEQUEUED_BUFFERS, &value));
  EXPECT_LE(0, value);
  EXPECT_GE(BufferQueueDefs::NUM_BUFFER_SLOTS, static_cast<size_t>(value));
  EXPECT_GE(BufferQueueDefs::NUM_BUFFER_SLOTS, value);

  EXPECT_EQ(NO_ERROR,
            mProducer->query(NATIVE_WINDOW_CONSUMER_RUNNING_BEHIND, &value));
Loading