Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6514f24f authored by Sungtak Lee's avatar Sungtak Lee Committed by Android (Google) Code Review
Browse files

Merge changes from topic "bufferpool-aidl"

* changes:
  Support mainline s/w codec
  AIDL BufferPool implementation (HIDL -> AIDL)
  AIDL bufferpool implementation (just copy from HIDL impl)
  bufferpool2: Support mainline s/w codec
parents 44a6de89 bfeecf3e
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@ aidl_interface {
            apex_available: [
                "//apex_available:platform",
                "com.android.btservices",
                "com.android.media.swcodec",
            ],
            min_sdk_version: "29",
        },
+5 −0
Original line number Diff line number Diff line
@@ -40,6 +40,11 @@ aidl_interface {
        },
        ndk: {
            enabled: true,
            apex_available: [
                "//apex_available:platform",
                "com.android.media.swcodec",
            ],
            min_sdk_version: "29",
        },
    },
}
+509 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#define LOG_TAG "AidlBufferPoolAcc"
//#define LOG_NDEBUG 0

#include <sys/types.h>
#include <stdint.h>
#include <time.h>
#include <unistd.h>
#include <utils/Log.h>
#include <thread>

#include "Accessor.h"
#include "Connection.h"
#include "DataHelper.h"

namespace aidl::android::hardware::media::bufferpool2::implementation {

namespace {
    static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
    static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
}

#ifdef __ANDROID_VNDK__
static constexpr uint32_t kSeqIdVndkBit = 1U << 31;
#else
static constexpr uint32_t kSeqIdVndkBit = 0;
#endif

static constexpr uint32_t kSeqIdMax = 0x7fffffff;
uint32_t Accessor::sSeqId = time(nullptr) & kSeqIdMax;

namespace {
// anonymous namespace
static std::shared_ptr<ConnectionDeathRecipient> sConnectionDeathRecipient =
    std::make_shared<ConnectionDeathRecipient>();

void serviceDied(void *cookie) {
    if (sConnectionDeathRecipient) {
        sConnectionDeathRecipient->onDead(cookie);
    }
}
}

std::shared_ptr<ConnectionDeathRecipient> Accessor::getConnectionDeathRecipient() {
    return sConnectionDeathRecipient;
}

ConnectionDeathRecipient::ConnectionDeathRecipient() {
    mDeathRecipient = ndk::ScopedAIBinder_DeathRecipient(
            AIBinder_DeathRecipient_new(serviceDied));
}

void ConnectionDeathRecipient::add(
        int64_t connectionId,
        const std::shared_ptr<Accessor> &accessor) {
    std::lock_guard<std::mutex> lock(mLock);
    if (mAccessors.find(connectionId) == mAccessors.end()) {
        mAccessors.insert(std::make_pair(connectionId, accessor));
    }
}

void ConnectionDeathRecipient::remove(int64_t connectionId) {
    std::lock_guard<std::mutex> lock(mLock);
    mAccessors.erase(connectionId);
    auto it = mConnectionToCookie.find(connectionId);
    if (it != mConnectionToCookie.end()) {
        void * cookie = it->second;
        mConnectionToCookie.erase(it);
        auto cit = mCookieToConnections.find(cookie);
        if (cit != mCookieToConnections.end()) {
            cit->second.erase(connectionId);
            if (cit->second.size() == 0) {
                mCookieToConnections.erase(cit);
            }
        }
    }
}

void ConnectionDeathRecipient::addCookieToConnection(
        void *cookie,
        int64_t connectionId) {
    std::lock_guard<std::mutex> lock(mLock);
    if (mAccessors.find(connectionId) == mAccessors.end()) {
        return;
    }
    mConnectionToCookie.insert(std::make_pair(connectionId, cookie));
    auto it = mCookieToConnections.find(cookie);
    if (it != mCookieToConnections.end()) {
        it->second.insert(connectionId);
    } else {
        mCookieToConnections.insert(std::make_pair(
                cookie, std::set<int64_t>{connectionId}));
    }
}

void ConnectionDeathRecipient::onDead(void *cookie) {
    std::map<int64_t, const std::weak_ptr<Accessor>> connectionsToClose;
    {
        std::lock_guard<std::mutex> lock(mLock);

        auto it = mCookieToConnections.find(cookie);
        if (it != mCookieToConnections.end()) {
            for (auto conIt = it->second.begin(); conIt != it->second.end(); ++conIt) {
                auto accessorIt = mAccessors.find(*conIt);
                if (accessorIt != mAccessors.end()) {
                    connectionsToClose.insert(std::make_pair(*conIt, accessorIt->second));
                    mAccessors.erase(accessorIt);
                }
                mConnectionToCookie.erase(*conIt);
            }
            mCookieToConnections.erase(it);
        }
    }

    if (connectionsToClose.size() > 0) {
        std::shared_ptr<Accessor> accessor;
        for (auto it = connectionsToClose.begin(); it != connectionsToClose.end(); ++it) {
            accessor = it->second.lock();

            if (accessor) {
                accessor->close(it->first);
                ALOGD("connection %lld closed on death", (long long)it->first);
            }
        }
    }
}

AIBinder_DeathRecipient *ConnectionDeathRecipient::getRecipient() {
    return mDeathRecipient.get();
}

::ndk::ScopedAStatus Accessor::connect(const std::shared_ptr<::aidl::android::hardware::media::bufferpool2::IObserver>& in_observer, ::aidl::android::hardware::media::bufferpool2::IAccessor::ConnectionInfo* _aidl_return) {
    std::shared_ptr<Connection> connection;
    ConnectionId connectionId;
    uint32_t msgId;
    StatusDescriptor statusDesc;
    InvalidationDescriptor invDesc;
    BufferPoolStatus status = connect(
            in_observer, false, &connection, &connectionId, &msgId, &statusDesc, &invDesc);
    if (status == ResultStatus::OK) {
        _aidl_return->connection = connection;
        _aidl_return->connectionId = connectionId;
        _aidl_return->msgId = msgId;
        _aidl_return->toFmqDesc = std::move(statusDesc);
        _aidl_return->fromFmqDesc = std::move(invDesc);
        return ::ndk::ScopedAStatus::ok();
    }
    return ::ndk::ScopedAStatus::fromServiceSpecificError(status);
}

Accessor::Accessor(const std::shared_ptr<BufferPoolAllocator> &allocator)
    : mAllocator(allocator), mScheduleEvictTs(0) {}

Accessor::~Accessor() {
}

bool Accessor::isValid() {
    return mBufferPool.isValid();
}

BufferPoolStatus Accessor::flush() {
    std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
    mBufferPool.processStatusMessages();
    mBufferPool.flush(ref<Accessor>());
    return ResultStatus::OK;
}

BufferPoolStatus Accessor::allocate(
        ConnectionId connectionId,
        const std::vector<uint8_t> &params,
        BufferId *bufferId, const native_handle_t** handle) {
    std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
    mBufferPool.processStatusMessages();
    BufferPoolStatus status = ResultStatus::OK;
    if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
        lock.unlock();
        std::shared_ptr<BufferPoolAllocation> alloc;
        size_t allocSize;
        status = mAllocator->allocate(params, &alloc, &allocSize);
        lock.lock();
        if (status == ResultStatus::OK) {
            status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
        }
        ALOGV("create a buffer %d : %u %p",
              status == ResultStatus::OK, *bufferId, *handle);
    }
    if (status == ResultStatus::OK) {
        // TODO: handle ownBuffer failure
        mBufferPool.handleOwnBuffer(connectionId, *bufferId);
    }
    mBufferPool.cleanUp();
    scheduleEvictIfNeeded();
    return status;
}

BufferPoolStatus Accessor::fetch(
        ConnectionId connectionId, TransactionId transactionId,
        BufferId bufferId, const native_handle_t** handle) {
    std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
    mBufferPool.processStatusMessages();
    auto found = mBufferPool.mTransactions.find(transactionId);
    if (found != mBufferPool.mTransactions.end() &&
            contains(&mBufferPool.mPendingTransactions,
                     connectionId, transactionId)) {
        if (found->second->mSenderValidated &&
                found->second->mStatus == BufferStatus::TRANSFER_FROM &&
                found->second->mBufferId == bufferId) {
            found->second->mStatus = BufferStatus::TRANSFER_FETCH;
            auto bufferIt = mBufferPool.mBuffers.find(bufferId);
            if (bufferIt != mBufferPool.mBuffers.end()) {
                mBufferPool.mStats.onBufferFetched();
                *handle = bufferIt->second->handle();
                return ResultStatus::OK;
            }
        }
    }
    mBufferPool.cleanUp();
    scheduleEvictIfNeeded();
    return ResultStatus::CRITICAL_ERROR;
}

BufferPoolStatus Accessor::connect(
        const std::shared_ptr<IObserver> &observer, bool local,
        std::shared_ptr<Connection> *connection, ConnectionId *pConnectionId,
        uint32_t *pMsgId,
        StatusDescriptor* statusDescPtr,
        InvalidationDescriptor* invDescPtr) {
    std::shared_ptr<Connection> newConnection = ::ndk::SharedRefBase::make<Connection>();
    BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
    {
        std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
        if (newConnection) {
            int32_t pid = getpid();
            ConnectionId id = (int64_t)pid << 32 | sSeqId | kSeqIdVndkBit;
            status = mBufferPool.mObserver.open(id, statusDescPtr);
            if (status == ResultStatus::OK) {
                newConnection->initialize(ref<Accessor>(), id);
                *connection = newConnection;
                *pConnectionId = id;
                *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
                mBufferPool.mConnectionIds.insert(id);
                mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
                mBufferPool.mInvalidation.onConnect(id, observer);
                if (sSeqId == kSeqIdMax) {
                   sSeqId = 0;
                } else {
                    ++sSeqId;
                }
            }

        }
        mBufferPool.processStatusMessages();
        mBufferPool.cleanUp();
        scheduleEvictIfNeeded();
    }
    if (!local && status == ResultStatus::OK) {
        std::shared_ptr<Accessor> accessor(ref<Accessor>());
        sConnectionDeathRecipient->add(*pConnectionId, accessor);
    }
    return status;
}

BufferPoolStatus Accessor::close(ConnectionId connectionId) {
    {
        std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
        ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
        mBufferPool.processStatusMessages();
        mBufferPool.handleClose(connectionId);
        mBufferPool.mObserver.close(connectionId);
        mBufferPool.mInvalidation.onClose(connectionId);
        // Since close# will be called after all works are finished, it is OK to
        // evict unused buffers.
        mBufferPool.cleanUp(true);
        scheduleEvictIfNeeded();
    }
    sConnectionDeathRecipient->remove(connectionId);
    return ResultStatus::OK;
}

void Accessor::cleanUp(bool clearCache) {
    // transaction timeout, buffer caching TTL handling
    std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
    mBufferPool.processStatusMessages();
    mBufferPool.cleanUp(clearCache);
}

void Accessor::handleInvalidateAck() {
    std::map<ConnectionId, const std::shared_ptr<IObserver>> observers;
    uint32_t invalidationId;
    {
        std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
        mBufferPool.processStatusMessages();
        mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
    }
    // Do not hold lock for send invalidations
    size_t deadClients = 0;
    for (auto it = observers.begin(); it != observers.end(); ++it) {
        const std::shared_ptr<IObserver> observer = it->second;
        if (observer) {
            ::ndk::ScopedAStatus status = observer->onMessage(it->first, invalidationId);
            if (!status.isOk()) {
                ++deadClients;
            }
        }
    }
    if (deadClients > 0) {
        ALOGD("During invalidation found %zu dead clients", deadClients);
    }
}

void Accessor::invalidatorThread(
            std::map<uint32_t, const std::weak_ptr<Accessor>> &accessors,
            std::mutex &mutex,
            std::condition_variable &cv,
            bool &ready) {
    constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
    constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
    constexpr useconds_t MAX_SLEEP_US = 10000;
    uint32_t numSpin = 0;
    useconds_t sleepUs = 1;

    while(true) {
        std::map<uint32_t, const std::weak_ptr<Accessor>> copied;
        {
            std::unique_lock<std::mutex> lock(mutex);
            while (!ready) {
                numSpin = 0;
                sleepUs = 1;
                cv.wait(lock);
            }
            copied.insert(accessors.begin(), accessors.end());
        }
        std::list<ConnectionId> erased;
        for (auto it = copied.begin(); it != copied.end(); ++it) {
            const std::shared_ptr<Accessor> acc = it->second.lock();
            if (!acc) {
                erased.push_back(it->first);
            } else {
                acc->handleInvalidateAck();
            }
        }
        {
            std::unique_lock<std::mutex> lock(mutex);
            for (auto it = erased.begin(); it != erased.end(); ++it) {
                accessors.erase(*it);
            }
            if (accessors.size() == 0) {
                ready = false;
            } else {
                // N.B. Since there is not a efficient way to wait over FMQ,
                // polling over the FMQ is the current way to prevent draining
                // CPU.
                lock.unlock();
                ++numSpin;
                if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
                    sleepUs < MAX_SLEEP_US) {
                    sleepUs *= 10;
                }
                if (numSpin % NUM_SPIN_TO_LOG == 0) {
                    ALOGW("invalidator thread spinning");
                }
                ::usleep(sleepUs);
            }
        }
    }
}

Accessor::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
    std::thread invalidator(
            invalidatorThread,
            std::ref(mAccessors),
            std::ref(mMutex),
            std::ref(mCv),
            std::ref(mReady));
    invalidator.detach();
}

void Accessor::AccessorInvalidator::addAccessor(
        uint32_t accessorId, const std::weak_ptr<Accessor> &accessor) {
    bool notify = false;
    std::unique_lock<std::mutex> lock(mMutex);
    if (mAccessors.find(accessorId) == mAccessors.end()) {
        if (!mReady) {
            mReady = true;
            notify = true;
        }
        mAccessors.emplace(accessorId, accessor);
        ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
    }
    lock.unlock();
    if (notify) {
        mCv.notify_one();
    }
}

void Accessor::AccessorInvalidator::delAccessor(uint32_t accessorId) {
    std::lock_guard<std::mutex> lock(mMutex);
    mAccessors.erase(accessorId);
    ALOGV("buffer invalidation deleted bp:%u", accessorId);
    if (mAccessors.size() == 0) {
        mReady = false;
    }
}

std::unique_ptr<Accessor::AccessorInvalidator> Accessor::sInvalidator;

void Accessor::createInvalidator() {
    if (!sInvalidator) {
        sInvalidator = std::make_unique<Accessor::AccessorInvalidator>();
    }
}

void Accessor::evictorThread(
        std::map<const std::weak_ptr<Accessor>, nsecs_t, std::owner_less<>> &accessors,
        std::mutex &mutex,
        std::condition_variable &cv) {
    std::list<const std::weak_ptr<Accessor>> evictList;
    while (true) {
        int expired = 0;
        int evicted = 0;
        {
            nsecs_t now = systemTime();
            std::unique_lock<std::mutex> lock(mutex);
            while (accessors.size() == 0) {
                cv.wait(lock);
            }
            auto it = accessors.begin();
            while (it != accessors.end()) {
                if (now > (it->second + kEvictDurationNs)) {
                    ++expired;
                    evictList.push_back(it->first);
                    it = accessors.erase(it);
                } else {
                    ++it;
                }
            }
        }
        // evict idle accessors;
        for (auto it = evictList.begin(); it != evictList.end(); ++it) {
            const std::shared_ptr<Accessor> accessor = it->lock();
            if (accessor) {
                accessor->cleanUp(true);
                ++evicted;
            }
        }
        if (expired > 0) {
            ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
        }
        evictList.clear();
        ::usleep(kEvictGranularityNs / 1000);
    }
}

Accessor::AccessorEvictor::AccessorEvictor() {
    std::thread evictor(
            evictorThread,
            std::ref(mAccessors),
            std::ref(mMutex),
            std::ref(mCv));
    evictor.detach();
}

void Accessor::AccessorEvictor::addAccessor(
        const std::weak_ptr<Accessor> &accessor, nsecs_t ts) {
    std::lock_guard<std::mutex> lock(mMutex);
    bool notify = mAccessors.empty();
    auto it = mAccessors.find(accessor);
    if (it == mAccessors.end()) {
        mAccessors.emplace(accessor, ts);
    } else {
        it->second = ts;
    }
    if (notify) {
        mCv.notify_one();
    }
}

std::unique_ptr<Accessor::AccessorEvictor> Accessor::sEvictor;

void Accessor::createEvictor() {
    if (!sEvictor) {
        sEvictor = std::make_unique<Accessor::AccessorEvictor>();
    }
}

void Accessor::scheduleEvictIfNeeded() {
    nsecs_t now = systemTime();

    if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
        mScheduleEvictTs = now;
        sEvictor->addAccessor(ref<Accessor>(), now);
    }
}

}  // namespace aidl::android::hardware::media::bufferpool2::implemntation {
+238 −0

File added.

Preview size limit exceeded, changes collapsed.

+50 −0
Original line number Diff line number Diff line
package {
    // See: http://go/android-license-faq
    // A large-scale-change added 'default_applicable_licenses' to import
    // all of the 'license_kinds' from "frameworks_av_license"
    // to get the below license kinds:
    //   SPDX-license-identifier-Apache-2.0
    default_applicable_licenses: ["hardware_interfaces_license"],
}

cc_library {
    name: "libstagefright_aidl_bufferpool2",
    vendor_available: true,
    min_sdk_version: "29",
    apex_available: [
        "//apex_available:platform",
        "com.android.media.swcodec",
        "test_com.android.media.swcodec",
    ],
    srcs: [
        "Accessor.cpp",
        "BufferPool.cpp",
        "BufferPoolClient.cpp",
        "BufferStatus.cpp",
        "ClientManager.cpp",
        "Connection.cpp",
        "Observer.cpp",
    ],
    export_include_dirs: [
        "include",
    ],
    shared_libs: [
        "libbinder_ndk",
        "libcutils",
        "libfmq",
        "liblog",
        "libutils",
        "android.hardware.media.bufferpool2-V1-ndk",
    ],
    static_libs: [
        "libaidlcommonsupport",
    ],
    export_shared_lib_headers: [
        "libfmq",
        "android.hardware.media.bufferpool2-V1-ndk",
    ],
    double_loadable: true,
    cflags: [
        "-DBUFFERPOOL_CLONE_HANDLES",
    ],
}
Loading