Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 74a6b0f1 authored by Andreas Huber's avatar Andreas Huber Committed by Android (Google) Code Review
Browse files

Merge "RTPReceiver can now track packet loss, account for late arrivals" into jb-mr2-dev

parents 071c6334 2be6121a
Loading
Loading
Loading
Loading
+208 −64
Original line number Diff line number Diff line
@@ -30,11 +30,13 @@
#include <media/stagefright/MediaErrors.h>
#include <media/stagefright/Utils.h>

#define TRACK_PACKET_LOSS       0

namespace android {

////////////////////////////////////////////////////////////////////////////////

struct RTPReceiver::Source : public RefBase {
struct RTPReceiver::Source : public AHandler {
    Source(RTPReceiver *receiver, uint32_t ssrc);

    void onPacketReceived(uint16_t seq, const sp<ABuffer> &buffer);
@@ -44,7 +46,14 @@ struct RTPReceiver::Source : public RefBase {
protected:
    virtual ~Source();

    virtual void onMessageReceived(const sp<AMessage> &msg);

private:
    enum {
        kWhatRetransmit,
        kWhatDeclareLost,
    };

    static const uint32_t kMinSequential = 2;
    static const uint32_t kMaxDropout = 3000;
    static const uint32_t kMaxMisorder = 100;
@@ -67,6 +76,17 @@ private:
    // Ordered by extended seq number.
    List<sp<ABuffer> > mPackets;

    enum StatusBits {
        STATUS_DECLARED_LOST            = 1,
        STATUS_REQUESTED_RETRANSMISSION = 2,
        STATUS_ARRIVED_LATE             = 4,
    };
#if TRACK_PACKET_LOSS
    KeyedVector<int32_t, uint32_t> mLostPackets;
#endif

    void modifyPacketStatus(int32_t extSeqNo, uint32_t mask);

    int32_t mAwaitingExtSeqNo;
    bool mRequestedRetransmission;

@@ -78,12 +98,20 @@ private:
    int32_t mNumDeclaredLost;
    int32_t mNumDeclaredLostPrior;

    int32_t mRetransmitGeneration;
    int32_t mDeclareLostGeneration;
    bool mDeclareLostTimerPending;

    void queuePacket(const sp<ABuffer> &packet);
    void dequeueMore();

    sp<ABuffer> getNextPacket();
    void resync();

    void postRetransmitTimer(int64_t delayUs);
    void postDeclareLostTimer(int64_t delayUs);
    void cancelTimers();

    DISALLOW_EVIL_CONSTRUCTORS(Source);
};

@@ -106,12 +134,71 @@ RTPReceiver::Source::Source(RTPReceiver *receiver, uint32_t ssrc)
      mActivePacketType(-1),
      mNextReportTimeUs(-1ll),
      mNumDeclaredLost(0),
      mNumDeclaredLostPrior(0) {
      mNumDeclaredLostPrior(0),
      mRetransmitGeneration(0),
      mDeclareLostGeneration(0),
      mDeclareLostTimerPending(false) {
}

RTPReceiver::Source::~Source() {
}

void RTPReceiver::Source::onMessageReceived(const sp<AMessage> &msg) {
    switch (msg->what()) {
        case kWhatRetransmit:
        {
            int32_t generation;
            CHECK(msg->findInt32("generation", &generation));

            if (generation != mRetransmitGeneration) {
                break;
            }

            mRequestedRetransmission = true;
            mReceiver->requestRetransmission(mSSRC, mAwaitingExtSeqNo);

            modifyPacketStatus(
                    mAwaitingExtSeqNo, STATUS_REQUESTED_RETRANSMISSION);
            break;
        }

        case kWhatDeclareLost:
        {
            int32_t generation;
            CHECK(msg->findInt32("generation", &generation));

            if (generation != mDeclareLostGeneration) {
                break;
            }

            cancelTimers();

            ALOGV("Lost packet extSeqNo %d %s",
                  mAwaitingExtSeqNo,
                  mRequestedRetransmission ? "*" : "");

            mRequestedRetransmission = false;
            if (mActiveAssembler != NULL) {
                mActiveAssembler->signalDiscontinuity();
            }

            modifyPacketStatus(mAwaitingExtSeqNo, STATUS_DECLARED_LOST);

            // resync();
            ++mAwaitingExtSeqNo;
            ++mNumDeclaredLost;

            mReceiver->notifyPacketLost();

            dequeueMore();
            break;
        }

        default:
            TRESPASS();
    }
}

void RTPReceiver::Source::onPacketReceived(
        uint16_t seq, const sp<ABuffer> &buffer) {
    if (mFirst) {
@@ -164,6 +251,8 @@ void RTPReceiver::Source::queuePacket(const sp<ABuffer> &packet) {
    if (mAwaitingExtSeqNo >= 0 && newExtendedSeqNo < mAwaitingExtSeqNo) {
        // We're no longer interested in these. They're old.
        ALOGV("dropping stale extSeqNo %d", newExtendedSeqNo);

        modifyPacketStatus(newExtendedSeqNo, STATUS_ARRIVED_LATE);
        return;
    }

@@ -230,14 +319,59 @@ void RTPReceiver::Source::dequeueMore() {
        }

        mNextReportTimeUs = nowUs + kReportIntervalUs;

#if TRACK_PACKET_LOSS
        for (size_t i = 0; i < mLostPackets.size(); ++i) {
            int32_t key = mLostPackets.keyAt(i);
            uint32_t value = mLostPackets.valueAt(i);

            AString status;
            if (value & STATUS_REQUESTED_RETRANSMISSION) {
                status.append("retrans ");
            }
            if (value & STATUS_ARRIVED_LATE) {
                status.append("arrived-late ");
            }
            ALOGI("Packet %d declared lost %s", key, status.c_str());
        }
#endif
    }

    for (;;) {
        sp<ABuffer> packet = getNextPacket();
    sp<ABuffer> packet;
    while ((packet = getNextPacket()) != NULL) {
        if (mDeclareLostTimerPending) {
            cancelTimers();
        }

        CHECK_GE(mAwaitingExtSeqNo, 0);
#if TRACK_PACKET_LOSS
        mLostPackets.removeItem(mAwaitingExtSeqNo);
#endif

        int32_t packetType;
        CHECK(packet->meta()->findInt32("PT", &packetType));

        if (packetType != mActivePacketType) {
            mActiveAssembler = mReceiver->makeAssembler(packetType);
            mActivePacketType = packetType;
        }

        if (mActiveAssembler != NULL) {
            status_t err = mActiveAssembler->processPacket(packet);
            if (err != OK) {
                ALOGV("assembler returned error %d", err);
            }
        }

        ++mAwaitingExtSeqNo;
    }

    if (mDeclareLostTimerPending) {
        return;
    }

        if (packet == NULL) {
    if (mPackets.empty()) {
                break;
        return;
    }

    CHECK_GE(mAwaitingExtSeqNo, 0);
@@ -254,7 +388,7 @@ void RTPReceiver::Source::dequeueMore() {
    int64_t maxArrivalTimeUs =
        mFirstArrivalTimeUs + rtpUs - mFirstRTPTimeUs;

            int64_t nowUs = ALooper::GetNowUs();
    nowUs = ALooper::GetNowUs();

    CHECK_LT(mAwaitingExtSeqNo, firstPacket->int32Data());

@@ -263,52 +397,11 @@ void RTPReceiver::Source::dequeueMore() {
          firstPacket->int32Data(),
          maxArrivalTimeUs - nowUs);

            if (maxArrivalTimeUs + kPacketLostAfterUs <= nowUs) {
                ALOGV("Lost packet extSeqNo %d %s",
                      mAwaitingExtSeqNo,
                      mRequestedRetransmission ? "*" : "");

                mRequestedRetransmission = false;
                if (mActiveAssembler != NULL) {
                    mActiveAssembler->signalDiscontinuity();
                }

                // resync();
                ++mAwaitingExtSeqNo;
                ++mNumDeclaredLost;

                mReceiver->notifyPacketLost();
                continue;
            } else if (kRequestRetransmissionAfterUs > 0
                    && maxArrivalTimeUs + kRequestRetransmissionAfterUs <= nowUs
                    && !mRequestedRetransmission
                    && mAwaitingExtSeqNo >= 0) {
                mRequestedRetransmission = true;
                mReceiver->requestRetransmission(mSSRC, mAwaitingExtSeqNo);
                break;
            } else {
                break;
            }
        }

        mRequestedRetransmission = false;
    postDeclareLostTimer(maxArrivalTimeUs + kPacketLostAfterUs);

        int32_t packetType;
        CHECK(packet->meta()->findInt32("PT", &packetType));

        if (packetType != mActivePacketType) {
            mActiveAssembler = mReceiver->makeAssembler(packetType);
            mActivePacketType = packetType;
        }

        if (mActiveAssembler == NULL) {
            continue;
        }

        status_t err = mActiveAssembler->processPacket(packet);
        if (err != OK) {
            ALOGV("assembler returned error %d", err);
        }
    if (kRequestRetransmissionAfterUs > 0ll) {
        postRetransmitTimer(
                maxArrivalTimeUs + kRequestRetransmissionAfterUs);
    }
}

@@ -328,8 +421,6 @@ sp<ABuffer> RTPReceiver::Source::getNextPacket() {
    sp<ABuffer> packet = *mPackets.begin();
    mPackets.erase(mPackets.begin());

    ++mAwaitingExtSeqNo;

    return packet;
}

@@ -404,9 +495,11 @@ void RTPReceiver::Source::addReportBlock(

RTPReceiver::RTPReceiver(
        const sp<ANetworkSession> &netSession,
        const sp<AMessage> &notify)
        const sp<AMessage> &notify,
        uint32_t flags)
    : mNetSession(netSession),
      mNotify(notify),
      mFlags(flags),
      mRTPMode(TRANSPORT_UNDEFINED),
      mRTCPMode(TRANSPORT_UNDEFINED),
      mRTPSessionID(0),
@@ -693,6 +786,20 @@ void RTPReceiver::onNetNotify(bool isRTP, const sp<AMessage> &msg) {
            CHECK(msg->findBuffer("data", &data));

            if (isRTP) {
                if (mFlags & FLAG_AUTO_CONNECT) {
                    AString fromAddr;
                    CHECK(msg->findString("fromAddr", &fromAddr));

                    int32_t fromPort;
                    CHECK(msg->findInt32("fromPort", &fromPort));

                    CHECK_EQ((status_t)OK,
                             connect(
                                 fromAddr.c_str(), fromPort, fromPort + 1));

                    mFlags &= ~FLAG_AUTO_CONNECT;
                }

                onRTPData(data);
            } else {
                onRTCPData(data);
@@ -835,6 +942,8 @@ status_t RTPReceiver::onRTPData(const sp<ABuffer> &buffer) {
    sp<Source> source;
    if (index < 0) {
        source = new Source(this, srcId);
        looper()->registerHandler(source);

        mSources.add(srcId, source);
    } else {
        source = mSources.valueAt(index);
@@ -965,6 +1074,7 @@ sp<RTPReceiver::Assembler> RTPReceiver::makeAssembler(uint8_t packetType) {
    PacketizationMode mode = mPacketTypes.valueAt(index);

    switch (mode) {
        case PACKETIZATION_NONE:
        case PACKETIZATION_TRANSPORT_STREAM:
            return new TSAssembler(mNotify);

@@ -1005,5 +1115,39 @@ void RTPReceiver::requestRetransmission(uint32_t senderSSRC, int32_t extSeqNo) {
     mNetSession->sendRequest(mRTCPSessionID, buf->data(), buf->size());
}

void RTPReceiver::Source::modifyPacketStatus(int32_t extSeqNo, uint32_t mask) {
#if TRACK_PACKET_LOSS
    ssize_t index = mLostPackets.indexOfKey(extSeqNo);
    if (index < 0) {
        mLostPackets.add(extSeqNo, mask);
    } else {
        mLostPackets.editValueAt(index) |= mask;
    }
#endif
}

void RTPReceiver::Source::postRetransmitTimer(int64_t timeUs) {
    int64_t delayUs = timeUs - ALooper::GetNowUs();
    sp<AMessage> msg = new AMessage(kWhatRetransmit, id());
    msg->setInt32("generation", mRetransmitGeneration);
    msg->post(delayUs);
}

void RTPReceiver::Source::postDeclareLostTimer(int64_t timeUs) {
    CHECK(!mDeclareLostTimerPending);
    mDeclareLostTimerPending = true;

    int64_t delayUs = timeUs - ALooper::GetNowUs();
    sp<AMessage> msg = new AMessage(kWhatDeclareLost, id());
    msg->setInt32("generation", mDeclareLostGeneration);
    msg->post(delayUs);
}

void RTPReceiver::Source::cancelTimers() {
    ++mRetransmitGeneration;
    ++mDeclareLostGeneration;
    mDeclareLostTimerPending = false;
}

}  // namespace android
+7 −1
Original line number Diff line number Diff line
@@ -39,9 +39,14 @@ struct RTPReceiver : public RTPBase, public AHandler {
        kWhatAccessUnit,
        kWhatPacketLost,
    };

    enum Flags {
        FLAG_AUTO_CONNECT = 1,
    };
    RTPReceiver(
            const sp<ANetworkSession> &netSession,
            const sp<AMessage> &notify);
            const sp<AMessage> &notify,
            uint32_t flags = 0);

    status_t registerPacketType(
            uint8_t packetType, PacketizationMode mode);
@@ -82,6 +87,7 @@ private:

    sp<ANetworkSession> mNetSession;
    sp<AMessage> mNotify;
    uint32_t mFlags;
    TransportMode mRTPMode;
    TransportMode mRTCPMode;
    int32_t mRTPSessionID;