Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d5678111 authored by Prabir Pradhan's avatar Prabir Pradhan
Browse files

Improve BlockingQueue and add SyncQueue

Changes to BlockingQueue:
- Change BlockingQueue to be list-backed instead of vector-backed so
  that removals are O(1) instead of O(N).
- Rename erase to erase_if.
- Make providing a fixed capacity at construction optional.
- Add emplace function to push and construct element in-place.
- Add popWithTimeout function.

Bug: 275726706
Test: atest inputflinger_tests
Change-Id: I1be02b0887df2c21b28f4f1cb43a8e208d996a87
parent aff1303d
Loading
Loading
Loading
Loading
+52 −14
Original line number Diff line number Diff line
@@ -16,15 +16,17 @@

#pragma once

#include "android-base/thread_annotations.h"
#include <condition_variable>
#include <list>
#include <mutex>
#include <vector>
#include <optional>
#include "android-base/thread_annotations.h"

namespace android {

/**
 * A FIFO queue that stores up to <i>capacity</i> objects.
 * A thread-safe FIFO queue. This list-backed queue stores up to <i>capacity</i> objects if
 * a capacity is provided at construction, and is otherwise unbounded.
 * Objects can always be added. Objects are added immediately.
 * If the queue is full, new objects cannot be added.
 *
@@ -33,13 +35,13 @@ namespace android {
template <class T>
class BlockingQueue {
public:
    BlockingQueue(size_t capacity) : mCapacity(capacity) {
        mQueue.reserve(mCapacity);
    };
    explicit BlockingQueue() = default;

    explicit BlockingQueue(size_t capacity) : mCapacity(capacity){};

    /**
     * Retrieve and remove the oldest object.
     * Blocks execution while queue is empty.
     * Blocks execution indefinitely while queue is empty.
     */
    T pop() {
        std::unique_lock lock(mLock);
@@ -50,6 +52,23 @@ public:
        return t;
    };

    /**
     * Retrieve and remove the oldest object.
     * Blocks execution for the given duration while queue is empty, and returns std::nullopt
     * if the queue was empty for the entire duration.
     */
    std::optional<T> popWithTimeout(std::chrono::nanoseconds duration) {
        std::unique_lock lock(mLock);
        android::base::ScopedLockAssertion assumeLock(mLock);
        if (!mHasElements.wait_for(lock, duration,
                                   [this]() REQUIRES(mLock) { return !this->mQueue.empty(); })) {
            return {};
        }
        T t = std::move(mQueue.front());
        mQueue.erase(mQueue.begin());
        return t;
    };

    /**
     * Add a new object to the queue.
     * Does not block.
@@ -57,20 +76,39 @@ public:
     * Return false if the queue is full.
     */
    bool push(T&& t) {
        {
        { // acquire lock
            std::scoped_lock lock(mLock);
            if (mQueue.size() == mCapacity) {
            if (mCapacity && mQueue.size() == mCapacity) {
                return false;
            }
            mQueue.push_back(std::move(t));
        } // release lock
        mHasElements.notify_one();
        return true;
    };

    /**
     * Construct a new object into the queue.
     * Does not block.
     * Return true if an element was successfully added.
     * Return false if the queue is full.
     */
    template <class... Args>
    bool emplace(Args&&... args) {
        { // acquire lock
            std::scoped_lock lock(mLock);
            if (mCapacity && mQueue.size() == mCapacity) {
                return false;
            }
            mQueue.emplace_back(args...);
        } // release lock
        mHasElements.notify_one();
        return true;
    };

    void erase(const std::function<bool(const T&)>& lambda) {
    void erase_if(const std::function<bool(const T&)>& pred) {
        std::scoped_lock lock(mLock);
        std::erase_if(mQueue, [&lambda](const auto& t) { return lambda(t); });
        std::erase_if(mQueue, pred);
    }

    /**
@@ -93,7 +131,7 @@ public:
    }

private:
    const size_t mCapacity;
    const std::optional<size_t> mCapacity;
    /**
     * Used to signal that mQueue is non-empty.
     */
@@ -102,7 +140,7 @@ private:
     * Lock for accessing and waiting on elements.
     */
    std::mutex mLock;
    std::vector<T> mQueue GUARDED_BY(mLock);
    std::list<T> mQueue GUARDED_BY(mLock);
};

} // namespace android
+1 −1
Original line number Diff line number Diff line
@@ -322,7 +322,7 @@ void MotionClassifier::reset() {
void MotionClassifier::reset(const NotifyDeviceResetArgs& args) {
    int32_t deviceId = args.deviceId;
    // Clear the pending events right away, to avoid unnecessary work done by the HAL.
    mEvents.erase([deviceId](const ClassifierEvent& event) {
    mEvents.erase_if([deviceId](const ClassifierEvent& event) {
        std::optional<int32_t> eventDeviceId = event.getDeviceId();
        return eventDeviceId && (*eventDeviceId == deviceId);
    });
+53 −0
Original line number Diff line number Diff line
/*
 * Copyright 2023 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <utils/threads.h>
#include <list>
#include <mutex>
#include <optional>

namespace android {

/** A thread-safe FIFO queue. */
template <class T>
class SyncQueue {
public:
    /** Retrieve and remove the oldest object. Returns std::nullopt if the queue is empty. */
    std::optional<T> pop() {
        std::scoped_lock lock(mLock);
        if (mQueue.empty()) {
            return {};
        }
        T t = std::move(mQueue.front());
        mQueue.erase(mQueue.begin());
        return t;
    };

    /** Add a new object to the queue. */
    template <class... Args>
    void push(Args&&... args) {
        std::scoped_lock lock(mLock);
        mQueue.emplace_back(args...);
    };

private:
    std::mutex mLock;
    std::list<T> mQueue GUARDED_BY(mLock);
};

} // namespace android
+1 −0
Original line number Diff line number Diff line
@@ -58,6 +58,7 @@ cc_test {
        "NotifyArgs_test.cpp",
        "PreferStylusOverTouch_test.cpp",
        "PropertyProvider_test.cpp",
        "SyncQueue_test.cpp",
        "TestInputListener.cpp",
        "UinputDevice.cpp",
        "UnwantedInteractionBlocker_test.cpp",
+14 −1
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@

namespace android {

using std::chrono_literals::operator""ns;

// --- BlockingQueueTest ---

@@ -34,6 +35,14 @@ TEST(BlockingQueueTest, Queue_AddAndRemove) {

    ASSERT_TRUE(queue.push(1));
    ASSERT_EQ(queue.pop(), 1);

    ASSERT_TRUE(queue.emplace(2));
    ASSERT_EQ(queue.popWithTimeout(0ns), 2);

    ASSERT_TRUE(queue.push(3));
    ASSERT_EQ(queue.popWithTimeout(100ns), 3);

    ASSERT_EQ(std::nullopt, queue.popWithTimeout(0ns));
}

/**
@@ -87,7 +96,7 @@ TEST(BlockingQueueTest, Queue_Erases) {
    queue.push(3);
    queue.push(4);
    // Erase elements 2 and 4
    queue.erase([](int element) { return element == 2 || element == 4; });
    queue.erase_if([](int element) { return element == 2 || element == 4; });
    // Should no longer receive elements 2 and 4
    ASSERT_EQ(1, queue.pop());
    ASSERT_EQ(3, queue.pop());
@@ -138,5 +147,9 @@ TEST(BlockingQueueTest, Queue_BlocksWhileWaitingForElements) {
    ASSERT_TRUE(hasReceivedElement);
}

TEST(BlockingQueueTest, Queue_TimesOut) {
    BlockingQueue<int> queue;
    ASSERT_EQ(std::nullopt, queue.popWithTimeout(1ns));
}

} // namespace android
Loading