Loading automotive/vehicle/aidl/impl/utils/common/include/ConcurrentQueue.h 0 → 100644 +99 −0 Original line number Original line Diff line number Diff line /* * Copyright (C) 2016 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_ #define android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_ #include <android-base/thread_annotations.h> #include <atomic> #include <condition_variable> #include <iostream> #include <queue> #include <thread> namespace android { namespace hardware { namespace automotive { namespace vehicle { template <typename T> class ConcurrentQueue { public: void waitForItems() { std::unique_lock<std::mutex> lockGuard(mLock); ::android::base::ScopedLockAssertion lockAssertion(mLock); while (mQueue.empty() && mIsActive) { mCond.wait(lockGuard); } } std::vector<T> flush() { std::vector<T> items; std::lock_guard<std::mutex> lockGuard(mLock); if (mQueue.empty()) { return items; } while (!mQueue.empty()) { // Even if the queue is deactivated, we should still flush all the remaining values // in the queue. items.push_back(std::move(mQueue.front())); mQueue.pop(); } return items; } void push(T&& item) { { std::lock_guard<std::mutex> lockGuard(mLock); if (!mIsActive) { return; } mQueue.push(std::move(item)); } mCond.notify_one(); } // Deactivates the queue, thus no one can push items to it, also notifies all waiting thread. // The items already in the queue could still be flushed even after the queue is deactivated. void deactivate() { { std::lock_guard<std::mutex> lockGuard(mLock); mIsActive = false; } // To unblock all waiting consumers. mCond.notify_all(); } ConcurrentQueue() = default; ConcurrentQueue(const ConcurrentQueue&) = delete; ConcurrentQueue& operator=(const ConcurrentQueue&) = delete; private: mutable std::mutex mLock; bool mIsActive GUARDED_BY(mLock) = true; std::condition_variable mCond; std::queue<T> mQueue GUARDED_BY(mLock); }; } // namespace vehicle } // namespace automotive } // namespace hardware } // namespace android #endif // android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_ automotive/vehicle/aidl/impl/utils/common/test/VehicleUtilsTest.cpp +89 −1 Original line number Original line Diff line number Diff line Loading @@ -14,10 +14,14 @@ * limitations under the License. * limitations under the License. */ */ #include <ConcurrentQueue.h> #include <PropertyUtils.h> #include <PropertyUtils.h> #include <VehicleUtils.h> #include <VehicleUtils.h> #include <gtest/gtest.h> #include <gtest/gtest.h> #include <atomic> #include <thread> #include <vector> #include <vector> namespace android { namespace android { Loading Loading @@ -231,7 +235,7 @@ TEST(VehicleUtilsTest, testCreateVehiclePropValueVecFloat) { << "vector size should always be 1 for single value type"; << "vector size should always be 1 for single value type"; } } TEST(VehicleUtilsTest, testCreateVehiclePropValueFloVecatVec) { TEST(VehicleUtilsTest, testCreateVehiclePropValueFloatVecMultiValues) { std::unique_ptr<VehiclePropValue> value = std::unique_ptr<VehiclePropValue> value = createVehiclePropValueVec(VehiclePropertyType::FLOAT_VEC, /*vecSize=*/2); createVehiclePropValueVec(VehiclePropertyType::FLOAT_VEC, /*vecSize=*/2); Loading @@ -247,6 +251,90 @@ TEST(VehicleUtilsTest, testCreateVehiclePropValueVecBytes) { ASSERT_EQ(2u, value->value.byteValues.size()); ASSERT_EQ(2u, value->value.byteValues.size()); } } TEST(VehicleUtilsTest, testConcurrentQueueOneThread) { ConcurrentQueue<int> queue; queue.push(1); queue.push(2); auto result = queue.flush(); ASSERT_EQ(result, std::vector<int>({1, 2})); } TEST(VehicleUtilsTest, testConcurrentQueueMultipleThreads) { ConcurrentQueue<int> queue; std::vector<int> results; std::atomic<bool> stop = false; std::thread t1([&queue]() { for (int i = 0; i < 100; i++) { queue.push(0); } }); std::thread t2([&queue]() { for (int i = 0; i < 100; i++) { queue.push(1); } }); std::thread t3([&queue, &results, &stop]() { while (!stop) { queue.waitForItems(); for (int i : queue.flush()) { results.push_back(i); } } // After we stop, get all the remaining values in the queue. for (int i : queue.flush()) { results.push_back(i); } }); t1.join(); t2.join(); stop = true; queue.deactivate(); t3.join(); size_t zeroCount = 0; size_t oneCount = 0; for (int i : results) { if (i == 0) { zeroCount++; } if (i == 1) { oneCount++; } } EXPECT_EQ(results.size(), static_cast<size_t>(200)); EXPECT_EQ(zeroCount, static_cast<size_t>(100)); EXPECT_EQ(oneCount, static_cast<size_t>(100)); } TEST(VehicleUtilsTest, testConcurrentQueuePushAfterDeactivate) { ConcurrentQueue<int> queue; queue.deactivate(); queue.push(1); ASSERT_TRUE(queue.flush().empty()); } TEST(VehicleUtilsTest, testConcurrentQueueDeactivateNotifyWaitingThread) { ConcurrentQueue<int> queue; std::thread t([&queue]() { // This would block until queue is deactivated. queue.waitForItems(); }); queue.deactivate(); t.join(); } } // namespace vehicle } // namespace vehicle } // namespace automotive } // namespace automotive } // namespace hardware } // namespace hardware Loading Loading
automotive/vehicle/aidl/impl/utils/common/include/ConcurrentQueue.h 0 → 100644 +99 −0 Original line number Original line Diff line number Diff line /* * Copyright (C) 2016 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_ #define android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_ #include <android-base/thread_annotations.h> #include <atomic> #include <condition_variable> #include <iostream> #include <queue> #include <thread> namespace android { namespace hardware { namespace automotive { namespace vehicle { template <typename T> class ConcurrentQueue { public: void waitForItems() { std::unique_lock<std::mutex> lockGuard(mLock); ::android::base::ScopedLockAssertion lockAssertion(mLock); while (mQueue.empty() && mIsActive) { mCond.wait(lockGuard); } } std::vector<T> flush() { std::vector<T> items; std::lock_guard<std::mutex> lockGuard(mLock); if (mQueue.empty()) { return items; } while (!mQueue.empty()) { // Even if the queue is deactivated, we should still flush all the remaining values // in the queue. items.push_back(std::move(mQueue.front())); mQueue.pop(); } return items; } void push(T&& item) { { std::lock_guard<std::mutex> lockGuard(mLock); if (!mIsActive) { return; } mQueue.push(std::move(item)); } mCond.notify_one(); } // Deactivates the queue, thus no one can push items to it, also notifies all waiting thread. // The items already in the queue could still be flushed even after the queue is deactivated. void deactivate() { { std::lock_guard<std::mutex> lockGuard(mLock); mIsActive = false; } // To unblock all waiting consumers. mCond.notify_all(); } ConcurrentQueue() = default; ConcurrentQueue(const ConcurrentQueue&) = delete; ConcurrentQueue& operator=(const ConcurrentQueue&) = delete; private: mutable std::mutex mLock; bool mIsActive GUARDED_BY(mLock) = true; std::condition_variable mCond; std::queue<T> mQueue GUARDED_BY(mLock); }; } // namespace vehicle } // namespace automotive } // namespace hardware } // namespace android #endif // android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_
automotive/vehicle/aidl/impl/utils/common/test/VehicleUtilsTest.cpp +89 −1 Original line number Original line Diff line number Diff line Loading @@ -14,10 +14,14 @@ * limitations under the License. * limitations under the License. */ */ #include <ConcurrentQueue.h> #include <PropertyUtils.h> #include <PropertyUtils.h> #include <VehicleUtils.h> #include <VehicleUtils.h> #include <gtest/gtest.h> #include <gtest/gtest.h> #include <atomic> #include <thread> #include <vector> #include <vector> namespace android { namespace android { Loading Loading @@ -231,7 +235,7 @@ TEST(VehicleUtilsTest, testCreateVehiclePropValueVecFloat) { << "vector size should always be 1 for single value type"; << "vector size should always be 1 for single value type"; } } TEST(VehicleUtilsTest, testCreateVehiclePropValueFloVecatVec) { TEST(VehicleUtilsTest, testCreateVehiclePropValueFloatVecMultiValues) { std::unique_ptr<VehiclePropValue> value = std::unique_ptr<VehiclePropValue> value = createVehiclePropValueVec(VehiclePropertyType::FLOAT_VEC, /*vecSize=*/2); createVehiclePropValueVec(VehiclePropertyType::FLOAT_VEC, /*vecSize=*/2); Loading @@ -247,6 +251,90 @@ TEST(VehicleUtilsTest, testCreateVehiclePropValueVecBytes) { ASSERT_EQ(2u, value->value.byteValues.size()); ASSERT_EQ(2u, value->value.byteValues.size()); } } TEST(VehicleUtilsTest, testConcurrentQueueOneThread) { ConcurrentQueue<int> queue; queue.push(1); queue.push(2); auto result = queue.flush(); ASSERT_EQ(result, std::vector<int>({1, 2})); } TEST(VehicleUtilsTest, testConcurrentQueueMultipleThreads) { ConcurrentQueue<int> queue; std::vector<int> results; std::atomic<bool> stop = false; std::thread t1([&queue]() { for (int i = 0; i < 100; i++) { queue.push(0); } }); std::thread t2([&queue]() { for (int i = 0; i < 100; i++) { queue.push(1); } }); std::thread t3([&queue, &results, &stop]() { while (!stop) { queue.waitForItems(); for (int i : queue.flush()) { results.push_back(i); } } // After we stop, get all the remaining values in the queue. for (int i : queue.flush()) { results.push_back(i); } }); t1.join(); t2.join(); stop = true; queue.deactivate(); t3.join(); size_t zeroCount = 0; size_t oneCount = 0; for (int i : results) { if (i == 0) { zeroCount++; } if (i == 1) { oneCount++; } } EXPECT_EQ(results.size(), static_cast<size_t>(200)); EXPECT_EQ(zeroCount, static_cast<size_t>(100)); EXPECT_EQ(oneCount, static_cast<size_t>(100)); } TEST(VehicleUtilsTest, testConcurrentQueuePushAfterDeactivate) { ConcurrentQueue<int> queue; queue.deactivate(); queue.push(1); ASSERT_TRUE(queue.flush().empty()); } TEST(VehicleUtilsTest, testConcurrentQueueDeactivateNotifyWaitingThread) { ConcurrentQueue<int> queue; std::thread t([&queue]() { // This would block until queue is deactivated. queue.waitForItems(); }); queue.deactivate(); t.join(); } } // namespace vehicle } // namespace vehicle } // namespace automotive } // namespace automotive } // namespace hardware } // namespace hardware Loading