Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 18687245 authored by Matthew Sedam's avatar Matthew Sedam
Browse files

Add reliable message retry support to the Context Hub Service

Bug: 331795143
Flag: android.chre.flags.reliable_message_retry_support_service
Test: Run the reliable message test with the test
      mode behavior flag enabled
Change-Id: I4a8a0cc53a694db315b9e1e889da3191d3f3721b
parent b54585e6
Loading
Loading
Loading
Loading
+1 −1
Original line number Original line Diff line number Diff line
@@ -296,7 +296,7 @@ public class ContextHubService extends IContextHubService.Stub {
        }
        }


        public boolean isExpired() {
        public boolean isExpired() {
            return mTimestamp + ContextHubTransactionManager.RELIABLE_MESSAGE_TIMEOUT_NS
            return mTimestamp + ContextHubTransactionManager.RELIABLE_MESSAGE_TIMEOUT.toNanos()
                    < SystemClock.elapsedRealtimeNanos();
                    < SystemClock.elapsedRealtimeNanos();
        }
        }
    }
    }
+1 −1
Original line number Original line Diff line number Diff line
@@ -161,7 +161,7 @@ abstract class ContextHubServiceTransaction {
            case ContextHubTransaction.TYPE_LOAD_NANOAPP:
            case ContextHubTransaction.TYPE_LOAD_NANOAPP:
                return unit.convert(30L, TimeUnit.SECONDS);
                return unit.convert(30L, TimeUnit.SECONDS);
            case ContextHubTransaction.TYPE_RELIABLE_MESSAGE:
            case ContextHubTransaction.TYPE_RELIABLE_MESSAGE:
                return unit.convert(ContextHubTransactionManager.RELIABLE_MESSAGE_TIMEOUT_NS,
                return unit.convert(ContextHubTransactionManager.RELIABLE_MESSAGE_TIMEOUT.toNanos(),
                        TimeUnit.NANOSECONDS);
                        TimeUnit.NANOSECONDS);
            case ContextHubTransaction.TYPE_UNLOAD_NANOAPP:
            case ContextHubTransaction.TYPE_UNLOAD_NANOAPP:
            case ContextHubTransaction.TYPE_ENABLE_NANOAPP:
            case ContextHubTransaction.TYPE_ENABLE_NANOAPP:
+233 −56
Original line number Original line Diff line number Diff line
@@ -16,19 +16,26 @@


package com.android.server.location.contexthub;
package com.android.server.location.contexthub;


import android.chre.flags.Flags;
import android.hardware.location.ContextHubTransaction;
import android.hardware.location.ContextHubTransaction;
import android.hardware.location.IContextHubTransactionCallback;
import android.hardware.location.IContextHubTransactionCallback;
import android.hardware.location.NanoAppBinary;
import android.hardware.location.NanoAppBinary;
import android.hardware.location.NanoAppMessage;
import android.hardware.location.NanoAppMessage;
import android.hardware.location.NanoAppState;
import android.hardware.location.NanoAppState;
import android.os.RemoteException;
import android.os.RemoteException;
import android.os.SystemClock;
import android.util.Log;
import android.util.Log;


import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Iterator;
import java.util.List;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeUnit;
@@ -47,39 +54,30 @@ import java.util.concurrent.atomic.AtomicInteger;
/* package */ class ContextHubTransactionManager {
/* package */ class ContextHubTransactionManager {
    private static final String TAG = "ContextHubTransactionManager";
    private static final String TAG = "ContextHubTransactionManager";


    /**
    public static final Duration RELIABLE_MESSAGE_TIMEOUT = Duration.ofSeconds(1);
     * The timeout in ns of a reliable message transaction.
     */
    public static final int RELIABLE_MESSAGE_TIMEOUT_NS = 1_000_000_000; // 1 second


    /*
     * Maximum number of transaction requests that can be pending at a time
     */
    private static final int MAX_PENDING_REQUESTS = 10000;
    private static final int MAX_PENDING_REQUESTS = 10000;


    /*
    private static final int RELIABLE_MESSAGE_MAX_NUM_RETRY = 3;
     * The proxy to talk to the Context Hub

     */
    private static final Duration RELIABLE_MESSAGE_RETRY_WAIT_TIME = Duration.ofMillis(250);

    private static final Duration RELIABLE_MESSAGE_MIN_WAIT_TIME = Duration.ofNanos(1000);

    private final IContextHubWrapper mContextHubProxy;
    private final IContextHubWrapper mContextHubProxy;


    /*
     * The manager for all clients for the service.
     */
    private final ContextHubClientManager mClientManager;
    private final ContextHubClientManager mClientManager;


    /*
     * The nanoapp state manager for the service
     */
    private final NanoAppStateManager mNanoAppStateManager;
    private final NanoAppStateManager mNanoAppStateManager;


    /*
     * A queue containing the current transactions
     */
    private final ArrayDeque<ContextHubServiceTransaction> mTransactionQueue = new ArrayDeque<>();
    private final ArrayDeque<ContextHubServiceTransaction> mTransactionQueue = new ArrayDeque<>();


    /*
    private final Map<Integer, ContextHubServiceTransaction> mReliableMessageTransactionMap =
     * The next available transaction ID
            new HashMap<>();
     */

    /** A set of host endpoint IDs that have an active pending transaction. */
    private final Set<Short> mReliableMessageHostEndpointIdActiveSet = new HashSet<>();

    private final AtomicInteger mNextAvailableId = new AtomicInteger();
    private final AtomicInteger mNextAvailableId = new AtomicInteger();


    /**
    /**
@@ -91,10 +89,12 @@ import java.util.concurrent.atomic.AtomicInteger;
            new AtomicInteger(new Random().nextInt(Integer.MAX_VALUE / 2));
            new AtomicInteger(new Random().nextInt(Integer.MAX_VALUE / 2));


    /*
    /*
     * An executor and the future object for scheduling timeout timers
     * An executor and the future object for scheduling timeout timers and
     * for scheduling the processing of reliable message transactions.
     */
     */
    private final ScheduledThreadPoolExecutor mTimeoutExecutor = new ScheduledThreadPoolExecutor(1);
    private final ScheduledThreadPoolExecutor mExecutor = new ScheduledThreadPoolExecutor(1);
    private ScheduledFuture<?> mTimeoutFuture = null;
    private ScheduledFuture<?> mTimeoutFuture = null;
    private ScheduledFuture<?> mReliableMessageTransactionFuture = null;


    /*
    /*
     * The list of previous transaction records.
     * The list of previous transaction records.
@@ -338,7 +338,7 @@ import java.util.concurrent.atomic.AtomicInteger;
            IContextHubTransactionCallback transactionCallback, String packageName) {
            IContextHubTransactionCallback transactionCallback, String packageName) {
        return new ContextHubServiceTransaction(mNextAvailableId.getAndIncrement(),
        return new ContextHubServiceTransaction(mNextAvailableId.getAndIncrement(),
                ContextHubTransaction.TYPE_RELIABLE_MESSAGE, packageName,
                ContextHubTransaction.TYPE_RELIABLE_MESSAGE, packageName,
                mNextAvailableMessageSequenceNumber.getAndIncrement()) {
                mNextAvailableMessageSequenceNumber.getAndIncrement(), hostEndpointId) {
            @Override
            @Override
            /* package */ int onTransact() {
            /* package */ int onTransact() {
                try {
                try {
@@ -421,18 +421,25 @@ import java.util.concurrent.atomic.AtomicInteger;
            return;
            return;
        }
        }


        if (mTransactionQueue.size() == MAX_PENDING_REQUESTS) {
        if (mTransactionQueue.size() >= MAX_PENDING_REQUESTS
                || mReliableMessageTransactionMap.size() >= MAX_PENDING_REQUESTS) {
            throw new IllegalStateException("Transaction queue is full (capacity = "
            throw new IllegalStateException("Transaction queue is full (capacity = "
                    + MAX_PENDING_REQUESTS + ")");
                    + MAX_PENDING_REQUESTS + ")");
        }
        }


        mTransactionQueue.add(transaction);
        mTransactionRecordDeque.add(new TransactionRecord(transaction.toString()));
        mTransactionRecordDeque.add(new TransactionRecord(transaction.toString()));

        if (Flags.reliableMessageRetrySupportService()
                && transaction.getTransactionType()
                        == ContextHubTransaction.TYPE_RELIABLE_MESSAGE) {
            mReliableMessageTransactionMap.put(transaction.getMessageSequenceNumber(), transaction);
            mExecutor.execute(() -> processMessageTransactions());
        } else {
            mTransactionQueue.add(transaction);
            if (mTransactionQueue.size() == 1) {
            if (mTransactionQueue.size() == 1) {
                startNextTransaction();
                startNextTransaction();
            }
            }
        }
        }
    }


    /**
    /**
     * Handles a transaction response from a Context Hub.
     * Handles a transaction response from a Context Hub.
@@ -460,6 +467,7 @@ import java.util.concurrent.atomic.AtomicInteger;


    /* package */
    /* package */
    synchronized void onMessageDeliveryResponse(int messageSequenceNumber, boolean success) {
    synchronized void onMessageDeliveryResponse(int messageSequenceNumber, boolean success) {
        if (!Flags.reliableMessageRetrySupportService()) {
            ContextHubServiceTransaction transaction = mTransactionQueue.peek();
            ContextHubServiceTransaction transaction = mTransactionQueue.peek();
            if (transaction == null) {
            if (transaction == null) {
                Log.w(TAG, "Received unexpected transaction response (no transaction pending)");
                Log.w(TAG, "Received unexpected transaction response (no transaction pending)");
@@ -480,6 +488,21 @@ import java.util.concurrent.atomic.AtomicInteger;
            transaction.onTransactionComplete(success ? ContextHubTransaction.RESULT_SUCCESS :
            transaction.onTransactionComplete(success ? ContextHubTransaction.RESULT_SUCCESS :
                            ContextHubTransaction.RESULT_FAILED_AT_HUB);
                            ContextHubTransaction.RESULT_FAILED_AT_HUB);
            removeTransactionAndStartNext();
            removeTransactionAndStartNext();
            return;
        }

        ContextHubServiceTransaction transaction =
                mReliableMessageTransactionMap.get(messageSequenceNumber);
        if (transaction == null) {
            Log.w(TAG, "Could not find reliable message transaction with message sequence number"
                    + messageSequenceNumber);
            return;
        }

        completeMessageTransaction(transaction,
                success ? ContextHubTransaction.RESULT_SUCCESS
                        : ContextHubTransaction.RESULT_FAILED_AT_HUB);
        mExecutor.execute(() -> processMessageTransactions());
    }
    }


    /**
    /**
@@ -508,6 +531,15 @@ import java.util.concurrent.atomic.AtomicInteger;
     */
     */
    /* package */
    /* package */
    synchronized void onHubReset() {
    synchronized void onHubReset() {
        if (Flags.reliableMessageRetrySupportService()) {
            Iterator<Map.Entry<Integer, ContextHubServiceTransaction>> iter =
                    mReliableMessageTransactionMap.entrySet().iterator();
            while (iter.hasNext()) {
                completeMessageTransaction(iter.next().getValue(),
                        ContextHubTransaction.RESULT_FAILED_AT_HUB, iter);
            }
        }

        ContextHubServiceTransaction transaction = mTransactionQueue.peek();
        ContextHubServiceTransaction transaction = mTransactionQueue.peek();
        if (transaction == null) {
        if (transaction == null) {
            return;
            return;
@@ -571,7 +603,7 @@ import java.util.concurrent.atomic.AtomicInteger;


                long timeoutMs = transaction.getTimeout(TimeUnit.MILLISECONDS);
                long timeoutMs = transaction.getTimeout(TimeUnit.MILLISECONDS);
                try {
                try {
                    mTimeoutFuture = mTimeoutExecutor.schedule(
                    mTimeoutFuture = mExecutor.schedule(
                            onTimeoutFunc, timeoutMs, TimeUnit.MILLISECONDS);
                            onTimeoutFunc, timeoutMs, TimeUnit.MILLISECONDS);
                } catch (Exception e) {
                } catch (Exception e) {
                    Log.e(TAG, "Error when schedule a timer", e);
                    Log.e(TAG, "Error when schedule a timer", e);
@@ -584,6 +616,136 @@ import java.util.concurrent.atomic.AtomicInteger;
        }
        }
    }
    }


    /**
     * Processes message transactions, starting and completing them as needed.
     * This function is called when adding a message transaction or when a timer
     * expires for an existing message transaction's retry or timeout. The
     * internal processing loop will iterate at most twice as if one iteration
     * completes a transaction, the next iteration can only start new transactions.
     * If the first iteration does not complete any transaction, the loop will
     * only iterate once.
     */
    private synchronized void processMessageTransactions() {
        if (!Flags.reliableMessageRetrySupportService()) {
            return;
        }

        if (mReliableMessageTransactionFuture != null) {
            mReliableMessageTransactionFuture.cancel(/* mayInterruptIfRunning= */ false);
            mReliableMessageTransactionFuture = null;
        }

        long now = SystemClock.elapsedRealtimeNanos();
        long nextExecutionTime = Long.MAX_VALUE;
        boolean continueProcessing;
        do {
            continueProcessing = false;
            Iterator<Map.Entry<Integer, ContextHubServiceTransaction>> iter =
                    mReliableMessageTransactionMap.entrySet().iterator();
            while (iter.hasNext()) {
                ContextHubServiceTransaction transaction = iter.next().getValue();
                short hostEndpointId = transaction.getHostEndpointId();
                int numCompletedStartCalls = transaction.getNumCompletedStartCalls();
                if (numCompletedStartCalls == 0
                        && mReliableMessageHostEndpointIdActiveSet.contains(hostEndpointId)) {
                    continue;
                }

                long nextRetryTime = transaction.getNextRetryTime();
                long timeoutTime = transaction.getTimeoutTime();
                boolean transactionTimedOut = timeoutTime <= now;
                boolean transactionHitMaxRetries = nextRetryTime <= now
                        && numCompletedStartCalls > RELIABLE_MESSAGE_MAX_NUM_RETRY;
                if (transactionTimedOut || transactionHitMaxRetries) {
                    completeMessageTransaction(transaction,
                            ContextHubTransaction.RESULT_FAILED_TIMEOUT, iter);
                    continueProcessing = true;
                } else {
                    if (nextRetryTime <= now || numCompletedStartCalls <= 0) {
                        startMessageTransaction(transaction, now);
                    }

                    nextExecutionTime = Math.min(nextExecutionTime,
                            transaction.getNextRetryTime());
                    nextExecutionTime = Math.min(nextExecutionTime,
                            transaction.getTimeoutTime());
                }
            }
        } while (continueProcessing);

        if (nextExecutionTime < Long.MAX_VALUE) {
            mReliableMessageTransactionFuture = mExecutor.schedule(
                    () -> processMessageTransactions(),
                    Math.max(nextExecutionTime - SystemClock.elapsedRealtimeNanos(),
                            RELIABLE_MESSAGE_MIN_WAIT_TIME.toNanos()),
                    TimeUnit.NANOSECONDS);
        }
    }

    /**
     * Completes a message transaction and removes it from the reliable message map.
     *
     * @param transaction The transaction to complete.
     * @param result The result code.
     */
    private void completeMessageTransaction(ContextHubServiceTransaction transaction,
            @ContextHubTransaction.Result int result) {
        completeMessageTransaction(transaction, result, /* iter= */ null);
    }

    /**
     * Completes a message transaction and removes it from the reliable message map using iter.
     *
     * @param transaction The transaction to complete.
     * @param result The result code.
     * @param iter The iterator for the reliable message map - used to remove the message directly.
     */
    private void completeMessageTransaction(ContextHubServiceTransaction transaction,
            @ContextHubTransaction.Result int result,
            Iterator<Map.Entry<Integer, ContextHubServiceTransaction>> iter) {
        transaction.onTransactionComplete(result);

        if (iter == null) {
            mReliableMessageTransactionMap.remove(transaction.getMessageSequenceNumber());
        } else {
            iter.remove();
        }
        mReliableMessageHostEndpointIdActiveSet.remove(transaction.getHostEndpointId());

        Log.d(TAG, "Successfully completed reliable message transaction with "
                + "message sequence number: " + transaction.getMessageSequenceNumber()
                + " and result: " + result);
    }

    /**
     * Starts a message transaction.
     *
     * @param transaction The transaction to start.
     * @param now The now time.
     */
    private void startMessageTransaction(ContextHubServiceTransaction transaction, long now) {
        int numCompletedStartCalls = transaction.getNumCompletedStartCalls();
        @ContextHubTransaction.Result int result = transaction.onTransact();
        if (result == ContextHubTransaction.RESULT_SUCCESS) {
            Log.d(TAG, "Successfully "
                    + (numCompletedStartCalls == 0 ? "started" : "retried")
                    + " reliable message transaction with message sequence number: "
                    + transaction.getMessageSequenceNumber());
        } else {
            Log.w(TAG, "Could not start reliable message transaction with "
                    + "message sequence number: "
                    + transaction.getMessageSequenceNumber()
                    + ", result: " + result);
        }

        transaction.setNextRetryTime(now + RELIABLE_MESSAGE_RETRY_WAIT_TIME.toNanos());
        if (transaction.getTimeoutTime() == Long.MAX_VALUE) { // first time starting transaction
            transaction.setTimeoutTime(now + RELIABLE_MESSAGE_TIMEOUT.toNanos());
        }
        transaction.setNumCompletedStartCalls(numCompletedStartCalls + 1);
        mReliableMessageHostEndpointIdActiveSet.add(transaction.getHostEndpointId());
    }

    private int toStatsTransactionResult(@ContextHubTransaction.Result int result) {
    private int toStatsTransactionResult(@ContextHubTransaction.Result int result) {
        switch (result) {
        switch (result) {
            case ContextHubTransaction.RESULT_SUCCESS:
            case ContextHubTransaction.RESULT_SUCCESS:
@@ -610,19 +772,34 @@ import java.util.concurrent.atomic.AtomicInteger;


    @Override
    @Override
    public String toString() {
    public String toString() {
        StringBuilder sb = new StringBuilder(100);
        StringBuilder sb = new StringBuilder();
        ContextHubServiceTransaction[] arr;
        int i = 0;
        synchronized (this) {
        synchronized (this) {
            arr = mTransactionQueue.toArray(new ContextHubServiceTransaction[0]);
            for (ContextHubServiceTransaction transaction: mTransactionQueue) {
                sb.append(i);
                sb.append(": ");
                sb.append(transaction.toString());
                sb.append("\n");
                ++i;
            }

            if (Flags.reliableMessageRetrySupportService()) {
                for (ContextHubServiceTransaction transaction:
                        mReliableMessageTransactionMap.values()) {
                    sb.append(i);
                    sb.append(": ");
                    sb.append(transaction.toString());
                    sb.append("\n");
                    ++i;
                }
                }
        for (int i = 0; i < arr.length; i++) {
            sb.append(i + ": " + arr[i] + "\n");
            }
            }


            sb.append("Transaction History:\n");
            sb.append("Transaction History:\n");
            Iterator<TransactionRecord> iterator = mTransactionRecordDeque.descendingIterator();
            Iterator<TransactionRecord> iterator = mTransactionRecordDeque.descendingIterator();
            while (iterator.hasNext()) {
            while (iterator.hasNext()) {
            sb.append(iterator.next() + "\n");
                sb.append(iterator.next());
                sb.append("\n");
            }
        }
        }
        return sb.toString();
        return sb.toString();
    }
    }