Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit dece117d authored by Matthew Sedam's avatar Matthew Sedam Committed by Android (Google) Code Review
Browse files

Merge "Context Hub Service: Add reliable message duplicate detection" into main

parents 025d7cd8 39524ee7
Loading
Loading
Loading
Loading
+200 −7
Original line number Diff line number Diff line
@@ -78,10 +78,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
@@ -152,6 +156,16 @@ public class ContextHubService extends IContextHubService.Stub {
    private final ScheduledThreadPoolExecutor mDailyMetricTimer =
            new ScheduledThreadPoolExecutor(1);

    // A queue of reliable message records for duplicate detection
    private final PriorityQueue<ReliableMessageRecord> mReliableMessageRecordQueue =
            new PriorityQueue<ReliableMessageRecord>(
                    (ReliableMessageRecord left, ReliableMessageRecord right) -> {
                        return Long.compare(left.getTimestamp(), right.getTimestamp());
                    });

    // The test mode manager that manages behaviors during test mode.
    private final TestModeManager mTestModeManager = new TestModeManager();

    // The period of the recurring time
    private static final int PERIOD_METRIC_QUERY_DAYS = 1;

@@ -164,6 +178,9 @@ public class ContextHubService extends IContextHubService.Stub {
    private boolean mIsBtScanningEnabled = false;
    private boolean mIsBtMainEnabled = false;

    // True if test mode is enabled for the Context Hub
    private AtomicBoolean mIsTestModeEnabled = new AtomicBoolean(false);

    // A hashmap used to record if a contexthub is waiting for daily query
    private Set<Integer> mMetricQueryPendingContextHubIds =
            Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
@@ -210,8 +227,17 @@ public class ContextHubService extends IContextHubService.Stub {
        @Override
        public void handleNanoappMessage(short hostEndpointId, NanoAppMessage message,
                List<String> nanoappPermissions, List<String> messagePermissions) {
            handleClientMessageCallback(mContextHubId, hostEndpointId, message, nanoappPermissions,
                    messagePermissions);
            if (Flags.reliableMessageImplementation()
                    && Flags.reliableMessageTestModeBehavior()
                    && mIsTestModeEnabled.get()
                    && mTestModeManager.handleNanoappMessage(mContextHubId, hostEndpointId,
                            message, nanoappPermissions, messagePermissions)) {
                // The TestModeManager handled the nanoapp message, so return here.
                return;
            }

            handleClientMessageCallback(mContextHubId, hostEndpointId, message,
                    nanoappPermissions, messagePermissions);
        }

        @Override
@@ -228,6 +254,106 @@ public class ContextHubService extends IContextHubService.Stub {
        }
    }

    /**
     * Records a reliable message from a nanoapp for duplicate detection.
     */
    private static class ReliableMessageRecord {
        public static final int TIMEOUT_NS = 1000000000;

        public int mContextHubId;
        public long mTimestamp;
        public int mMessageSequenceNumber;
        byte mErrorCode;

        ReliableMessageRecord(int contextHubId, long timestamp,
                int messageSequenceNumber, byte errorCode) {
            mContextHubId = contextHubId;
            mTimestamp = timestamp;
            mMessageSequenceNumber = messageSequenceNumber;
            mErrorCode = errorCode;
        }

        public int getContextHubId() {
            return mContextHubId;
        }

        public long getTimestamp() {
            return mTimestamp;
        }

        public int getMessageSequenceNumber() {
            return mMessageSequenceNumber;
        }

        public byte getErrorCode() {
            return mErrorCode;
        }

        public void setErrorCode(byte errorCode) {
            mErrorCode = errorCode;
        }

        public boolean isExpired() {
            return mTimestamp + TIMEOUT_NS < SystemClock.elapsedRealtimeNanos();
        }
    }

    /**
     * A class to manage behaviors during test mode. This is used for testing.
     */
    private class TestModeManager {
        /**
         * Probability (in percent) of duplicating a message.
         */
        private static final int MESSAGE_DUPLICATION_PROBABILITY_PERCENT = 50;

        /**
         * The number of total messages to send when the duplicate event happens.
         */
        private static final int NUM_MESSAGES_TO_DUPLICATE = 3;

        /**
         * A probability percent for a certain event.
         */
        private static final int MAX_PROBABILITY_PERCENT = 100;

        /**
         * Random number generator.
         */
        private Random mRandom = new Random();

        /**
         * @see ContextHubServiceCallback.handleNanoappMessage
         * @return whether the message was handled
         */
        public boolean handleNanoappMessage(int contextHubId,
                short hostEndpointId, NanoAppMessage message,
                List<String> nanoappPermissions, List<String> messagePermissions) {
            if (!message.isReliable()) {
                return false;
            }

            if (didEventHappen(MESSAGE_DUPLICATION_PROBABILITY_PERCENT)) {
                for (int i = 0; i < NUM_MESSAGES_TO_DUPLICATE; ++i) {
                    handleClientMessageCallback(contextHubId, hostEndpointId,
                            message, nanoappPermissions, messagePermissions);
                }
                return true;
            }
            return false;
        }

        /**
         * Returns true if the event with percentPercent did happen.
         *
         * @param probabilityPercent the percent probability of the event.
         * @return true if the event happened, false otherwise.
         */
        private boolean didEventHappen(int probabilityPercent) {
            return mRandom.nextInt(MAX_PROBABILITY_PERCENT) < probabilityPercent;
        }
    }

    public ContextHubService(Context context, IContextHubWrapper contextHubWrapper) {
        Log.i(TAG, "Starting Context Hub Service init");
        mContext = context;
@@ -563,6 +689,8 @@ public class ContextHubService extends IContextHubService.Stub {
     * Resets the settings. Called when a context hub restarts or the AIDL HAL dies
     */
    private void resetSettings() {
        mIsTestModeEnabled.set(false);

        sendLocationSettingUpdate();
        sendWifiSettingUpdate(/* forceUpdate= */ true);
        sendAirplaneModeSettingUpdate();
@@ -854,14 +982,76 @@ public class ContextHubService extends IContextHubService.Stub {
    private void handleClientMessageCallback(int contextHubId, short hostEndpointId,
            NanoAppMessage message, List<String> nanoappPermissions,
            List<String> messagePermissions) {
        byte errorCode = mClientManager.onMessageFromNanoApp(contextHubId, hostEndpointId, message,
                nanoappPermissions, messagePermissions);
        if (!Flags.reliableMessageImplementation()
                || !Flags.reliableMessageDuplicateDetectionService()) {
            byte errorCode = mClientManager.onMessageFromNanoApp(contextHubId, hostEndpointId,
                    message, nanoappPermissions, messagePermissions);
            if (message.isReliable() && errorCode != ErrorCode.OK) {
            sendMessageDeliveryStatusToContextHub(contextHubId, message.getMessageSequenceNumber(),
                    errorCode);
                sendMessageDeliveryStatusToContextHub(contextHubId,
                        message.getMessageSequenceNumber(), errorCode);
            }
            return;
        }

        if (message.isReliable()) {
            byte errorCode = ErrorCode.OK;
            synchronized (mReliableMessageRecordQueue) {
                Optional<ReliableMessageRecord> record = Optional.empty();
                for (ReliableMessageRecord r: mReliableMessageRecordQueue) {
                    if (r.getContextHubId() == contextHubId
                            && r.getMessageSequenceNumber() == message.getMessageSequenceNumber()) {
                        record = Optional.of(r);
                        break;
                    }
                }

                if (record.isPresent()) {
                    errorCode = record.get().getErrorCode();
                    if (errorCode == ErrorCode.TRANSIENT_ERROR) {
                        Log.w(TAG, "Found duplicate reliable message with message sequence number: "
                                + record.get().getMessageSequenceNumber() + ": retrying");
                        errorCode = mClientManager.onMessageFromNanoApp(
                                contextHubId, hostEndpointId, message,
                                nanoappPermissions, messagePermissions);
                        record.get().setErrorCode(errorCode);
                    } else {
                        Log.w(TAG, "Found duplicate reliable message with message sequence number: "
                                + record.get().getMessageSequenceNumber());
                    }
                } else {
                    errorCode = mClientManager.onMessageFromNanoApp(
                            contextHubId, hostEndpointId, message,
                            nanoappPermissions, messagePermissions);
                    mReliableMessageRecordQueue.add(
                            new ReliableMessageRecord(contextHubId,
                                    SystemClock.elapsedRealtimeNanos(),
                                    message.getMessageSequenceNumber(),
                                    errorCode));
                }
            }
            sendMessageDeliveryStatusToContextHub(contextHubId,
                    message.getMessageSequenceNumber(), errorCode);
        } else {
            mClientManager.onMessageFromNanoApp(
                    contextHubId, hostEndpointId, message,
                    nanoappPermissions, messagePermissions);
        }

        synchronized (mReliableMessageRecordQueue) {
            while (mReliableMessageRecordQueue.peek() != null
                   && mReliableMessageRecordQueue.peek().isExpired()) {
                mReliableMessageRecordQueue.poll();
            }
        }
    }

    /**
     * Sends the message delivery status to the Context Hub.
     *
     * @param contextHubId the ID of the hub
     * @param messageSequenceNumber the message sequence number
     * @param errorCode the error code, one of the enum ErrorCode
     */
    private void sendMessageDeliveryStatusToContextHub(int contextHubId,
            int messageSequenceNumber, byte errorCode) {
        if (!Flags.reliableMessageImplementation()) {
@@ -1229,6 +1419,9 @@ public class ContextHubService extends IContextHubService.Stub {
    public boolean setTestMode(boolean enable) {
        super.setTestMode_enforcePermission();
        boolean status = mContextHubWrapper.setTestMode(enable);
        if (status) {
            mIsTestModeEnabled.set(enable);
        }

        // Query nanoapps to update service state after test mode state change.
        for (int contextHubId: mDefaultClientMap.keySet()) {