Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ef9108a1 authored by hkuang's avatar hkuang
Browse files

TranscodingAPI: Handling Transcding service crash.

Upon receiving a binder died event of the client
due to service crash, we will do the following:
 1) For the job that is running, notify the client
    that the job is failed with error code, so client
    could choose to retry the job or not.
 2) For the jobs that is still pending or paused,
    we will resubmit the job internally once we
    successfully reconnect to the service and
    register a new client.
 3) When trying to connect to the service and register
    a new client. The service may need time to reboot
    or never boot up again. So we will retry for a number
    of times. If we still could not connect, we will notify
    client job failure for the pending and paused jobs.

Bug: 161469320
Bug: 160260102
Test: Run the test with long clip and kill the service.
Change-Id: Idfd8dd0aae60b1dfd6c766552c71f07c60663918
parent 8fd758e5
Loading
Loading
Loading
Loading
+177 −18
Original line number Diff line number Diff line
@@ -36,7 +36,10 @@ import com.android.internal.annotations.VisibleForTesting;
import java.io.FileNotFoundException;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -94,11 +97,17 @@ import java.util.concurrent.Executors;
 TODO(hkuang): Clarify whether supports framerate conversion.
 @hide
 */
public final class MediaTranscodeManager {
public final class MediaTranscodeManager implements AutoCloseable {
    private static final String TAG = "MediaTranscodeManager";

    private static final String MEDIA_TRANSCODING_SERVICE = "media.transcoding";

    /** Maximum number of retry to connect to the service. */
    private static final int CONNECT_SERVICE_RETRY_COUNT = 100;

    /** Interval between trying to reconnect to the service. */
    private static final int INTERVAL_CONNECT_SERVICE_RETRY_MS = 40;

    /**
     * Default transcoding type.
     * @hide
@@ -117,6 +126,25 @@ public final class MediaTranscodeManager {
     */
    public static final int TRANSCODING_TYPE_IMAGE = 2;

    @Override
    public void close() throws Exception {
        release();
    }

    /**
     * Releases the MediaTranscodeManager.
     */
    //TODO(hkuang): add test for it.
    private void release() throws Exception {
        synchronized (mLock) {
            if (mTranscodingClient != null) {
                mTranscodingClient.unregister();
            } else {
                throw new UnsupportedOperationException("Failed to release");
            }
        }
    }

    /** @hide */
    @IntDef(prefix = {"TRANSCODING_TYPE_"}, value = {
            TRANSCODING_TYPE_UNKNOWN,
@@ -181,10 +209,12 @@ public final class MediaTranscodeManager {
    private final String mPackageName;
    private final int mPid;
    private final int mUid;
    private final ExecutorService mCallbackExecutor = Executors.newSingleThreadExecutor();
    private static MediaTranscodeManager sMediaTranscodeManager;
    private final ExecutorService mExecutor = Executors.newSingleThreadExecutor();
    private final HashMap<Integer, TranscodingJob> mPendingTranscodingJobs = new HashMap();
    @NonNull private ITranscodingClient mTranscodingClient;
    private final Object mLock = new Object();
    @GuardedBy("mLock")
    @NonNull private ITranscodingClient mTranscodingClient = null;
    private static MediaTranscodeManager sMediaTranscodeManager;

    private void handleTranscodingFinished(int jobId, TranscodingResultParcel result) {
        synchronized (mPendingTranscodingJobs) {
@@ -209,7 +239,7 @@ public final class MediaTranscodeManager {
        }
    }

    private void handleTranscodingFailed(int jobId, int errorCodec) {
    private void handleTranscodingFailed(int jobId, int errorCode) {
        synchronized (mPendingTranscodingJobs) {
            // Gets the job associated with the jobId and removes it from
            // mPendingTranscodingJobs.
@@ -254,6 +284,98 @@ public final class MediaTranscodeManager {
        }
    }

    private static IMediaTranscodingService getService(boolean retry) {
        int retryCount = !retry ? 1 :  CONNECT_SERVICE_RETRY_COUNT;
        Log.i(TAG, "get service with rety " + retryCount);
        for (int count = 1;  count <= retryCount; count++) {
            Log.d(TAG, "Trying to connect to service. Try count: " + count);
            IMediaTranscodingService service = IMediaTranscodingService.Stub.asInterface(
                    ServiceManager.getService(MEDIA_TRANSCODING_SERVICE));
            if (service != null) {
                return service;
            }
            try {
                // Sleep a bit before retry.
                Thread.sleep(INTERVAL_CONNECT_SERVICE_RETRY_MS);
            } catch (InterruptedException ie) {
                /* ignore */
            }
        }

        throw new UnsupportedOperationException("Failed to connect to MediaTranscoding service");
    }

    /*
     * Handle client binder died event.
     * Upon receiving a binder died event of the client, we will do the following:
     * 1) For the job that is running, notify the client that the job is failed with error code,
     *    so client could choose to retry the job or not.
     *    TODO(hkuang): Add a new error code to signal service died error.
     * 2) For the jobs that is still pending or paused, we will resubmit the job internally once
     *    we successfully reconnect to the service and register a new client.
     * 3) When trying to connect to the service and register a new client. The service may need time
     *    to reboot or never boot up again. So we will retry for a number of times. If we still
     *    could not connect, we will notify client job failure for the pending and paused jobs.
     */
    private void onClientDied() {
        synchronized (mLock) {
            mTranscodingClient = null;
        }

        // Delegates the job notification and retry to the executor as it may take some time.
        mExecutor.execute(() -> {
            // List to track the jobs that we want to retry.
            List<TranscodingJob> retryJobs = new ArrayList<TranscodingJob>();

            // First notify the client of job failure for all the running jobs.
            synchronized (mPendingTranscodingJobs) {
                for (Map.Entry<Integer, TranscodingJob> entry :
                        mPendingTranscodingJobs.entrySet()) {
                    TranscodingJob job = entry.getValue();

                    if (job.getStatus() == TranscodingJob.STATUS_RUNNING) {
                        job.updateStatusAndResult(TranscodingJob.STATUS_FINISHED,
                                TranscodingJob.RESULT_ERROR);

                        // Remove the job from pending jobs.
                        mPendingTranscodingJobs.remove(entry.getKey());

                        if (job.mListener != null && job.mListenerExecutor != null) {
                            Log.i(TAG, "Notify client job failed");
                            job.mListenerExecutor.execute(
                                    () -> job.mListener.onTranscodingFinished(job));
                        }
                    } else if (job.getStatus() == TranscodingJob.STATUS_PENDING
                            || job.getStatus() == TranscodingJob.STATUS_PAUSED) {
                        // Add the job to retryJobs to handle them later.
                        retryJobs.add(job);
                    }
                }
            }

            // Try to register with the service once it boots up.
            IMediaTranscodingService service = getService(true /*retry*/);
            boolean haveTranscodingClient = false;
            if (service != null) {
                synchronized (mLock) {
                    mTranscodingClient = registerClient(service);
                    if (mTranscodingClient != null) {
                        haveTranscodingClient = true;
                    }
                }
            }

            for (TranscodingJob job : retryJobs) {
                // Notify the job failure if we fails to connect to the service or fail
                // to retry the job.
                if (!haveTranscodingClient || !job.retry()) {
                    // TODO(hkuang): Return correct error code to the client.
                    handleTranscodingFailed(job.getJobId(), 0 /*unused */);
                }
            }
        });
    }

    private void updateStatus(int jobId, int status) {
        synchronized (mPendingTranscodingJobs) {
            final TranscodingJob job = mPendingTranscodingJobs.get(jobId);
@@ -336,29 +458,49 @@ public final class MediaTranscodeManager {
                }
            };

    /* Private constructor. */
    private MediaTranscodeManager(@NonNull Context context,
            IMediaTranscodingService transcodingService) {
        mContext = context;
        mContentResolver = mContext.getContentResolver();
        mPackageName = mContext.getPackageName();
        mPid = Os.getuid();
        mUid = Os.getpid();

    private ITranscodingClient registerClient(IMediaTranscodingService service)
            throws UnsupportedOperationException {
        synchronized (mLock) {
            try {
                // Registers the client with MediaTranscoding service.
            mTranscodingClient = transcodingService.registerClient(
                mTranscodingClient = service.registerClient(
                        mTranscodingClientCallback,
                        mPackageName,
                        mPackageName,
                        IMediaTranscodingService.USE_CALLING_UID,
                        IMediaTranscodingService.USE_CALLING_PID);

                if (mTranscodingClient != null) {
                    mTranscodingClient.asBinder().linkToDeath(() -> onClientDied(), /* flags */ 0);
                }
                return mTranscodingClient;
            } catch (RemoteException re) {
                Log.e(TAG, "Failed to register new client due to exception " + re);
                mTranscodingClient = null;
            }
        }
        throw new UnsupportedOperationException("Failed to register new client");
    }

    /* Private constructor. */
    private MediaTranscodeManager(@NonNull Context context,
            IMediaTranscodingService transcodingService) {
        mContext = context;
        mContentResolver = mContext.getContentResolver();
        mPackageName = mContext.getPackageName();
        mPid = Os.getuid();
        mUid = Os.getpid();
        mTranscodingClient = registerClient(transcodingService);
    }

    @Override
    protected void finalize() {
        try {
            release();
        } catch (Exception ex) {
            Log.e(TAG, "Failed to release");
        }
    }

    public static final class TranscodingRequest {
        /** Uri of the source media file. */
@@ -806,6 +948,16 @@ public final class MediaTranscodeManager {
            mResult = jobResult;
        }

        /**
         * Resubmit the transcoding job to the service.
         *
         * @return true if successfully resubmit the job to the service. False otherwise.
         */
        public synchronized boolean retry() {
            // TODO(hkuang): Implement this.
            return true;
        }

        /**
         * Cancels the transcoding job and notify the listener.
         * If the job happened to finish before being canceled this call is effectively a no-op and
@@ -879,9 +1031,15 @@ public final class MediaTranscodeManager {
     */
    public static MediaTranscodeManager getInstance(@NonNull Context context) {
        // Acquires the MediaTranscoding service.
        IMediaTranscodingService service = IMediaTranscodingService.Stub.asInterface(
                ServiceManager.getService(MEDIA_TRANSCODING_SERVICE));
        IMediaTranscodingService service = getService(false /*retry*/);
        return getInstance(context, service);
    }

    /** Similar as above, but wait till the service is ready. */
    @VisibleForTesting
    public static MediaTranscodeManager getInstance(@NonNull Context context, boolean retry) {
        // Acquires the MediaTranscoding service.
        IMediaTranscodingService service = getService(retry);
        return getInstance(context, service);
    }

@@ -896,6 +1054,7 @@ public final class MediaTranscodeManager {
                sMediaTranscodeManager = new MediaTranscodeManager(context.getApplicationContext(),
                        transcodingService);
            }

            return sMediaTranscodeManager;
        }
    }
+1 −0
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ android_test {
    static_libs: [
        "androidx.test.ext.junit",
        "androidx.test.rules",
        "androidx.test.uiautomator_uiautomator",
        "android-support-test",
        "testng"
    ],
+100 −3
Original line number Diff line number Diff line
@@ -23,15 +23,23 @@ import android.media.MediaTranscodeManager;
import android.media.MediaTranscodeManager.TranscodingJob;
import android.media.MediaTranscodeManager.TranscodingRequest;
import android.net.Uri;
import android.os.Bundle;
import android.os.FileUtils;
import android.os.ParcelFileDescriptor;
import android.test.ActivityInstrumentationTestCase2;
import android.util.Log;

import androidx.test.InstrumentationRegistry;
import androidx.test.uiautomator.UiDevice;

import org.junit.Test;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@@ -127,8 +135,9 @@ public class MediaTranscodeManagerTest
        super.setUp();

        mContext = getInstrumentation().getContext();
        mMediaTranscodeManager = MediaTranscodeManager.getInstance(mContext);
        mMediaTranscodeManager = MediaTranscodeManager.getInstance(mContext, true /*retry*/);
        assertNotNull(mMediaTranscodeManager);
        androidx.test.InstrumentationRegistry.registerInstance(getInstrumentation(), new Bundle());

        // Setup source HEVC file uri.
        mSourceHEVCVideoUri = resourceToUri(mContext, R.raw.VideoOnlyHEVC, "VideoOnlyHEVC.mp4");
@@ -148,8 +157,6 @@ public class MediaTranscodeManagerTest

    @Test
    public void testTranscodingFromHevcToAvc() throws Exception {
        Log.d(TAG, "Starting: testMediaTranscodeManager");

        Semaphore transcodeCompleteSemaphore = new Semaphore(0);

        // Create a file Uri: file:///data/user/0/com.android.mediatranscodingtest/cache/temp.mp4
@@ -194,6 +201,7 @@ public class MediaTranscodeManagerTest
                stats.mAveragePSNR >= PSNR_THRESHOLD);
    }


    @Test
    public void testCancelTranscoding() throws Exception {
        Log.d(TAG, "Starting: testMediaTranscodeManager");
@@ -300,5 +308,94 @@ public class MediaTranscodeManagerTest
        assertTrue("Failed to receive at least 10 progress updates",
                progressUpdateCount.get() > 10);
    }

    // [[ $(adb shell whoami) == "root" ]]
    private boolean checkIfRoot() throws IOException {
        try (ParcelFileDescriptor result = getInstrumentation().getUiAutomation()
                .executeShellCommand("whoami");
             BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(
                     new FileInputStream(result.getFileDescriptor())))) {
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                if (line.contains("root")) {
                    return true;
                }
            }
        }
        return false;
    }

    private String executeShellCommand(String cmd) throws Exception {
        return UiDevice.getInstance(
                InstrumentationRegistry.getInstrumentation()).executeShellCommand(cmd);
    }

    @Test
    public void testHandleTranscoderServiceDied() throws Exception {
        try {
            if (!checkIfRoot()) {
                throw new AssertionError("must be root to run this test; try adb root?");
            } else {
                Log.i(TAG, "Device is root");
            }
        } catch (IOException e) {
            throw new AssertionError(e);
        }

        Semaphore transcodeCompleteSemaphore = new Semaphore(0);
        Semaphore jobStartedSemaphore = new Semaphore(0);

        // Transcode a 15 seconds video, so that the transcoding is not finished when we kill the
        // service.
        Uri srcUri = Uri.parse(ContentResolver.SCHEME_FILE + "://"
                + mContext.getCacheDir().getAbsolutePath() + "/longtest_15s.mp4");
        Uri destinationUri = Uri.parse(ContentResolver.SCHEME_FILE + "://"
                + mContext.getCacheDir().getAbsolutePath() + "/HevcTranscode.mp4");

        TranscodingRequest request =
                new TranscodingRequest.Builder()
                        .setSourceUri(mSourceHEVCVideoUri)
                        .setDestinationUri(destinationUri)
                        .setType(MediaTranscodeManager.TRANSCODING_TYPE_VIDEO)
                        .setPriority(MediaTranscodeManager.PRIORITY_REALTIME)
                        .setVideoTrackFormat(createMediaFormat())
                        .build();
        Executor listenerExecutor = Executors.newSingleThreadExecutor();

        Log.i(TAG, "transcoding to " + createMediaFormat());

        TranscodingJob job = mMediaTranscodeManager.enqueueRequest(request, listenerExecutor,
                transcodingJob -> {
                    Log.d(TAG, "Transcoding completed with result: " + transcodingJob.getResult());
                    assertEquals(transcodingJob.getResult(), TranscodingJob.RESULT_ERROR);
                    transcodeCompleteSemaphore.release();
                });
        assertNotNull(job);

        AtomicInteger progressUpdateCount = new AtomicInteger(0);

        // Set progress update executor and use the same executor as result listener.
        job.setOnProgressUpdateListener(listenerExecutor,
                new TranscodingJob.OnProgressUpdateListener() {
                    @Override
                    public void onProgressUpdate(int newProgress) {
                        if (newProgress > 0) {
                            jobStartedSemaphore.release();
                        }
                    }
                });

        // Wait for progress update so the job is in running state.
        jobStartedSemaphore.tryAcquire(TRANSCODE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
        assertTrue("Job is not running", job.getStatus() == TranscodingJob.STATUS_RUNNING);

        // Kills the service and expects receiving failure of the job.
        executeShellCommand("pkill -f media.transcoding");

        Log.d(TAG, "testMediaTranscodeManager - Waiting for transcode result.");
        boolean finishedOnTime = transcodeCompleteSemaphore.tryAcquire(
                TRANSCODE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
        assertTrue("Invalid job status", job.getStatus() == TranscodingJob.STATUS_FINISHED);
    }
}