Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0a3db404 authored by Joe Antonetti's avatar Joe Antonetti
Browse files

[Handoff][1/N] Standardize Message Passing Logic

This change standardizes calls to `CompanionDeviceManager` behind `TaskContinuityMessenger`, which will allow for a common place to enable and disable message passing.

The next CL in this chain will finish this standardization by decoupling business logic from ConnnectedAssociationStore entirely

Flag: android.companion.enable_task_continuity
Bug: 400970610
Test: Updated Unit Tests
Change-Id: I2939f3e3cd491d49b3c132e97da0c5a1ea12225a
parent 6280ac14
Loading
Loading
Loading
Loading
+18 −52
Original line number Diff line number Diff line
@@ -16,15 +16,14 @@

package com.android.server.companion.datatransfer.continuity;

import static android.companion.CompanionDeviceManager.MESSAGE_ONEWAY_TASK_CONTINUITY;
import static com.android.server.companion.datatransfer.contextsync.BitmapUtils.renderDrawableToByteArray;

import android.annotation.NonNull;
import android.app.ActivityManager;
import android.app.ActivityManager.RunningTaskInfo;
import android.app.ActivityTaskManager;
import android.app.TaskStackListener;
import android.companion.AssociationInfo;
import android.companion.CompanionDeviceManager;
import android.content.ComponentName;
import android.content.Context;
import android.content.pm.PackageInfo;
@@ -34,13 +33,12 @@ import android.os.RemoteException;
import android.util.Slog;

import com.android.server.companion.datatransfer.continuity.connectivity.ConnectedAssociationStore;
import com.android.server.companion.datatransfer.continuity.connectivity.TaskContinuityMessenger;
import com.android.server.companion.datatransfer.continuity.messages.ContinuityDeviceConnected;
import com.android.server.companion.datatransfer.continuity.messages.RemoteTaskAddedMessage;
import com.android.server.companion.datatransfer.continuity.messages.RemoteTaskRemovedMessage;
import com.android.server.companion.datatransfer.continuity.messages.RemoteTaskUpdatedMessage;
import com.android.server.companion.datatransfer.continuity.messages.RemoteTaskInfo;
import com.android.server.companion.datatransfer.continuity.messages.TaskContinuityMessage;
import com.android.server.companion.datatransfer.continuity.messages.TaskContinuityMessageSerializer;

import java.io.IOException;
import java.util.ArrayList;
@@ -48,6 +46,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Objects;

/**
 * Responsible for broadcasting recent tasks on the current device to the user's
@@ -62,26 +61,22 @@ class TaskBroadcaster

    private final Context mContext;
    private final ActivityTaskManager mActivityTaskManager;
    private final CompanionDeviceManager mCompanionDeviceManager;
    private final ConnectedAssociationStore mConnectedAssociationStore;
    private final TaskContinuityMessenger mTaskContinuityMessenger;
    private final PackageManager mPackageManager;

    private boolean mIsBroadcasting = false;

    public TaskBroadcaster(
        Context context,
        ConnectedAssociationStore connectedAssociationStore) {
        @NonNull Context context,
        @NonNull TaskContinuityMessenger taskContinuityMessenger) {

        mContext = context;
        mConnectedAssociationStore = connectedAssociationStore;

        mActivityTaskManager
            = context.getSystemService(ActivityTaskManager.class);

        mCompanionDeviceManager
            = context.getSystemService(CompanionDeviceManager.class);
        Objects.requireNonNull(context);
        Objects.requireNonNull(taskContinuityMessenger);

        mContext = context;
        mActivityTaskManager = context.getSystemService(ActivityTaskManager.class);
        mPackageManager = context.getPackageManager();
        mTaskContinuityMessenger = taskContinuityMessenger;
    }

    void startBroadcasting(){
@@ -91,7 +86,7 @@ class TaskBroadcaster
        }

        Slog.v(TAG, "Starting broadcasting");
        mConnectedAssociationStore.addObserver(this);
        mTaskContinuityMessenger.getConnectedAssociationStore().addObserver(this);
        mActivityTaskManager.registerTaskStackListener(this);

        mIsBroadcasting = true;
@@ -105,7 +100,7 @@ class TaskBroadcaster

        Slog.v(TAG, "Stopping broadcasting");
        mIsBroadcasting = false;
        mConnectedAssociationStore.removeObserver(this);
        mTaskContinuityMessenger.getConnectedAssociationStore().removeObserver(this);
        mActivityTaskManager.unregisterTaskStackListener(this);
    }

@@ -140,10 +135,8 @@ class TaskBroadcaster
                return;
            }

            RemoteTaskAddedMessage taskAddedMessage
                = new RemoteTaskAddedMessage(remoteTaskInfo);

            sendMessageToAllConnectedAssociations(taskAddedMessage);
            RemoteTaskAddedMessage taskAddedMessage = new RemoteTaskAddedMessage(remoteTaskInfo);
            mTaskContinuityMessenger.sendMessage(taskAddedMessage);
        } else {
            Slog.w(TAG, "Could not find RunningTaskInfo for taskId: " + taskId);
        }
@@ -154,7 +147,7 @@ class TaskBroadcaster
        Slog.v(TAG, "onTaskRemoved: taskId=" + taskId);

        RemoteTaskRemovedMessage taskRemovedMessage = new RemoteTaskRemovedMessage(taskId);
        sendMessageToAllConnectedAssociations(taskRemovedMessage);
        mTaskContinuityMessenger.sendMessage(taskRemovedMessage);
    }

    @Override
@@ -168,7 +161,7 @@ class TaskBroadcaster
        }

        RemoteTaskUpdatedMessage taskUpdatedMessage = new RemoteTaskUpdatedMessage(remoteTaskInfo);
        sendMessageToAllConnectedAssociations(taskUpdatedMessage);
        mTaskContinuityMessenger.sendMessage(taskUpdatedMessage);
    }

    private void sendDeviceConnectedMessage(int associationId) {
@@ -192,34 +185,7 @@ class TaskBroadcaster
        ContinuityDeviceConnected deviceConnectedMessage
            = new ContinuityDeviceConnected(remoteTasks);

        sendMessage(associationId, deviceConnectedMessage);
    }

    private void sendMessage(int associationId, TaskContinuityMessage message) {
        Slog.v(TAG, "Sending message to association id: " + associationId);

        try {
            mCompanionDeviceManager.sendMessage(
                CompanionDeviceManager.MESSAGE_ONEWAY_TASK_CONTINUITY,
                TaskContinuityMessageSerializer.serialize(message),
                new int[] {associationId});
        } catch (IOException e) {
            Slog.e(TAG, "Failed to send message to device " + associationId, e);
        }
    }

    private void sendMessageToAllConnectedAssociations(TaskContinuityMessage message) {

        Collection<AssociationInfo> connectedAssociations
            = mConnectedAssociationStore.getConnectedAssociations();

        Slog.v(
            TAG,
            "Sending message to " + connectedAssociations.size() + " associations.");

        for (AssociationInfo associationInfo : connectedAssociations) {
            sendMessage(associationInfo.getId(), message);
        }
        mTaskContinuityMessenger.sendMessage(associationId, deviceConnectedMessage);
    }

    private ActivityManager.RunningTaskInfo getRunningTask(int taskId) {
+15 −16
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@ import android.content.Context;
import android.os.Binder;
import android.util.Slog;

import com.android.server.companion.datatransfer.continuity.connectivity.TaskContinuityMessenger;
import com.android.server.companion.datatransfer.continuity.handoff.InboundHandoffRequestController;
import com.android.server.companion.datatransfer.continuity.handoff.OutboundHandoffRequestController;
import com.android.server.companion.datatransfer.continuity.messages.ContinuityDeviceConnected;
@@ -38,7 +39,6 @@ import com.android.server.companion.datatransfer.continuity.messages.TaskContinu
import com.android.server.companion.datatransfer.continuity.tasks.RemoteTaskStore;

import com.android.server.SystemService;
import com.android.server.companion.datatransfer.continuity.connectivity.ConnectedAssociationStore;

import java.util.ArrayList;
import java.util.List;
@@ -49,7 +49,8 @@ import java.util.List;
 * @hide
 *
 */
public final class TaskContinuityManagerService extends SystemService {
public final class TaskContinuityManagerService
    extends SystemService implements TaskContinuityMessenger.Listener {

    private static final String TAG = "TaskContinuityManagerService";

@@ -57,31 +58,28 @@ public final class TaskContinuityManagerService extends SystemService {
    private OutboundHandoffRequestController mOutboundHandoffRequestController;
    private TaskContinuityManagerServiceImpl mTaskContinuityManagerService;
    private TaskBroadcaster mTaskBroadcaster;
    private ConnectedAssociationStore mConnectedAssociationStore;
    private TaskContinuityMessageReceiver mTaskContinuityMessageReceiver;
    private TaskContinuityMessenger mTaskContinuityMessenger;
    private RemoteTaskStore mRemoteTaskStore;

    public TaskContinuityManagerService(Context context) {
        super(context);
        mConnectedAssociationStore = new ConnectedAssociationStore(context);

        mTaskBroadcaster = new TaskBroadcaster(
            context,
            mConnectedAssociationStore);

        mTaskContinuityMessageReceiver = new TaskContinuityMessageReceiver(context);
        mRemoteTaskStore = new RemoteTaskStore(mConnectedAssociationStore);
        mTaskContinuityMessenger = new TaskContinuityMessenger(context, this);
        mTaskBroadcaster = new TaskBroadcaster(context, mTaskContinuityMessenger);
        mRemoteTaskStore = new RemoteTaskStore(
            mTaskContinuityMessenger.getConnectedAssociationStore());
        mOutboundHandoffRequestController = new OutboundHandoffRequestController(
            context,
            mConnectedAssociationStore);
        mInboundHandoffRequestController = new InboundHandoffRequestController(context);
            mTaskContinuityMessenger);
        mInboundHandoffRequestController = new InboundHandoffRequestController(
            mTaskContinuityMessenger);
    }

    @Override
    public void onStart() {
        mTaskContinuityManagerService = new TaskContinuityManagerServiceImpl();
        mTaskContinuityMessenger.enable();
        mTaskBroadcaster.startBroadcasting();
        mTaskContinuityMessageReceiver.startListening(this::onTaskContinuityMessageReceived);
        publishBinderService(Context.TASK_CONTINUITY_SERVICE, mTaskContinuityManagerService);
    }

@@ -118,9 +116,10 @@ public final class TaskContinuityManagerService extends SystemService {
        }
    }

    private void onTaskContinuityMessageReceived(
    @Override
    public void onMessageReceived(
        int associationId,
        TaskContinuityMessage taskContinuityMessage) {
        @NonNull TaskContinuityMessage taskContinuityMessage) {

        Slog.v(TAG, "Received message from association id: " + associationId);

+0 −109
Original line number Diff line number Diff line
/*
 * Copyright (C) 2025 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.android.server.companion.datatransfer.continuity;

import static android.companion.CompanionDeviceManager.MESSAGE_ONEWAY_TASK_CONTINUITY;

import android.content.Context;
import android.companion.CompanionDeviceManager;
import android.util.Slog;

import com.android.server.companion.datatransfer.continuity.messages.TaskContinuityMessage;
import com.android.server.companion.datatransfer.continuity.messages.TaskContinuityMessageSerializer;

import java.util.function.BiConsumer;

/**
 * Responsible for receiving task continuity messages from the user's other
 * devices.
 */
class TaskContinuityMessageReceiver {

    private static final String TAG = "TaskContinuityMessageReceiver";

    private final Context mContext;
    private final CompanionDeviceManager mCompanionDeviceManager;

    private final BiConsumer<Integer, byte[]> mOnMessageReceivedListener
        = this::onMessageReceived;

    private BiConsumer<Integer, TaskContinuityMessage> mOnTaskContinuityMessageReceivedListener;

    private boolean mIsListening = false;

    TaskContinuityMessageReceiver(Context context) {
        mContext = context;
        mCompanionDeviceManager = context
            .getSystemService(CompanionDeviceManager.class);
    }

    /**
     * Starts listening for task continuity messages.
     *
     * @return true if listening was started successfully, false otherwise.
     */
    boolean startListening(
        BiConsumer<Integer, TaskContinuityMessage> onTaskContinuityMessageReceivedListener) {
        if (mIsListening) {
            Slog.v(TAG, "TaskContinuityMessageReceiver is already listening");
            return false;
        }

        mOnTaskContinuityMessageReceivedListener = onTaskContinuityMessageReceivedListener;
        mCompanionDeviceManager.addOnMessageReceivedListener(
            mContext.getMainExecutor(),
            MESSAGE_ONEWAY_TASK_CONTINUITY,
            mOnMessageReceivedListener
        );

        mIsListening = true;
        return true;
    }

    /**
     * Stops listening for task continuity messages.
     */
    void stopListening() {
        if (!mIsListening) {
            Slog.v(TAG, "TaskContinuityMessageReceiver is not listening");
            return;
        }

        mOnTaskContinuityMessageReceivedListener = null;

        mCompanionDeviceManager.removeOnMessageReceivedListener(
            MESSAGE_ONEWAY_TASK_CONTINUITY,
            mOnMessageReceivedListener);

        mIsListening = false;
    }

    private void onMessageReceived(int associationId, byte[] data) {
        Slog.v(TAG, "Received message from association id: " + associationId);
      try {
            TaskContinuityMessage taskContinuityMessage
                = TaskContinuityMessageSerializer.deserialize(data);
            if (mOnTaskContinuityMessageReceivedListener != null) {
                mOnTaskContinuityMessageReceivedListener.accept(
                    associationId,
                    taskContinuityMessage);
            }
      } catch (Exception e) {
        Slog.e(TAG, "Failed to parse task continuity message", e);
      }
    }
}
 No newline at end of file
+5 −5
Original line number Diff line number Diff line
@@ -19,7 +19,6 @@ package com.android.server.companion.datatransfer.continuity.connectivity;
import android.annotation.NonNull;
import android.companion.AssociationInfo;
import android.companion.CompanionDeviceManager;
import android.content.Context;
import android.util.Log;

import java.util.ArrayList;
@@ -29,6 +28,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;

public class ConnectedAssociationStore {

@@ -44,12 +44,12 @@ public class ConnectedAssociationStore {
    }

    public ConnectedAssociationStore(
            @NonNull Context context) {
        mCompanionDeviceManager = context
            .getSystemService(CompanionDeviceManager.class);
        @NonNull CompanionDeviceManager companionDeviceManager,
        @NonNull Executor executor) {

        mCompanionDeviceManager = companionDeviceManager;
        mCompanionDeviceManager.addOnTransportsChangedListener(
                context.getMainExecutor(),
                executor,
                this::onTransportsChanged);
   }

+170 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2025 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.android.server.companion.datatransfer.continuity.connectivity;

import static android.companion.CompanionDeviceManager.MESSAGE_ONEWAY_TASK_CONTINUITY;

import android.annotation.NonNull;
import android.companion.CompanionDeviceManager;
import android.companion.AssociationInfo;
import android.content.Context;
import android.util.Slog;

import com.android.server.companion.datatransfer.continuity.messages.TaskContinuityMessage;
import com.android.server.companion.datatransfer.continuity.messages.TaskContinuityMessageSerializer;

import java.io.IOException;
import java.util.Collection;
import java.util.function.BiConsumer;
import java.util.concurrent.Executor;
import java.util.Objects;

/**
 * Facilitates communication between devices, including sending and receiving messages
 * between devices. Internally, it uses the {@link CompanionDeviceManager} to send and receive
 * messages.
 */
public class TaskContinuityMessenger {

    private static final String TAG = "TaskContinuityMessenger";

    private final Context mContext;
    private final CompanionDeviceManager mCompanionDeviceManager;
    private final ConnectedAssociationStore mConnectedAssociationStore;
    private final Executor mExecutor;
    private final Listener mListener;

    private BiConsumer<Integer, byte[]> mIncomingMessageConsumer;

    public interface Listener {
        void onMessageReceived(int associationId, @NonNull TaskContinuityMessage message);
    }

    public TaskContinuityMessenger(@NonNull Context context, @NonNull Listener listener) {

        Objects.requireNonNull(context);
        Objects.requireNonNull(listener);

        mContext = context;
        mExecutor = context.getMainExecutor();
        mListener = listener;
        mCompanionDeviceManager = context.getSystemService(CompanionDeviceManager.class);
        mConnectedAssociationStore = new ConnectedAssociationStore(
            mCompanionDeviceManager,
            mExecutor);
    }

    public void enable() {
        synchronized (this) {
            if (mIncomingMessageConsumer != null) {
                Slog.i(TAG, "TaskContinuityMessenger is already enabled.");
                return;
            }
            mIncomingMessageConsumer = this::onMessageReceived;
            mCompanionDeviceManager.addOnMessageReceivedListener(
                mExecutor,
                MESSAGE_ONEWAY_TASK_CONTINUITY,
                mIncomingMessageConsumer);
        }
    }

    public void disable() {
        synchronized (this) {
            if (mIncomingMessageConsumer == null) {
                Slog.i(TAG, "TaskContinuityMessenger is already disabled.");
                return;
            }
            mCompanionDeviceManager.removeOnMessageReceivedListener(
                MESSAGE_ONEWAY_TASK_CONTINUITY,
                mIncomingMessageConsumer);
            mIncomingMessageConsumer = null;
        }
    }

    @NonNull
    public ConnectedAssociationStore getConnectedAssociationStore() {
        return mConnectedAssociationStore;
    }

    public enum SendMessageResult {
        SUCCESS,
        FAILURE_MESSAGE_SERIALIZATION_FAILED,
        FAILURE_ASSOCIATION_NOT_FOUND,
        FAILURE_INTERNAL_ERROR,
    }

    public SendMessageResult sendMessage(
        int associationId,
        @NonNull TaskContinuityMessage message) {

       return sendMessage(new int[] {associationId}, message);
    }

    public SendMessageResult sendMessage(
        int[] associationIds,
        @NonNull TaskContinuityMessage message) {

        Slog.i(TAG, "Sending message to " + associationIds.length + " associations.");
        byte[] serializedMessage;
        try {
            serializedMessage = TaskContinuityMessageSerializer.serialize(message);
        } catch (IOException e) {
            Slog.e(TAG, "Failed to serialize message: " + message, e);
            return SendMessageResult.FAILURE_MESSAGE_SERIALIZATION_FAILED;
        }

        for (int associationId : associationIds) {
            if (mConnectedAssociationStore.getConnectedAssociationById(associationId) == null) {
                Slog.w(TAG, "Association " + associationId + " is not connected.");
                return SendMessageResult.FAILURE_ASSOCIATION_NOT_FOUND;
            }
        }

        try {
            mCompanionDeviceManager.sendMessage(
                CompanionDeviceManager.MESSAGE_ONEWAY_TASK_CONTINUITY,
                serializedMessage,
                associationIds);
            Slog.i(TAG, "Sending message to " + associationIds.length + " associations.");
            return SendMessageResult.SUCCESS;
        } catch (Exception e) {
            Slog.e(TAG, "Failed to send message to associations", e);
            return SendMessageResult.FAILURE_INTERNAL_ERROR;
        }
    }

    public SendMessageResult sendMessage(@NonNull TaskContinuityMessage message) {
        int[] connectedAssociations = mConnectedAssociationStore
            .getConnectedAssociations()
            .stream()
            .mapToInt(AssociationInfo::getId)
            .toArray();

        return sendMessage(connectedAssociations, message);
    }

    private void onMessageReceived(int associationId, byte[] data) {
        Slog.v(TAG, "Received message from association id: " + associationId);
      try {
            TaskContinuityMessage taskContinuityMessage
                = TaskContinuityMessageSerializer.deserialize(data);
            mListener.onMessageReceived(associationId, taskContinuityMessage);
      } catch (IOException e) {
        Slog.e(TAG, "Failed to parse task continuity message", e);
      }
    }
}
 No newline at end of file
Loading