Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit be4e3af8 authored by Eugene Susla's avatar Eugene Susla
Browse files

Migrate PermissionControllerManager to ServiceConnector

Test: - atest --test-mapping core/java/com/android/internal/infra
  - m -j CtsBackupHostTestCases && atest android.backup.cts.PermissionTest
Change-Id: I6a590194207d08569f41f3c5ac6d56e63737feaa
parent 15b60142
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
{
    "presubmit": [
        {
            "name": "CtsRoleTestCases",
            "options": [
                {
                    "include-filter": "android.app.role.cts.RoleManagerTest"
                }
            ]
        }
    ]
}
 No newline at end of file
+203 −787

File changed.

Preview size limit exceeded, changes collapsed.

+226 −36
Original line number Diff line number Diff line
@@ -16,6 +16,9 @@

package com.android.internal.infra;

import static com.android.internal.util.ConcurrentUtils.DIRECT_EXECUTOR;

import android.annotation.CallSuper;
import android.annotation.NonNull;
import android.annotation.Nullable;
import android.os.Handler;
@@ -30,10 +33,13 @@ import com.android.internal.util.function.pooled.PooledLambda;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * A customized {@link CompletableFuture} with focus on reducing the number of allocations involved
@@ -42,9 +48,12 @@ import java.util.function.Function;
 * In particular this involves allocations optimizations in:
 * <ul>
 *     <li>{@link #thenCompose(Function)}</li>
 *     <li>{@link #thenApply(Function)}</li>
 *     <li>{@link #thenCombine(CompletionStage, BiFunction)}</li>
 *     <li>{@link #orTimeout(long, TimeUnit)}</li>
 *     <li>{@link #whenComplete(BiConsumer)}</li>
 * </ul>
 * As well as their *Async versions.
 *
 * @param <T> see {@link CompletableFuture}
 */
@@ -52,8 +61,11 @@ public class AndroidFuture<T> extends CompletableFuture<T> {

    private static final String LOG_TAG = AndroidFuture.class.getSimpleName();

    @GuardedBy("this")
    private final @NonNull Object mLock = new Object();
    @GuardedBy("mLock")
    private @Nullable BiConsumer<? super T, ? super Throwable> mListener;
    @GuardedBy("mLock")
    private @Nullable Executor mListenerExecutor = DIRECT_EXECUTOR;
    private @NonNull Handler mTimeoutHandler = Handler.getMain();

    @Override
@@ -74,27 +86,44 @@ public class AndroidFuture<T> extends CompletableFuture<T> {
        return super.completeExceptionally(ex);
    }

    private void onCompleted(@Nullable T res, @Nullable Throwable err) {
    @CallSuper
    protected void onCompleted(@Nullable T res, @Nullable Throwable err) {
        cancelTimeout();

        BiConsumer<? super T, ? super Throwable> listener;
        synchronized (this) {
        synchronized (mLock) {
            listener = mListener;
            mListener = null;
        }

        if (listener != null) {
            callListener(listener, res, err);
            callListenerAsync(listener, res, err);
        }
    }

    @Override
    public AndroidFuture<T> whenComplete(
            @NonNull BiConsumer<? super T, ? super Throwable> action) {
    public AndroidFuture<T> whenComplete(@NonNull BiConsumer<? super T, ? super Throwable> action) {
        return whenCompleteAsync(action, DIRECT_EXECUTOR);
    }

    @Override
    public AndroidFuture<T> whenCompleteAsync(
            @NonNull BiConsumer<? super T, ? super Throwable> action,
            @NonNull Executor executor) {
        Preconditions.checkNotNull(action);
        synchronized (this) {
        Preconditions.checkNotNull(executor);
        synchronized (mLock) {
            if (!isDone()) {
                BiConsumer<? super T, ? super Throwable> oldListener = mListener;

                if (oldListener != null && executor != mListenerExecutor) {
                    // 2 listeners with different executors
                    // Too complex - give up on saving allocations and delegate to superclass
                    super.whenCompleteAsync(action, executor);
                    return this;
                }

                mListenerExecutor = executor;
                mListener = oldListener == null
                        ? action
                        : (res, err) -> {
@@ -115,10 +144,21 @@ public class AndroidFuture<T> extends CompletableFuture<T> {
        } catch (Throwable e) {
            err = e;
        }
        callListener(action, res, err);
        callListenerAsync(action, res, err);
        return this;
    }

    private void callListenerAsync(BiConsumer<? super T, ? super Throwable> listener,
            @Nullable T res, @Nullable Throwable err) {
        if (mListenerExecutor == DIRECT_EXECUTOR) {
            callListener(listener, res, err);
        } else {
            mListenerExecutor.execute(PooledLambda
                    .obtainRunnable(AndroidFuture::callListener, listener, res, err)
                    .recycleOnUse());
        }
    }

    /**
     * Calls the provided listener, handling any exceptions that may arise.
     */
@@ -137,8 +177,7 @@ public class AndroidFuture<T> extends CompletableFuture<T> {
                } else {
                    // listener exception-case threw
                    // give up on listener but preserve the original exception when throwing up
                    ExceptionUtils.getRootCause(t).initCause(err);
                    throw t;
                    throw ExceptionUtils.appendCause(t, err);
                }
            }
        } catch (Throwable t2) {
@@ -163,8 +202,14 @@ public class AndroidFuture<T> extends CompletableFuture<T> {
        }
    }

    protected void cancelTimeout() {
    /**
     * Cancel all timeouts previously set with {@link #orTimeout}, if any.
     *
     * @return {@code this} for chaining
     */
    public AndroidFuture<T> cancelTimeout() {
        mTimeoutHandler.removeCallbacksAndMessages(this);
        return this;
    }

    /**
@@ -179,47 +224,192 @@ public class AndroidFuture<T> extends CompletableFuture<T> {
    @Override
    public <U> AndroidFuture<U> thenCompose(
            @NonNull Function<? super T, ? extends CompletionStage<U>> fn) {
        return (AndroidFuture<U>) new ThenCompose<>(this, fn);
        return thenComposeAsync(fn, DIRECT_EXECUTOR);
    }

    private static class ThenCompose<T, U> extends AndroidFuture<Object>
            implements BiConsumer<Object, Throwable> {
        private final AndroidFuture<T> mSource;
        private Function<? super T, ? extends CompletionStage<U>> mFn;
    @Override
    public <U> AndroidFuture<U> thenComposeAsync(
            @NonNull Function<? super T, ? extends CompletionStage<U>> fn,
            @NonNull Executor executor) {
        return new ThenComposeAsync<>(this, fn, executor);
    }

        ThenCompose(@NonNull AndroidFuture<T> source,
                @NonNull Function<? super T, ? extends CompletionStage<U>> fn) {
            mSource = source;
    private static class ThenComposeAsync<T, U> extends AndroidFuture<U>
            implements BiConsumer<Object, Throwable>, Runnable {
        private volatile T mSourceResult = null;
        private final Executor mExecutor;
        private volatile Function<? super T, ? extends CompletionStage<U>> mFn;

        ThenComposeAsync(@NonNull AndroidFuture<T> source,
                @NonNull Function<? super T, ? extends CompletionStage<U>> fn,
                @NonNull Executor executor) {
            mFn = Preconditions.checkNotNull(fn);
            mExecutor = Preconditions.checkNotNull(executor);

            // subscribe to first job completion
            source.whenComplete(this);
        }

        @Override
        public void accept(Object res, Throwable err) {
            Function<? super T, ? extends CompletionStage<U>> fn;
            synchronized (this) {
                fn = mFn;
                mFn = null;
            }
            if (fn != null) {
            if (err != null) {
                // first or second job failed
                completeExceptionally(err);
            } else if (mFn != null) {
                // first job completed
                mSourceResult = (T) res;
                // subscribe to second job completion asynchronously
                mExecutor.execute(this);
            } else {
                // second job completed
                complete((U) res);
            }
        }

        @Override
        public void run() {
            CompletionStage<U> secondJob;
            try {
                    secondJob = Preconditions.checkNotNull(fn.apply((T) res));
                secondJob = Preconditions.checkNotNull(mFn.apply(mSourceResult));
            } catch (Throwable t) {
                completeExceptionally(t);
                return;
            } finally {
                // Marks first job complete
                mFn = null;
            }
            // subscribe to second job completion
            secondJob.whenComplete(this);
        }
    }

    @Override
    public <U> AndroidFuture<U> thenApply(@NonNull Function<? super T, ? extends U> fn) {
        return thenApplyAsync(fn, DIRECT_EXECUTOR);
    }

    @Override
    public <U> AndroidFuture<U> thenApplyAsync(@NonNull Function<? super T, ? extends U> fn,
            @NonNull Executor executor) {
        return new ThenApplyAsync<>(this, fn, executor);
    }

    private static class ThenApplyAsync<T, U> extends AndroidFuture<U>
            implements BiConsumer<T, Throwable>, Runnable {
        private volatile T mSourceResult = null;
        private final Executor mExecutor;
        private final Function<? super T, ? extends U> mFn;

        ThenApplyAsync(@NonNull AndroidFuture<T> source,
                @NonNull Function<? super T, ? extends U> fn,
                @NonNull Executor executor) {
            mExecutor = Preconditions.checkNotNull(executor);
            mFn = Preconditions.checkNotNull(fn);

            // subscribe to job completion
            source.whenComplete(this);
        }

        @Override
        public void accept(T res, Throwable err) {
            if (err != null) {
                completeExceptionally(err);
            } else {
                // second job completed
                mSourceResult = res;
                mExecutor.execute(this);
            }
        }

        @Override
        public void run() {
            try {
                complete(mFn.apply(mSourceResult));
            } catch (Throwable t) {
                completeExceptionally(t);
            }
        }
    }

    @Override
    public <U, V> AndroidFuture<V> thenCombine(
            @NonNull CompletionStage<? extends U> other,
            @NonNull BiFunction<? super T, ? super U, ? extends V> combineResults) {
        return new ThenCombine<T, U, V>(this, other, combineResults);
    }

    /** @see CompletionStage#thenCombine */
    public AndroidFuture<T> thenCombine(@NonNull CompletionStage<Void> other) {
        return thenCombine(other, (res, aVoid) -> res);
    }

    private static class ThenCombine<T, U, V> extends AndroidFuture<V>
            implements BiConsumer<Object, Throwable> {
        private volatile @Nullable T mResultT = null;
        private volatile @NonNull CompletionStage<? extends U> mSourceU;
        private final @NonNull BiFunction<? super T, ? super U, ? extends V> mCombineResults;

        ThenCombine(CompletableFuture<T> sourceT,
                CompletionStage<? extends U> sourceU,
                BiFunction<? super T, ? super U, ? extends V> combineResults) {
            mSourceU = Preconditions.checkNotNull(sourceU);
            mCombineResults = Preconditions.checkNotNull(combineResults);

            sourceT.whenComplete(this);
        }

        @Override
        public void accept(Object res, Throwable err) {
            if (err != null) {
                completeExceptionally(err);
                return;
            }

            if (mSourceU != null) {
                // T done
                mResultT = (T) res;
                mSourceU.whenComplete(this);
            } else {
                    complete(res);
                // U done
                try {
                    complete(mCombineResults.apply(mResultT, (U) res));
                } catch (Throwable t) {
                    completeExceptionally(t);
                }
            }
        }
    }

    /**
     * Similar to {@link CompletableFuture#supplyAsync} but
     * runs the given action directly.
     *
     * The resulting future is immediately completed.
     */
    public static <T> AndroidFuture<T> supply(Supplier<T> supplier) {
        return supplyAsync(supplier, DIRECT_EXECUTOR);
    }

    /**
     * @see CompletableFuture#supplyAsync(Supplier, Executor)
     */
    public static <T> AndroidFuture<T> supplyAsync(Supplier<T> supplier, Executor executor) {
        return new SupplyAsync<>(supplier, executor);
    }

    private static class SupplyAsync<T> extends AndroidFuture<T> implements Runnable {
        private final @NonNull Supplier<T> mSupplier;

        SupplyAsync(Supplier<T> supplier, Executor executor) {
            mSupplier = supplier;
            executor.execute(this);
        }

        @Override
        public void run() {
            try {
                complete(mSupplier.get());
            } catch (Throwable t) {
                completeExceptionally(t);
            }
        }
    }
+225 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2019 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.android.internal.infra;

import static java.util.concurrent.TimeUnit.SECONDS;

import android.os.AsyncTask;
import android.os.ParcelFileDescriptor;

import com.android.internal.util.FunctionalUtils.ThrowingConsumer;
import com.android.internal.util.FunctionalUtils.ThrowingFunction;

import libcore.io.IoUtils;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Executor;

/**
 * Utility class for streaming bytes across IPC, using standard APIs such as
 * {@link InputStream}/{@link OutputStream} or simply {@code byte[]}
 *
 * <p>
 * To use this, you'll want to declare your IPC methods to accept a {@link ParcelFileDescriptor},
 * and call them from within lambdas passed to {@link #receiveBytes}/{@link #sendBytes},
 * passing on the provided {@link ParcelFileDescriptor}.
 *
 * <p>
 * E.g.:
 * {@code
 *     //IFoo.aidl
 *     oneway interface IFoo {
 *         void sendGreetings(in ParcelFileDescriptor pipe);
 *         void receiveGreetings(in ParcelFileDescriptor pipe);
 *     }
 *
 *     //Foo.java
 *     mServiceConnector.postAsync(service -> RemoteStream.sendBytes(
 *             pipe -> service.sendGreetings(pipe, greetings)))...
 *
 *     mServiceConnector.postAsync(service -> RemoteStream.receiveBytes(
 *                    pipe -> service.receiveGreetings(pipe)))
 *                .whenComplete((greetings, err) -> ...);
 * }
 *
 * <p>
 * Each operation has a 30 second timeout by default, as it's possible for an operation to be
 * stuck forever otherwise.
 * You can {@link #cancelTimeout cancel} and/or {@link #orTimeout set a custom timeout}, using the
 * {@link AndroidFuture} you get as a result.
 *
 * <p>
 * You can also {@link #cancel} the operation, which will result in closing the underlying
 * {@link ParcelFileDescriptor}.
 *
 * @see #sendBytes
 * @see #receiveBytes
 *
 * @param <RES> the result of a successful streaming.
 * @param <IOSTREAM> either {@link InputStream} or {@link OutputStream} depending on the direction.
 */
public abstract class RemoteStream<RES, IOSTREAM extends Closeable>
        extends AndroidFuture<RES>
        implements Runnable {

    private final ThrowingFunction<IOSTREAM, RES> mHandleStream;
    private volatile ParcelFileDescriptor mLocalPipe;

    /**
     * Call an IPC, and process incoming bytes as an {@link InputStream} within {@code read}.
     *
     * @param ipc action to perform the IPC. Called directly on the calling thread.
     * @param read action to read from an {@link InputStream}, transforming data into {@code R}.
     *             Called asynchronously on the background thread.
     * @param <R> type of the end result of reading the bytes (if any).
     * @return an {@link AndroidFuture} that can be used to track operation's completion and
     *         retrieve its result (if any).
     */
    public static <R> AndroidFuture<R> receiveBytes(
            ThrowingConsumer<ParcelFileDescriptor> ipc, ThrowingFunction<InputStream, R> read) {
        return new RemoteStream<R, InputStream>(
                ipc, read, AsyncTask.THREAD_POOL_EXECUTOR, true /* read */) {
            @Override
            protected InputStream createStream(ParcelFileDescriptor fd) {
                return new ParcelFileDescriptor.AutoCloseInputStream(fd);
            }
        };
    }

    /**
     * Call an IPC, and asynchronously return incoming bytes as {@code byte[]}.
     *
     * @param ipc action to perform the IPC. Called directly on the calling thread.
     * @return an {@link AndroidFuture} that can be used to track operation's completion and
     *         retrieve its result.
     */
    public static AndroidFuture<byte[]> receiveBytes(ThrowingConsumer<ParcelFileDescriptor> ipc) {
        return receiveBytes(ipc, RemoteStream::readAll);
    }

    /**
     * Convert a given {@link InputStream} into {@code byte[]}.
     *
     * <p>
     * This doesn't close the given {@link InputStream}
     */
    public static byte[] readAll(InputStream inputStream) throws IOException {
        ByteArrayOutputStream combinedBuffer = new ByteArrayOutputStream();
        byte[] buffer = new byte[16 * 1024];
        while (true) {
            int numRead = inputStream.read(buffer);
            if (numRead == -1) {
                break;
            }
            combinedBuffer.write(buffer, 0, numRead);
        }
        return combinedBuffer.toByteArray();
    }

    /**
     * Call an IPC, and perform sending bytes via an {@link OutputStream} within {@code write}.
     *
     * @param ipc action to perform the IPC. Called directly on the calling thread.
     * @param write action to write to an {@link OutputStream}, optionally returning operation
     *              result as {@code R}. Called asynchronously on the background thread.
     * @param <R> type of the end result of writing the bytes (if any).
     * @return an {@link AndroidFuture} that can be used to track operation's completion and
     *         retrieve its result (if any).
     */
    public static <R> AndroidFuture<R> sendBytes(
            ThrowingConsumer<ParcelFileDescriptor> ipc, ThrowingFunction<OutputStream, R> write) {
        return new RemoteStream<R, OutputStream>(
                ipc, write, AsyncTask.THREAD_POOL_EXECUTOR, false /* read */) {
            @Override
            protected OutputStream createStream(ParcelFileDescriptor fd) {
                return new ParcelFileDescriptor.AutoCloseOutputStream(fd);
            }
        };
    }

    /**
     * Same as {@link #sendBytes(ThrowingConsumer, ThrowingFunction)}, but explicitly avoids
     * returning a result.
     */
    public static AndroidFuture<Void> sendBytes(
            ThrowingConsumer<ParcelFileDescriptor> ipc, ThrowingConsumer<OutputStream> write) {
        return sendBytes(ipc, os -> {
            write.acceptOrThrow(os);
            return null;
        });
    }

    /**
     * Same as {@link #sendBytes(ThrowingConsumer, ThrowingFunction)}, but providing the data to
     * send eagerly as {@code byte[]}.
     */
    public static AndroidFuture<Void> sendBytes(
            ThrowingConsumer<ParcelFileDescriptor> ipc, byte[] data) {
        return sendBytes(ipc, os -> {
            os.write(data);
            return null;
        });
    }

    private RemoteStream(
            ThrowingConsumer<ParcelFileDescriptor> ipc,
            ThrowingFunction<IOSTREAM, RES> handleStream,
            Executor backgroundExecutor,
            boolean read) {
        mHandleStream = handleStream;

        ParcelFileDescriptor[] pipe;
        try {
            //TODO consider using createReliablePipe
            pipe = ParcelFileDescriptor.createPipe();
            try (ParcelFileDescriptor remotePipe = pipe[read ? 1 : 0]) {
                ipc.acceptOrThrow(remotePipe);
                // Remote pipe end is duped by binder call. Local copy is not needed anymore
            }

            mLocalPipe = pipe[read ? 0 : 1];
            backgroundExecutor.execute(this);

            // Guard against getting stuck forever
            orTimeout(30, SECONDS);
        } catch (Throwable e) {
            completeExceptionally(e);
            // mLocalPipe closes in #onCompleted
        }
    }

    protected abstract IOSTREAM createStream(ParcelFileDescriptor fd);

    @Override
    public void run() {
        try (IOSTREAM stream = createStream(mLocalPipe)) {
            complete(mHandleStream.applyOrThrow(stream));
        } catch (Throwable t) {
            completeExceptionally(t);
        }
    }

    @Override
    protected void onCompleted(RES res, Throwable err) {
        super.onCompleted(res, err);
        IoUtils.closeQuietly(mLocalPipe);
    }
}
+34 −9
Original line number Diff line number Diff line
@@ -42,6 +42,7 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;

@@ -70,7 +71,7 @@ public interface ServiceConnector<I extends IInterface> {
     *
     * @return whether a job was successfully scheduled
     */
    boolean fireAndForget(@NonNull VoidJob<I> job);
    boolean run(@NonNull VoidJob<I> job);

    /**
     * Schedules to run a given job when service is connected.
@@ -167,7 +168,7 @@ public interface ServiceConnector<I extends IInterface> {
         * @return the result of this operation to be propagated to the original caller.
         *         If you do not need to provide a result you can implement {@link VoidJob} instead
         */
        R run(@NonNull II service) throws RemoteException;
        R run(@NonNull II service) throws Exception;

    }

@@ -180,10 +181,10 @@ public interface ServiceConnector<I extends IInterface> {
    interface VoidJob<II> extends Job<II, Void> {

        /** @see Job#run */
        void runNoResult(II service) throws RemoteException;
        void runNoResult(II service) throws Exception;

        @Override
        default Void run(II service) throws RemoteException {
        default Void run(II service) throws Exception {
            runNoResult(service);
            return null;
        }
@@ -213,6 +214,7 @@ public interface ServiceConnector<I extends IInterface> {
        static final String LOG_TAG = "ServiceConnector.Impl";

        private static final long DEFAULT_DISCONNECT_TIMEOUT_MS = 15_000;
        private static final long DEFAULT_REQUEST_TIMEOUT_MS = 30_000;

        private final @NonNull Queue<Job<I, ?>> mQueue = this;
        private final @NonNull List<CompletionAwareJob<I, ?>> mUnfinishedJobs = new ArrayList<>();
@@ -274,6 +276,19 @@ public interface ServiceConnector<I extends IInterface> {
            return DEFAULT_DISCONNECT_TIMEOUT_MS;
        }

        /**
         * Gets the amount of time to wait for a request to complete, before finishing it with a
         * {@link java.util.concurrent.TimeoutException}
         *
         * <p>
         * This includes time spent connecting to the service, if any.
         *
         * @return amount of time in ms
         */
        protected long getRequestTimeoutMs() {
            return DEFAULT_REQUEST_TIMEOUT_MS;
        }

        /**
         * {@link Context#bindServiceAsUser Binds} to the service.
         *
@@ -320,7 +335,7 @@ public interface ServiceConnector<I extends IInterface> {
        protected void onServiceConnectionStatusChanged(@NonNull I service, boolean isConnected) {}

        @Override
        public boolean fireAndForget(@NonNull VoidJob<I> job) {
        public boolean run(@NonNull VoidJob<I> job) {
            if (DEBUG) {
                Log.d(LOG_TAG, "Wrapping fireAndForget job to take advantage of its mDebugName");
                return !post(job).isCompletedExceptionally();
@@ -653,6 +668,11 @@ public interface ServiceConnector<I extends IInterface> {
            boolean mAsync = false;
            private String mDebugName;
            {
                long requestTimeout = getRequestTimeoutMs();
                if (requestTimeout > 0) {
                    orTimeout(requestTimeout, TimeUnit.MILLISECONDS);
                }

                if (DEBUG) {
                    mDebugName = Arrays.stream(Thread.currentThread().getStackTrace())
                            .skip(2)
@@ -665,7 +685,7 @@ public interface ServiceConnector<I extends IInterface> {
            }

            @Override
            public R run(@NonNull II service) throws RemoteException {
            public R run(@NonNull II service) throws Exception {
                return mDelegate.run(service);
            }

@@ -688,15 +708,20 @@ public interface ServiceConnector<I extends IInterface> {

            @Override
            public void accept(@Nullable R res, @Nullable Throwable err) {
                if (mUnfinishedJobs.remove(this)) {
                    maybeScheduleUnbindTimeout();
                }
                if (err != null) {
                    completeExceptionally(err);
                } else {
                    complete(res);
                }
            }

            @Override
            protected void onCompleted(R res, Throwable err) {
                super.onCompleted(res, err);
                if (mUnfinishedJobs.remove(this)) {
                    maybeScheduleUnbindTimeout();
                }
            }
        }
    }
}
Loading