Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d45e49b4 authored by Stan Rokita's avatar Stan Rokita Committed by Android (Google) Code Review
Browse files

Merge "MH2 | Implement pending writes thread"

parents 09eeddff 59714269
Loading
Loading
Loading
Loading
+67 −14
Original line number Diff line number Diff line
@@ -49,8 +49,15 @@ HalProxy::HalProxy(std::vector<ISensorsSubHal*>& subHalList) : mSubHalList(subHa
}

HalProxy::~HalProxy() {
    // TODO: Join any running threads and clean up FMQs and any other allocated
    // state.
    {
        std::lock_guard<std::mutex> lockGuard(mEventQueueWriteMutex);
        mPendingWritesRun = false;
        mEventQueueWriteCV.notify_one();
    }
    if (mPendingWritesThread.joinable()) {
        mPendingWritesThread.join();
    }
    // TODO: Cleanup wakeup thread once it is implemented
}

Return<void> HalProxy::getSensorsList(getSensorsList_cb _hidl_cb) {
@@ -120,7 +127,8 @@ Return<Result> HalProxy::initialize(
        result = Result::BAD_VALUE;
    }

    // TODO: start threads to read wake locks and process events from sub HALs.
    mPendingWritesThread = std::thread(startPendingWritesThread, this);
    // TODO: start threads to read wake locks.

    for (size_t i = 0; i < mSubHalList.size(); i++) {
        auto subHal = mSubHalList[i];
@@ -277,21 +285,66 @@ void HalProxy::initializeSubHalCallbacksAndSensorList() {
    initializeSensorList();
}

void HalProxy::startPendingWritesThread(HalProxy* halProxy) {
    halProxy->handlePendingWrites();
}

void HalProxy::handlePendingWrites() {
    // TODO: Find a way to optimize locking strategy maybe using two mutexes instead of one.
    std::unique_lock<std::mutex> lock(mEventQueueWriteMutex);
    while (mPendingWritesRun) {
        mEventQueueWriteCV.wait(
                lock, [&] { return !mPendingWriteEventsQueue.empty() || !mPendingWritesRun; });
        if (!mPendingWriteEventsQueue.empty() && mPendingWritesRun) {
            std::vector<Event>& pendingWriteEvents = mPendingWriteEventsQueue.front();
            size_t eventQueueSize = mEventQueue->getQuantumCount();
            size_t numToWrite = std::min(pendingWriteEvents.size(), eventQueueSize);
            lock.unlock();
            // TODO: Find a way to interrup writeBlocking if the thread should exit
            // so we don't have to wait for timeout on framework restarts.
            if (!mEventQueue->writeBlocking(
                        pendingWriteEvents.data(), numToWrite,
                        static_cast<uint32_t>(EventQueueFlagBits::EVENTS_READ),
                        static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS),
                        kWakelockTimeoutNs, mEventQueueFlag)) {
                ALOGE("Dropping %zu events after blockingWrite failed.", numToWrite);
            } else {
                mEventQueueFlag->wake(static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS));
            }
            lock.lock();
            if (pendingWriteEvents.size() > eventQueueSize) {
                // TODO: Check if this erase operation is too inefficient. It will copy all the
                // events ahead of it down to fill gap off array at front after the erase.
                pendingWriteEvents.erase(pendingWriteEvents.begin(),
                                         pendingWriteEvents.begin() + eventQueueSize);
            } else {
                mPendingWriteEventsQueue.pop();
            }
        }
    }
}

void HalProxy::postEventsToMessageQueue(const std::vector<Event>& events) {
    std::lock_guard<std::mutex> lock(mEventQueueMutex);
    size_t numToWrite = std::min(events.size(), mEventQueue->availableToWrite());
    size_t numToWrite = 0;
    std::lock_guard<std::mutex> lock(mEventQueueWriteMutex);
    if (mPendingWriteEventsQueue.empty()) {
        numToWrite = std::min(events.size(), mEventQueue->availableToWrite());
        if (numToWrite > 0) {
            if (mEventQueue->write(events.data(), numToWrite)) {
            // TODO: While loop if mEventQueue->avaiableToWrite > 0 to possibly fit in more writes
            // immediately
                // TODO: While loop if mEventQueue->avaiableToWrite > 0 to possibly fit in more
                // writes immediately
                mEventQueueFlag->wake(static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS));
            } else {
                numToWrite = 0;
            }
        }
    }
    if (numToWrite < events.size()) {
        // TODO: Post from events[numToWrite -> end] to background events queue
        // Signal background thread
        // TODO: Bound the mPendingWriteEventsQueue so that we do not trigger OOMs if framework
        // stalls
        mPendingWriteEventsQueue.push(
                std::vector<Event>(events.begin() + numToWrite, events.end()));
        mEventQueueWriteCV.notify_one();
    }
}

+34 −0
Original line number Diff line number Diff line
@@ -24,7 +24,12 @@
#include <hidl/MQDescriptor.h>
#include <hidl/Status.h>

#include <atomic>
#include <condition_variable>
#include <map>
#include <mutex>
#include <queue>
#include <thread>

namespace android {
namespace hardware {
@@ -159,6 +164,7 @@ class HalProxy : public ISensors, public IScopedWakelockRefCounter {
     */
    std::vector<ISensorsSubHal*> mSubHalList;

    //! The list of subhal callbacks for each subhal where the indices correlate with mSubHalList
    std::vector<const sp<IHalProxyCallback>> mSubHalCallbacks;

    /**
@@ -179,6 +185,9 @@ class HalProxy : public ISensors, public IScopedWakelockRefCounter {
    //! The mutex for the event queue.
    std::mutex mEventQueueMutex;

    //! The timeout for each pending write on background thread for events.
    static const int64_t kWakelockTimeoutNs = 5 * INT64_C(1000000000) /* 5 seconds */;

    //! The scoped wakelock ref count.
    size_t mWakelockRefCount = 0;

@@ -188,6 +197,21 @@ class HalProxy : public ISensors, public IScopedWakelockRefCounter {
    //! The bit mask used to get the subhal index from a sensor handle.
    static constexpr uint32_t kSensorHandleSubHalIndexMask = 0xFF000000;

    //! The events that were not able to be written to fmq right away
    std::queue<std::vector<Event>> mPendingWriteEventsQueue;

    //! The mutex protecting writing to the fmq and the pending events queue
    std::mutex mEventQueueWriteMutex;

    //! The condition variable waiting on pending write events to stack up
    std::condition_variable mEventQueueWriteCV;

    //! The thread object ptr that handles pending writes
    std::thread mPendingWritesThread;

    //! The bool indicating whether to end the pending writes background thread or not
    bool mPendingWritesRun = true;

    /**
     * Initialize the list of SubHal objects in mSubHalList by reading from dynamic libraries
     * listed in a config file.
@@ -210,6 +234,16 @@ class HalProxy : public ISensors, public IScopedWakelockRefCounter {
     */
    void initializeSubHalCallbacksAndSensorList();

    /**
     * Starts the thread that handles pending writes to event fmq.
     *
     * @param halProxy The HalProxy object pointer.
     */
    static void startPendingWritesThread(HalProxy* halProxy);

    //! Handles the pending writes on events to eventqueue.
    void handlePendingWrites();

    /**
     * Clear direct channel flags if the HalProxy has already chosen a subhal as its direct channel
     * subhal. Set the directChannelSubHal pointer to the subHal passed in if this is the first
+144 −13
Original line number Diff line number Diff line
@@ -22,11 +22,10 @@
#include "ScopedWakelock.h"
#include "SensorsSubHal.h"

#include <chrono>
#include <thread>
#include <vector>

#undef LOG_TAG
#define LOG_TAG "HalProxy_test"

namespace {

using ::android::hardware::hidl_vec;
@@ -110,6 +109,26 @@ Event makeProximityEvent();
 */
Event makeAccelerometerEvent();

/**
 * Make a certain number of proximity type events with the sensorHandle field set to
 * the proper number for AllSensorsSubHal subhal type.
 *
 * @param numEvents The number of events to make.
 *
 * @return The created list of events.
 */
std::vector<Event> makeMultipleProximityEvents(size_t numEvents);

/**
 * Make a certain number of accelerometer type events with the sensorHandle field set to
 * the proper number for AllSensorsSubHal subhal type.
 *
 * @param numEvents The number of events to make.
 *
 * @return The created list of events.
 */
std::vector<Event> makeMultipleAccelerometerEvents(size_t numEvents);

// Tests follow
TEST(HalProxyTest, GetSensorsListOneSubHalTest) {
    AllSensorsSubHal subHal;
@@ -232,10 +251,7 @@ TEST(HalProxyTest, PostMultipleNonWakeupEvent) {
    ::android::sp<ISensorsCallback> callback = new SensorsCallback();
    proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);

    std::vector<Event> events;
    for (size_t i = 0; i < kNumEvents; i++) {
        events.push_back(makeAccelerometerEvent());
    }
    std::vector<Event> events = makeMultipleAccelerometerEvents(kNumEvents);
    subHal.postEvents(events, false /* wakeup */);

    EXPECT_EQ(eventQueue->availableToRead(), kNumEvents);
@@ -272,15 +288,114 @@ TEST(HalProxyTest, PostMultipleWakeupEvents) {
    ::android::sp<ISensorsCallback> callback = new SensorsCallback();
    proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);

    std::vector<Event> events;
    for (size_t i = 0; i < kNumEvents; i++) {
        events.push_back(makeProximityEvent());
    }
    std::vector<Event> events = makeMultipleProximityEvents(kNumEvents);
    subHal.postEvents(events, true /* wakeup */);

    EXPECT_EQ(eventQueue->availableToRead(), kNumEvents);
}

TEST(HalProxyTest, PostEventsMultipleSubhals) {
    constexpr size_t kQueueSize = 5;
    constexpr size_t kNumEvents = 2;
    AllSensorsSubHal subHal1, subHal2;
    std::vector<ISensorsSubHal*> subHals{&subHal1, &subHal2};
    HalProxy proxy(subHals);
    std::unique_ptr<EventMessageQueue> eventQueue =
            std::make_unique<EventMessageQueue>(kQueueSize, true);
    std::unique_ptr<WakeupMessageQueue> wakeLockQueue =
            std::make_unique<WakeupMessageQueue>(kQueueSize, true);
    ::android::sp<ISensorsCallback> callback = new SensorsCallback();
    proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);

    std::vector<Event> events = makeMultipleAccelerometerEvents(kNumEvents);
    subHal1.postEvents(events, false /* wakeup */);

    EXPECT_EQ(eventQueue->availableToRead(), kNumEvents);

    subHal2.postEvents(events, false /* wakeup */);

    EXPECT_EQ(eventQueue->availableToRead(), kNumEvents * 2);
}

TEST(HalProxyTest, PostEventsDelayedWrite) {
    constexpr size_t kQueueSize = 5;
    constexpr size_t kNumEvents = 6;
    AllSensorsSubHal subHal1, subHal2;
    std::vector<ISensorsSubHal*> subHals{&subHal1, &subHal2};
    HalProxy proxy(subHals);
    std::unique_ptr<EventMessageQueue> eventQueue =
            std::make_unique<EventMessageQueue>(kQueueSize, true);
    std::unique_ptr<WakeupMessageQueue> wakeLockQueue =
            std::make_unique<WakeupMessageQueue>(kQueueSize, true);
    ::android::sp<ISensorsCallback> callback = new SensorsCallback();
    proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);

    std::vector<Event> events = makeMultipleAccelerometerEvents(kNumEvents);
    subHal1.postEvents(events, false /* wakeup */);

    EXPECT_EQ(eventQueue->availableToRead(), kQueueSize);

    Event eventOut;
    // writeblock 1 event out of queue, timeout for half a second
    EXPECT_TRUE(eventQueue->readBlocking(&eventOut, 1, 500000000));

    // Sleep for a half second so that blocking write has time complete in background thread
    std::this_thread::sleep_for(std::chrono::milliseconds(500));

    // proxy background thread should have wrote last event when it saw space
    EXPECT_EQ(eventQueue->availableToRead(), kQueueSize);
}

TEST(HalProxyTest, PostEventsMultipleSubhalsThreaded) {
    constexpr size_t kQueueSize = 5;
    constexpr size_t kNumEvents = 2;
    AllSensorsSubHal subHal1, subHal2;
    std::vector<ISensorsSubHal*> subHals{&subHal1, &subHal2};
    HalProxy proxy(subHals);
    std::unique_ptr<EventMessageQueue> eventQueue =
            std::make_unique<EventMessageQueue>(kQueueSize, true);
    std::unique_ptr<WakeupMessageQueue> wakeLockQueue =
            std::make_unique<WakeupMessageQueue>(kQueueSize, true);
    ::android::sp<ISensorsCallback> callback = new SensorsCallback();
    proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);

    std::vector<Event> events = makeMultipleAccelerometerEvents(kNumEvents);

    std::thread t1(&AllSensorsSubHal::postEvents, &subHal1, events, false);
    std::thread t2(&AllSensorsSubHal::postEvents, &subHal2, events, false);

    t1.join();
    t2.join();

    EXPECT_EQ(eventQueue->availableToRead(), kNumEvents * 2);
}

TEST(HalProxyTest, DestructingWithEventsPendingOnBackgroundThreadTest) {
    constexpr size_t kQueueSize = 5;
    constexpr size_t kNumEvents = 6;
    AllSensorsSubHal subHal;
    std::vector<ISensorsSubHal*> subHals{&subHal};

    std::unique_ptr<EventMessageQueue> eventQueue =
            std::make_unique<EventMessageQueue>(kQueueSize, true);
    std::unique_ptr<WakeupMessageQueue> wakeLockQueue =
            std::make_unique<WakeupMessageQueue>(kQueueSize, true);
    ::android::sp<ISensorsCallback> callback = new SensorsCallback();
    HalProxy proxy(subHals);
    proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);

    std::vector<Event> events = makeMultipleAccelerometerEvents(kNumEvents);
    subHal.postEvents(events, false /* wakeup */);

    // Sleep for a half second so that background thread has time to attempt it's blocking write
    std::this_thread::sleep_for(std::chrono::milliseconds(500));

    // Should see a 5 second wait for blocking write timeout here

    // Should be one events left on pending writes queue here and proxy will destruct
    // If this TEST completes then it was a success, if it hangs we will see a crash
}

// Helper implementations follow
void testSensorsListFromProxyAndSubHal(const std::vector<SensorInfo>& proxySensorsList,
                                       const std::vector<SensorInfo>& subHalSensorsList) {
@@ -332,4 +447,20 @@ Event makeAccelerometerEvent() {
    return event;
}

std::vector<Event> makeMultipleProximityEvents(size_t numEvents) {
    std::vector<Event> events;
    for (size_t i = 0; i < numEvents; i++) {
        events.push_back(makeProximityEvent());
    }
    return events;
}

std::vector<Event> makeMultipleAccelerometerEvents(size_t numEvents) {
    std::vector<Event> events;
    for (size_t i = 0; i < numEvents; i++) {
        events.push_back(makeAccelerometerEvent());
    }
    return events;
}

}  // namespace