Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 288dc8fe authored by Winson's avatar Winson
Browse files

Throttle package session async write requests

Previously, async writes were posted to a Handler but not throttled,
so it was possible to do redundant writes when the latest state had
already been saved.

To avoid that, a counter is stored that serves as a request ID that
can be checked against to see if the latest state has already been
saved, skipping the write request if it has.

This is preferred to calling Handler#hasCallbacks or
Handler#removeCallbacks because those will lock the Handler. And taking
the lock twice is perhaps worse than locking once to schedule a no-op
Runnable, although this was not benchmarked.

This also introduces a retry mechanism in case the write fails.

Bug: 168086110

Test: atest PackageSessionTests
Test: atest RequestThrottleTest

Change-Id: I604dc433c77cf1d9d743c8437674576ad087d62c
parent 8d3b1656
Loading
Loading
Loading
Loading
+21 −21
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ import static org.xmlpull.v1.XmlPullParser.END_DOCUMENT;
import static org.xmlpull.v1.XmlPullParser.START_TAG;

import android.Manifest;
import android.annotation.NonNull;
import android.app.ActivityManager;
import android.app.AppGlobals;
import android.app.AppOpsManager;
@@ -88,6 +89,7 @@ import com.android.server.SystemConfig;
import com.android.server.SystemService;
import com.android.server.SystemServiceManager;
import com.android.server.pm.parsing.PackageParser2;
import com.android.server.pm.utils.RequestThrottle;

import libcore.io.IoUtils;

@@ -220,6 +222,14 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements
        }
    }

    @NonNull
    private final RequestThrottle mSettingsWriteRequest = new RequestThrottle(IoThread.getHandler(),
            () -> {
                synchronized (mSessions) {
                    return writeSessionsLocked();
                }
            });

    public PackageInstallerService(Context context, PackageManagerService pm,
            Supplier<PackageParser2> apexParserSupplier) {
        mContext = context;
@@ -275,7 +285,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements

            // Invalid sessions might have been marked while parsing. Re-write the database with
            // the updated information.
            writeSessionsLocked();
            mSettingsWriteRequest.runNow();

        }
    }
@@ -464,7 +474,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements
    }

    @GuardedBy("mSessions")
    private void writeSessionsLocked() {
    private boolean writeSessionsLocked() {
        if (LOGD) Slog.v(TAG, "writeSessionsLocked()");

        FileOutputStream fos = null;
@@ -483,28 +493,20 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements
            out.endDocument();

            mSessionsFile.finishWrite(fos);
            return true;
        } catch (IOException e) {
            if (fos != null) {
                mSessionsFile.failWrite(fos);
            }
        }

        return false;
    }

    private File buildAppIconFile(int sessionId) {
        return new File(mSessionsDir, "app_icon." + sessionId + ".png");
    }

    private void writeSessionsAsync() {
        IoThread.getHandler().post(new Runnable() {
            @Override
            public void run() {
                synchronized (mSessions) {
                    writeSessionsLocked();
                }
            }
        });
    }

    @Override
    public int createSession(SessionParams params, String installerPackageName,
            String callingAttributionTag, int userId) {
@@ -764,7 +766,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements

        mCallbacks.notifySessionCreated(session.sessionId, session.userId);

        writeSessionsAsync();
        mSettingsWriteRequest.schedule();
        return sessionId;
    }

@@ -1374,7 +1376,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements
    class InternalCallback {
        public void onSessionBadgingChanged(PackageInstallerSession session) {
            mCallbacks.notifySessionBadgingChanged(session.sessionId, session.userId);
            writeSessionsAsync();
            mSettingsWriteRequest.schedule();
        }

        public void onSessionActiveChanged(PackageInstallerSession session, boolean active) {
@@ -1389,7 +1391,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements

        public void onStagedSessionChanged(PackageInstallerSession session) {
            session.markUpdated();
            writeSessionsAsync();
            mSettingsWriteRequest.schedule();
            if (mOkToSendBroadcasts && !session.isDestroyed()) {
                // we don't scrub the data here as this is sent only to the installer several
                // privileged system packages
@@ -1419,7 +1421,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements
                            appIconFile.delete();
                        }

                        writeSessionsLocked();
                        mSettingsWriteRequest.runNow();
                    }
                }
            });
@@ -1428,16 +1430,14 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements
        public void onSessionPrepared(PackageInstallerSession session) {
            // We prepared the destination to write into; we want to persist
            // this, but it's not critical enough to block for.
            writeSessionsAsync();
            mSettingsWriteRequest.schedule();
        }

        public void onSessionSealedBlocking(PackageInstallerSession session) {
            // It's very important that we block until we've recorded the
            // session as being sealed, since we never want to allow mutation
            // after sealing.
            synchronized (mSessions) {
                writeSessionsLocked();
            }
            mSettingsWriteRequest.runNow();
        }
    }
}
+154 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.android.server.pm.utils;

import android.annotation.NonNull;
import android.os.Handler;

import com.android.server.IoThread;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

/**
 * Loose throttle latest behavior for success/fail requests, with options to schedule or force a
 * request through. Throttling is implicit and not configurable. This means requests are dispatched
 * to the {@link Handler} immediately when received, and only batched while waiting on the next
 * message execution or running request.
 *
 * This also means there is no explicit debouncing. Implicit debouncing is available through the
 * same runtime delays in the {@link Handler} instance and the request execution, where multiple
 * requests prior to the execution point are collapsed.
 *
 * Callers provide a {@link Handler} with which to schedule tasks on. This may be a highly
 * contentious thread like {@link IoThread#getHandler()}, but note that there are no guarantees
 * that the request will be handled before the system server dies. Ideally callers should handle
 * re-initialization from stale state with no consequences to the user.
 *
 * This class will retry requests if they don't succeed, as provided by a true/false response from
 * the block provided to run the request. This uses an exponential backoff mechanism, assuming that
 * state write should be attempted immediately, but not retried so heavily as to potentially block
 * other system server callers. Exceptions are not considered and will not result in a retry if
 * thrown from inside the block. Caller should wrap with try-catch and rollback and transaction
 * state before returning false to signal a retry.
 *
 * The caller is strictly responsible for data synchronization, as this class will not synchronize
 * the request block, potentially running it multiple times or on multiple threads simultaneously
 * if requests come in asynchronously.
 */
public class RequestThrottle {

    private static final int DEFAULT_RETRY_MAX_ATTEMPTS = 5;
    private static final int DEFAULT_DELAY_MS = 1000;
    private static final int DEFAULT_BACKOFF_BASE = 2;

    private final AtomicInteger mLastRequest = new AtomicInteger(0);
    private final AtomicInteger mLastCommitted = new AtomicInteger(-1);

    private final int mMaxAttempts;
    private final int mFirstDelay;
    private final int mBackoffBase;

    private final AtomicInteger mCurrentRetry = new AtomicInteger(0);

    @NonNull
    private final Handler mHandler;

    @NonNull
    private final Supplier<Boolean> mBlock;

    @NonNull
    private final Runnable mRunnable;

    /**
     * @see #RequestThrottle(Handler, int, int, int, Supplier)
     */
    public RequestThrottle(@NonNull Handler handler, @NonNull Supplier<Boolean> block) {
        this(handler, DEFAULT_RETRY_MAX_ATTEMPTS, DEFAULT_DELAY_MS, DEFAULT_BACKOFF_BASE,
                block);
    }

    /**
     * Backoff timing is calculated as firstDelay * (backoffBase ^ retryAttempt).
     *
     * @param handler     Representing the thread to run the provided block.
     * @param block       The action to run when scheduled, returning whether or not the request was
     *                    successful. Note that any thrown exceptions will be ignored and not
     *                    retried, since it's not easy to tell how destructive or retry-able an
     *                    exception is.
     * @param maxAttempts Number of times to re-attempt any single request.
     * @param firstDelay  The first delay used after the initial attempt.
     * @param backoffBase The base of the backoff calculation, where retry attempt count is the
     *                    exponent.
     */
    public RequestThrottle(@NonNull Handler handler, int maxAttempts, int firstDelay,
            int backoffBase, @NonNull Supplier<Boolean> block) {
        mHandler = handler;
        mBlock = block;
        mMaxAttempts = maxAttempts;
        mFirstDelay = firstDelay;
        mBackoffBase = backoffBase;
        mRunnable = this::runInternal;
    }

    /**
     * Schedule the intended action on the provided {@link Handler}.
     */
    public void schedule() {
        // To avoid locking the Handler twice by pre-checking hasCallbacks, instead just queue
        // the Runnable again. It will no-op if the request has already been written to disk.
        mLastRequest.incrementAndGet();
        mHandler.post(mRunnable);
    }

    /**
     * Run the intended action immediately on the calling thread. Note that synchronization and
     * deadlock between threads is not handled. This will immediately call the request block, and
     * also potentially schedule a retry. The caller must not block itself.
     *
     * @return true if the write succeeded or the last request was already written
     */
    public boolean runNow() {
        mLastRequest.incrementAndGet();
        return runInternal();
    }

    private boolean runInternal() {
        int lastRequest = mLastRequest.get();
        int lastCommitted = mLastCommitted.get();
        if (lastRequest == lastCommitted) {
            return true;
        }

        if (mBlock.get()) {
            mCurrentRetry.set(0);
            mLastCommitted.set(lastRequest);
            return true;
        } else {
            int currentRetry = mCurrentRetry.getAndIncrement();
            if (currentRetry < mMaxAttempts) {
                long nextDelay =
                        (long) (mFirstDelay * Math.pow(mBackoffBase, currentRetry));
                mHandler.postDelayed(mRunnable, nextDelay);
            } else {
                mCurrentRetry.set(0);
            }

            return false;
        }
    }
}
+219 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.android.server.pm.test.install

import com.android.server.pm.utils.RequestThrottle
import com.android.server.testutils.TestHandler
import com.google.common.collect.Range
import com.google.common.truth.LongSubject
import com.google.common.truth.Truth.assertThat
import org.junit.Before
import org.junit.Test
import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong

class RequestThrottleTest {

    private val counter = AtomicInteger(0)

    private val handler = TestHandler(null)

    @Before
    fun resetValues() {
        handler.flush()
        counter.set(0)
        assertThat(counter.get()).isEqualTo(0)
    }

    @Test
    fun simpleThrottle() {
        val request = RequestThrottle(handler) {
            counter.incrementAndGet()
            true
        }

        fun sendRequests() {
            request.schedule()
            val thread = startThread { request.schedule() }
            request.schedule()
            thread.joinForTest()
        }

        sendRequests()
        handler.flush()
        assertThat(counter.get()).isEqualTo(1)

        sendRequests()
        handler.flush()
        assertThat(counter.get()).isEqualTo(2)
    }

    @Test
    fun exceptionInRequest() {
        val shouldThrow = AtomicBoolean(true)
        val request = RequestThrottle(handler) {
            if (shouldThrow.get()) {
                throw RuntimeException()
            }
            counter.incrementAndGet()
            true
        }

        fun sendRequests() {
            request.schedule()
            val thread = startThread { request.schedule() }
            request.schedule()
            thread.joinForTest()
        }

        sendRequests()
        try {
            handler.flush()
        } catch (ignored: Exception) {
        }
        assertThat(counter.get()).isEqualTo(0)

        shouldThrow.set(false)

        sendRequests()
        handler.flush()
        assertThat(counter.get()).isEqualTo(1)
    }

    @Test
    fun scheduleWhileRunning() {
        val latchForStartRequest = CountDownLatch(1)
        val latchForEndRequest = CountDownLatch(1)
        val request = RequestThrottle(handler) {
            latchForStartRequest.countDown()
            counter.incrementAndGet()
            latchForEndRequest.awaitForTest()
            true
        }

        // Schedule and block a request
        request.schedule()
        val handlerThread = startThread { handler.timeAdvance() }
        latchForStartRequest.awaitForTest()

        // Hit it with other requests
        request.schedule()
        (0..5).map { startThread { request.schedule() } }
                .forEach { it.joinForTest() }

        // Release everything
        latchForEndRequest.countDown()
        handlerThread.join()
        handler.flush()

        // Ensure another request was run after initial blocking request ends
        assertThat(counter.get()).isEqualTo(2)
    }

    @Test
    fun backoffRetry() {
        val time = AtomicLong(0)
        val handler = TestHandler(null) { time.get() }
        val returnValue = AtomicBoolean(false)
        val request = RequestThrottle(handler, 3, 1000, 2) {
            counter.incrementAndGet()
            returnValue.get()
        }

        request.schedule()

        handler.timeAdvance()
        handler.pendingMessages.apply {
            assertThat(size).isEqualTo(1)
            assertThat(single().sendTime).isAround(1000)
        }

        time.set(1000)
        handler.timeAdvance()
        handler.pendingMessages.apply {
            assertThat(size).isEqualTo(1)
            assertThat(single().sendTime).isAround(3000)
        }

        time.set(3000)
        handler.timeAdvance()
        handler.pendingMessages.apply {
            assertThat(size).isEqualTo(1)
            assertThat(single().sendTime).isAround(7000)
        }

        returnValue.set(true)
        time.set(7000)
        handler.timeAdvance()
        assertThat(handler.pendingMessages).isEmpty()

        // Ensure another request was run after initial blocking request ends
        assertThat(counter.get()).isEqualTo(4)
    }

    @Test
    fun forceWriteMultiple() {
        val request = RequestThrottle(handler) {
            counter.incrementAndGet()
            true
        }

        request.runNow()
        request.runNow()
        request.runNow()

        assertThat(counter.get()).isEqualTo(3)
    }

    @Test
    fun forceWriteNowWithoutSync() {
        // When forcing a write without synchronizing the request block, 2 instances will be run.
        // There is no test for "with sync" because any logic to avoid multiple runs is left
        // entirely up to the caller.

        val barrierForEndRequest = CyclicBarrier(2)
        val request = RequestThrottle(handler) {
            counter.incrementAndGet()
            barrierForEndRequest.awaitForTest()
            true
        }

        // Schedule and block a request
        request.schedule()
        val thread = startThread { handler.timeAdvance() }

        request.runNow()

        thread.joinForTest()

        assertThat(counter.get()).isEqualTo(2)
    }

    private fun CountDownLatch.awaitForTest() = assertThat(await(5, TimeUnit.SECONDS)).isTrue()
    private fun CyclicBarrier.awaitForTest() = await(5, TimeUnit.SECONDS)
    private fun Thread.joinForTest() = join(5000)

    private fun startThread(block: () -> Unit) = Thread { block() }.apply { start() }

    // Float math means time calculations are not exact, so use a loose range
    private fun LongSubject.isAround(value: Long, threshold: Long = 10) =
            isIn(Range.closed(value - threshold, value + threshold))
}