Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 06938878 authored by John Grossman's avatar John Grossman
Browse files

LibAAH_RTP: Fix a stuttering audio bug.



Fix a bug discovered while working on adding unicast mode to the TX/RX
players.  Also some general cleanup/consolidation regarding timeout
code.

The bug went like this.  When a TX player had hit EOS, it would send
an EOS command payload to its receivers.  Later, when application
level code shutdown and cleaned up the player, it would send another.
In situations where there is massive packet loss, there is a chance
that not only did both of the EOS packets get dropped, but that they
never got filled in by the retry algorithm because the receiver gave
up on the RTP gap due to an aboutToUnderflow situation in at least one
of its active substreams.

When this happens, there are two major problems.  First, all of the
substreams associated with the TX player which has now gone away have
become effectively leaked.  They will only get cleaned up if the
entire RTP stream (the TX Group) goes away for 10 seconds or more, or
when the RX Player itself is reset by application level code or a
fatal error.  These substreams are holding decoder and renderer
resources which are probably in very short supply, which is a Bad
Thing.

Second, there is now at least one substream in the RX player which is
never going to receive another payload (its TX player source is gone),
but is still considered to be active by the rx player.  Assuming that
this substream's program was in the play state when the track ended,
there is now at least one substream which is always
"aboutToUnderflow".  From here on out, when the retry algorithm is
attempting to decide whether or not it has the time to attempt to fill
in a gap in the muxed RTP sequence, it always decides that it does not
have the time because of the orphaned substream which is stuck in its
about to underflow state.  This effectively means that the retry
algorithm is completely shut off until the rx player gets reset
somehow (something which does not happen during normal operation).
Since the environment had to be extremely lossy to trigger this chain
of events in the first place, and its probably no better now, your
playback is just going to be chock full of gaps which produces
horrible stuttering in the presentation stage of the system.

Two new failsafes have been introduced to keep the double EOS drop
from causing this.  First, a timeout has been introduced on the
substream level, in addition to the already existing RTP level
timeout.  If a substream fails to receive an activity for 10 seconds
(same timeout as the master RTP timeout), it will be automatically
flushed and purged.

Second, the nature of the master RTP timeout on the transmitter side
has been changed.  Instead of just sending an empty NOP command packet
to indicate that the main RTP stream is still alive, the transmitter
now sends a new time of command packet; the Active Program Update
packet.  This packet contains a list of all the active program ID
attached to this TX group.  Upon receiving one of these APU packets,
RX players reset the inactivity timers for all substreams which are
members of the programs listed in the packet, but they also
immediately purge any substreams associated with programs not present
in the APU.

Between the two of these, no matter how nasty and selective the packet
smashing gremlins in your system happen to be, substreams will always
eventually clean up and avoid getting stuck in a perma-stutter
situation.

Also in this CL:
+ Extract some common utility code into a utils.cpp file so that it
  can be shared across the library.
+ Stop using custom timeout logic in the RXPlayer.  Instead, use the
  common Timeout helper class in utils.cpp.

Signed-off-by: default avatarJohn Grossman <johngro@google.com>
Change-Id: I350869942074f2cae020f719c2911d9092ba8055
parent 42a6382f
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -17,7 +17,8 @@ LOCAL_SRC_FILES := \
    aah_tx_group.cpp \
    aah_tx_packet.cpp \
    aah_tx_player.cpp \
    pipe_event.cpp
    pipe_event.cpp \
    utils.cpp

LOCAL_C_INCLUDES := \
    frameworks/base/include \
+32 −11
Original line number Diff line number Diff line
@@ -31,6 +31,7 @@

#include "aah_decoder_pump.h"
#include "pipe_event.h"
#include "utils.h"

namespace android {

@@ -173,8 +174,7 @@ class AAH_RXPlayer : public MediaPlayerInterface {
        bool           waiting_for_fast_start_;
        bool           fetched_first_packet_;

        uint64_t       rtp_activity_timeout_;
        bool           rtp_activity_timeout_valid_;
        Timeout        rtp_activity_timeout_;

        DISALLOW_EVIL_CONSTRUCTORS(RXRingBuffer);
    };
@@ -194,9 +194,26 @@ class AAH_RXPlayer : public MediaPlayerInterface {

        bool     isAboutToUnderflow();
        uint32_t getSSRC()      const { return ssrc_; }
        uint16_t getProgramID() const { return (ssrc_ >> 5) & 0x1F; }
        uint8_t  getProgramID() const { return (ssrc_ >> 5) & 0x1F; }
        status_t getStatus() const { return status_; }

        void clearInactivityTimeout() {
            inactivity_timeout_.setTimeout(-1);
        }

        void resetInactivityTimeout() {
            inactivity_timeout_.setTimeout(kInactivityTimeoutMsec);
        }

        bool shouldExpire() {
            // Substreams should always have a positive time until timeout.  A
            // timeout value of 0 indicates that the timer has expired, while a
            // negative timeout (normally meaning no timeout) is used by some of
            // the core code to implement a mark and sweep pattern for cleaning
            // out no longer relevant substreams.
            return (inactivity_timeout_.msecTillTimeout() <= 0);
        }

      protected:
        virtual ~Substream();

@@ -228,8 +245,10 @@ class AAH_RXPlayer : public MediaPlayerInterface {
        uint32_t            aux_data_expected_size_;

        sp<AAH_DecoderPump> decoder_;
        Timeout             inactivity_timeout_;

        static int64_t      kAboutToUnderflowThreshold;
        static const int64_t    kAboutToUnderflowThreshold;
        static const int        kInactivityTimeoutMsec;

        DISALLOW_EVIL_CONSTRUCTORS(Substream);
    };
@@ -247,7 +266,8 @@ class AAH_RXPlayer : public MediaPlayerInterface {
    void                processRingBuffer();
    void                processCommandPacket(PacketBuffer* pb);
    bool                processGaps();
    int                 computeNextGapRetransmitTimeout();
    void                setGapStatus(GapStatus status);
    void                cleanoutExpiredSubstreams();
    void                fetchAudioFlinger();

    PipeEvent           wakeup_work_thread_evt_;
@@ -268,7 +288,9 @@ class AAH_RXPlayer : public MediaPlayerInterface {

    SeqNoGap            current_gap_;
    GapStatus           current_gap_status_;
    uint64_t            next_retrans_req_time_;
    Timeout             next_retrans_req_timeout_;

    Timeout             ss_cleanout_timeout_;

    RXRingBuffer        ring_buffer_;
    SubstreamVec        substreams_;
@@ -282,15 +304,14 @@ class AAH_RXPlayer : public MediaPlayerInterface {
    static const uint32_t kRetransRequestMagic;
    static const uint32_t kFastStartRequestMagic;
    static const uint32_t kRetransNAKMagic;
    static const uint32_t kGapRerequestTimeoutUSec;
    static const uint32_t kFastStartTimeoutUSec;
    static const uint32_t kRTPActivityTimeoutUSec;
    static const uint32_t kGapRerequestTimeoutMsec;
    static const uint32_t kFastStartTimeoutMsec;
    static const uint32_t kRTPActivityTimeoutMsec;
    static const uint32_t kSSCleanoutTimeoutMsec;

    static const uint32_t INVOKE_GET_MASTER_VOLUME = 3;
    static const uint32_t INVOKE_SET_MASTER_VOLUME = 4;

    static uint64_t monotonicUSecNow();

    DISALLOW_EVIL_CONSTRUCTORS(AAH_RXPlayer);
};

+125 −59
Original line number Diff line number Diff line
@@ -37,9 +37,10 @@ const uint32_t AAH_RXPlayer::kRetransNAKMagic =
    FOURCC('T','n','a','k');
const uint32_t AAH_RXPlayer::kFastStartRequestMagic =
    FOURCC('T','f','s','t');
const uint32_t AAH_RXPlayer::kGapRerequestTimeoutUSec = 75000;
const uint32_t AAH_RXPlayer::kFastStartTimeoutUSec = 800000;
const uint32_t AAH_RXPlayer::kRTPActivityTimeoutUSec = 10000000;
const uint32_t AAH_RXPlayer::kGapRerequestTimeoutMsec = 75;
const uint32_t AAH_RXPlayer::kFastStartTimeoutMsec = 800;
const uint32_t AAH_RXPlayer::kRTPActivityTimeoutMsec = 10000;
const uint32_t AAH_RXPlayer::kSSCleanoutTimeoutMsec = 1000;

static inline int16_t fetchInt16(uint8_t* data) {
    return static_cast<int16_t>(U16_AT(data));
@@ -53,20 +54,10 @@ static inline int64_t fetchInt64(uint8_t* data) {
    return static_cast<int64_t>(U64_AT(data));
}

uint64_t AAH_RXPlayer::monotonicUSecNow() {
    struct timespec now;
    int res = clock_gettime(CLOCK_MONOTONIC, &now);
    CHECK(res >= 0);

    uint64_t ret = static_cast<uint64_t>(now.tv_sec) * 1000000;
    ret += now.tv_nsec / 1000;

    return ret;
}

status_t AAH_RXPlayer::startWorkThread() {
    status_t res;
    stopWorkThread();
    ss_cleanout_timeout_.setTimeout(kSSCleanoutTimeoutMsec);
    res = thread_wrapper_->run("TRX_Player", PRIORITY_AUDIO);

    if (res != OK) {
@@ -125,7 +116,7 @@ void AAH_RXPlayer::resetPipeline() {

    substreams_.clear();

    current_gap_status_ = kGS_NoGap;
    setGapStatus(kGS_NoGap);
}

bool AAH_RXPlayer::setupSocket() {
@@ -231,25 +222,26 @@ bool AAH_RXPlayer::threadLoop() {

    while (!thread_wrapper_->exitPending()) {
        // Step 1: Wait until there is something to do.
        int gap_timeout = computeNextGapRetransmitTimeout();
        int gap_timeout = next_retrans_req_timeout_.msecTillTimeout();
        int ring_timeout = ring_buffer_.computeInactivityTimeout();
        int ss_cleanout_timeout = ss_cleanout_timeout_.msecTillTimeout();
        int timeout = -1;

        if (!ring_timeout) {
            LOGW("RTP inactivity timeout reached, resetting pipeline.");
            resetPipeline();
            timeout = gap_timeout;
        } else {
            if (gap_timeout < 0) {
                timeout = ring_timeout;
            } else if (ring_timeout < 0) {
                timeout = gap_timeout;
            } else {
                timeout = (gap_timeout < ring_timeout) ? gap_timeout
                                                       : ring_timeout;
            continue;
        }

        if (!ss_cleanout_timeout) {
            cleanoutExpiredSubstreams();
            continue;
        }

        timeout = minTimeout(gap_timeout, timeout);
        timeout = minTimeout(ring_timeout, timeout);
        timeout = minTimeout(ss_cleanout_timeout, timeout);

        if ((0 != timeout) && (!process_more_right_now)) {
            // Set up the events to wait on.  Start with the wakeup pipe.
            memset(&poll_fds, 0, sizeof(poll_fds));
@@ -564,7 +556,7 @@ void AAH_RXPlayer::processRingBuffer() {
                }
            }

            // Is this a command packet?  If so, its not necessarily associate
            // Is this a command packet?  If so, its not necessarily associated
            // with one particular substream.  Just give it to the command
            // packet handler and then move on.
            if (4 == payload_type) {
@@ -643,32 +635,85 @@ void AAH_RXPlayer::processCommandPacket(PacketBuffer* pb) {
        return;
    }

    bool do_cleanup_pass = false;
    uint16_t command_id = U16_AT(data + offset);
    offset += 2;

    switch (command_id) {
        case TRTPControlPacket::kCommandNop:
            // Note: NOPs are frequently used to carry timestamp transformation
            // updates.  If there was a timestamp transform attached to this
            // payload, it was already taken care of by processRX.
            break;

        case TRTPControlPacket::kCommandEOS:
            // TODO need to differentiate between flush and EOS.  Substreams
            // which have hit EOS need a chance to drain before being destroyed.

        case TRTPControlPacket::kCommandFlush: {
            uint16_t program_id = (U32_AT(data + 8) >> 5) & 0x1F;
            uint8_t program_id = (U32_AT(data + 8) >> 5) & 0x1F;
            LOGI("*** %s flushing program_id=%d",
                 __PRETTY_FUNCTION__, program_id);

            Vector<uint32_t> substreams_to_remove;
            // Flag any programs with the given program ID for cleanup.
            for (size_t i = 0; i < substreams_.size(); ++i) {
                sp<Substream> iter = substreams_.valueAt(i);
                if (iter->getProgramID() == program_id) {
                    iter->shutdown();
                    substreams_to_remove.add(iter->getSSRC());
                const sp<Substream>& stream = substreams_.valueAt(i);
                if (stream->getProgramID() == program_id) {
                    stream->clearInactivityTimeout();
                }
            }

            // Make sure we do our cleanup pass at the end of this.
            do_cleanup_pass = true;
        } break;

        case TRTPControlPacket::kCommandAPU: {
            // Active program update packet.  Go over all of our substreams and
            // either reset the inactivity timer for the substreams listed in
            // this update packet, or clear the inactivity timer for the
            // substreams not listed in this update packet.  A cleared
            // inactivity timer will flag a substream for deletion in the
            // cleanup pass at the end of this function.

            // The packet must contain at least the 1 byte numActivePrograms
            // field.
            if (amt < offset + 1) {
                return;
            }
            uint8_t numActivePrograms = data[offset++];

            // If the payload is not long enough to contain the list it promises
            // to have, just skip it.
            if (amt < (offset + numActivePrograms)) {
                return;
            }

            for (size_t i = 0; i < substreams_to_remove.size(); ++i) {
                substreams_.removeItem(substreams_to_remove[i]);
            // Clear all inactivity timers.
            for (size_t i = 0; i < substreams_.size(); ++i) {
                const sp<Substream>& stream = substreams_.valueAt(i);
                stream->clearInactivityTimeout();
            }

            // Now go over the list of active programs and reset the inactivity
            // timers for those streams which are currently in the active
            // program update packet.
            for (uint8_t j = 0; j < numActivePrograms; ++j) {
                uint8_t pid = (data[offset + j] & 0x1F);
                for (size_t i = 0; i < substreams_.size(); ++i) {
                    const sp<Substream>& stream = substreams_.valueAt(i);
                    if (stream->getProgramID() == pid) {
                        stream->resetInactivityTimeout();
                    }
                }
            }

            // Make sure we do our cleanup pass at the end of this.
            do_cleanup_pass = true;
        } break;
    }

    if (do_cleanup_pass)
        cleanoutExpiredSubstreams();
}

bool AAH_RXPlayer::processGaps() {
@@ -705,18 +750,18 @@ bool AAH_RXPlayer::processGaps() {
        // this gap and move on.
        if (!send_retransmit_request &&
           (kGS_NoGap != current_gap_status_) &&
           (0 == computeNextGapRetransmitTimeout())) {

           (0 == next_retrans_req_timeout_.msecTillTimeout())) {
            // If out current gap is the fast-start gap, don't bother to skip it
            // because substreams look like the are about to underflow.
            if ((kGS_FastStartGap != gap_status) ||
                (current_gap_.end_seq_ != gap.end_seq_)) {

                for (size_t i = 0; i < substreams_.size(); ++i) {
                    if (substreams_.valueAt(i)->isAboutToUnderflow()) {
                        LOGV("About to underflow, giving up on gap [%hu, %hu]",
                        LOGI("About to underflow, giving up on gap [%hu, %hu]",
                                gap.start_seq_, gap.end_seq_);
                        ring_buffer_.processNAK();
                        current_gap_status_ = kGS_NoGap;
                        setGapStatus(kGS_NoGap);
                        return true;
                    }
                }
@@ -727,7 +772,7 @@ bool AAH_RXPlayer::processGaps() {
            send_retransmit_request = true;
        }
    } else {
        current_gap_status_ = kGS_NoGap;
        setGapStatus(kGS_NoGap);
    }

    if (send_retransmit_request) {
@@ -738,7 +783,7 @@ bool AAH_RXPlayer::processGaps() {
            (current_gap_.end_seq_ == gap.end_seq_)) {
            LOGV("Fast start is taking forever; giving up.");
            ring_buffer_.processNAK();
            current_gap_status_ = kGS_NoGap;
            setGapStatus(kGS_NoGap);
            return true;
        }

@@ -777,32 +822,53 @@ bool AAH_RXPlayer::processGaps() {

        // Update the current gap info.
        current_gap_ = gap;
        current_gap_status_ = gap_status;
        next_retrans_req_time_ = monotonicUSecNow() +
                               ((kGS_FastStartGap == current_gap_status_)
                                ? kFastStartTimeoutUSec
                                : kGapRerequestTimeoutUSec);
        setGapStatus(gap_status);
    }

    return false;
}

// Compute when its time to send the next gap retransmission in milliseconds.
// Returns < 0 for an infinite timeout (no gap) and 0 if its time to retransmit
// right now.
int AAH_RXPlayer::computeNextGapRetransmitTimeout() {
    if (kGS_NoGap == current_gap_status_) {
        return -1;
void AAH_RXPlayer::setGapStatus(GapStatus status) {
    current_gap_status_ = status;

    switch(current_gap_status_) {
        case kGS_NormalGap:
            next_retrans_req_timeout_.setTimeout(kGapRerequestTimeoutMsec);
            break;

        case kGS_FastStartGap:
            next_retrans_req_timeout_.setTimeout(kFastStartTimeoutMsec);
            break;

        case kGS_NoGap:
        default:
            next_retrans_req_timeout_.setTimeout(-1);
            break;
    }
}

void AAH_RXPlayer::cleanoutExpiredSubstreams() {
    static const size_t kMaxPerPass = 32;
    uint32_t to_remove[kMaxPerPass];
    size_t cnt, i;

    int64_t timeout_delta = next_retrans_req_time_ - monotonicUSecNow();
    do {
        for (i = 0, cnt = 0;
            (i < substreams_.size()) && (cnt < kMaxPerPass);
            ++i) {
            const sp<Substream>& stream = substreams_.valueAt(i);
            if (stream->shouldExpire()) {
                to_remove[cnt++] = stream->getSSRC();
            }
        }

    timeout_delta /= 1000;
    if (timeout_delta <= 0) {
        return 0;
        for (i = 0; i < cnt; ++i) {
            LOGI("Purging substream with SSRC 0x%08x", to_remove[i]);
            substreams_.removeItem(to_remove[i]);
        }
    } while (cnt >= kMaxPerPass);

    return static_cast<uint32_t>(timeout_delta);
    ss_cleanout_timeout_.setTimeout(kSSCleanoutTimeoutMsec);
}

}  // namespace android
+3 −14
Original line number Diff line number Diff line
@@ -53,7 +53,7 @@ void AAH_RXPlayer::RXRingBuffer::reset() {
    rd_seq_known_ = false;
    waiting_for_fast_start_ = true;
    fetched_first_packet_ = false;
    rtp_activity_timeout_valid_ = false;
    rtp_activity_timeout_.setTimeout(-1);
}

bool AAH_RXPlayer::RXRingBuffer::pushBuffer(PacketBuffer* buf,
@@ -62,8 +62,7 @@ bool AAH_RXPlayer::RXRingBuffer::pushBuffer(PacketBuffer* buf,
    CHECK(NULL != ring_);
    CHECK(NULL != buf);

    rtp_activity_timeout_valid_ = true;
    rtp_activity_timeout_ = monotonicUSecNow() + kRTPActivityTimeoutUSec;
    rtp_activity_timeout_.setTimeout(kRTPActivityTimeoutMsec);

    // If the ring buffer is totally reset (we have never received a single
    // payload) then we don't know the rd sequence number and this should be
@@ -328,17 +327,7 @@ void AAH_RXPlayer::RXRingBuffer::processNAK(SeqNoGap* nak) {

int AAH_RXPlayer::RXRingBuffer::computeInactivityTimeout() {
    AutoMutex lock(&lock_);

    if (!rtp_activity_timeout_valid_) {
        return -1;
    }

    uint64_t now = monotonicUSecNow();
    if (rtp_activity_timeout_ <= now) {
        return 0;
    }

    return (rtp_activity_timeout_ - now) / 1000;
    return rtp_activity_timeout_.msecTillTimeout();
}

AAH_RXPlayer::PacketBuffer*
+7 −1
Original line number Diff line number Diff line
@@ -33,8 +33,9 @@

namespace android {

int64_t AAH_RXPlayer::Substream::kAboutToUnderflowThreshold =
const int64_t AAH_RXPlayer::Substream::kAboutToUnderflowThreshold =
    50ull * 1000;
const int AAH_RXPlayer::Substream::kInactivityTimeoutMsec = 10000;

AAH_RXPlayer::Substream::Substream(uint32_t ssrc, OMXClient& omx) {
    ssrc_ = ssrc;
@@ -54,6 +55,7 @@ AAH_RXPlayer::Substream::Substream(uint32_t ssrc, OMXClient& omx) {
    // cleanupBufferInProgress will reset most of the internal state variables.
    // Just need to make sure that buffer_in_progress_ is NULL before calling.
    cleanupBufferInProgress();
    resetInactivityTimeout();
}

AAH_RXPlayer::Substream::~Substream() {
@@ -108,6 +110,8 @@ void AAH_RXPlayer::Substream::processPayloadStart(uint8_t* buf,
        return;
    }

    resetInactivityTimeout();

    // Do we have a buffer in progress already?  If so, abort the buffer.  In
    // theory, this should never happen.  If there were a discontinutity in the
    // stream, the discon in the seq_nos at the RTP level should have already
@@ -362,6 +366,8 @@ void AAH_RXPlayer::Substream::processPayloadCont(uint8_t* buf,
        return;
    }

    resetInactivityTimeout();

    if (NULL == buffer_in_progress_) {
        LOGV("TRTP Receiver skipping payload continuation; no buffer currently"
             " in progress.");
Loading