Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 8a4e6a90 authored by Corey Tabaka's avatar Corey Tabaka
Browse files

Add support for consumer queue initial import and hangup.

- Add support for consumer queues to import buffers that are created
  before the consumer queue is created, making multi-consumer queue
  patterns possible. This is essential for VrFlinger operation.
- Add support for notifying consumer queues when the producer queue
  hangs up.
- Correct the epoll event loop to check for hangups even when buffers
  are available.
- Add method to retrieve the event fd from a queue.
- Add trace logging and minor cleanup.
- Improve bufferhubd dump state output.

Bug: 36401174
Test: build; bufferhub tests pass.
Change-Id: Idd6f38a3341c048192062734e288d11de48bc4d4
parent 6f413c5f
Loading
Loading
Loading
Loading
+59 −20
Original line number Diff line number Diff line
@@ -87,6 +87,14 @@ std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
    return nullptr;
}

std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
  if (auto status = CreateConsumerQueueHandle())
    return std::unique_ptr<ConsumerQueue>(
        new ConsumerQueue(status.take(), true));
  else
    return nullptr;
}

Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle() {
  auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>();
  if (!status) {
@@ -103,12 +111,18 @@ Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle() {
bool BufferHubQueue::WaitForBuffers(int timeout) {
  std::array<epoll_event, kMaxEvents> events;

  while (count() == 0) {
    int ret = epoll_fd_.Wait(events.data(), events.size(), timeout);
  // Loop at least once to check for hangups.
  do {
    ALOGD_IF(TRACE, "BufferHubQueue::WaitForBuffers: count=%zu capacity=%zu",
             count(), capacity());

    // If there is already a buffer then just check for hangup without waiting.
    const int ret = epoll_fd_.Wait(events.data(), events.size(),
                                   count() == 0 ? timeout : 0);

    if (ret == 0) {
      ALOGD_IF(TRACE, "Wait on epoll returns nothing before timeout.");
      return false;
      return count() != 0;
    }

    if (ret < 0 && ret != -EINTR) {
@@ -136,9 +150,9 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
              index);
      }
    }
  }
  } while (count() == 0 && capacity() > 0 && !hung_up());

  return true;
  return count() != 0;
}

void BufferHubQueue::HandleBufferEvent(size_t slot, const epoll_event& event) {
@@ -203,6 +217,9 @@ void BufferHubQueue::HandleQueueEvent(const epoll_event& event) {
      ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
            buffer_status.GetErrorMessage().c_str());
    }
  } else if (events & EPOLLHUP) {
    ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
    hung_up_ = true;
  } else {
    ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%d", events);
  }
@@ -260,7 +277,7 @@ int BufferHubQueue::DetachBuffer(size_t slot) {
  return 0;
}

void BufferHubQueue::Enqueue(std::shared_ptr<BufferHubBuffer> buf,
void BufferHubQueue::Enqueue(const std::shared_ptr<BufferHubBuffer>& buf,
                             size_t slot) {
  if (count() == capacity_) {
    ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
@@ -272,8 +289,7 @@ void BufferHubQueue::Enqueue(std::shared_ptr<BufferHubBuffer> buf,
  // the limitation of the RingBuffer we are using. Would be better to refactor
  // that.
  BufferInfo buffer_info(slot, meta_size_);
  // Swap buffer into vector.
  std::swap(buffer_info.buffer, buf);
  buffer_info.buffer = buf;
  // Swap metadata loaded during onBufferReady into vector.
  std::swap(buffer_info.metadata, meta_buffer_tmp_);

@@ -286,7 +302,7 @@ std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout,
                                                         LocalHandle* fence) {
  ALOGD_IF(TRACE, "Dequeue: count=%zu, timeout=%d", count(), timeout);

  if (count() == 0 && !WaitForBuffers(timeout))
  if (!WaitForBuffers(timeout))
    return nullptr;

  std::shared_ptr<BufferHubBuffer> buf;
@@ -366,9 +382,8 @@ int ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
      InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
          width, height, format, usage, slice_count, kBufferCount);
  if (!status) {
    ALOGE(
        "ProducerQueue::AllocateBuffer failed to create producer buffer "
        "through BufferHub.");
    ALOGE("ProducerQueue::AllocateBuffer failed to create producer buffer: %s",
          status.GetErrorMessage().c_str());
    return -status.error();
  }

@@ -428,14 +443,14 @@ std::shared_ptr<BufferProducer> ProducerQueue::Dequeue(
  return std::static_pointer_cast<BufferProducer>(buf);
}

int ProducerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                                 LocalHandle* release_fence) {
  auto buffer = std::static_pointer_cast<BufferProducer>(buf);
  return buffer->Gain(release_fence);
}

ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
    : BufferHubQueue(std::move(handle)) {
ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
    : BufferHubQueue(std::move(handle)), ignore_on_import_(ignore_on_import) {
  auto status = ImportQueue();
  if (!status) {
    ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
@@ -443,8 +458,14 @@ ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
    Close(-status.error());
  }

  // TODO(b/34387835) Import buffers in case the ProducerQueue we are
  // based on was not empty.
  auto import_status = ImportBuffers();
  if (import_status) {
    ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
          import_status.get());
  } else {
    ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
          import_status.GetErrorMessage().c_str());
  }
}

Status<size_t> ConsumerQueue::ImportBuffers() {
@@ -457,6 +478,7 @@ Status<size_t> ConsumerQueue::ImportBuffers() {
    return ErrorStatus(status.error());
  }

  int ret;
  int last_error = 0;
  int imported_buffers = 0;

@@ -468,7 +490,24 @@ Status<size_t> ConsumerQueue::ImportBuffers() {

    std::unique_ptr<BufferConsumer> buffer_consumer =
        BufferConsumer::Import(std::move(buffer_handle_slot.first));
    int ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);

    // Setup ignore state before adding buffer to the queue.
    if (ignore_on_import_) {
      ALOGD_IF(TRACE,
               "ConsumerQueue::ImportBuffers: Setting buffer to ignored state: "
               "buffer_id=%d",
               buffer_consumer->id());
      ret = buffer_consumer->SetIgnore(true);
      if (ret < 0) {
        ALOGE(
            "ConsumerQueue::ImportBuffers: Failed to set ignored state on "
            "imported buffer buffer_id=%d: %s",
            buffer_consumer->id(), strerror(-ret));
        last_error = ret;
      }
    }

    ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
    if (ret < 0) {
      ALOGE("ConsumerQueue::ImportBuffers failed to add buffer, ret: %s",
            strerror(-ret));
@@ -502,7 +541,7 @@ std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(
    return nullptr;
  }

  if (slot == nullptr || meta == nullptr || acquire_fence == nullptr) {
  if (slot == nullptr || acquire_fence == nullptr) {
    ALOGE(
        "ConsumerQueue::Dequeue: Invalid parameter, slot=%p, meta=%p, "
        "acquire_fence=%p",
@@ -514,7 +553,7 @@ std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(
  return std::static_pointer_cast<BufferConsumer>(buf);
}

int ConsumerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                                 LocalHandle* acquire_fence) {
  auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
  return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
+49 −10
Original line number Diff line number Diff line
@@ -33,6 +33,11 @@ class BufferHubQueue : public pdx::Client {
  // a new consumer queue client or nullptr on failure.
  std::unique_ptr<ConsumerQueue> CreateConsumerQueue();

  // Create a new consumer queue that is attached to the producer. This queue
  // sets each of its imported consumer buffers to the ignored state to avoid
  // participation in lifecycle events.
  std::unique_ptr<ConsumerQueue> CreateSilentConsumerQueue();

  // Return the default buffer width of this buffer queue.
  size_t default_width() const { return default_width_; }

@@ -71,9 +76,19 @@ class BufferHubQueue : public pdx::Client {
    }
  }

  // Returns an fd that signals pending queue events using
  // EPOLLIN/POLLIN/readible. Either HandleQueueEvents or WaitForBuffers may be
  // called to handle pending queue events.
  int queue_fd() const { return epoll_fd_.Get(); }

  // Handles any pending events, returning available buffers to the queue and
  // reaping disconnected buffers. Returns true if successful, false if an error
  // occurred.
  bool HandleQueueEvents() { return WaitForBuffers(0); }

  // Enqueue a buffer marks buffer to be available (|Gain|'ed for producer
  // and |Acquire|'ed for consumer. This is only used for internal bookkeeping.
  void Enqueue(std::shared_ptr<BufferHubBuffer> buf, size_t slot);
  void Enqueue(const std::shared_ptr<BufferHubBuffer>& buf, size_t slot);

  // |BufferHubQueue| will keep track of at most this value of buffers.
  static constexpr size_t kMaxQueueCapacity =
@@ -88,6 +103,7 @@ class BufferHubQueue : public pdx::Client {
  static constexpr int kNoTimeOut = -1;

  int id() const { return id_; }
  bool hung_up() const { return hung_up_; }

 protected:
  BufferHubQueue(LocalChannelHandle channel);
@@ -121,7 +137,7 @@ class BufferHubQueue : public pdx::Client {
  void HandleBufferEvent(size_t slot, const epoll_event& event);
  void HandleQueueEvent(const epoll_event& event);

  virtual int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
  virtual int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                            LocalHandle* fence) = 0;

  // Called when a buffer is allocated remotely.
@@ -248,6 +264,12 @@ class BufferHubQueue : public pdx::Client {
  // Epoll fd used to wait for BufferHub events.
  EpollFileDescriptor epoll_fd_;

  // Flag indicating that the other side hung up. For ProducerQueues this
  // triggers when BufferHub dies or explicitly closes the queue channel. For
  // ConsumerQueues this can either mean the same or that the ProducerQueue on
  // the other end hung up.
  bool hung_up_{false};

  // Global id for the queue that is consistent across processes.
  int id_;

@@ -261,6 +283,9 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> {
  static std::unique_ptr<ProducerQueue> Create() {
    return BASE::Create(sizeof(Meta));
  }
  static std::unique_ptr<ProducerQueue> Create(size_t meta_size_bytes) {
    return BASE::Create(meta_size_bytes);
  }

  // Usage bits in |usage_set_mask| will be automatically masked on. Usage bits
  // in |usage_clear_mask| will be automatically masked off. Note that
@@ -331,7 +356,7 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> {
                uint64_t usage_clear_mask, uint64_t usage_deny_set_mask,
                uint64_t usage_deny_clear_mask);

  int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
  int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                    LocalHandle* release_fence) override;
};

@@ -339,16 +364,22 @@ class ConsumerQueue : public BufferHubQueue {
 public:
  // Get a buffer consumer. Note that the method doesn't check whether the
  // buffer slot has a valid buffer that has been imported already. When no
  // buffer has been imported before it returns |nullptr|; otherwise it returns
  // a shared pointer to a |BufferConsumer|.
  // buffer has been imported before it returns nullptr; otherwise returns a
  // shared pointer to a BufferConsumer.
  std::shared_ptr<BufferConsumer> GetBuffer(size_t slot) const {
    return std::static_pointer_cast<BufferConsumer>(
        BufferHubQueue::GetBuffer(slot));
  }

  // Import a |ConsumerQueue| from a channel handle.
  static std::unique_ptr<ConsumerQueue> Import(LocalChannelHandle handle) {
    return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(std::move(handle)));
  // Import a ConsumerQueue from a channel handle. |ignore_on_import| controls
  // whether or not buffers are set to be ignored when imported. This may be
  // used to avoid participation in the buffer lifecycle by a consumer queue
  // that is only used to spawn other consumer queues, such as in an
  // intermediate service.
  static std::unique_ptr<ConsumerQueue> Import(LocalChannelHandle handle,
                                               bool ignore_on_import = false) {
    return std::unique_ptr<ConsumerQueue>(
        new ConsumerQueue(std::move(handle), ignore_on_import));
  }

  // Import newly created buffers from the service side.
@@ -366,6 +397,10 @@ class ConsumerQueue : public BufferHubQueue {
                                          LocalHandle* acquire_fence) {
    return Dequeue(timeout, slot, meta, sizeof(*meta), acquire_fence);
  }
  std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot,
                                          LocalHandle* acquire_fence) {
    return Dequeue(timeout, slot, nullptr, 0, acquire_fence);
  }

  std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, void* meta,
                                          size_t meta_size,
@@ -374,7 +409,7 @@ class ConsumerQueue : public BufferHubQueue {
 private:
  friend BufferHubQueue;

  ConsumerQueue(LocalChannelHandle handle);
  ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import = false);

  // Add a consumer buffer to populate the queue. Once added, a consumer buffer
  // is NOT available to use until the producer side |Post| it. |WaitForBuffers|
@@ -382,10 +417,14 @@ class ConsumerQueue : public BufferHubQueue {
  // consumer.
  int AddBuffer(const std::shared_ptr<BufferConsumer>& buf, size_t slot);

  int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
  int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                    LocalHandle* acquire_fence) override;

  Status<void> OnBufferAllocated() override;

  // Flag indicating that imported (consumer) buffers should be ignored when
  // imported to avoid participating in the buffer ownership flow.
  bool ignore_on_import_;
};

}  // namespace dvr
+2 −0
Original line number Diff line number Diff line
@@ -52,6 +52,8 @@ class EpollFileDescriptor {
      return ret;
  }

  int Get() const { return fd_.get(); }

 private:
  base::unique_fd fd_;
};
+1 −1
Original line number Diff line number Diff line
@@ -132,7 +132,7 @@ std::string BufferHubService::DumpState(size_t /*max_length*/) {
  stream << std::endl;
  stream << "Active Producer Queues:\n";
  stream << std::right << std::setw(6) << "Id";
  stream << std::right << std::setw(12) << " Allocated";
  stream << std::right << std::setw(12) << " Capacity";
  stream << std::right << std::setw(12) << " Consumers";
  stream << " UsageSetMask";
  stream << " UsageClearMask";
+17 −3
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ using android::pdx::ErrorStatus;
using android::pdx::RemoteChannelHandle;
using android::pdx::Status;
using android::pdx::rpc::DispatchRemoteMethod;
using android::pdx::rpc::RemoteMethodError;

namespace android {
namespace dvr {
@@ -33,8 +34,10 @@ ConsumerQueueChannel::~ConsumerQueueChannel() {
bool ConsumerQueueChannel::HandleMessage(Message& message) {
  ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
  auto producer = GetProducer();
  if (!producer)
    REPLY_ERROR_RETURN(message, EPIPE, true);
  if (!producer) {
    RemoteMethodError(message, EPIPE);
    return true;
  }

  switch (message.GetOp()) {
    case BufferHubRPC::CreateConsumerQueue::Opcode:
@@ -79,6 +82,9 @@ BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {

void ConsumerQueueChannel::RegisterNewBuffer(
    const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
  ALOGD_IF(TRACE,
           "ConsumerQueueChannel::RegisterNewBuffer: buffer_id=%d slot=%zu",
           producer_channel->buffer_id(), slot);
  pending_buffer_slots_.emplace(producer_channel, slot);

  // Signal the client that there is new buffer available throught POLLIN.
@@ -89,7 +95,8 @@ Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
  std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
  ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
  ALOGD(
  ALOGD_IF(
      TRACE,
      "ConsumerQueueChannel::OnConsumerQueueImportBuffers number of buffers to "
      "import: %zu",
      pending_buffer_slots_.size());
@@ -134,5 +141,12 @@ ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
  return {std::move(buffer_handles)};
}

void ConsumerQueueChannel::OnProducerClosed() {
  ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
           buffer_id());
  producer_.reset();
  Hangup();
}

}  // namespace dvr
}  // namespace android
Loading