Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 5ad921ae authored by Yu Shan's avatar Yu Shan
Browse files

Add RecurrentTimer for VHAL.

Add a RecurrentTimer that allows registering recurrent actions.

Test: atest DefaultVehicleHalTest
Bug: 200737967
Change-Id: I0d62c554bb8de404d9e9634ff8118c64ebf3c864
parent c2308603
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -57,6 +57,7 @@ cc_library {
        "src/ConnectedClient.cpp",
        "src/DefaultVehicleHal.cpp",
        "src/PendingRequestPool.cpp",
        "src/RecurrentTimer.cpp",
    ],
    static_libs: [
        "VehicleHalUtils",
+95 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef android_hardware_automotive_vehicle_aidl_impl_vhal_include_RecurrentTimer_H_
#define android_hardware_automotive_vehicle_aidl_impl_vhal_include_RecurrentTimer_H_

#include <android-base/thread_annotations.h>

#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <unordered_map>
#include <vector>

namespace android {
namespace hardware {
namespace automotive {
namespace vehicle {

// A thread-safe recurrent timer.
class RecurrentTimer final {
  public:
    // The class for the function that would be called recurrently.
    using Callback = std::function<void()>;

    RecurrentTimer();

    ~RecurrentTimer();

    // Registers a recurrent callback for a given interval.
    // Registering the same callback twice will override the interval provided before.
    void registerTimerCallback(int64_t intervalInNano, std::shared_ptr<Callback> callback);

    // Unregisters a previously registered recurrent callback.
    void unregisterTimerCallback(std::shared_ptr<Callback> callback);

  private:
    // friend class for unit testing.
    friend class RecurrentTimerTest;

    struct CallbackInfo {
        std::shared_ptr<Callback> callback;
        int64_t interval;
        int64_t nextTime;
        // A flag to indicate whether this CallbackInfo is already outdated and should be ignored.
        // The reason we need this flag is because we cannot easily remove an element from a heap.
        bool outdated = false;

        static bool cmp(const std::unique_ptr<CallbackInfo>& lhs,
                        const std::unique_ptr<CallbackInfo>& rhs);
    };

    std::mutex mLock;
    std::thread mThread;
    std::condition_variable mCond;
    bool mStopRequested GUARDED_BY(mLock) = false;
    // A map to map each callback to its current active CallbackInfo in the mCallbackQueue.
    std::unordered_map<std::shared_ptr<Callback>, CallbackInfo*> mCallbacks GUARDED_BY(mLock);
    // A min-heap sorted by nextTime. Note that because we cannot remove arbitrary element from the
    // heap, a single Callback can have multiple entries in this queue, all but one should be valid.
    // The rest should be mark as outdated. The valid one is one stored in mCallbacks.
    std::vector<std::unique_ptr<CallbackInfo>> mCallbackQueue GUARDED_BY(mLock);

    void loop();

    // Mark the callbackInfo as outdated and should be ignored when popped from the heap.
    void markOutdatedLocked(CallbackInfo* callback) REQUIRES(mLock);
    // Remove all outdated callbackInfos from the top of the heap. This function must be called
    // each time we might introduce outdated elements to the top. We must make sure the heap is
    // always valid from the top.
    void removeInvalidCallbackLocked() REQUIRES(mLock);
    // Pops the next closest callback (must be valid) from the heap.
    std::unique_ptr<CallbackInfo> popNextCallbackLocked() REQUIRES(mLock);
};

}  // namespace vehicle
}  // namespace automotive
}  // namespace hardware
}  // namespace android

#endif  // android_hardware_automotive_vehicle_aidl_impl_vhal_include_RecurrentTimer_H_
+177 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "RecurrentTimer.h"

#include <utils/Log.h>
#include <utils/SystemClock.h>

#include <inttypes.h>
#include <math.h>

namespace android {
namespace hardware {
namespace automotive {
namespace vehicle {

using ::android::base::ScopedLockAssertion;

RecurrentTimer::RecurrentTimer() : mThread(&RecurrentTimer::loop, this) {}

RecurrentTimer::~RecurrentTimer() {
    {
        std::scoped_lock<std::mutex> lockGuard(mLock);
        mStopRequested = true;
    }
    mCond.notify_one();
    if (mThread.joinable()) {
        mThread.join();
    }
}

void RecurrentTimer::registerTimerCallback(int64_t intervalInNano,
                                           std::shared_ptr<RecurrentTimer::Callback> callback) {
    {
        std::scoped_lock<std::mutex> lockGuard(mLock);

        // Aligns the nextTime to multiply of interval.
        int64_t nextTime = ceil(elapsedRealtimeNano() / intervalInNano) * intervalInNano;

        std::unique_ptr<CallbackInfo> info = std::make_unique<CallbackInfo>();
        info->callback = callback;
        info->interval = intervalInNano;
        info->nextTime = nextTime;

        auto it = mCallbacks.find(callback);
        if (it != mCallbacks.end()) {
            ALOGI("Replacing an existing timer callback with a new interval, current: %" PRId64
                  " ns, new: %" PRId64 " ns",
                  it->second->interval, intervalInNano);
            markOutdatedLocked(it->second);
        }
        mCallbacks[callback] = info.get();
        mCallbackQueue.push_back(std::move(info));
        // Insert the last element into the heap.
        std::push_heap(mCallbackQueue.begin(), mCallbackQueue.end(), CallbackInfo::cmp);
    }
    mCond.notify_one();
}

void RecurrentTimer::unregisterTimerCallback(std::shared_ptr<RecurrentTimer::Callback> callback) {
    {
        std::scoped_lock<std::mutex> lockGuard(mLock);

        auto it = mCallbacks.find(callback);
        if (it == mCallbacks.end()) {
            ALOGE("No event found to unregister");
            return;
        }

        markOutdatedLocked(it->second);
        mCallbacks.erase(it);
    }

    mCond.notify_one();
}

void RecurrentTimer::markOutdatedLocked(RecurrentTimer::CallbackInfo* info) {
    info->outdated = true;
    info->callback = nullptr;
    // Make sure the first element is always valid.
    removeInvalidCallbackLocked();
}

void RecurrentTimer::removeInvalidCallbackLocked() {
    while (mCallbackQueue.size() != 0 && mCallbackQueue[0]->outdated) {
        std::pop_heap(mCallbackQueue.begin(), mCallbackQueue.end(), CallbackInfo::cmp);
        mCallbackQueue.pop_back();
    }
}

std::unique_ptr<RecurrentTimer::CallbackInfo> RecurrentTimer::popNextCallbackLocked() {
    std::pop_heap(mCallbackQueue.begin(), mCallbackQueue.end(), CallbackInfo::cmp);
    std::unique_ptr<CallbackInfo> info = std::move(mCallbackQueue[mCallbackQueue.size() - 1]);
    mCallbackQueue.pop_back();
    // Make sure the first element is always valid.
    removeInvalidCallbackLocked();
    return info;
}

void RecurrentTimer::loop() {
    std::unique_lock<std::mutex> uniqueLock(mLock);

    while (true) {
        // Wait until the timer exits or we have at least one recurrent callback.
        mCond.wait(uniqueLock, [this] {
            ScopedLockAssertion lockAssertion(mLock);
            return mStopRequested || mCallbackQueue.size() != 0;
        });

        int64_t interval;
        {
            ScopedLockAssertion lockAssertion(mLock);
            if (mStopRequested) {
                return;
            }
            // The first element is the nearest next event.
            int64_t nextTime = mCallbackQueue[0]->nextTime;
            int64_t now = elapsedRealtimeNano();
            if (nextTime > now) {
                interval = nextTime - now;
            } else {
                interval = 0;
            }
        }

        // Wait for the next event or the timer exits.
        if (mCond.wait_for(uniqueLock, std::chrono::nanoseconds(interval), [this] {
                ScopedLockAssertion lockAssertion(mLock);
                return mStopRequested;
            })) {
            return;
        }

        {
            ScopedLockAssertion lockAssertion(mLock);
            int64_t now = elapsedRealtimeNano();
            while (mCallbackQueue.size() > 0) {
                int64_t nextTime = mCallbackQueue[0]->nextTime;
                if (nextTime > now) {
                    break;
                }

                std::unique_ptr<CallbackInfo> info = popNextCallbackLocked();
                info->nextTime += info->interval;

                auto callback = info->callback;
                mCallbackQueue.push_back(std::move(info));
                std::push_heap(mCallbackQueue.begin(), mCallbackQueue.end(), CallbackInfo::cmp);

                (*callback)();
            }
        }
    }
}

bool RecurrentTimer::CallbackInfo::cmp(const std::unique_ptr<RecurrentTimer::CallbackInfo>& lhs,
                                       const std::unique_ptr<RecurrentTimer::CallbackInfo>& rhs) {
    return lhs->nextTime > rhs->nextTime;
}

}  // namespace vehicle
}  // namespace automotive
}  // namespace hardware
}  // namespace android
+192 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "RecurrentTimer.h"

#include <android-base/thread_annotations.h>
#include <gtest/gtest.h>

#include <chrono>
#include <memory>
#include <mutex>

namespace android {
namespace hardware {
namespace automotive {
namespace vehicle {

class RecurrentTimerTest : public ::testing::Test {
  public:
    std::shared_ptr<RecurrentTimer::Callback> getCallback(size_t token) {
        return std::make_shared<RecurrentTimer::Callback>([this, token] {
            std::scoped_lock<std::mutex> lockGuard(mLock);

            mCallbacks.push_back(token);
        });
    }

    std::vector<size_t> getCalledCallbacks() {
        std::scoped_lock<std::mutex> lockGuard(mLock);
        return mCallbacks;
    }

    void clearCalledCallbacks() {
        std::scoped_lock<std::mutex> lockGuard(mLock);
        mCallbacks.clear();
    }

    size_t countTimerCallbackQueue(RecurrentTimer* timer) {
        std::scoped_lock<std::mutex> lockGuard(timer->mLock);
        return timer->mCallbackQueue.size();
    }

  private:
    std::mutex mLock;
    std::vector<size_t> mCallbacks GUARDED_BY(mLock);
};

TEST_F(RecurrentTimerTest, testRegisterCallback) {
    RecurrentTimer timer;
    // 0.1s
    int64_t interval = 100000000;

    auto action = getCallback(0);
    timer.registerTimerCallback(interval, action);

    std::this_thread::sleep_for(std::chrono::seconds(1));

    timer.unregisterTimerCallback(action);

    // Theoretically trigger 10 times, but check for at least 9 times to be stable.
    ASSERT_GE(getCalledCallbacks().size(), static_cast<size_t>(9));
}

TEST_F(RecurrentTimerTest, testRegisterUnregisterRegister) {
    RecurrentTimer timer;
    // 0.1s
    int64_t interval = 100000000;

    auto action = getCallback(0);
    timer.registerTimerCallback(interval, action);

    std::this_thread::sleep_for(std::chrono::milliseconds(200));

    timer.unregisterTimerCallback(action);

    std::this_thread::sleep_for(std::chrono::milliseconds(200));

    clearCalledCallbacks();

    timer.registerTimerCallback(interval, action);

    std::this_thread::sleep_for(std::chrono::seconds(1));

    // Theoretically trigger 10 times, but check for at least 9 times to be stable.
    ASSERT_GE(getCalledCallbacks().size(), static_cast<size_t>(9));
}

TEST_F(RecurrentTimerTest, testDestroyTimerWithCallback) {
    std::unique_ptr<RecurrentTimer> timer = std::make_unique<RecurrentTimer>();
    // 0.1s
    int64_t interval = 100000000;

    auto action = getCallback(0);
    timer->registerTimerCallback(interval, action);

    std::this_thread::sleep_for(std::chrono::milliseconds(200));

    timer.reset();

    clearCalledCallbacks();

    std::this_thread::sleep_for(std::chrono::milliseconds(200));

    ASSERT_TRUE(getCalledCallbacks().empty());
}

TEST_F(RecurrentTimerTest, testRegisterMultipleCallbacks) {
    RecurrentTimer timer;
    // 0.1s
    int64_t interval1 = 100000000;
    auto action1 = getCallback(1);
    timer.registerTimerCallback(interval1, action1);
    // 0.05s
    int64_t interval2 = 50000000;
    auto action2 = getCallback(2);
    timer.registerTimerCallback(interval2, action2);
    // 0.03s
    int64_t interval3 = 30000000;
    auto action3 = getCallback(3);
    timer.registerTimerCallback(interval3, action3);

    std::this_thread::sleep_for(std::chrono::seconds(1));

    timer.unregisterTimerCallback(action1);
    timer.unregisterTimerCallback(action2);
    timer.unregisterTimerCallback(action3);

    size_t action1Count = 0;
    size_t action2Count = 0;
    size_t action3Count = 0;
    for (size_t token : getCalledCallbacks()) {
        if (token == 1) {
            action1Count++;
        }
        if (token == 2) {
            action2Count++;
        }
        if (token == 3) {
            action3Count++;
        }
    }
    // Theoretically trigger 10 times, but check for at least 9 times to be stable.
    ASSERT_GE(action1Count, static_cast<size_t>(9));
    // Theoretically trigger 20 times, but check for at least 15 times to be stable.
    ASSERT_GE(action2Count, static_cast<size_t>(15));
    // Theoretically trigger 33 times, but check for at least 25 times to be stable.
    ASSERT_GE(action3Count, static_cast<size_t>(25));
}

TEST_F(RecurrentTimerTest, testRegisterSameCallbackMultipleTimes) {
    RecurrentTimer timer;
    // 0.02s
    int64_t interval1 = 20000000;
    // 0.01s
    int64_t interval2 = 10000000;

    auto action = getCallback(0);
    for (int i = 0; i < 10; i++) {
        timer.registerTimerCallback(interval1, action);
        timer.registerTimerCallback(interval2, action);
    }

    clearCalledCallbacks();

    std::this_thread::sleep_for(std::chrono::milliseconds(100));

    // Theoretically trigger 10 times, but check for at least 9 times to be stable.
    ASSERT_GE(getCalledCallbacks().size(), static_cast<size_t>(9));

    timer.unregisterTimerCallback(action);

    // Make sure there is no item in the callback queue.
    ASSERT_EQ(countTimerCallbackQueue(&timer), static_cast<size_t>(0));
}

}  // namespace vehicle
}  // namespace automotive
}  // namespace hardware
}  // namespace android