Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 3b606a46 authored by TreeHugger Robot's avatar TreeHugger Robot Committed by Android (Google) Code Review
Browse files

Merge "Clean up BufferHubQueue API and internal bookkeeping."

parents e9594fcd b7ca5dee
Loading
Loading
Loading
Loading
+246 −183
Original line number Diff line number Diff line
@@ -23,34 +23,61 @@

using android::pdx::ErrorStatus;
using android::pdx::LocalChannelHandle;
using android::pdx::LocalHandle;
using android::pdx::Status;

namespace android {
namespace dvr {

namespace {

// Polls an fd for the given events.
Status<int> PollEvents(int fd, short events) {
  const int kTimeoutMs = 0;
  pollfd pfd{fd, events, 0};
  const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
  if (count < 0) {
    return ErrorStatus(errno);
  } else if (count == 0) {
    return ErrorStatus(ETIMEDOUT);
  } else {
    return {pfd.revents};
  }
}

// Polls a buffer for the given events, taking care to do the proper
// translation.
Status<int> PollEvents(const std::shared_ptr<BufferHubBuffer>& buffer,
                       short events) {
  auto poll_status = PollEvents(buffer->event_fd(), events);
  if (!poll_status)
    return poll_status;

  return buffer->GetEventMask(poll_status.get());
}

std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
  return {static_cast<int32_t>(value >> 32),
          static_cast<int32_t>(value & ((1ull << 32) - 1))};
}

uint64_t Stuff(int32_t a, int32_t b) {
  const uint32_t ua = static_cast<uint32_t>(a);
  const uint32_t ub = static_cast<uint32_t>(b);
  return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub);
}

}  // anonymous namespace

BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
    : Client{pdx::default_transport::ClientChannel::Create(
          std::move(channel_handle))},
      meta_size_(0),
      buffers_(BufferHubQueue::kMaxQueueCapacity),
      epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
      available_buffers_(BufferHubQueue::kMaxQueueCapacity),
      fences_(BufferHubQueue::kMaxQueueCapacity),
      capacity_(0),
      id_(-1) {
          std::move(channel_handle))} {
  Initialize();
}

BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
    : Client{pdx::default_transport::ClientChannelFactory::Create(
          endpoint_path)},
      meta_size_(0),
      buffers_(BufferHubQueue::kMaxQueueCapacity),
      epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
      available_buffers_(BufferHubQueue::kMaxQueueCapacity),
      fences_(BufferHubQueue::kMaxQueueCapacity),
      capacity_(0),
      id_(-1) {
    : Client{
          pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} {
  Initialize();
}

@@ -62,9 +89,9 @@ void BufferHubQueue::Initialize() {
    return;
  }

  epoll_event event = {.events = EPOLLIN | EPOLLET,
                       .data = {.u64 = static_cast<uint64_t>(
                                    BufferHubQueue::kEpollQueueEventIndex)}};
  epoll_event event = {
      .events = EPOLLIN | EPOLLET,
      .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
  ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
  if (ret < 0) {
    ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
@@ -87,7 +114,6 @@ Status<void> BufferHubQueue::ImportQueue() {
void BufferHubQueue::SetupQueue(size_t meta_size_bytes, int id) {
  meta_size_ = meta_size_bytes;
  id_ = id;
  meta_buffer_tmp_.reset(meta_size_ > 0 ? new uint8_t[meta_size_] : nullptr);
}

std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
@@ -152,19 +178,24 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
    // one for each buffer, in the queue and one extra event for the queue
    // client itself.
    for (int i = 0; i < num_events; i++) {
      int64_t index = static_cast<int64_t>(events[i].data.u64);
      int32_t event_fd;
      int32_t index;
      std::tie(event_fd, index) = Unstuff(events[i].data.u64);

      ALOGD_IF(TRACE,
               "BufferHubQueue::WaitForBuffers: event %d: index=%" PRId64, i,
               index);
               "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
               i, event_fd, index);

      if (is_buffer_event_index(index)) {
        HandleBufferEvent(static_cast<size_t>(index), events[i].events);
        HandleBufferEvent(static_cast<size_t>(index), event_fd,
                          events[i].events);
      } else if (is_queue_event_index(index)) {
        HandleQueueEvent(events[i].events);
      } else {
        ALOGW("BufferHubQueue::WaitForBuffers: Unknown event index: %" PRId64,
              index);
        ALOGW(
            "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d "
            "index=%d",
            event_fd, index);
      }
    }
  } while (count() == 0 && capacity() > 0 && !hung_up());
@@ -172,52 +203,72 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
  return count() != 0;
}

void BufferHubQueue::HandleBufferEvent(size_t slot, int poll_events) {
  auto buffer = buffers_[slot];
  if (!buffer) {
Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
                                               int poll_events) {
  if (!buffers_[slot]) {
    ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
    return;
    return ErrorStatus(ENOENT);
  }

  auto status = buffer->GetEventMask(poll_events);
  auto status = buffers_[slot]->GetEventMask(poll_events);
  if (!status) {
    ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
          status.GetErrorMessage().c_str());
    return;
    return status.error_status();
  }

  const int events = status.get();
  if (events & EPOLLIN) {
    const int ret = OnBufferReady(buffer, &fences_[slot]);
    if (ret == 0 || ret == -EALREADY || ret == -EBUSY) {
    auto entry_status = OnBufferReady(buffers_[slot], slot);
    if (entry_status.ok() || entry_status.error() == EALREADY) {
      // Only enqueue the buffer if it moves to or is already in the state
      // requested in OnBufferReady(). If the buffer is busy this means that the
      // buffer moved from released to posted when a new consumer was created
      // before the ProducerQueue had a chance to regain it. This is a valid
      // transition that we have to handle because edge triggered poll events
      // latch the ready state even if it is later de-asserted -- don't enqueue
      // or print an error log in this case.
      if (ret != -EBUSY)
        Enqueue(buffer, slot);
      // requested in OnBufferReady().
      return Enqueue(entry_status.take());
    } else if (entry_status.error() == EBUSY) {
      // If the buffer is busy this means that the buffer moved from released to
      // posted when a new consumer was created before the ProducerQueue had a
      // chance to regain it. This is a valid transition that we have to handle
      // because edge triggered poll events latch the ready state even if it is
      // later de-asserted -- don't enqueue or print an error log in this case.
    } else {
      ALOGE(
          "BufferHubQueue::HandleBufferEvent: Failed to set buffer ready, "
          "queue_id=%d buffer_id=%d: %s",
          id(), buffer->id(), strerror(-ret));
          id(), buffers_[slot]->id(), entry_status.GetErrorMessage().c_str());
    }
  } else if (events & EPOLLHUP) {
    // This might be caused by producer replacing an existing buffer slot, or
    // when BufferHubQueue is shutting down. For the first case, currently the
    // epoll FD is cleaned up when the replacement consumer client is imported,
    // we shouldn't detach again if |epollhub_pending_[slot]| is set.
    // Check to see if the current buffer in the slot hung up. This is a bit of
    // paranoia to deal with the epoll set getting out of sync with the buffer
    // slots.
    auto poll_status = PollEvents(buffers_[slot], POLLIN);
    if (!poll_status && poll_status.error() != ETIMEDOUT) {
      ALOGE("BufferHubQueue::HandleBufferEvent: Failed to poll buffer: %s",
            poll_status.GetErrorMessage().c_str());
      return poll_status.error_status();
    }

    const bool hangup_pending = status.ok() && (poll_status.get() & EPOLLHUP);

    ALOGW(
        "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP at slot: %zu, "
        "buffer event fd: %d, EPOLLHUP pending: %d",
        slot, buffer->event_fd(), int{epollhup_pending_[slot]});
    if (epollhup_pending_[slot]) {
      epollhup_pending_[slot] = false;
        "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
        "event_fd=%d buffer_id=%d hangup_pending=%d poll_status=%x",
        slot, buffers_[slot]->event_fd(), buffers_[slot]->id(), hangup_pending,
        poll_status.get());

    if (hangup_pending) {
      return DetachBuffer(slot);
    } else {
      DetachBuffer(slot);
      // Clean up the bookkeeping for the event fd. This is a bit of paranoia to
      // deal with the epoll set getting out of sync with the buffer slots.
      // Hitting this path should be very unusual.
      const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, event_fd, nullptr);
      if (ret < 0) {
        ALOGE(
            "BufferHubQueue::HandleBufferEvent: Failed to remove fd=%d from "
            "epoll set: %s",
            event_fd, strerror(-ret));
        return ErrorStatus(-ret);
      }
    }
  } else {
    ALOGW(
@@ -225,14 +276,16 @@ void BufferHubQueue::HandleBufferEvent(size_t slot, int poll_events) {
        "events=%d",
        slot, events);
  }

  return {};
}

void BufferHubQueue::HandleQueueEvent(int poll_event) {
Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
  auto status = GetEventMask(poll_event);
  if (!status) {
    ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
          status.GetErrorMessage().c_str());
    return;
    return status.error_status();
  }

  const int events = status.get();
@@ -250,111 +303,97 @@ void BufferHubQueue::HandleQueueEvent(int poll_event) {
  } else {
    ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
  }

  return {};
}

int BufferHubQueue::AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf,
                              size_t slot) {
Status<void> BufferHubQueue::AddBuffer(
    const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
  ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu",
           buffer->id(), slot);

  if (is_full()) {
    // TODO(jwcai) Move the check into Producer's AllocateBuffer and consumer's
    // import buffer.
    ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
          capacity_);
    return -E2BIG;
    return ErrorStatus(E2BIG);
  }

  if (buffers_[slot] != nullptr) {
    // Replace the buffer if the slot is preoccupied. This could happen when the
  if (buffers_[slot]) {
    // Replace the buffer if the slot is occupied. This could happen when the
    // producer side replaced the slot with a newly allocated buffer. Detach the
    // buffer before setting up with the new one.
    DetachBuffer(slot);
    epollhup_pending_[slot] = true;
    auto detach_status = DetachBuffer(slot);
    if (!detach_status)
      return detach_status.error_status();
  }

  epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u64 = slot}};
  const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buf->event_fd(), &event);
  epoll_event event = {.events = EPOLLIN | EPOLLET,
                       .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
  const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buffer->event_fd(), &event);
  if (ret < 0) {
    ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
          strerror(-ret));
    return ret;
    return ErrorStatus(-ret);
  }

  buffers_[slot] = buf;
  buffers_[slot] = buffer;
  capacity_++;
  return 0;
  return {};
}

int BufferHubQueue::DetachBuffer(size_t slot) {
  auto& buf = buffers_[slot];
  if (buf == nullptr) {
    ALOGE("BufferHubQueue::DetachBuffer: Invalid slot: %zu", slot);
    return -EINVAL;
  }
Status<void> BufferHubQueue::DetachBuffer(size_t slot) {
  ALOGD_IF(TRACE, "BufferHubQueue::DetachBuffer: slot=%zu", slot);

  const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, buf->event_fd(), nullptr);
  if (buffers_[slot]) {
    const int ret =
        epoll_fd_.Control(EPOLL_CTL_DEL, buffers_[slot]->event_fd(), nullptr);
    if (ret < 0) {
      ALOGE(
        "BufferHubQueue::DetachBuffer: Failed to detach buffer from epoll set: "
          "BufferHubQueue::DetachBuffer: Failed to detach buffer from epoll "
          "set: "
          "%s",
          strerror(-ret));
    return ret;
      return ErrorStatus(-ret);
    }

    buffers_[slot] = nullptr;
    capacity_--;
  return 0;
  }

void BufferHubQueue::Enqueue(const std::shared_ptr<BufferHubBuffer>& buf,
                             size_t slot) {
  if (count() == capacity_) {
    ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
    return;
  return {};
}

  // Set slot buffer back to vector.
  // TODO(jwcai) Here have to dynamically allocate BufferInfo::metadata due to
  // the limitation of the RingBuffer we are using. Would be better to refactor
  // that.
  BufferInfo buffer_info(slot, meta_size_);
  buffer_info.buffer = buf;
  // Swap metadata loaded during onBufferReady into vector.
  std::swap(buffer_info.metadata, meta_buffer_tmp_);

  available_buffers_.Append(std::move(buffer_info));
Status<void> BufferHubQueue::Enqueue(Entry entry) {
  if (!is_full()) {
    available_buffers_.Append(std::move(entry));
    return {};
  } else {
    ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
    return ErrorStatus(E2BIG);
  }
}

Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(
    int timeout, size_t* slot, void* meta, LocalHandle* fence) {
  ALOGD_IF(TRACE, "Dequeue: count=%zu, timeout=%d", count(), timeout);
  ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(),
           timeout);

  if (!WaitForBuffers(timeout))
    return ErrorStatus(ETIMEDOUT);

  std::shared_ptr<BufferHubBuffer> buf;
  BufferInfo& buffer_info = available_buffers_.Front();

  *fence = std::move(fences_[buffer_info.slot]);

  // Report current pos as the output slot.
  std::swap(buffer_info.slot, *slot);
  // Swap buffer from vector to be returned later.
  std::swap(buffer_info.buffer, buf);
  // Swap metadata from vector into tmp so that we can write out to |meta|.
  std::swap(buffer_info.metadata, meta_buffer_tmp_);

  available_buffers_.PopFront();

  if (!buf) {
    ALOGE("BufferHubQueue::Dequeue: Buffer to be dequeued is nullptr");
    return ErrorStatus(ENOBUFS);
  }
  auto& entry = available_buffers_.Front();

  if (meta) {
    std::copy(meta_buffer_tmp_.get(), meta_buffer_tmp_.get() + meta_size_,
  std::shared_ptr<BufferHubBuffer> buffer = std::move(entry.buffer);
  *slot = entry.slot;
  *fence = std::move(entry.fence);
  if (meta && entry.metadata) {
    std::copy(entry.metadata.get(), entry.metadata.get() + meta_size_,
              reinterpret_cast<uint8_t*>(meta));
  }

  return {std::move(buf)};
  available_buffers_.PopFront();

  return {std::move(buffer)};
}

ProducerQueue::ProducerQueue(size_t meta_size)
@@ -388,28 +427,29 @@ ProducerQueue::ProducerQueue(size_t meta_size, uint64_t usage_set_mask,
  SetupQueue(status.get().meta_size_bytes, status.get().id);
}

int ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
                                  uint32_t layer_count, uint32_t format,
                                  uint64_t usage, size_t* out_slot) {
Status<void> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
                                           uint32_t layer_count,
                                           uint32_t format, uint64_t usage,
                                           size_t* out_slot) {
  if (out_slot == nullptr) {
    ALOGE("ProducerQueue::AllocateBuffer: Parameter out_slot cannot be null.");
    return -EINVAL;
    return ErrorStatus(EINVAL);
  }

  if (is_full()) {
    ALOGE("ProducerQueue::AllocateBuffer queue is at maximum capacity: %zu",
          capacity());
    return -E2BIG;
    return ErrorStatus(E2BIG);
  }

  const size_t kBufferCount = 1U;
  const size_t kBufferCount = 1u;
  Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
      InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
          width, height, layer_count, format, usage, kBufferCount);
  if (!status) {
    ALOGE("ProducerQueue::AllocateBuffer failed to create producer buffer: %s",
          status.GetErrorMessage().c_str());
    return -status.error();
    return status.error_status();
  }

  auto buffer_handle_slots = status.take();
@@ -429,27 +469,26 @@ int ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
                   buffer_slot);
}

int ProducerQueue::AddBuffer(const std::shared_ptr<BufferProducer>& buf,
                             size_t slot) {
Status<void> ProducerQueue::AddBuffer(
    const std::shared_ptr<BufferProducer>& buffer, size_t slot) {
  ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
           id(), buf->id(), slot);
           id(), buffer->id(), slot);
  // For producer buffer, we need to enqueue the newly added buffer
  // immediately. Producer queue starts with all buffers in available state.
  const int ret = BufferHubQueue::AddBuffer(buf, slot);
  if (ret < 0)
    return ret;
  auto status = BufferHubQueue::AddBuffer(buffer, slot);
  if (!status)
    return status;

  Enqueue(buf, slot);
  return 0;
  return Enqueue(buffer, slot);
}

int ProducerQueue::DetachBuffer(size_t slot) {
Status<void> ProducerQueue::DetachBuffer(size_t slot) {
  auto status =
      InvokeRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(slot);
  if (!status) {
    ALOGE("ProducerQueue::DetachBuffer: Failed to detach producer buffer: %s",
          status.GetErrorMessage().c_str());
    return -status.error();
    return status.error_status();
  }

  return BufferHubQueue::DetachBuffer(slot);
@@ -471,12 +510,22 @@ Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
  return {std::static_pointer_cast<BufferProducer>(buffer_status.take())};
}

int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                                 LocalHandle* release_fence) {
  ALOGD_IF(TRACE, "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
           id(), buf->id());
  auto buffer = std::static_pointer_cast<BufferProducer>(buf);
  return buffer->Gain(release_fence);
Status<BufferHubQueue::Entry> ProducerQueue::OnBufferReady(
    const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
  ALOGD_IF(TRACE,
           "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d slot=%zu",
           id(), buffer->id(), slot);

  // Avoid taking a transient reference, buffer is valid for the duration of
  // this method call.
  auto* producer_buffer = static_cast<BufferProducer*>(buffer.get());
  LocalHandle release_fence;

  const int ret = producer_buffer->Gain(&release_fence);
  if (ret < 0)
    return ErrorStatus(-ret);
  else
    return {{buffer, nullptr, std::move(release_fence), slot}};
}

ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
@@ -503,12 +552,12 @@ Status<size_t> ConsumerQueue::ImportBuffers() {
  if (!status) {
    ALOGE("ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
          status.GetErrorMessage().c_str());
    return ErrorStatus(status.error());
    return status.error_status();
  }

  int ret;
  int last_error = 0;
  int imported_buffers = 0;
  Status<void> last_error;
  size_t imported_buffers_count = 0;

  auto buffer_handle_slots = status.take();
  for (auto& buffer_handle_slot : buffer_handle_slots) {
@@ -530,53 +579,52 @@ Status<size_t> ConsumerQueue::ImportBuffers() {
            "ConsumerQueue::ImportBuffers: Failed to set ignored state on "
            "imported buffer buffer_id=%d: %s",
            buffer_consumer->id(), strerror(-ret));
        last_error = ret;
        last_error = ErrorStatus(-ret);
      }
    }

    ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
    if (ret < 0) {
    auto add_status =
        AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
    if (!add_status) {
      ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
            strerror(-ret));
      last_error = ret;
      continue;
            add_status.GetErrorMessage().c_str());
      last_error = add_status;
    } else {
      imported_buffers++;
      imported_buffers_count++;
    }
  }

  if (imported_buffers > 0)
    return {imported_buffers};
  if (imported_buffers_count > 0)
    return {imported_buffers_count};
  else
    return ErrorStatus(-last_error);
    return last_error.error_status();
}

int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
                             size_t slot) {
Status<void> ConsumerQueue::AddBuffer(
    const std::shared_ptr<BufferConsumer>& buffer, size_t slot) {
  ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
           id(), buf->id(), slot);
  const int ret = BufferHubQueue::AddBuffer(buf, slot);
  if (ret < 0)
    return ret;
           id(), buffer->id(), slot);
  auto status = BufferHubQueue::AddBuffer(buffer, slot);
  if (!status)
    return status;

  // Check to see if the buffer is already signaled. This is necessary to catch
  // cases where buffers are already available; epoll edge triggered mode does
  // not fire until and edge transition when adding new buffers to the epoll
  // set.
  const int kTimeoutMs = 0;
  pollfd pfd{buf->event_fd(), POLLIN, 0};
  const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
  if (count < 0) {
    const int error = errno;
  // set. Note that we only poll the fd events because HandleBufferEvent() takes
  // care of checking the translated buffer events.
  auto poll_status = PollEvents(buffer->event_fd(), POLLIN);
  if (!poll_status && poll_status.error() != ETIMEDOUT) {
    ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s",
          strerror(errno));
    return -error;
          poll_status.GetErrorMessage().c_str());
    return poll_status.error_status();
  }

  if (count == 1)
    HandleBufferEvent(slot, pfd.revents);

  return 0;
  // Update accounting if the buffer is available.
  if (poll_status)
    return HandleBufferEvent(slot, buffer->event_fd(), poll_status.get());
  else
    return {};
}

Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
@@ -606,15 +654,30 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
  return {std::static_pointer_cast<BufferConsumer>(buffer_status.take())};
}

int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                                 LocalHandle* acquire_fence) {
  ALOGD_IF(TRACE, "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
           id(), buf->id());
  auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
  return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
Status<BufferHubQueue::Entry> ConsumerQueue::OnBufferReady(
    const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
  ALOGD_IF(TRACE,
           "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d slot=%zu",
           id(), buffer->id(), slot);

  // Avoid taking a transient reference, buffer is valid for the duration of
  // this method call.
  auto* consumer_buffer = static_cast<BufferConsumer*>(buffer.get());
  std::unique_ptr<uint8_t[]> metadata(meta_size_ ? new uint8_t[meta_size_]
                                                 : nullptr);
  LocalHandle acquire_fence;

  const int ret =
      consumer_buffer->Acquire(&acquire_fence, metadata.get(), meta_size_);
  if (ret < 0)
    return ErrorStatus(-ret);
  else
    return {{buffer, std::move(metadata), std::move(acquire_fence), slot}};
}

Status<void> ConsumerQueue::OnBufferAllocated() {
  ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id());

  auto status = ImportBuffers();
  if (!status) {
    ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
+13 −9
Original line number Diff line number Diff line
@@ -159,6 +159,8 @@ status_t BufferHubQueueProducer::dequeueBuffer(
  for (size_t retry = 0; retry < BufferHubQueue::kMaxQueueCapacity; retry++) {
    LocalHandle fence;
    auto buffer_status = queue_->Dequeue(dequeue_timeout_ms_, &slot, &fence);
    if (!buffer_status)
      return NO_MEMORY;

    buffer_producer = buffer_status.take();
    if (!buffer_producer)
@@ -608,10 +610,12 @@ status_t BufferHubQueueProducer::AllocateBuffer(uint32_t width, uint32_t height,
                                                PixelFormat format,
                                                uint64_t usage) {
  size_t slot;

  if (queue_->AllocateBuffer(width, height, layer_count, format, usage, &slot) <
      0) {
    ALOGE("Failed to allocate new buffer in BufferHub.");
  auto status =
      queue_->AllocateBuffer(width, height, layer_count, format, usage, &slot);
  if (!status) {
    ALOGE(
        "BufferHubQueueProducer::AllocateBuffer: Failed to allocate buffer: %s",
        status.GetErrorMessage().c_str());
    return NO_MEMORY;
  }

@@ -626,11 +630,11 @@ status_t BufferHubQueueProducer::AllocateBuffer(uint32_t width, uint32_t height,
}

status_t BufferHubQueueProducer::RemoveBuffer(size_t slot) {
  int ret = queue_->DetachBuffer(slot);
  if (ret < 0) {
    ALOGE("BufferHubQueueProducer::RemoveBuffer failed through RPC, ret=%s",
          strerror(-ret));
    return ret;
  auto status = queue_->DetachBuffer(slot);
  if (!status) {
    ALOGE("BufferHubQueueProducer::RemoveBuffer: Failed to detach buffer: %s",
          status.GetErrorMessage().c_str());
    return INVALID_OPERATION;
  }

  // Reset in memory objects related the the buffer.
+115 −169

File changed.

Preview size limit exceeded, changes collapsed.

+164 −32

File changed.

Preview size limit exceeded, changes collapsed.

+1 −1
Original line number Diff line number Diff line
@@ -192,7 +192,7 @@ TEST_F(BufferHubQueueProducerTest, Query_Succeeds) {
  EXPECT_EQ(NO_ERROR,
            mProducer->query(NATIVE_WINDOW_MIN_UNDEQUEUED_BUFFERS, &value));
  EXPECT_LE(0, value);
  EXPECT_GE(BufferQueueDefs::NUM_BUFFER_SLOTS, static_cast<size_t>(value));
  EXPECT_GE(BufferQueueDefs::NUM_BUFFER_SLOTS, value);

  EXPECT_EQ(NO_ERROR,
            mProducer->query(NATIVE_WINDOW_CONSUMER_RUNNING_BEHIND, &value));
Loading