Loading media/bufferpool/2.0/Accessor.cpp +22 −10 Original line number Diff line number Diff line Loading @@ -117,19 +117,18 @@ sp<ConnectionDeathRecipient> Accessor::getConnectionDeathRecipient() { Return<void> Accessor::connect( const sp<::android::hardware::media::bufferpool::V2_0::IObserver>& observer, connect_cb _hidl_cb) { (void)observer; sp<Connection> connection; ConnectionId connectionId; uint32_t msgId; const StatusDescriptor* fmqDesc; const InvalidationDescriptor* invDesc; ResultStatus status = connect(&connection, &connectionId, &fmqDesc, false); ResultStatus status = connect( observer, false, &connection, &connectionId, &msgId, &fmqDesc, &invDesc); if (status == ResultStatus::OK) { _hidl_cb(status, connection, connectionId, *fmqDesc, android::hardware::MQDescriptorUnsync<BufferInvalidationMessage>( std::vector<android::hardware::GrantorDescriptor>(), nullptr /* nhandle */, 0 /* size */)); _hidl_cb(status, connection, connectionId, msgId, *fmqDesc, *invDesc); } else { _hidl_cb(status, nullptr, -1LL, _hidl_cb(status, nullptr, -1LL, 0, android::hardware::MQDescriptorSync<BufferStatusMessage>( std::vector<android::hardware::GrantorDescriptor>(), nullptr /* nhandle */, 0 /* size */), Loading @@ -147,7 +146,15 @@ Accessor::~Accessor() { } bool Accessor::isValid() { return (bool)mImpl; return (bool)mImpl && mImpl->isValid(); } ResultStatus Accessor::flush() { if (mImpl) { mImpl->flush(); return ResultStatus::OK; } return ResultStatus::CRITICAL_ERROR; } ResultStatus Accessor::allocate( Loading @@ -170,10 +177,15 @@ ResultStatus Accessor::fetch( } ResultStatus Accessor::connect( const sp<IObserver> &observer, bool local, sp<Connection> *connection, ConnectionId *pConnectionId, const StatusDescriptor** fmqDescPtr, bool local) { uint32_t *pMsgId, const StatusDescriptor** statusDescPtr, const InvalidationDescriptor** invDescPtr) { if (mImpl) { ResultStatus status = mImpl->connect(this, connection, pConnectionId, fmqDescPtr); ResultStatus status = mImpl->connect( this, observer, connection, pConnectionId, pMsgId, statusDescPtr, invDescPtr); if (!local && status == ResultStatus::OK) { sp<Accessor> accessor(this); sConnectionDeathRecipient->add(*pConnectionId, accessor); Loading media/bufferpool/2.0/Accessor.h +16 −5 Original line number Diff line number Diff line Loading @@ -95,6 +95,9 @@ struct Accessor : public IAccessor { /** Returns whether the accessor is valid. */ bool isValid(); /** Invalidates all buffers which are owned by bufferpool */ ResultStatus flush(); /** Allocates a buffer from a buffer pool. * * @param connectionId the connection id of the client. Loading Loading @@ -135,20 +138,28 @@ struct Accessor : public IAccessor { * created connection in order to communicate with the buffer pool. An * FMQ for buffer status message is also created for the client. * * @param observer client observer for buffer invalidation * @param local true when a connection request comes from local process, * false otherwise. * @param connection created connection * @param pConnectionId the id of the created connection * @param fmqDescPtr FMQ descriptor for shared buffer status message * @param pMsgId the id of the recent buffer pool message * @param statusDescPtr FMQ descriptor for shared buffer status message * queue between a buffer pool and the client. * @param local true when a connection request comes from local process, * false otherwise. * @param invDescPtr FMQ descriptor for buffer invalidation message * queue from a buffer pool to the client. * * @return OK when a connection is successfully made. * NO_MEMORY when there is no memory. * CRITICAL_ERROR otherwise. */ ResultStatus connect( const sp<IObserver>& observer, bool local, sp<Connection> *connection, ConnectionId *pConnectionId, const StatusDescriptor** fmqDescPtr, bool local); uint32_t *pMsgId, const StatusDescriptor** statusDescPtr, const InvalidationDescriptor** invDescPtr); /** * Closes the specified connection to the client. Loading Loading @@ -176,7 +187,7 @@ struct Accessor : public IAccessor { private: class Impl; std::unique_ptr<Impl> mImpl; std::shared_ptr<Impl> mImpl; }; } // namespace implementation Loading media/bufferpool/2.0/AccessorImpl.cpp +284 −14 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ #include <time.h> #include <unistd.h> #include <utils/Log.h> #include <thread> #include "AccessorImpl.h" #include "Connection.h" Loading @@ -47,6 +48,7 @@ struct InternalBuffer { const std::shared_ptr<BufferPoolAllocation> mAllocation; const size_t mAllocSize; const std::vector<uint8_t> mConfig; bool mInvalidated; InternalBuffer( BufferId id, Loading @@ -54,11 +56,16 @@ struct InternalBuffer { const size_t allocSize, const std::vector<uint8_t> &allocConfig) : mId(id), mOwnerCount(0), mTransactionCount(0), mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig) {} mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig), mInvalidated(false) {} const native_handle_t *handle() { return mAllocation->handle(); } void invalidate() { mInvalidated = true; } }; struct TransactionStatus { Loading Loading @@ -138,21 +145,29 @@ Accessor::Impl::~Impl() { } ResultStatus Accessor::Impl::connect( const sp<Accessor> &accessor, sp<Connection> *connection, ConnectionId *pConnectionId, const StatusDescriptor** fmqDescPtr) { const sp<Accessor> &accessor, const sp<IObserver> &observer, sp<Connection> *connection, ConnectionId *pConnectionId, uint32_t *pMsgId, const StatusDescriptor** statusDescPtr, const InvalidationDescriptor** invDescPtr) { sp<Connection> newConnection = new Connection(); ResultStatus status = ResultStatus::CRITICAL_ERROR; { std::lock_guard<std::mutex> lock(mBufferPool.mMutex); if (newConnection) { ConnectionId id = (int64_t)sPid << 32 | sSeqId; status = mBufferPool.mObserver.open(id, fmqDescPtr); status = mBufferPool.mObserver.open(id, statusDescPtr); if (status == ResultStatus::OK) { newConnection->initialize(accessor, id); *connection = newConnection; *pConnectionId = id; *pMsgId = mBufferPool.mInvalidation.mInvalidationId; mBufferPool.mInvalidationChannel.getDesc(invDescPtr); mBufferPool.mInvalidation.onConnect(id, observer); ++sSeqId; } } mBufferPool.processStatusMessages(); mBufferPool.cleanUp(); Loading @@ -165,6 +180,7 @@ ResultStatus Accessor::Impl::close(ConnectionId connectionId) { mBufferPool.processStatusMessages(); mBufferPool.handleClose(connectionId); mBufferPool.mObserver.close(connectionId); mBufferPool.mInvalidation.onClose(connectionId); // Since close# will be called after all works are finished, it is OK to // evict unused buffers. mBufferPool.cleanUp(true); Loading Loading @@ -229,11 +245,30 @@ void Accessor::Impl::cleanUp(bool clearCache) { mBufferPool.cleanUp(clearCache); } void Accessor::Impl::flush() { std::lock_guard<std::mutex> lock(mBufferPool.mMutex); mBufferPool.processStatusMessages(); mBufferPool.flush(shared_from_this()); } void Accessor::Impl::handleInvalidateAck() { std::lock_guard<std::mutex> lock(mBufferPool.mMutex); mBufferPool.processStatusMessages(); mBufferPool.mInvalidation.onHandleAck(); } bool Accessor::Impl::isValid() { return mBufferPool.isValid(); } Accessor::Impl::Impl::BufferPool::BufferPool() : mTimestampUs(getTimestampNow()), mLastCleanUpUs(mTimestampUs), mLastLogUs(mTimestampUs), mSeq(0) {} mSeq(0), mStartSeq(0) { mValid = mInvalidationChannel.isValid(); } // Statistics helper Loading @@ -242,6 +277,8 @@ int percentage(T base, S total) { return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0); } std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sSeqId(0); Accessor::Impl::Impl::BufferPool::~BufferPool() { std::lock_guard<std::mutex> lock(mMutex); ALOGD("Destruction - bufferpool %p " Loading @@ -255,6 +292,96 @@ Accessor::Impl::Impl::BufferPool::~BufferPool() { percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers)); } void Accessor::Impl::BufferPool::Invalidation::onConnect( ConnectionId conId, const sp<IObserver>& observer) { mAcks[conId] = mInvalidationId; // starts from current invalidationId mObservers.insert(std::make_pair(conId, observer)); } void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) { mAcks.erase(conId); mObservers.erase(conId); } void Accessor::Impl::BufferPool::Invalidation::onAck( ConnectionId conId, uint32_t msgId) { auto it = mAcks.find(conId); if (it == mAcks.end() || isMessageLater(msgId, it->second)) { mAcks[conId] = msgId; } } void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated( BufferId bufferId, BufferInvalidationChannel &channel) { for (auto it = mPendings.begin(); it != mPendings.end();) { if (it->invalidate(bufferId)) { it = mPendings.erase(it); uint32_t msgId = 0; if (it->mNeedsAck) { msgId = ++mInvalidationId; if (msgId == 0) { // wrap happens msgId = ++mInvalidationId; } } channel.postInvalidation(msgId, it->mFrom, it->mTo); sInvalidator.addAccessor(mId, it->mImpl); continue; } ++it; } } void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest( bool needsAck, uint32_t from, uint32_t to, size_t left, BufferInvalidationChannel &channel, const std::shared_ptr<Accessor::Impl> &impl) { if (left == 0) { uint32_t msgId = 0; if (needsAck) { msgId = ++mInvalidationId; if (msgId == 0) { // wrap happens msgId = ++mInvalidationId; } } channel.postInvalidation(msgId, from, to); sInvalidator.addAccessor(mId, impl); } else { // TODO: sending hint message? Pending pending(needsAck, from, to, left, impl); mPendings.push_back(pending); } } void Accessor::Impl::BufferPool::Invalidation::onHandleAck() { if (mInvalidationId != 0) { std::set<int> deads; for (auto it = mAcks.begin(); it != mAcks.end(); ++it) { if (it->second != mInvalidationId) { const sp<IObserver> observer = mObservers[it->first].promote(); if (observer) { observer->onMessage(it->first, mInvalidationId); } else { deads.insert(it->first); } } } if (deads.size() > 0) { for (auto it = deads.begin(); it != deads.end(); ++it) { onClose(*it); } } } // All invalidation Ids are synced. sInvalidator.delAccessor(mId); } bool Accessor::Impl::BufferPool::handleOwnBuffer( ConnectionId connectionId, BufferId bufferId) { Loading @@ -275,8 +402,15 @@ bool Accessor::Impl::BufferPool::handleReleaseBuffer( iter->second->mOwnerCount--; if (iter->second->mOwnerCount == 0 && iter->second->mTransactionCount == 0) { if (!iter->second->mInvalidated) { mStats.onBufferUnused(iter->second->mAllocSize); mFreeBuffers.insert(bufferId); } else { mStats.onBufferUnused(iter->second->mAllocSize); mStats.onBufferEvicted(iter->second->mAllocSize); mBuffers.erase(iter); mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel); } } } erase(&mUsingConnections, bufferId, connectionId); Loading Loading @@ -352,8 +486,15 @@ bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage bufferIter->second->mTransactionCount--; if (bufferIter->second->mOwnerCount == 0 && bufferIter->second->mTransactionCount == 0) { if (!bufferIter->second->mInvalidated) { mStats.onBufferUnused(bufferIter->second->mAllocSize); mFreeBuffers.insert(message.bufferId); } else { mStats.onBufferUnused(bufferIter->second->mAllocSize); mStats.onBufferEvicted(bufferIter->second->mAllocSize); mBuffers.erase(bufferIter); mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel); } } mTransactions.erase(found); } Loading Loading @@ -400,7 +541,7 @@ void Accessor::Impl::BufferPool::processStatusMessages() { ret = handleTransferResult(message); break; case BufferStatus::INVALIDATION_ACK: // TODO mInvalidation.onAck(message.connectionId, message.bufferId); break; } if (ret == false) { Loading @@ -423,8 +564,15 @@ bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) { if (bufferIter->second->mOwnerCount == 0 && bufferIter->second->mTransactionCount == 0) { // TODO: handle freebuffer insert fail if (!bufferIter->second->mInvalidated) { mStats.onBufferUnused(bufferIter->second->mAllocSize); mFreeBuffers.insert(bufferId); } else { mStats.onBufferUnused(bufferIter->second->mAllocSize); mStats.onBufferEvicted(bufferIter->second->mAllocSize); mBuffers.erase(bufferIter); mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel); } } } } Loading @@ -446,8 +594,15 @@ bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) { if (bufferIter->second->mOwnerCount == 0 && bufferIter->second->mTransactionCount == 0) { // TODO: handle freebuffer insert fail if (!bufferIter->second->mInvalidated) { mStats.onBufferUnused(bufferIter->second->mAllocSize); mFreeBuffers.insert(bufferId); } else { mStats.onBufferUnused(bufferIter->second->mAllocSize); mStats.onBufferEvicted(bufferIter->second->mAllocSize); mBuffers.erase(bufferIter); mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel); } } mTransactions.erase(iter); } Loading Loading @@ -538,6 +693,121 @@ void Accessor::Impl::BufferPool::cleanUp(bool clearCache) { } } void Accessor::Impl::BufferPool::invalidate( bool needsAck, BufferId from, BufferId to, const std::shared_ptr<Accessor::Impl> &impl) { for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) { if (isBufferInRange(from, to, *freeIt)) { auto it = mBuffers.find(*freeIt); if (it != mBuffers.end() && it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) { mStats.onBufferEvicted(it->second->mAllocSize); mBuffers.erase(it); freeIt = mFreeBuffers.erase(freeIt); continue; } else { ALOGW("bufferpool inconsistent!"); } } ++freeIt; } size_t left = 0; for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) { if (isBufferInRange(from, to, it->first)) { it->second->invalidate(); ++left; } } mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl); } void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) { BufferId from = mStartSeq; BufferId to = mSeq; mStartSeq = mSeq; // TODO: needsAck params if (from != to) { invalidate(true, from, to, impl); } } void Accessor::Impl::invalidatorThread( std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors, std::mutex &mutex, std::condition_variable &cv, bool &ready) { while(true) { std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied; { std::unique_lock<std::mutex> lock(mutex); if (!ready) { cv.wait(lock); } copied.insert(accessors.begin(), accessors.end()); } std::list<ConnectionId> erased; for (auto it = copied.begin(); it != copied.end(); ++it) { const std::shared_ptr<Accessor::Impl> impl = it->second.lock(); if (!impl) { erased.push_back(it->first); } else { impl->handleInvalidateAck(); } } { std::unique_lock<std::mutex> lock(mutex); for (auto it = erased.begin(); it != erased.end(); ++it) { accessors.erase(*it); } if (accessors.size() == 0) { ready = false; } else { // prevent draining cpu. lock.unlock(); std::this_thread::yield(); } } } } Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) { std::thread invalidator( invalidatorThread, std::ref(mAccessors), std::ref(mMutex), std::ref(mCv), std::ref(mReady)); invalidator.detach(); } void Accessor::Impl::AccessorInvalidator::addAccessor( uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) { bool notify = false; std::unique_lock<std::mutex> lock(mMutex); if (mAccessors.find(accessorId) == mAccessors.end()) { if (!mReady) { mReady = true; notify = true; } mAccessors.insert(std::make_pair(accessorId, impl)); } lock.unlock(); if (notify) { mCv.notify_one(); } } void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) { std::lock_guard<std::mutex> lock(mMutex); mAccessors.erase(accessorId); if (mAccessors.size() == 0) { mReady = false; } } Accessor::Impl::AccessorInvalidator Accessor::Impl::sInvalidator; } // namespace implementation } // namespace V2_0 } // namespace bufferpool Loading media/bufferpool/2.0/AccessorImpl.h +98 −3 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include <map> #include <set> #include <condition_variable> #include "Accessor.h" namespace android { Loading @@ -33,15 +34,20 @@ struct TransactionStatus; /** * An implementation of a buffer pool accessor(or a buffer pool implementation.) */ class Accessor::Impl { class Accessor::Impl : public std::enable_shared_from_this<Accessor::Impl> { public: Impl(const std::shared_ptr<BufferPoolAllocator> &allocator); ~Impl(); ResultStatus connect( const sp<Accessor> &accessor, sp<Connection> *connection, ConnectionId *pConnectionId, const StatusDescriptor** fmqDescPtr); const sp<Accessor> &accessor, const sp<IObserver> &observer, sp<Connection> *connection, ConnectionId *pConnectionId, uint32_t *pMsgId, const StatusDescriptor** statusDescPtr, const InvalidationDescriptor** invDescPtr); ResultStatus close(ConnectionId connectionId); Loading @@ -55,8 +61,14 @@ public: BufferId bufferId, const native_handle_t** handle); void flush(); void cleanUp(bool clearCache); bool isValid(); void handleInvalidateAck(); private: // ConnectionId = pid : (timestamp_created + seqId) // in order to guarantee uniqueness for each connection Loading @@ -78,7 +90,10 @@ private: int64_t mLastCleanUpUs; int64_t mLastLogUs; BufferId mSeq; BufferId mStartSeq; bool mValid; BufferStatusObserver mObserver; BufferInvalidationChannel mInvalidationChannel; std::map<ConnectionId, std::set<BufferId>> mUsingBuffers; std::map<BufferId, std::set<ConnectionId>> mUsingConnections; Loading @@ -95,6 +110,54 @@ private: std::map<BufferId, std::unique_ptr<InternalBuffer>> mBuffers; std::set<BufferId> mFreeBuffers; struct Invalidation { static std::atomic<std::uint32_t> sSeqId; struct Pending { bool mNeedsAck; uint32_t mFrom; uint32_t mTo; size_t mLeft; const std::weak_ptr<Accessor::Impl> mImpl; Pending(bool needsAck, uint32_t from, uint32_t to, size_t left, const std::shared_ptr<Accessor::Impl> &impl) : mNeedsAck(needsAck), mFrom(from), mTo(to), mLeft(left), mImpl(impl) {} bool invalidate(uint32_t bufferId) { return isBufferInRange(mFrom, mTo, bufferId) && --mLeft == 0; } }; std::list<Pending> mPendings; std::map<ConnectionId, uint32_t> mAcks; std::map<ConnectionId, const wp<IObserver>> mObservers; uint32_t mInvalidationId; uint32_t mId; Invalidation() : mInvalidationId(0), mId(sSeqId.fetch_add(1)) {} void onConnect(ConnectionId conId, const sp<IObserver> &observer); void onClose(ConnectionId conId); void onAck(ConnectionId conId, uint32_t msgId); void onBufferInvalidated( BufferId bufferId, BufferInvalidationChannel &channel); void onInvalidationRequest( bool needsAck, uint32_t from, uint32_t to, size_t left, BufferInvalidationChannel &channel, const std::shared_ptr<Accessor::Impl> &impl); void onHandleAck(); } mInvalidation; /// Buffer pool statistics which tracks allocation and transfer statistics. struct Stats { /// Total size of allocations which are used or available to use. Loading Loading @@ -164,6 +227,13 @@ private: } } mStats; bool isValid() { return mValid; } void invalidate(bool needsAck, BufferId from, BufferId to, const std::shared_ptr<Accessor::Impl> &impl); public: /** Creates a buffer pool. */ BufferPool(); Loading Loading @@ -286,8 +356,33 @@ private: */ void cleanUp(bool clearCache = false); /** * Processes pending buffer status messages and invalidate all current * free buffers. Active buffers are invalidated after being inactive. */ void flush(const std::shared_ptr<Accessor::Impl> &impl); friend class Accessor::Impl; } mBufferPool; struct AccessorInvalidator { std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> mAccessors; std::mutex mMutex; std::condition_variable mCv; bool mReady; AccessorInvalidator(); void addAccessor(uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl); void delAccessor(uint32_t accessorId); }; static AccessorInvalidator sInvalidator; static void invalidatorThread( std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors, std::mutex &mutex, std::condition_variable &cv, bool &ready); }; } // namespace implementation Loading media/bufferpool/2.0/Android.bp +1 −0 Original line number Diff line number Diff line Loading @@ -8,6 +8,7 @@ cc_library { "BufferStatus.cpp", "ClientManager.cpp", "Connection.cpp", "Observer.cpp", ], export_include_dirs: [ "include", Loading Loading
media/bufferpool/2.0/Accessor.cpp +22 −10 Original line number Diff line number Diff line Loading @@ -117,19 +117,18 @@ sp<ConnectionDeathRecipient> Accessor::getConnectionDeathRecipient() { Return<void> Accessor::connect( const sp<::android::hardware::media::bufferpool::V2_0::IObserver>& observer, connect_cb _hidl_cb) { (void)observer; sp<Connection> connection; ConnectionId connectionId; uint32_t msgId; const StatusDescriptor* fmqDesc; const InvalidationDescriptor* invDesc; ResultStatus status = connect(&connection, &connectionId, &fmqDesc, false); ResultStatus status = connect( observer, false, &connection, &connectionId, &msgId, &fmqDesc, &invDesc); if (status == ResultStatus::OK) { _hidl_cb(status, connection, connectionId, *fmqDesc, android::hardware::MQDescriptorUnsync<BufferInvalidationMessage>( std::vector<android::hardware::GrantorDescriptor>(), nullptr /* nhandle */, 0 /* size */)); _hidl_cb(status, connection, connectionId, msgId, *fmqDesc, *invDesc); } else { _hidl_cb(status, nullptr, -1LL, _hidl_cb(status, nullptr, -1LL, 0, android::hardware::MQDescriptorSync<BufferStatusMessage>( std::vector<android::hardware::GrantorDescriptor>(), nullptr /* nhandle */, 0 /* size */), Loading @@ -147,7 +146,15 @@ Accessor::~Accessor() { } bool Accessor::isValid() { return (bool)mImpl; return (bool)mImpl && mImpl->isValid(); } ResultStatus Accessor::flush() { if (mImpl) { mImpl->flush(); return ResultStatus::OK; } return ResultStatus::CRITICAL_ERROR; } ResultStatus Accessor::allocate( Loading @@ -170,10 +177,15 @@ ResultStatus Accessor::fetch( } ResultStatus Accessor::connect( const sp<IObserver> &observer, bool local, sp<Connection> *connection, ConnectionId *pConnectionId, const StatusDescriptor** fmqDescPtr, bool local) { uint32_t *pMsgId, const StatusDescriptor** statusDescPtr, const InvalidationDescriptor** invDescPtr) { if (mImpl) { ResultStatus status = mImpl->connect(this, connection, pConnectionId, fmqDescPtr); ResultStatus status = mImpl->connect( this, observer, connection, pConnectionId, pMsgId, statusDescPtr, invDescPtr); if (!local && status == ResultStatus::OK) { sp<Accessor> accessor(this); sConnectionDeathRecipient->add(*pConnectionId, accessor); Loading
media/bufferpool/2.0/Accessor.h +16 −5 Original line number Diff line number Diff line Loading @@ -95,6 +95,9 @@ struct Accessor : public IAccessor { /** Returns whether the accessor is valid. */ bool isValid(); /** Invalidates all buffers which are owned by bufferpool */ ResultStatus flush(); /** Allocates a buffer from a buffer pool. * * @param connectionId the connection id of the client. Loading Loading @@ -135,20 +138,28 @@ struct Accessor : public IAccessor { * created connection in order to communicate with the buffer pool. An * FMQ for buffer status message is also created for the client. * * @param observer client observer for buffer invalidation * @param local true when a connection request comes from local process, * false otherwise. * @param connection created connection * @param pConnectionId the id of the created connection * @param fmqDescPtr FMQ descriptor for shared buffer status message * @param pMsgId the id of the recent buffer pool message * @param statusDescPtr FMQ descriptor for shared buffer status message * queue between a buffer pool and the client. * @param local true when a connection request comes from local process, * false otherwise. * @param invDescPtr FMQ descriptor for buffer invalidation message * queue from a buffer pool to the client. * * @return OK when a connection is successfully made. * NO_MEMORY when there is no memory. * CRITICAL_ERROR otherwise. */ ResultStatus connect( const sp<IObserver>& observer, bool local, sp<Connection> *connection, ConnectionId *pConnectionId, const StatusDescriptor** fmqDescPtr, bool local); uint32_t *pMsgId, const StatusDescriptor** statusDescPtr, const InvalidationDescriptor** invDescPtr); /** * Closes the specified connection to the client. Loading Loading @@ -176,7 +187,7 @@ struct Accessor : public IAccessor { private: class Impl; std::unique_ptr<Impl> mImpl; std::shared_ptr<Impl> mImpl; }; } // namespace implementation Loading
media/bufferpool/2.0/AccessorImpl.cpp +284 −14 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ #include <time.h> #include <unistd.h> #include <utils/Log.h> #include <thread> #include "AccessorImpl.h" #include "Connection.h" Loading @@ -47,6 +48,7 @@ struct InternalBuffer { const std::shared_ptr<BufferPoolAllocation> mAllocation; const size_t mAllocSize; const std::vector<uint8_t> mConfig; bool mInvalidated; InternalBuffer( BufferId id, Loading @@ -54,11 +56,16 @@ struct InternalBuffer { const size_t allocSize, const std::vector<uint8_t> &allocConfig) : mId(id), mOwnerCount(0), mTransactionCount(0), mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig) {} mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig), mInvalidated(false) {} const native_handle_t *handle() { return mAllocation->handle(); } void invalidate() { mInvalidated = true; } }; struct TransactionStatus { Loading Loading @@ -138,21 +145,29 @@ Accessor::Impl::~Impl() { } ResultStatus Accessor::Impl::connect( const sp<Accessor> &accessor, sp<Connection> *connection, ConnectionId *pConnectionId, const StatusDescriptor** fmqDescPtr) { const sp<Accessor> &accessor, const sp<IObserver> &observer, sp<Connection> *connection, ConnectionId *pConnectionId, uint32_t *pMsgId, const StatusDescriptor** statusDescPtr, const InvalidationDescriptor** invDescPtr) { sp<Connection> newConnection = new Connection(); ResultStatus status = ResultStatus::CRITICAL_ERROR; { std::lock_guard<std::mutex> lock(mBufferPool.mMutex); if (newConnection) { ConnectionId id = (int64_t)sPid << 32 | sSeqId; status = mBufferPool.mObserver.open(id, fmqDescPtr); status = mBufferPool.mObserver.open(id, statusDescPtr); if (status == ResultStatus::OK) { newConnection->initialize(accessor, id); *connection = newConnection; *pConnectionId = id; *pMsgId = mBufferPool.mInvalidation.mInvalidationId; mBufferPool.mInvalidationChannel.getDesc(invDescPtr); mBufferPool.mInvalidation.onConnect(id, observer); ++sSeqId; } } mBufferPool.processStatusMessages(); mBufferPool.cleanUp(); Loading @@ -165,6 +180,7 @@ ResultStatus Accessor::Impl::close(ConnectionId connectionId) { mBufferPool.processStatusMessages(); mBufferPool.handleClose(connectionId); mBufferPool.mObserver.close(connectionId); mBufferPool.mInvalidation.onClose(connectionId); // Since close# will be called after all works are finished, it is OK to // evict unused buffers. mBufferPool.cleanUp(true); Loading Loading @@ -229,11 +245,30 @@ void Accessor::Impl::cleanUp(bool clearCache) { mBufferPool.cleanUp(clearCache); } void Accessor::Impl::flush() { std::lock_guard<std::mutex> lock(mBufferPool.mMutex); mBufferPool.processStatusMessages(); mBufferPool.flush(shared_from_this()); } void Accessor::Impl::handleInvalidateAck() { std::lock_guard<std::mutex> lock(mBufferPool.mMutex); mBufferPool.processStatusMessages(); mBufferPool.mInvalidation.onHandleAck(); } bool Accessor::Impl::isValid() { return mBufferPool.isValid(); } Accessor::Impl::Impl::BufferPool::BufferPool() : mTimestampUs(getTimestampNow()), mLastCleanUpUs(mTimestampUs), mLastLogUs(mTimestampUs), mSeq(0) {} mSeq(0), mStartSeq(0) { mValid = mInvalidationChannel.isValid(); } // Statistics helper Loading @@ -242,6 +277,8 @@ int percentage(T base, S total) { return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0); } std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sSeqId(0); Accessor::Impl::Impl::BufferPool::~BufferPool() { std::lock_guard<std::mutex> lock(mMutex); ALOGD("Destruction - bufferpool %p " Loading @@ -255,6 +292,96 @@ Accessor::Impl::Impl::BufferPool::~BufferPool() { percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers)); } void Accessor::Impl::BufferPool::Invalidation::onConnect( ConnectionId conId, const sp<IObserver>& observer) { mAcks[conId] = mInvalidationId; // starts from current invalidationId mObservers.insert(std::make_pair(conId, observer)); } void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) { mAcks.erase(conId); mObservers.erase(conId); } void Accessor::Impl::BufferPool::Invalidation::onAck( ConnectionId conId, uint32_t msgId) { auto it = mAcks.find(conId); if (it == mAcks.end() || isMessageLater(msgId, it->second)) { mAcks[conId] = msgId; } } void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated( BufferId bufferId, BufferInvalidationChannel &channel) { for (auto it = mPendings.begin(); it != mPendings.end();) { if (it->invalidate(bufferId)) { it = mPendings.erase(it); uint32_t msgId = 0; if (it->mNeedsAck) { msgId = ++mInvalidationId; if (msgId == 0) { // wrap happens msgId = ++mInvalidationId; } } channel.postInvalidation(msgId, it->mFrom, it->mTo); sInvalidator.addAccessor(mId, it->mImpl); continue; } ++it; } } void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest( bool needsAck, uint32_t from, uint32_t to, size_t left, BufferInvalidationChannel &channel, const std::shared_ptr<Accessor::Impl> &impl) { if (left == 0) { uint32_t msgId = 0; if (needsAck) { msgId = ++mInvalidationId; if (msgId == 0) { // wrap happens msgId = ++mInvalidationId; } } channel.postInvalidation(msgId, from, to); sInvalidator.addAccessor(mId, impl); } else { // TODO: sending hint message? Pending pending(needsAck, from, to, left, impl); mPendings.push_back(pending); } } void Accessor::Impl::BufferPool::Invalidation::onHandleAck() { if (mInvalidationId != 0) { std::set<int> deads; for (auto it = mAcks.begin(); it != mAcks.end(); ++it) { if (it->second != mInvalidationId) { const sp<IObserver> observer = mObservers[it->first].promote(); if (observer) { observer->onMessage(it->first, mInvalidationId); } else { deads.insert(it->first); } } } if (deads.size() > 0) { for (auto it = deads.begin(); it != deads.end(); ++it) { onClose(*it); } } } // All invalidation Ids are synced. sInvalidator.delAccessor(mId); } bool Accessor::Impl::BufferPool::handleOwnBuffer( ConnectionId connectionId, BufferId bufferId) { Loading @@ -275,8 +402,15 @@ bool Accessor::Impl::BufferPool::handleReleaseBuffer( iter->second->mOwnerCount--; if (iter->second->mOwnerCount == 0 && iter->second->mTransactionCount == 0) { if (!iter->second->mInvalidated) { mStats.onBufferUnused(iter->second->mAllocSize); mFreeBuffers.insert(bufferId); } else { mStats.onBufferUnused(iter->second->mAllocSize); mStats.onBufferEvicted(iter->second->mAllocSize); mBuffers.erase(iter); mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel); } } } erase(&mUsingConnections, bufferId, connectionId); Loading Loading @@ -352,8 +486,15 @@ bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage bufferIter->second->mTransactionCount--; if (bufferIter->second->mOwnerCount == 0 && bufferIter->second->mTransactionCount == 0) { if (!bufferIter->second->mInvalidated) { mStats.onBufferUnused(bufferIter->second->mAllocSize); mFreeBuffers.insert(message.bufferId); } else { mStats.onBufferUnused(bufferIter->second->mAllocSize); mStats.onBufferEvicted(bufferIter->second->mAllocSize); mBuffers.erase(bufferIter); mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel); } } mTransactions.erase(found); } Loading Loading @@ -400,7 +541,7 @@ void Accessor::Impl::BufferPool::processStatusMessages() { ret = handleTransferResult(message); break; case BufferStatus::INVALIDATION_ACK: // TODO mInvalidation.onAck(message.connectionId, message.bufferId); break; } if (ret == false) { Loading @@ -423,8 +564,15 @@ bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) { if (bufferIter->second->mOwnerCount == 0 && bufferIter->second->mTransactionCount == 0) { // TODO: handle freebuffer insert fail if (!bufferIter->second->mInvalidated) { mStats.onBufferUnused(bufferIter->second->mAllocSize); mFreeBuffers.insert(bufferId); } else { mStats.onBufferUnused(bufferIter->second->mAllocSize); mStats.onBufferEvicted(bufferIter->second->mAllocSize); mBuffers.erase(bufferIter); mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel); } } } } Loading @@ -446,8 +594,15 @@ bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) { if (bufferIter->second->mOwnerCount == 0 && bufferIter->second->mTransactionCount == 0) { // TODO: handle freebuffer insert fail if (!bufferIter->second->mInvalidated) { mStats.onBufferUnused(bufferIter->second->mAllocSize); mFreeBuffers.insert(bufferId); } else { mStats.onBufferUnused(bufferIter->second->mAllocSize); mStats.onBufferEvicted(bufferIter->second->mAllocSize); mBuffers.erase(bufferIter); mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel); } } mTransactions.erase(iter); } Loading Loading @@ -538,6 +693,121 @@ void Accessor::Impl::BufferPool::cleanUp(bool clearCache) { } } void Accessor::Impl::BufferPool::invalidate( bool needsAck, BufferId from, BufferId to, const std::shared_ptr<Accessor::Impl> &impl) { for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) { if (isBufferInRange(from, to, *freeIt)) { auto it = mBuffers.find(*freeIt); if (it != mBuffers.end() && it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) { mStats.onBufferEvicted(it->second->mAllocSize); mBuffers.erase(it); freeIt = mFreeBuffers.erase(freeIt); continue; } else { ALOGW("bufferpool inconsistent!"); } } ++freeIt; } size_t left = 0; for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) { if (isBufferInRange(from, to, it->first)) { it->second->invalidate(); ++left; } } mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl); } void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) { BufferId from = mStartSeq; BufferId to = mSeq; mStartSeq = mSeq; // TODO: needsAck params if (from != to) { invalidate(true, from, to, impl); } } void Accessor::Impl::invalidatorThread( std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors, std::mutex &mutex, std::condition_variable &cv, bool &ready) { while(true) { std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied; { std::unique_lock<std::mutex> lock(mutex); if (!ready) { cv.wait(lock); } copied.insert(accessors.begin(), accessors.end()); } std::list<ConnectionId> erased; for (auto it = copied.begin(); it != copied.end(); ++it) { const std::shared_ptr<Accessor::Impl> impl = it->second.lock(); if (!impl) { erased.push_back(it->first); } else { impl->handleInvalidateAck(); } } { std::unique_lock<std::mutex> lock(mutex); for (auto it = erased.begin(); it != erased.end(); ++it) { accessors.erase(*it); } if (accessors.size() == 0) { ready = false; } else { // prevent draining cpu. lock.unlock(); std::this_thread::yield(); } } } } Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) { std::thread invalidator( invalidatorThread, std::ref(mAccessors), std::ref(mMutex), std::ref(mCv), std::ref(mReady)); invalidator.detach(); } void Accessor::Impl::AccessorInvalidator::addAccessor( uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) { bool notify = false; std::unique_lock<std::mutex> lock(mMutex); if (mAccessors.find(accessorId) == mAccessors.end()) { if (!mReady) { mReady = true; notify = true; } mAccessors.insert(std::make_pair(accessorId, impl)); } lock.unlock(); if (notify) { mCv.notify_one(); } } void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) { std::lock_guard<std::mutex> lock(mMutex); mAccessors.erase(accessorId); if (mAccessors.size() == 0) { mReady = false; } } Accessor::Impl::AccessorInvalidator Accessor::Impl::sInvalidator; } // namespace implementation } // namespace V2_0 } // namespace bufferpool Loading
media/bufferpool/2.0/AccessorImpl.h +98 −3 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include <map> #include <set> #include <condition_variable> #include "Accessor.h" namespace android { Loading @@ -33,15 +34,20 @@ struct TransactionStatus; /** * An implementation of a buffer pool accessor(or a buffer pool implementation.) */ class Accessor::Impl { class Accessor::Impl : public std::enable_shared_from_this<Accessor::Impl> { public: Impl(const std::shared_ptr<BufferPoolAllocator> &allocator); ~Impl(); ResultStatus connect( const sp<Accessor> &accessor, sp<Connection> *connection, ConnectionId *pConnectionId, const StatusDescriptor** fmqDescPtr); const sp<Accessor> &accessor, const sp<IObserver> &observer, sp<Connection> *connection, ConnectionId *pConnectionId, uint32_t *pMsgId, const StatusDescriptor** statusDescPtr, const InvalidationDescriptor** invDescPtr); ResultStatus close(ConnectionId connectionId); Loading @@ -55,8 +61,14 @@ public: BufferId bufferId, const native_handle_t** handle); void flush(); void cleanUp(bool clearCache); bool isValid(); void handleInvalidateAck(); private: // ConnectionId = pid : (timestamp_created + seqId) // in order to guarantee uniqueness for each connection Loading @@ -78,7 +90,10 @@ private: int64_t mLastCleanUpUs; int64_t mLastLogUs; BufferId mSeq; BufferId mStartSeq; bool mValid; BufferStatusObserver mObserver; BufferInvalidationChannel mInvalidationChannel; std::map<ConnectionId, std::set<BufferId>> mUsingBuffers; std::map<BufferId, std::set<ConnectionId>> mUsingConnections; Loading @@ -95,6 +110,54 @@ private: std::map<BufferId, std::unique_ptr<InternalBuffer>> mBuffers; std::set<BufferId> mFreeBuffers; struct Invalidation { static std::atomic<std::uint32_t> sSeqId; struct Pending { bool mNeedsAck; uint32_t mFrom; uint32_t mTo; size_t mLeft; const std::weak_ptr<Accessor::Impl> mImpl; Pending(bool needsAck, uint32_t from, uint32_t to, size_t left, const std::shared_ptr<Accessor::Impl> &impl) : mNeedsAck(needsAck), mFrom(from), mTo(to), mLeft(left), mImpl(impl) {} bool invalidate(uint32_t bufferId) { return isBufferInRange(mFrom, mTo, bufferId) && --mLeft == 0; } }; std::list<Pending> mPendings; std::map<ConnectionId, uint32_t> mAcks; std::map<ConnectionId, const wp<IObserver>> mObservers; uint32_t mInvalidationId; uint32_t mId; Invalidation() : mInvalidationId(0), mId(sSeqId.fetch_add(1)) {} void onConnect(ConnectionId conId, const sp<IObserver> &observer); void onClose(ConnectionId conId); void onAck(ConnectionId conId, uint32_t msgId); void onBufferInvalidated( BufferId bufferId, BufferInvalidationChannel &channel); void onInvalidationRequest( bool needsAck, uint32_t from, uint32_t to, size_t left, BufferInvalidationChannel &channel, const std::shared_ptr<Accessor::Impl> &impl); void onHandleAck(); } mInvalidation; /// Buffer pool statistics which tracks allocation and transfer statistics. struct Stats { /// Total size of allocations which are used or available to use. Loading Loading @@ -164,6 +227,13 @@ private: } } mStats; bool isValid() { return mValid; } void invalidate(bool needsAck, BufferId from, BufferId to, const std::shared_ptr<Accessor::Impl> &impl); public: /** Creates a buffer pool. */ BufferPool(); Loading Loading @@ -286,8 +356,33 @@ private: */ void cleanUp(bool clearCache = false); /** * Processes pending buffer status messages and invalidate all current * free buffers. Active buffers are invalidated after being inactive. */ void flush(const std::shared_ptr<Accessor::Impl> &impl); friend class Accessor::Impl; } mBufferPool; struct AccessorInvalidator { std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> mAccessors; std::mutex mMutex; std::condition_variable mCv; bool mReady; AccessorInvalidator(); void addAccessor(uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl); void delAccessor(uint32_t accessorId); }; static AccessorInvalidator sInvalidator; static void invalidatorThread( std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors, std::mutex &mutex, std::condition_variable &cv, bool &ready); }; } // namespace implementation Loading
media/bufferpool/2.0/Android.bp +1 −0 Original line number Diff line number Diff line Loading @@ -8,6 +8,7 @@ cc_library { "BufferStatus.cpp", "ClientManager.cpp", "Connection.cpp", "Observer.cpp", ], export_include_dirs: [ "include", Loading