Loading services/core/java/com/android/server/pm/PackageInstallerService.java +21 −21 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ import static org.xmlpull.v1.XmlPullParser.END_DOCUMENT; import static org.xmlpull.v1.XmlPullParser.START_TAG; import android.Manifest; import android.annotation.NonNull; import android.app.ActivityManager; import android.app.AppGlobals; import android.app.AppOpsManager; Loading Loading @@ -88,6 +89,7 @@ import com.android.server.SystemConfig; import com.android.server.SystemService; import com.android.server.SystemServiceManager; import com.android.server.pm.parsing.PackageParser2; import com.android.server.pm.utils.RequestThrottle; import libcore.io.IoUtils; Loading Loading @@ -220,6 +222,14 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements } } @NonNull private final RequestThrottle mSettingsWriteRequest = new RequestThrottle(IoThread.getHandler(), () -> { synchronized (mSessions) { return writeSessionsLocked(); } }); public PackageInstallerService(Context context, PackageManagerService pm, Supplier<PackageParser2> apexParserSupplier) { mContext = context; Loading Loading @@ -275,7 +285,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements // Invalid sessions might have been marked while parsing. Re-write the database with // the updated information. writeSessionsLocked(); mSettingsWriteRequest.runNow(); } } Loading Loading @@ -464,7 +474,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements } @GuardedBy("mSessions") private void writeSessionsLocked() { private boolean writeSessionsLocked() { if (LOGD) Slog.v(TAG, "writeSessionsLocked()"); FileOutputStream fos = null; Loading @@ -483,28 +493,20 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements out.endDocument(); mSessionsFile.finishWrite(fos); return true; } catch (IOException e) { if (fos != null) { mSessionsFile.failWrite(fos); } } return false; } private File buildAppIconFile(int sessionId) { return new File(mSessionsDir, "app_icon." + sessionId + ".png"); } private void writeSessionsAsync() { IoThread.getHandler().post(new Runnable() { @Override public void run() { synchronized (mSessions) { writeSessionsLocked(); } } }); } @Override public int createSession(SessionParams params, String installerPackageName, String callingAttributionTag, int userId) { Loading Loading @@ -764,7 +766,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements mCallbacks.notifySessionCreated(session.sessionId, session.userId); writeSessionsAsync(); mSettingsWriteRequest.schedule(); return sessionId; } Loading Loading @@ -1374,7 +1376,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements class InternalCallback { public void onSessionBadgingChanged(PackageInstallerSession session) { mCallbacks.notifySessionBadgingChanged(session.sessionId, session.userId); writeSessionsAsync(); mSettingsWriteRequest.schedule(); } public void onSessionActiveChanged(PackageInstallerSession session, boolean active) { Loading @@ -1389,7 +1391,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements public void onStagedSessionChanged(PackageInstallerSession session) { session.markUpdated(); writeSessionsAsync(); mSettingsWriteRequest.schedule(); if (mOkToSendBroadcasts && !session.isDestroyed()) { // we don't scrub the data here as this is sent only to the installer several // privileged system packages Loading Loading @@ -1419,7 +1421,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements appIconFile.delete(); } writeSessionsLocked(); mSettingsWriteRequest.runNow(); } } }); Loading @@ -1428,16 +1430,14 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements public void onSessionPrepared(PackageInstallerSession session) { // We prepared the destination to write into; we want to persist // this, but it's not critical enough to block for. writeSessionsAsync(); mSettingsWriteRequest.schedule(); } public void onSessionSealedBlocking(PackageInstallerSession session) { // It's very important that we block until we've recorded the // session as being sealed, since we never want to allow mutation // after sealing. synchronized (mSessions) { writeSessionsLocked(); } mSettingsWriteRequest.runNow(); } } } services/core/java/com/android/server/pm/utils/RequestThrottle.java 0 → 100644 +154 −0 Original line number Diff line number Diff line /* * Copyright (C) 2021 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.android.server.pm.utils; import android.annotation.NonNull; import android.os.Handler; import com.android.server.IoThread; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; /** * Loose throttle latest behavior for success/fail requests, with options to schedule or force a * request through. Throttling is implicit and not configurable. This means requests are dispatched * to the {@link Handler} immediately when received, and only batched while waiting on the next * message execution or running request. * * This also means there is no explicit debouncing. Implicit debouncing is available through the * same runtime delays in the {@link Handler} instance and the request execution, where multiple * requests prior to the execution point are collapsed. * * Callers provide a {@link Handler} with which to schedule tasks on. This may be a highly * contentious thread like {@link IoThread#getHandler()}, but note that there are no guarantees * that the request will be handled before the system server dies. Ideally callers should handle * re-initialization from stale state with no consequences to the user. * * This class will retry requests if they don't succeed, as provided by a true/false response from * the block provided to run the request. This uses an exponential backoff mechanism, assuming that * state write should be attempted immediately, but not retried so heavily as to potentially block * other system server callers. Exceptions are not considered and will not result in a retry if * thrown from inside the block. Caller should wrap with try-catch and rollback and transaction * state before returning false to signal a retry. * * The caller is strictly responsible for data synchronization, as this class will not synchronize * the request block, potentially running it multiple times or on multiple threads simultaneously * if requests come in asynchronously. */ public class RequestThrottle { private static final int DEFAULT_RETRY_MAX_ATTEMPTS = 5; private static final int DEFAULT_DELAY_MS = 1000; private static final int DEFAULT_BACKOFF_BASE = 2; private final AtomicInteger mLastRequest = new AtomicInteger(0); private final AtomicInteger mLastCommitted = new AtomicInteger(-1); private final int mMaxAttempts; private final int mFirstDelay; private final int mBackoffBase; private final AtomicInteger mCurrentRetry = new AtomicInteger(0); @NonNull private final Handler mHandler; @NonNull private final Supplier<Boolean> mBlock; @NonNull private final Runnable mRunnable; /** * @see #RequestThrottle(Handler, int, int, int, Supplier) */ public RequestThrottle(@NonNull Handler handler, @NonNull Supplier<Boolean> block) { this(handler, DEFAULT_RETRY_MAX_ATTEMPTS, DEFAULT_DELAY_MS, DEFAULT_BACKOFF_BASE, block); } /** * Backoff timing is calculated as firstDelay * (backoffBase ^ retryAttempt). * * @param handler Representing the thread to run the provided block. * @param block The action to run when scheduled, returning whether or not the request was * successful. Note that any thrown exceptions will be ignored and not * retried, since it's not easy to tell how destructive or retry-able an * exception is. * @param maxAttempts Number of times to re-attempt any single request. * @param firstDelay The first delay used after the initial attempt. * @param backoffBase The base of the backoff calculation, where retry attempt count is the * exponent. */ public RequestThrottle(@NonNull Handler handler, int maxAttempts, int firstDelay, int backoffBase, @NonNull Supplier<Boolean> block) { mHandler = handler; mBlock = block; mMaxAttempts = maxAttempts; mFirstDelay = firstDelay; mBackoffBase = backoffBase; mRunnable = this::runInternal; } /** * Schedule the intended action on the provided {@link Handler}. */ public void schedule() { // To avoid locking the Handler twice by pre-checking hasCallbacks, instead just queue // the Runnable again. It will no-op if the request has already been written to disk. mLastRequest.incrementAndGet(); mHandler.post(mRunnable); } /** * Run the intended action immediately on the calling thread. Note that synchronization and * deadlock between threads is not handled. This will immediately call the request block, and * also potentially schedule a retry. The caller must not block itself. * * @return true if the write succeeded or the last request was already written */ public boolean runNow() { mLastRequest.incrementAndGet(); return runInternal(); } private boolean runInternal() { int lastRequest = mLastRequest.get(); int lastCommitted = mLastCommitted.get(); if (lastRequest == lastCommitted) { return true; } if (mBlock.get()) { mCurrentRetry.set(0); mLastCommitted.set(lastRequest); return true; } else { int currentRetry = mCurrentRetry.getAndIncrement(); if (currentRetry < mMaxAttempts) { long nextDelay = (long) (mFirstDelay * Math.pow(mBackoffBase, currentRetry)); mHandler.postDelayed(mRunnable, nextDelay); } else { mCurrentRetry.set(0); } return false; } } } services/tests/PackageManagerServiceTests/unit/src/com/android/server/pm/test/install/RequestThrottleTest.kt 0 → 100644 +219 −0 Original line number Diff line number Diff line /* * Copyright (C) 2021 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.android.server.pm.test.install import com.android.server.pm.utils.RequestThrottle import com.android.server.testutils.TestHandler import com.google.common.collect.Range import com.google.common.truth.LongSubject import com.google.common.truth.Truth.assertThat import org.junit.Before import org.junit.Test import java.util.concurrent.CountDownLatch import java.util.concurrent.CyclicBarrier import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicLong class RequestThrottleTest { private val counter = AtomicInteger(0) private val handler = TestHandler(null) @Before fun resetValues() { handler.flush() counter.set(0) assertThat(counter.get()).isEqualTo(0) } @Test fun simpleThrottle() { val request = RequestThrottle(handler) { counter.incrementAndGet() true } fun sendRequests() { request.schedule() val thread = startThread { request.schedule() } request.schedule() thread.joinForTest() } sendRequests() handler.flush() assertThat(counter.get()).isEqualTo(1) sendRequests() handler.flush() assertThat(counter.get()).isEqualTo(2) } @Test fun exceptionInRequest() { val shouldThrow = AtomicBoolean(true) val request = RequestThrottle(handler) { if (shouldThrow.get()) { throw RuntimeException() } counter.incrementAndGet() true } fun sendRequests() { request.schedule() val thread = startThread { request.schedule() } request.schedule() thread.joinForTest() } sendRequests() try { handler.flush() } catch (ignored: Exception) { } assertThat(counter.get()).isEqualTo(0) shouldThrow.set(false) sendRequests() handler.flush() assertThat(counter.get()).isEqualTo(1) } @Test fun scheduleWhileRunning() { val latchForStartRequest = CountDownLatch(1) val latchForEndRequest = CountDownLatch(1) val request = RequestThrottle(handler) { latchForStartRequest.countDown() counter.incrementAndGet() latchForEndRequest.awaitForTest() true } // Schedule and block a request request.schedule() val handlerThread = startThread { handler.timeAdvance() } latchForStartRequest.awaitForTest() // Hit it with other requests request.schedule() (0..5).map { startThread { request.schedule() } } .forEach { it.joinForTest() } // Release everything latchForEndRequest.countDown() handlerThread.join() handler.flush() // Ensure another request was run after initial blocking request ends assertThat(counter.get()).isEqualTo(2) } @Test fun backoffRetry() { val time = AtomicLong(0) val handler = TestHandler(null) { time.get() } val returnValue = AtomicBoolean(false) val request = RequestThrottle(handler, 3, 1000, 2) { counter.incrementAndGet() returnValue.get() } request.schedule() handler.timeAdvance() handler.pendingMessages.apply { assertThat(size).isEqualTo(1) assertThat(single().sendTime).isAround(1000) } time.set(1000) handler.timeAdvance() handler.pendingMessages.apply { assertThat(size).isEqualTo(1) assertThat(single().sendTime).isAround(3000) } time.set(3000) handler.timeAdvance() handler.pendingMessages.apply { assertThat(size).isEqualTo(1) assertThat(single().sendTime).isAround(7000) } returnValue.set(true) time.set(7000) handler.timeAdvance() assertThat(handler.pendingMessages).isEmpty() // Ensure another request was run after initial blocking request ends assertThat(counter.get()).isEqualTo(4) } @Test fun forceWriteMultiple() { val request = RequestThrottle(handler) { counter.incrementAndGet() true } request.runNow() request.runNow() request.runNow() assertThat(counter.get()).isEqualTo(3) } @Test fun forceWriteNowWithoutSync() { // When forcing a write without synchronizing the request block, 2 instances will be run. // There is no test for "with sync" because any logic to avoid multiple runs is left // entirely up to the caller. val barrierForEndRequest = CyclicBarrier(2) val request = RequestThrottle(handler) { counter.incrementAndGet() barrierForEndRequest.awaitForTest() true } // Schedule and block a request request.schedule() val thread = startThread { handler.timeAdvance() } request.runNow() thread.joinForTest() assertThat(counter.get()).isEqualTo(2) } private fun CountDownLatch.awaitForTest() = assertThat(await(5, TimeUnit.SECONDS)).isTrue() private fun CyclicBarrier.awaitForTest() = await(5, TimeUnit.SECONDS) private fun Thread.joinForTest() = join(5000) private fun startThread(block: () -> Unit) = Thread { block() }.apply { start() } // Float math means time calculations are not exact, so use a loose range private fun LongSubject.isAround(value: Long, threshold: Long = 10) = isIn(Range.closed(value - threshold, value + threshold)) } Loading
services/core/java/com/android/server/pm/PackageInstallerService.java +21 −21 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ import static org.xmlpull.v1.XmlPullParser.END_DOCUMENT; import static org.xmlpull.v1.XmlPullParser.START_TAG; import android.Manifest; import android.annotation.NonNull; import android.app.ActivityManager; import android.app.AppGlobals; import android.app.AppOpsManager; Loading Loading @@ -88,6 +89,7 @@ import com.android.server.SystemConfig; import com.android.server.SystemService; import com.android.server.SystemServiceManager; import com.android.server.pm.parsing.PackageParser2; import com.android.server.pm.utils.RequestThrottle; import libcore.io.IoUtils; Loading Loading @@ -220,6 +222,14 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements } } @NonNull private final RequestThrottle mSettingsWriteRequest = new RequestThrottle(IoThread.getHandler(), () -> { synchronized (mSessions) { return writeSessionsLocked(); } }); public PackageInstallerService(Context context, PackageManagerService pm, Supplier<PackageParser2> apexParserSupplier) { mContext = context; Loading Loading @@ -275,7 +285,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements // Invalid sessions might have been marked while parsing. Re-write the database with // the updated information. writeSessionsLocked(); mSettingsWriteRequest.runNow(); } } Loading Loading @@ -464,7 +474,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements } @GuardedBy("mSessions") private void writeSessionsLocked() { private boolean writeSessionsLocked() { if (LOGD) Slog.v(TAG, "writeSessionsLocked()"); FileOutputStream fos = null; Loading @@ -483,28 +493,20 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements out.endDocument(); mSessionsFile.finishWrite(fos); return true; } catch (IOException e) { if (fos != null) { mSessionsFile.failWrite(fos); } } return false; } private File buildAppIconFile(int sessionId) { return new File(mSessionsDir, "app_icon." + sessionId + ".png"); } private void writeSessionsAsync() { IoThread.getHandler().post(new Runnable() { @Override public void run() { synchronized (mSessions) { writeSessionsLocked(); } } }); } @Override public int createSession(SessionParams params, String installerPackageName, String callingAttributionTag, int userId) { Loading Loading @@ -764,7 +766,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements mCallbacks.notifySessionCreated(session.sessionId, session.userId); writeSessionsAsync(); mSettingsWriteRequest.schedule(); return sessionId; } Loading Loading @@ -1374,7 +1376,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements class InternalCallback { public void onSessionBadgingChanged(PackageInstallerSession session) { mCallbacks.notifySessionBadgingChanged(session.sessionId, session.userId); writeSessionsAsync(); mSettingsWriteRequest.schedule(); } public void onSessionActiveChanged(PackageInstallerSession session, boolean active) { Loading @@ -1389,7 +1391,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements public void onStagedSessionChanged(PackageInstallerSession session) { session.markUpdated(); writeSessionsAsync(); mSettingsWriteRequest.schedule(); if (mOkToSendBroadcasts && !session.isDestroyed()) { // we don't scrub the data here as this is sent only to the installer several // privileged system packages Loading Loading @@ -1419,7 +1421,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements appIconFile.delete(); } writeSessionsLocked(); mSettingsWriteRequest.runNow(); } } }); Loading @@ -1428,16 +1430,14 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements public void onSessionPrepared(PackageInstallerSession session) { // We prepared the destination to write into; we want to persist // this, but it's not critical enough to block for. writeSessionsAsync(); mSettingsWriteRequest.schedule(); } public void onSessionSealedBlocking(PackageInstallerSession session) { // It's very important that we block until we've recorded the // session as being sealed, since we never want to allow mutation // after sealing. synchronized (mSessions) { writeSessionsLocked(); } mSettingsWriteRequest.runNow(); } } }
services/core/java/com/android/server/pm/utils/RequestThrottle.java 0 → 100644 +154 −0 Original line number Diff line number Diff line /* * Copyright (C) 2021 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.android.server.pm.utils; import android.annotation.NonNull; import android.os.Handler; import com.android.server.IoThread; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; /** * Loose throttle latest behavior for success/fail requests, with options to schedule or force a * request through. Throttling is implicit and not configurable. This means requests are dispatched * to the {@link Handler} immediately when received, and only batched while waiting on the next * message execution or running request. * * This also means there is no explicit debouncing. Implicit debouncing is available through the * same runtime delays in the {@link Handler} instance and the request execution, where multiple * requests prior to the execution point are collapsed. * * Callers provide a {@link Handler} with which to schedule tasks on. This may be a highly * contentious thread like {@link IoThread#getHandler()}, but note that there are no guarantees * that the request will be handled before the system server dies. Ideally callers should handle * re-initialization from stale state with no consequences to the user. * * This class will retry requests if they don't succeed, as provided by a true/false response from * the block provided to run the request. This uses an exponential backoff mechanism, assuming that * state write should be attempted immediately, but not retried so heavily as to potentially block * other system server callers. Exceptions are not considered and will not result in a retry if * thrown from inside the block. Caller should wrap with try-catch and rollback and transaction * state before returning false to signal a retry. * * The caller is strictly responsible for data synchronization, as this class will not synchronize * the request block, potentially running it multiple times or on multiple threads simultaneously * if requests come in asynchronously. */ public class RequestThrottle { private static final int DEFAULT_RETRY_MAX_ATTEMPTS = 5; private static final int DEFAULT_DELAY_MS = 1000; private static final int DEFAULT_BACKOFF_BASE = 2; private final AtomicInteger mLastRequest = new AtomicInteger(0); private final AtomicInteger mLastCommitted = new AtomicInteger(-1); private final int mMaxAttempts; private final int mFirstDelay; private final int mBackoffBase; private final AtomicInteger mCurrentRetry = new AtomicInteger(0); @NonNull private final Handler mHandler; @NonNull private final Supplier<Boolean> mBlock; @NonNull private final Runnable mRunnable; /** * @see #RequestThrottle(Handler, int, int, int, Supplier) */ public RequestThrottle(@NonNull Handler handler, @NonNull Supplier<Boolean> block) { this(handler, DEFAULT_RETRY_MAX_ATTEMPTS, DEFAULT_DELAY_MS, DEFAULT_BACKOFF_BASE, block); } /** * Backoff timing is calculated as firstDelay * (backoffBase ^ retryAttempt). * * @param handler Representing the thread to run the provided block. * @param block The action to run when scheduled, returning whether or not the request was * successful. Note that any thrown exceptions will be ignored and not * retried, since it's not easy to tell how destructive or retry-able an * exception is. * @param maxAttempts Number of times to re-attempt any single request. * @param firstDelay The first delay used after the initial attempt. * @param backoffBase The base of the backoff calculation, where retry attempt count is the * exponent. */ public RequestThrottle(@NonNull Handler handler, int maxAttempts, int firstDelay, int backoffBase, @NonNull Supplier<Boolean> block) { mHandler = handler; mBlock = block; mMaxAttempts = maxAttempts; mFirstDelay = firstDelay; mBackoffBase = backoffBase; mRunnable = this::runInternal; } /** * Schedule the intended action on the provided {@link Handler}. */ public void schedule() { // To avoid locking the Handler twice by pre-checking hasCallbacks, instead just queue // the Runnable again. It will no-op if the request has already been written to disk. mLastRequest.incrementAndGet(); mHandler.post(mRunnable); } /** * Run the intended action immediately on the calling thread. Note that synchronization and * deadlock between threads is not handled. This will immediately call the request block, and * also potentially schedule a retry. The caller must not block itself. * * @return true if the write succeeded or the last request was already written */ public boolean runNow() { mLastRequest.incrementAndGet(); return runInternal(); } private boolean runInternal() { int lastRequest = mLastRequest.get(); int lastCommitted = mLastCommitted.get(); if (lastRequest == lastCommitted) { return true; } if (mBlock.get()) { mCurrentRetry.set(0); mLastCommitted.set(lastRequest); return true; } else { int currentRetry = mCurrentRetry.getAndIncrement(); if (currentRetry < mMaxAttempts) { long nextDelay = (long) (mFirstDelay * Math.pow(mBackoffBase, currentRetry)); mHandler.postDelayed(mRunnable, nextDelay); } else { mCurrentRetry.set(0); } return false; } } }
services/tests/PackageManagerServiceTests/unit/src/com/android/server/pm/test/install/RequestThrottleTest.kt 0 → 100644 +219 −0 Original line number Diff line number Diff line /* * Copyright (C) 2021 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.android.server.pm.test.install import com.android.server.pm.utils.RequestThrottle import com.android.server.testutils.TestHandler import com.google.common.collect.Range import com.google.common.truth.LongSubject import com.google.common.truth.Truth.assertThat import org.junit.Before import org.junit.Test import java.util.concurrent.CountDownLatch import java.util.concurrent.CyclicBarrier import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicLong class RequestThrottleTest { private val counter = AtomicInteger(0) private val handler = TestHandler(null) @Before fun resetValues() { handler.flush() counter.set(0) assertThat(counter.get()).isEqualTo(0) } @Test fun simpleThrottle() { val request = RequestThrottle(handler) { counter.incrementAndGet() true } fun sendRequests() { request.schedule() val thread = startThread { request.schedule() } request.schedule() thread.joinForTest() } sendRequests() handler.flush() assertThat(counter.get()).isEqualTo(1) sendRequests() handler.flush() assertThat(counter.get()).isEqualTo(2) } @Test fun exceptionInRequest() { val shouldThrow = AtomicBoolean(true) val request = RequestThrottle(handler) { if (shouldThrow.get()) { throw RuntimeException() } counter.incrementAndGet() true } fun sendRequests() { request.schedule() val thread = startThread { request.schedule() } request.schedule() thread.joinForTest() } sendRequests() try { handler.flush() } catch (ignored: Exception) { } assertThat(counter.get()).isEqualTo(0) shouldThrow.set(false) sendRequests() handler.flush() assertThat(counter.get()).isEqualTo(1) } @Test fun scheduleWhileRunning() { val latchForStartRequest = CountDownLatch(1) val latchForEndRequest = CountDownLatch(1) val request = RequestThrottle(handler) { latchForStartRequest.countDown() counter.incrementAndGet() latchForEndRequest.awaitForTest() true } // Schedule and block a request request.schedule() val handlerThread = startThread { handler.timeAdvance() } latchForStartRequest.awaitForTest() // Hit it with other requests request.schedule() (0..5).map { startThread { request.schedule() } } .forEach { it.joinForTest() } // Release everything latchForEndRequest.countDown() handlerThread.join() handler.flush() // Ensure another request was run after initial blocking request ends assertThat(counter.get()).isEqualTo(2) } @Test fun backoffRetry() { val time = AtomicLong(0) val handler = TestHandler(null) { time.get() } val returnValue = AtomicBoolean(false) val request = RequestThrottle(handler, 3, 1000, 2) { counter.incrementAndGet() returnValue.get() } request.schedule() handler.timeAdvance() handler.pendingMessages.apply { assertThat(size).isEqualTo(1) assertThat(single().sendTime).isAround(1000) } time.set(1000) handler.timeAdvance() handler.pendingMessages.apply { assertThat(size).isEqualTo(1) assertThat(single().sendTime).isAround(3000) } time.set(3000) handler.timeAdvance() handler.pendingMessages.apply { assertThat(size).isEqualTo(1) assertThat(single().sendTime).isAround(7000) } returnValue.set(true) time.set(7000) handler.timeAdvance() assertThat(handler.pendingMessages).isEmpty() // Ensure another request was run after initial blocking request ends assertThat(counter.get()).isEqualTo(4) } @Test fun forceWriteMultiple() { val request = RequestThrottle(handler) { counter.incrementAndGet() true } request.runNow() request.runNow() request.runNow() assertThat(counter.get()).isEqualTo(3) } @Test fun forceWriteNowWithoutSync() { // When forcing a write without synchronizing the request block, 2 instances will be run. // There is no test for "with sync" because any logic to avoid multiple runs is left // entirely up to the caller. val barrierForEndRequest = CyclicBarrier(2) val request = RequestThrottle(handler) { counter.incrementAndGet() barrierForEndRequest.awaitForTest() true } // Schedule and block a request request.schedule() val thread = startThread { handler.timeAdvance() } request.runNow() thread.joinForTest() assertThat(counter.get()).isEqualTo(2) } private fun CountDownLatch.awaitForTest() = assertThat(await(5, TimeUnit.SECONDS)).isTrue() private fun CyclicBarrier.awaitForTest() = await(5, TimeUnit.SECONDS) private fun Thread.joinForTest() = join(5000) private fun startThread(block: () -> Unit) = Thread { block() }.apply { start() } // Float math means time calculations are not exact, so use a loose range private fun LongSubject.isAround(value: Long, threshold: Long = 10) = isIn(Range.closed(value - threshold, value + threshold)) }