Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 48c0addf authored by Andreas Huber's avatar Andreas Huber
Browse files

RTPTest updated to allow for UDP/TCP transport and abstracted

where the data is coming from, also added time synchronization.

Change-Id: Iecc2201a2bd17be06f16690a28261bef5b4e439c
parent 30bf97b3
Loading
Loading
Loading
Loading
+228 −49
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@
#include "ANetworkSession.h"
#include "rtp/RTPSender.h"
#include "rtp/RTPReceiver.h"
#include "TimeSyncer.h"

#include <binder/ProcessState.h>
#include <media/stagefright/foundation/ABuffer.h>
@@ -28,12 +29,115 @@
#include <media/stagefright/foundation/AHandler.h>
#include <media/stagefright/foundation/ALooper.h>
#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/foundation/hexdump.h>
#include <media/stagefright/DataSource.h>
#include <media/stagefright/MediaDefs.h>
#include <media/stagefright/NuMediaExtractor.h>
#include <media/stagefright/Utils.h>

namespace android {

struct PacketSource : public RefBase {
    PacketSource() {}

    virtual sp<ABuffer> getNextAccessUnit() = 0;

protected:
    virtual ~PacketSource() {}

private:
    DISALLOW_EVIL_CONSTRUCTORS(PacketSource);
};

struct MediaPacketSource : public PacketSource {
    MediaPacketSource()
        : mMaxSampleSize(1024 * 1024) {
        mExtractor = new NuMediaExtractor;
        CHECK_EQ((status_t)OK,
                 mExtractor->setDataSource(
                         "/sdcard/Frame Counter HD 30FPS_1080p.mp4"));

        bool haveVideo = false;
        for (size_t i = 0; i < mExtractor->countTracks(); ++i) {
            sp<AMessage> format;
            CHECK_EQ((status_t)OK, mExtractor->getTrackFormat(i, &format));

            AString mime;
            CHECK(format->findString("mime", &mime));

            if (!strcasecmp(MEDIA_MIMETYPE_VIDEO_AVC, mime.c_str())) {
                mExtractor->selectTrack(i);
                haveVideo = true;
                break;
            }
        }

        CHECK(haveVideo);
    }

    virtual sp<ABuffer> getNextAccessUnit() {
        int64_t timeUs;
        status_t err = mExtractor->getSampleTime(&timeUs);

        if (err != OK) {
            return NULL;
        }

        sp<ABuffer> accessUnit = new ABuffer(mMaxSampleSize);
        CHECK_EQ((status_t)OK, mExtractor->readSampleData(accessUnit));

        accessUnit->meta()->setInt64("timeUs", timeUs);

        CHECK_EQ((status_t)OK, mExtractor->advance());

        return accessUnit;
    }

protected:
    virtual ~MediaPacketSource() {
    }

private:
    sp<NuMediaExtractor> mExtractor;
    size_t mMaxSampleSize;

    DISALLOW_EVIL_CONSTRUCTORS(MediaPacketSource);
};

struct SimplePacketSource : public PacketSource {
    SimplePacketSource()
        : mCounter(0) {
    }

    virtual sp<ABuffer> getNextAccessUnit() {
        sp<ABuffer> buffer = new ABuffer(4);
        uint8_t *dst = buffer->data();
        dst[0] = mCounter >> 24;
        dst[1] = (mCounter >> 16) & 0xff;
        dst[2] = (mCounter >> 8) & 0xff;
        dst[3] = mCounter & 0xff;

        buffer->meta()->setInt64("timeUs", mCounter * 1000000ll / kFrameRate);

        ++mCounter;

        return buffer;
    }

protected:
    virtual ~SimplePacketSource() {
    }

private:
    enum {
        kFrameRate = 30
    };

    uint32_t mCounter;

    DISALLOW_EVIL_CONSTRUCTORS(SimplePacketSource);
};

struct TestHandler : public AHandler {
    TestHandler(const sp<ANetworkSession> &netSession);

@@ -52,18 +156,39 @@ private:
        kWhatSenderNotify,
        kWhatSendMore,
        kWhatStop,
        kWhatTimeSyncerNotify,
    };

#if 1
    static const RTPBase::TransportMode kRTPMode = RTPBase::TRANSPORT_UDP;
    static const RTPBase::TransportMode kRTCPMode = RTPBase::TRANSPORT_UDP;
#else
    static const RTPBase::TransportMode kRTPMode = RTPBase::TRANSPORT_TCP;
    static const RTPBase::TransportMode kRTCPMode = RTPBase::TRANSPORT_NONE;
#endif

#if 1
    static const RTPBase::PacketizationMode kPacketizationMode
        = RTPBase::PACKETIZATION_H264;
#else
    static const RTPBase::PacketizationMode kPacketizationMode
        = RTPBase::PACKETIZATION_NONE;
#endif

    sp<ANetworkSession> mNetSession;
    sp<NuMediaExtractor> mExtractor;
    sp<PacketSource> mSource;
    sp<RTPSender> mSender;
    sp<RTPReceiver> mReceiver;

    size_t mMaxSampleSize;
    sp<TimeSyncer> mTimeSyncer;
    bool mTimeSyncerStarted;

    int64_t mFirstTimeRealUs;
    int64_t mFirstTimeMediaUs;

    int64_t mTimeOffsetUs;
    bool mTimeOffsetValid;

    status_t readMore();

    DISALLOW_EVIL_CONSTRUCTORS(TestHandler);
@@ -71,9 +196,11 @@ private:

TestHandler::TestHandler(const sp<ANetworkSession> &netSession)
    : mNetSession(netSession),
      mMaxSampleSize(1024 * 1024),
      mTimeSyncerStarted(false),
      mFirstTimeRealUs(-1ll),
      mFirstTimeMediaUs(-1ll) {
      mFirstTimeMediaUs(-1ll),
      mTimeOffsetUs(-1ll),
      mTimeOffsetValid(false) {
}

TestHandler::~TestHandler() {
@@ -91,23 +218,48 @@ void TestHandler::connect(const char *host, int32_t port) {
    msg->post();
}

static void dumpDelay(int64_t delayMs) {
    static const int64_t kMinDelayMs = 0;
    static const int64_t kMaxDelayMs = 300;

    const char *kPattern = "########################################";
    size_t kPatternSize = strlen(kPattern);

    int n = (kPatternSize * (delayMs - kMinDelayMs))
                / (kMaxDelayMs - kMinDelayMs);

    if (n < 0) {
        n = 0;
    } else if ((size_t)n > kPatternSize) {
        n = kPatternSize;
    }

    ALOGI("(%4lld ms) %s\n",
          delayMs,
          kPattern + kPatternSize - n);
}

void TestHandler::onMessageReceived(const sp<AMessage> &msg) {
    switch (msg->what()) {
        case kWhatListen:
        {
            sp<AMessage> notify = new AMessage(kWhatReceiverNotify, id());
            mReceiver = new RTPReceiver(mNetSession, notify);
            sp<AMessage> notify = new AMessage(kWhatTimeSyncerNotify, id());
            mTimeSyncer = new TimeSyncer(mNetSession, notify);
            looper()->registerHandler(mTimeSyncer);

            notify = new AMessage(kWhatReceiverNotify, id());
            mReceiver = new RTPReceiver(
                    mNetSession, notify, RTPReceiver::FLAG_AUTO_CONNECT);
            looper()->registerHandler(mReceiver);

            CHECK_EQ((status_t)OK,
                     mReceiver->registerPacketType(
                         33, RTPReceiver::PACKETIZATION_H264));
                     mReceiver->registerPacketType(33, kPacketizationMode));

            int32_t receiverRTPPort;
            CHECK_EQ((status_t)OK,
                     mReceiver->initAsync(
                         RTPReceiver::TRANSPORT_UDP,  // rtpMode
                         RTPReceiver::TRANSPORT_UDP,  // rtcpMode
                         kRTPMode,
                         kRTCPMode,
                         &receiverRTPPort));

            printf("picked receiverRTPPort %d\n", receiverRTPPort);
@@ -125,33 +277,23 @@ void TestHandler::onMessageReceived(const sp<AMessage> &msg) {
            AString host;
            CHECK(msg->findString("host", &host));

            sp<AMessage> notify = new AMessage(kWhatTimeSyncerNotify, id());
            mTimeSyncer = new TimeSyncer(mNetSession, notify);
            looper()->registerHandler(mTimeSyncer);
            mTimeSyncer->startServer(8123);

            int32_t receiverRTPPort;
            CHECK(msg->findInt32("port", &receiverRTPPort));

            mExtractor = new NuMediaExtractor;
            CHECK_EQ((status_t)OK,
                     mExtractor->setDataSource(
                             "/sdcard/Frame Counter HD 30FPS_1080p.mp4"));

            bool haveVideo = false;
            for (size_t i = 0; i < mExtractor->countTracks(); ++i) {
                sp<AMessage> format;
                CHECK_EQ((status_t)OK, mExtractor->getTrackFormat(i, &format));

                AString mime;
                CHECK(format->findString("mime", &mime));

                if (!strcasecmp(MEDIA_MIMETYPE_VIDEO_AVC, mime.c_str())) {
                    mExtractor->selectTrack(i);
                    haveVideo = true;
                    break;
                }
            }

            CHECK(haveVideo);
#if 1
            mSource = new MediaPacketSource;
#else
            mSource = new SimplePacketSource;
#endif

            sp<AMessage> notify = new AMessage(kWhatSenderNotify, id());
            notify = new AMessage(kWhatSenderNotify, id());
            mSender = new RTPSender(mNetSession, notify);

            looper()->registerHandler(mSender);

            int32_t senderRTPPort;
@@ -159,9 +301,10 @@ void TestHandler::onMessageReceived(const sp<AMessage> &msg) {
                     mSender->initAsync(
                         host.c_str(),
                         receiverRTPPort,
                         RTPSender::TRANSPORT_UDP,  // rtpMode
                         receiverRTPPort + 1,
                         RTPSender::TRANSPORT_UDP,  // rtcpMode
                         kRTPMode,
                         kRTCPMode == RTPBase::TRANSPORT_NONE
                            ? -1 : receiverRTPPort + 1,
                         kRTCPMode,
                         &senderRTPPort));

            printf("picked senderRTPPort %d\n", senderRTPPort);
@@ -201,7 +344,7 @@ void TestHandler::onMessageReceived(const sp<AMessage> &msg) {

        case kWhatReceiverNotify:
        {
            ALOGI("kWhatReceiverNotify");
            ALOGV("kWhatReceiverNotify");

            int32_t what;
            CHECK(msg->findInt32("what", &what));
@@ -216,8 +359,40 @@ void TestHandler::onMessageReceived(const sp<AMessage> &msg) {
                    break;
                }

                case RTPSender::kWhatError:
                case RTPReceiver::kWhatError:
                    break;

                case RTPReceiver::kWhatAccessUnit:
                {
#if 0
                    if (!mTimeSyncerStarted) {
                        mTimeSyncer->startClient("172.18.41.216", 8123);
                        mTimeSyncerStarted = true;
                    }

                    sp<ABuffer> accessUnit;
                    CHECK(msg->findBuffer("accessUnit", &accessUnit));

                    int64_t timeUs;
                    CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));

                    if (mTimeOffsetValid) {
                        timeUs -= mTimeOffsetUs;
                        int64_t nowUs = ALooper::GetNowUs();
                        int64_t delayMs = (nowUs - timeUs) / 1000ll;

                        dumpDelay(delayMs);
                    }
#endif
                    break;
                }

                case RTPReceiver::kWhatPacketLost:
                    ALOGV("kWhatPacketLost");
                    break;

                default:
                    TRESPASS();
            }
            break;
        }
@@ -231,7 +406,7 @@ void TestHandler::onMessageReceived(const sp<AMessage> &msg) {
                     mSender->queueBuffer(
                         accessUnit,
                         33,
                         RTPSender::PACKETIZATION_H264));
                         kPacketizationMode));

            status_t err = readMore();

@@ -253,31 +428,33 @@ void TestHandler::onMessageReceived(const sp<AMessage> &msg) {
                mSender.clear();
            }

            mExtractor.clear();
            mSource.clear();

            looper()->stop();
            break;
        }

        case kWhatTimeSyncerNotify:
        {
            CHECK(msg->findInt64("offset", &mTimeOffsetUs));
            mTimeOffsetValid = true;
            break;
        }

        default:
            TRESPASS();
    }
}

status_t TestHandler::readMore() {
    int64_t timeUs;
    status_t err = mExtractor->getSampleTime(&timeUs);
    sp<ABuffer> accessUnit = mSource->getNextAccessUnit();

    if (err != OK) {
        return err;
    if (accessUnit == NULL) {
        return ERROR_END_OF_STREAM;
    }

    sp<ABuffer> accessUnit = new ABuffer(mMaxSampleSize);
    CHECK_EQ((status_t)OK, mExtractor->readSampleData(accessUnit));

    accessUnit->meta()->setInt64("timeUs", timeUs);

    CHECK_EQ((status_t)OK, mExtractor->advance());
    int64_t timeUs;
    CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));

    int64_t nowUs = ALooper::GetNowUs();
    int64_t whenUs;
@@ -289,6 +466,8 @@ status_t TestHandler::readMore() {
        whenUs = mFirstTimeRealUs + timeUs - mFirstTimeMediaUs;
    }

    accessUnit->meta()->setInt64("timeUs", whenUs);

    sp<AMessage> msg = new AMessage(kWhatSendMore, id());
    msg->setBuffer("accessUnit", accessUnit);
    msg->post(whenUs - nowUs);