Loading media/libaah_rtp/Android.mk +2 −1 Original line number Diff line number Diff line Loading @@ -17,7 +17,8 @@ LOCAL_SRC_FILES := \ aah_tx_group.cpp \ aah_tx_packet.cpp \ aah_tx_player.cpp \ pipe_event.cpp pipe_event.cpp \ utils.cpp LOCAL_C_INCLUDES := \ frameworks/base/include \ Loading media/libaah_rtp/aah_rx_player.h +32 −11 Original line number Diff line number Diff line Loading @@ -31,6 +31,7 @@ #include "aah_decoder_pump.h" #include "pipe_event.h" #include "utils.h" namespace android { Loading Loading @@ -173,8 +174,7 @@ class AAH_RXPlayer : public MediaPlayerInterface { bool waiting_for_fast_start_; bool fetched_first_packet_; uint64_t rtp_activity_timeout_; bool rtp_activity_timeout_valid_; Timeout rtp_activity_timeout_; DISALLOW_EVIL_CONSTRUCTORS(RXRingBuffer); }; Loading @@ -194,9 +194,26 @@ class AAH_RXPlayer : public MediaPlayerInterface { bool isAboutToUnderflow(); uint32_t getSSRC() const { return ssrc_; } uint16_t getProgramID() const { return (ssrc_ >> 5) & 0x1F; } uint8_t getProgramID() const { return (ssrc_ >> 5) & 0x1F; } status_t getStatus() const { return status_; } void clearInactivityTimeout() { inactivity_timeout_.setTimeout(-1); } void resetInactivityTimeout() { inactivity_timeout_.setTimeout(kInactivityTimeoutMsec); } bool shouldExpire() { // Substreams should always have a positive time until timeout. A // timeout value of 0 indicates that the timer has expired, while a // negative timeout (normally meaning no timeout) is used by some of // the core code to implement a mark and sweep pattern for cleaning // out no longer relevant substreams. return (inactivity_timeout_.msecTillTimeout() <= 0); } protected: virtual ~Substream(); Loading Loading @@ -228,8 +245,10 @@ class AAH_RXPlayer : public MediaPlayerInterface { uint32_t aux_data_expected_size_; sp<AAH_DecoderPump> decoder_; Timeout inactivity_timeout_; static int64_t kAboutToUnderflowThreshold; static const int64_t kAboutToUnderflowThreshold; static const int kInactivityTimeoutMsec; DISALLOW_EVIL_CONSTRUCTORS(Substream); }; Loading @@ -247,7 +266,8 @@ class AAH_RXPlayer : public MediaPlayerInterface { void processRingBuffer(); void processCommandPacket(PacketBuffer* pb); bool processGaps(); int computeNextGapRetransmitTimeout(); void setGapStatus(GapStatus status); void cleanoutExpiredSubstreams(); void fetchAudioFlinger(); PipeEvent wakeup_work_thread_evt_; Loading @@ -268,7 +288,9 @@ class AAH_RXPlayer : public MediaPlayerInterface { SeqNoGap current_gap_; GapStatus current_gap_status_; uint64_t next_retrans_req_time_; Timeout next_retrans_req_timeout_; Timeout ss_cleanout_timeout_; RXRingBuffer ring_buffer_; SubstreamVec substreams_; Loading @@ -282,15 +304,14 @@ class AAH_RXPlayer : public MediaPlayerInterface { static const uint32_t kRetransRequestMagic; static const uint32_t kFastStartRequestMagic; static const uint32_t kRetransNAKMagic; static const uint32_t kGapRerequestTimeoutUSec; static const uint32_t kFastStartTimeoutUSec; static const uint32_t kRTPActivityTimeoutUSec; static const uint32_t kGapRerequestTimeoutMsec; static const uint32_t kFastStartTimeoutMsec; static const uint32_t kRTPActivityTimeoutMsec; static const uint32_t kSSCleanoutTimeoutMsec; static const uint32_t INVOKE_GET_MASTER_VOLUME = 3; static const uint32_t INVOKE_SET_MASTER_VOLUME = 4; static uint64_t monotonicUSecNow(); DISALLOW_EVIL_CONSTRUCTORS(AAH_RXPlayer); }; Loading media/libaah_rtp/aah_rx_player_core.cpp +125 −59 Original line number Diff line number Diff line Loading @@ -37,9 +37,10 @@ const uint32_t AAH_RXPlayer::kRetransNAKMagic = FOURCC('T','n','a','k'); const uint32_t AAH_RXPlayer::kFastStartRequestMagic = FOURCC('T','f','s','t'); const uint32_t AAH_RXPlayer::kGapRerequestTimeoutUSec = 75000; const uint32_t AAH_RXPlayer::kFastStartTimeoutUSec = 800000; const uint32_t AAH_RXPlayer::kRTPActivityTimeoutUSec = 10000000; const uint32_t AAH_RXPlayer::kGapRerequestTimeoutMsec = 75; const uint32_t AAH_RXPlayer::kFastStartTimeoutMsec = 800; const uint32_t AAH_RXPlayer::kRTPActivityTimeoutMsec = 10000; const uint32_t AAH_RXPlayer::kSSCleanoutTimeoutMsec = 1000; static inline int16_t fetchInt16(uint8_t* data) { return static_cast<int16_t>(U16_AT(data)); Loading @@ -53,20 +54,10 @@ static inline int64_t fetchInt64(uint8_t* data) { return static_cast<int64_t>(U64_AT(data)); } uint64_t AAH_RXPlayer::monotonicUSecNow() { struct timespec now; int res = clock_gettime(CLOCK_MONOTONIC, &now); CHECK(res >= 0); uint64_t ret = static_cast<uint64_t>(now.tv_sec) * 1000000; ret += now.tv_nsec / 1000; return ret; } status_t AAH_RXPlayer::startWorkThread() { status_t res; stopWorkThread(); ss_cleanout_timeout_.setTimeout(kSSCleanoutTimeoutMsec); res = thread_wrapper_->run("TRX_Player", PRIORITY_AUDIO); if (res != OK) { Loading Loading @@ -125,7 +116,7 @@ void AAH_RXPlayer::resetPipeline() { substreams_.clear(); current_gap_status_ = kGS_NoGap; setGapStatus(kGS_NoGap); } bool AAH_RXPlayer::setupSocket() { Loading Loading @@ -231,25 +222,26 @@ bool AAH_RXPlayer::threadLoop() { while (!thread_wrapper_->exitPending()) { // Step 1: Wait until there is something to do. int gap_timeout = computeNextGapRetransmitTimeout(); int gap_timeout = next_retrans_req_timeout_.msecTillTimeout(); int ring_timeout = ring_buffer_.computeInactivityTimeout(); int ss_cleanout_timeout = ss_cleanout_timeout_.msecTillTimeout(); int timeout = -1; if (!ring_timeout) { LOGW("RTP inactivity timeout reached, resetting pipeline."); resetPipeline(); timeout = gap_timeout; } else { if (gap_timeout < 0) { timeout = ring_timeout; } else if (ring_timeout < 0) { timeout = gap_timeout; } else { timeout = (gap_timeout < ring_timeout) ? gap_timeout : ring_timeout; continue; } if (!ss_cleanout_timeout) { cleanoutExpiredSubstreams(); continue; } timeout = minTimeout(gap_timeout, timeout); timeout = minTimeout(ring_timeout, timeout); timeout = minTimeout(ss_cleanout_timeout, timeout); if ((0 != timeout) && (!process_more_right_now)) { // Set up the events to wait on. Start with the wakeup pipe. memset(&poll_fds, 0, sizeof(poll_fds)); Loading Loading @@ -564,7 +556,7 @@ void AAH_RXPlayer::processRingBuffer() { } } // Is this a command packet? If so, its not necessarily associate // Is this a command packet? If so, its not necessarily associated // with one particular substream. Just give it to the command // packet handler and then move on. if (4 == payload_type) { Loading Loading @@ -643,32 +635,85 @@ void AAH_RXPlayer::processCommandPacket(PacketBuffer* pb) { return; } bool do_cleanup_pass = false; uint16_t command_id = U16_AT(data + offset); offset += 2; switch (command_id) { case TRTPControlPacket::kCommandNop: // Note: NOPs are frequently used to carry timestamp transformation // updates. If there was a timestamp transform attached to this // payload, it was already taken care of by processRX. break; case TRTPControlPacket::kCommandEOS: // TODO need to differentiate between flush and EOS. Substreams // which have hit EOS need a chance to drain before being destroyed. case TRTPControlPacket::kCommandFlush: { uint16_t program_id = (U32_AT(data + 8) >> 5) & 0x1F; uint8_t program_id = (U32_AT(data + 8) >> 5) & 0x1F; LOGI("*** %s flushing program_id=%d", __PRETTY_FUNCTION__, program_id); Vector<uint32_t> substreams_to_remove; // Flag any programs with the given program ID for cleanup. for (size_t i = 0; i < substreams_.size(); ++i) { sp<Substream> iter = substreams_.valueAt(i); if (iter->getProgramID() == program_id) { iter->shutdown(); substreams_to_remove.add(iter->getSSRC()); const sp<Substream>& stream = substreams_.valueAt(i); if (stream->getProgramID() == program_id) { stream->clearInactivityTimeout(); } } // Make sure we do our cleanup pass at the end of this. do_cleanup_pass = true; } break; case TRTPControlPacket::kCommandAPU: { // Active program update packet. Go over all of our substreams and // either reset the inactivity timer for the substreams listed in // this update packet, or clear the inactivity timer for the // substreams not listed in this update packet. A cleared // inactivity timer will flag a substream for deletion in the // cleanup pass at the end of this function. // The packet must contain at least the 1 byte numActivePrograms // field. if (amt < offset + 1) { return; } uint8_t numActivePrograms = data[offset++]; // If the payload is not long enough to contain the list it promises // to have, just skip it. if (amt < (offset + numActivePrograms)) { return; } for (size_t i = 0; i < substreams_to_remove.size(); ++i) { substreams_.removeItem(substreams_to_remove[i]); // Clear all inactivity timers. for (size_t i = 0; i < substreams_.size(); ++i) { const sp<Substream>& stream = substreams_.valueAt(i); stream->clearInactivityTimeout(); } // Now go over the list of active programs and reset the inactivity // timers for those streams which are currently in the active // program update packet. for (uint8_t j = 0; j < numActivePrograms; ++j) { uint8_t pid = (data[offset + j] & 0x1F); for (size_t i = 0; i < substreams_.size(); ++i) { const sp<Substream>& stream = substreams_.valueAt(i); if (stream->getProgramID() == pid) { stream->resetInactivityTimeout(); } } } // Make sure we do our cleanup pass at the end of this. do_cleanup_pass = true; } break; } if (do_cleanup_pass) cleanoutExpiredSubstreams(); } bool AAH_RXPlayer::processGaps() { Loading Loading @@ -705,18 +750,18 @@ bool AAH_RXPlayer::processGaps() { // this gap and move on. if (!send_retransmit_request && (kGS_NoGap != current_gap_status_) && (0 == computeNextGapRetransmitTimeout())) { (0 == next_retrans_req_timeout_.msecTillTimeout())) { // If out current gap is the fast-start gap, don't bother to skip it // because substreams look like the are about to underflow. if ((kGS_FastStartGap != gap_status) || (current_gap_.end_seq_ != gap.end_seq_)) { for (size_t i = 0; i < substreams_.size(); ++i) { if (substreams_.valueAt(i)->isAboutToUnderflow()) { LOGV("About to underflow, giving up on gap [%hu, %hu]", LOGI("About to underflow, giving up on gap [%hu, %hu]", gap.start_seq_, gap.end_seq_); ring_buffer_.processNAK(); current_gap_status_ = kGS_NoGap; setGapStatus(kGS_NoGap); return true; } } Loading @@ -727,7 +772,7 @@ bool AAH_RXPlayer::processGaps() { send_retransmit_request = true; } } else { current_gap_status_ = kGS_NoGap; setGapStatus(kGS_NoGap); } if (send_retransmit_request) { Loading @@ -738,7 +783,7 @@ bool AAH_RXPlayer::processGaps() { (current_gap_.end_seq_ == gap.end_seq_)) { LOGV("Fast start is taking forever; giving up."); ring_buffer_.processNAK(); current_gap_status_ = kGS_NoGap; setGapStatus(kGS_NoGap); return true; } Loading Loading @@ -777,32 +822,53 @@ bool AAH_RXPlayer::processGaps() { // Update the current gap info. current_gap_ = gap; current_gap_status_ = gap_status; next_retrans_req_time_ = monotonicUSecNow() + ((kGS_FastStartGap == current_gap_status_) ? kFastStartTimeoutUSec : kGapRerequestTimeoutUSec); setGapStatus(gap_status); } return false; } // Compute when its time to send the next gap retransmission in milliseconds. // Returns < 0 for an infinite timeout (no gap) and 0 if its time to retransmit // right now. int AAH_RXPlayer::computeNextGapRetransmitTimeout() { if (kGS_NoGap == current_gap_status_) { return -1; void AAH_RXPlayer::setGapStatus(GapStatus status) { current_gap_status_ = status; switch(current_gap_status_) { case kGS_NormalGap: next_retrans_req_timeout_.setTimeout(kGapRerequestTimeoutMsec); break; case kGS_FastStartGap: next_retrans_req_timeout_.setTimeout(kFastStartTimeoutMsec); break; case kGS_NoGap: default: next_retrans_req_timeout_.setTimeout(-1); break; } } void AAH_RXPlayer::cleanoutExpiredSubstreams() { static const size_t kMaxPerPass = 32; uint32_t to_remove[kMaxPerPass]; size_t cnt, i; int64_t timeout_delta = next_retrans_req_time_ - monotonicUSecNow(); do { for (i = 0, cnt = 0; (i < substreams_.size()) && (cnt < kMaxPerPass); ++i) { const sp<Substream>& stream = substreams_.valueAt(i); if (stream->shouldExpire()) { to_remove[cnt++] = stream->getSSRC(); } } timeout_delta /= 1000; if (timeout_delta <= 0) { return 0; for (i = 0; i < cnt; ++i) { LOGI("Purging substream with SSRC 0x%08x", to_remove[i]); substreams_.removeItem(to_remove[i]); } } while (cnt >= kMaxPerPass); return static_cast<uint32_t>(timeout_delta); ss_cleanout_timeout_.setTimeout(kSSCleanoutTimeoutMsec); } } // namespace android media/libaah_rtp/aah_rx_player_ring_buffer.cpp +3 −14 Original line number Diff line number Diff line Loading @@ -53,7 +53,7 @@ void AAH_RXPlayer::RXRingBuffer::reset() { rd_seq_known_ = false; waiting_for_fast_start_ = true; fetched_first_packet_ = false; rtp_activity_timeout_valid_ = false; rtp_activity_timeout_.setTimeout(-1); } bool AAH_RXPlayer::RXRingBuffer::pushBuffer(PacketBuffer* buf, Loading @@ -62,8 +62,7 @@ bool AAH_RXPlayer::RXRingBuffer::pushBuffer(PacketBuffer* buf, CHECK(NULL != ring_); CHECK(NULL != buf); rtp_activity_timeout_valid_ = true; rtp_activity_timeout_ = monotonicUSecNow() + kRTPActivityTimeoutUSec; rtp_activity_timeout_.setTimeout(kRTPActivityTimeoutMsec); // If the ring buffer is totally reset (we have never received a single // payload) then we don't know the rd sequence number and this should be Loading Loading @@ -328,17 +327,7 @@ void AAH_RXPlayer::RXRingBuffer::processNAK(SeqNoGap* nak) { int AAH_RXPlayer::RXRingBuffer::computeInactivityTimeout() { AutoMutex lock(&lock_); if (!rtp_activity_timeout_valid_) { return -1; } uint64_t now = monotonicUSecNow(); if (rtp_activity_timeout_ <= now) { return 0; } return (rtp_activity_timeout_ - now) / 1000; return rtp_activity_timeout_.msecTillTimeout(); } AAH_RXPlayer::PacketBuffer* Loading media/libaah_rtp/aah_rx_player_substream.cpp +7 −1 Original line number Diff line number Diff line Loading @@ -33,8 +33,9 @@ namespace android { int64_t AAH_RXPlayer::Substream::kAboutToUnderflowThreshold = const int64_t AAH_RXPlayer::Substream::kAboutToUnderflowThreshold = 50ull * 1000; const int AAH_RXPlayer::Substream::kInactivityTimeoutMsec = 10000; AAH_RXPlayer::Substream::Substream(uint32_t ssrc, OMXClient& omx) { ssrc_ = ssrc; Loading @@ -54,6 +55,7 @@ AAH_RXPlayer::Substream::Substream(uint32_t ssrc, OMXClient& omx) { // cleanupBufferInProgress will reset most of the internal state variables. // Just need to make sure that buffer_in_progress_ is NULL before calling. cleanupBufferInProgress(); resetInactivityTimeout(); } AAH_RXPlayer::Substream::~Substream() { Loading Loading @@ -108,6 +110,8 @@ void AAH_RXPlayer::Substream::processPayloadStart(uint8_t* buf, return; } resetInactivityTimeout(); // Do we have a buffer in progress already? If so, abort the buffer. In // theory, this should never happen. If there were a discontinutity in the // stream, the discon in the seq_nos at the RTP level should have already Loading Loading @@ -362,6 +366,8 @@ void AAH_RXPlayer::Substream::processPayloadCont(uint8_t* buf, return; } resetInactivityTimeout(); if (NULL == buffer_in_progress_) { LOGV("TRTP Receiver skipping payload continuation; no buffer currently" " in progress."); Loading Loading
media/libaah_rtp/Android.mk +2 −1 Original line number Diff line number Diff line Loading @@ -17,7 +17,8 @@ LOCAL_SRC_FILES := \ aah_tx_group.cpp \ aah_tx_packet.cpp \ aah_tx_player.cpp \ pipe_event.cpp pipe_event.cpp \ utils.cpp LOCAL_C_INCLUDES := \ frameworks/base/include \ Loading
media/libaah_rtp/aah_rx_player.h +32 −11 Original line number Diff line number Diff line Loading @@ -31,6 +31,7 @@ #include "aah_decoder_pump.h" #include "pipe_event.h" #include "utils.h" namespace android { Loading Loading @@ -173,8 +174,7 @@ class AAH_RXPlayer : public MediaPlayerInterface { bool waiting_for_fast_start_; bool fetched_first_packet_; uint64_t rtp_activity_timeout_; bool rtp_activity_timeout_valid_; Timeout rtp_activity_timeout_; DISALLOW_EVIL_CONSTRUCTORS(RXRingBuffer); }; Loading @@ -194,9 +194,26 @@ class AAH_RXPlayer : public MediaPlayerInterface { bool isAboutToUnderflow(); uint32_t getSSRC() const { return ssrc_; } uint16_t getProgramID() const { return (ssrc_ >> 5) & 0x1F; } uint8_t getProgramID() const { return (ssrc_ >> 5) & 0x1F; } status_t getStatus() const { return status_; } void clearInactivityTimeout() { inactivity_timeout_.setTimeout(-1); } void resetInactivityTimeout() { inactivity_timeout_.setTimeout(kInactivityTimeoutMsec); } bool shouldExpire() { // Substreams should always have a positive time until timeout. A // timeout value of 0 indicates that the timer has expired, while a // negative timeout (normally meaning no timeout) is used by some of // the core code to implement a mark and sweep pattern for cleaning // out no longer relevant substreams. return (inactivity_timeout_.msecTillTimeout() <= 0); } protected: virtual ~Substream(); Loading Loading @@ -228,8 +245,10 @@ class AAH_RXPlayer : public MediaPlayerInterface { uint32_t aux_data_expected_size_; sp<AAH_DecoderPump> decoder_; Timeout inactivity_timeout_; static int64_t kAboutToUnderflowThreshold; static const int64_t kAboutToUnderflowThreshold; static const int kInactivityTimeoutMsec; DISALLOW_EVIL_CONSTRUCTORS(Substream); }; Loading @@ -247,7 +266,8 @@ class AAH_RXPlayer : public MediaPlayerInterface { void processRingBuffer(); void processCommandPacket(PacketBuffer* pb); bool processGaps(); int computeNextGapRetransmitTimeout(); void setGapStatus(GapStatus status); void cleanoutExpiredSubstreams(); void fetchAudioFlinger(); PipeEvent wakeup_work_thread_evt_; Loading @@ -268,7 +288,9 @@ class AAH_RXPlayer : public MediaPlayerInterface { SeqNoGap current_gap_; GapStatus current_gap_status_; uint64_t next_retrans_req_time_; Timeout next_retrans_req_timeout_; Timeout ss_cleanout_timeout_; RXRingBuffer ring_buffer_; SubstreamVec substreams_; Loading @@ -282,15 +304,14 @@ class AAH_RXPlayer : public MediaPlayerInterface { static const uint32_t kRetransRequestMagic; static const uint32_t kFastStartRequestMagic; static const uint32_t kRetransNAKMagic; static const uint32_t kGapRerequestTimeoutUSec; static const uint32_t kFastStartTimeoutUSec; static const uint32_t kRTPActivityTimeoutUSec; static const uint32_t kGapRerequestTimeoutMsec; static const uint32_t kFastStartTimeoutMsec; static const uint32_t kRTPActivityTimeoutMsec; static const uint32_t kSSCleanoutTimeoutMsec; static const uint32_t INVOKE_GET_MASTER_VOLUME = 3; static const uint32_t INVOKE_SET_MASTER_VOLUME = 4; static uint64_t monotonicUSecNow(); DISALLOW_EVIL_CONSTRUCTORS(AAH_RXPlayer); }; Loading
media/libaah_rtp/aah_rx_player_core.cpp +125 −59 Original line number Diff line number Diff line Loading @@ -37,9 +37,10 @@ const uint32_t AAH_RXPlayer::kRetransNAKMagic = FOURCC('T','n','a','k'); const uint32_t AAH_RXPlayer::kFastStartRequestMagic = FOURCC('T','f','s','t'); const uint32_t AAH_RXPlayer::kGapRerequestTimeoutUSec = 75000; const uint32_t AAH_RXPlayer::kFastStartTimeoutUSec = 800000; const uint32_t AAH_RXPlayer::kRTPActivityTimeoutUSec = 10000000; const uint32_t AAH_RXPlayer::kGapRerequestTimeoutMsec = 75; const uint32_t AAH_RXPlayer::kFastStartTimeoutMsec = 800; const uint32_t AAH_RXPlayer::kRTPActivityTimeoutMsec = 10000; const uint32_t AAH_RXPlayer::kSSCleanoutTimeoutMsec = 1000; static inline int16_t fetchInt16(uint8_t* data) { return static_cast<int16_t>(U16_AT(data)); Loading @@ -53,20 +54,10 @@ static inline int64_t fetchInt64(uint8_t* data) { return static_cast<int64_t>(U64_AT(data)); } uint64_t AAH_RXPlayer::monotonicUSecNow() { struct timespec now; int res = clock_gettime(CLOCK_MONOTONIC, &now); CHECK(res >= 0); uint64_t ret = static_cast<uint64_t>(now.tv_sec) * 1000000; ret += now.tv_nsec / 1000; return ret; } status_t AAH_RXPlayer::startWorkThread() { status_t res; stopWorkThread(); ss_cleanout_timeout_.setTimeout(kSSCleanoutTimeoutMsec); res = thread_wrapper_->run("TRX_Player", PRIORITY_AUDIO); if (res != OK) { Loading Loading @@ -125,7 +116,7 @@ void AAH_RXPlayer::resetPipeline() { substreams_.clear(); current_gap_status_ = kGS_NoGap; setGapStatus(kGS_NoGap); } bool AAH_RXPlayer::setupSocket() { Loading Loading @@ -231,25 +222,26 @@ bool AAH_RXPlayer::threadLoop() { while (!thread_wrapper_->exitPending()) { // Step 1: Wait until there is something to do. int gap_timeout = computeNextGapRetransmitTimeout(); int gap_timeout = next_retrans_req_timeout_.msecTillTimeout(); int ring_timeout = ring_buffer_.computeInactivityTimeout(); int ss_cleanout_timeout = ss_cleanout_timeout_.msecTillTimeout(); int timeout = -1; if (!ring_timeout) { LOGW("RTP inactivity timeout reached, resetting pipeline."); resetPipeline(); timeout = gap_timeout; } else { if (gap_timeout < 0) { timeout = ring_timeout; } else if (ring_timeout < 0) { timeout = gap_timeout; } else { timeout = (gap_timeout < ring_timeout) ? gap_timeout : ring_timeout; continue; } if (!ss_cleanout_timeout) { cleanoutExpiredSubstreams(); continue; } timeout = minTimeout(gap_timeout, timeout); timeout = minTimeout(ring_timeout, timeout); timeout = minTimeout(ss_cleanout_timeout, timeout); if ((0 != timeout) && (!process_more_right_now)) { // Set up the events to wait on. Start with the wakeup pipe. memset(&poll_fds, 0, sizeof(poll_fds)); Loading Loading @@ -564,7 +556,7 @@ void AAH_RXPlayer::processRingBuffer() { } } // Is this a command packet? If so, its not necessarily associate // Is this a command packet? If so, its not necessarily associated // with one particular substream. Just give it to the command // packet handler and then move on. if (4 == payload_type) { Loading Loading @@ -643,32 +635,85 @@ void AAH_RXPlayer::processCommandPacket(PacketBuffer* pb) { return; } bool do_cleanup_pass = false; uint16_t command_id = U16_AT(data + offset); offset += 2; switch (command_id) { case TRTPControlPacket::kCommandNop: // Note: NOPs are frequently used to carry timestamp transformation // updates. If there was a timestamp transform attached to this // payload, it was already taken care of by processRX. break; case TRTPControlPacket::kCommandEOS: // TODO need to differentiate between flush and EOS. Substreams // which have hit EOS need a chance to drain before being destroyed. case TRTPControlPacket::kCommandFlush: { uint16_t program_id = (U32_AT(data + 8) >> 5) & 0x1F; uint8_t program_id = (U32_AT(data + 8) >> 5) & 0x1F; LOGI("*** %s flushing program_id=%d", __PRETTY_FUNCTION__, program_id); Vector<uint32_t> substreams_to_remove; // Flag any programs with the given program ID for cleanup. for (size_t i = 0; i < substreams_.size(); ++i) { sp<Substream> iter = substreams_.valueAt(i); if (iter->getProgramID() == program_id) { iter->shutdown(); substreams_to_remove.add(iter->getSSRC()); const sp<Substream>& stream = substreams_.valueAt(i); if (stream->getProgramID() == program_id) { stream->clearInactivityTimeout(); } } // Make sure we do our cleanup pass at the end of this. do_cleanup_pass = true; } break; case TRTPControlPacket::kCommandAPU: { // Active program update packet. Go over all of our substreams and // either reset the inactivity timer for the substreams listed in // this update packet, or clear the inactivity timer for the // substreams not listed in this update packet. A cleared // inactivity timer will flag a substream for deletion in the // cleanup pass at the end of this function. // The packet must contain at least the 1 byte numActivePrograms // field. if (amt < offset + 1) { return; } uint8_t numActivePrograms = data[offset++]; // If the payload is not long enough to contain the list it promises // to have, just skip it. if (amt < (offset + numActivePrograms)) { return; } for (size_t i = 0; i < substreams_to_remove.size(); ++i) { substreams_.removeItem(substreams_to_remove[i]); // Clear all inactivity timers. for (size_t i = 0; i < substreams_.size(); ++i) { const sp<Substream>& stream = substreams_.valueAt(i); stream->clearInactivityTimeout(); } // Now go over the list of active programs and reset the inactivity // timers for those streams which are currently in the active // program update packet. for (uint8_t j = 0; j < numActivePrograms; ++j) { uint8_t pid = (data[offset + j] & 0x1F); for (size_t i = 0; i < substreams_.size(); ++i) { const sp<Substream>& stream = substreams_.valueAt(i); if (stream->getProgramID() == pid) { stream->resetInactivityTimeout(); } } } // Make sure we do our cleanup pass at the end of this. do_cleanup_pass = true; } break; } if (do_cleanup_pass) cleanoutExpiredSubstreams(); } bool AAH_RXPlayer::processGaps() { Loading Loading @@ -705,18 +750,18 @@ bool AAH_RXPlayer::processGaps() { // this gap and move on. if (!send_retransmit_request && (kGS_NoGap != current_gap_status_) && (0 == computeNextGapRetransmitTimeout())) { (0 == next_retrans_req_timeout_.msecTillTimeout())) { // If out current gap is the fast-start gap, don't bother to skip it // because substreams look like the are about to underflow. if ((kGS_FastStartGap != gap_status) || (current_gap_.end_seq_ != gap.end_seq_)) { for (size_t i = 0; i < substreams_.size(); ++i) { if (substreams_.valueAt(i)->isAboutToUnderflow()) { LOGV("About to underflow, giving up on gap [%hu, %hu]", LOGI("About to underflow, giving up on gap [%hu, %hu]", gap.start_seq_, gap.end_seq_); ring_buffer_.processNAK(); current_gap_status_ = kGS_NoGap; setGapStatus(kGS_NoGap); return true; } } Loading @@ -727,7 +772,7 @@ bool AAH_RXPlayer::processGaps() { send_retransmit_request = true; } } else { current_gap_status_ = kGS_NoGap; setGapStatus(kGS_NoGap); } if (send_retransmit_request) { Loading @@ -738,7 +783,7 @@ bool AAH_RXPlayer::processGaps() { (current_gap_.end_seq_ == gap.end_seq_)) { LOGV("Fast start is taking forever; giving up."); ring_buffer_.processNAK(); current_gap_status_ = kGS_NoGap; setGapStatus(kGS_NoGap); return true; } Loading Loading @@ -777,32 +822,53 @@ bool AAH_RXPlayer::processGaps() { // Update the current gap info. current_gap_ = gap; current_gap_status_ = gap_status; next_retrans_req_time_ = monotonicUSecNow() + ((kGS_FastStartGap == current_gap_status_) ? kFastStartTimeoutUSec : kGapRerequestTimeoutUSec); setGapStatus(gap_status); } return false; } // Compute when its time to send the next gap retransmission in milliseconds. // Returns < 0 for an infinite timeout (no gap) and 0 if its time to retransmit // right now. int AAH_RXPlayer::computeNextGapRetransmitTimeout() { if (kGS_NoGap == current_gap_status_) { return -1; void AAH_RXPlayer::setGapStatus(GapStatus status) { current_gap_status_ = status; switch(current_gap_status_) { case kGS_NormalGap: next_retrans_req_timeout_.setTimeout(kGapRerequestTimeoutMsec); break; case kGS_FastStartGap: next_retrans_req_timeout_.setTimeout(kFastStartTimeoutMsec); break; case kGS_NoGap: default: next_retrans_req_timeout_.setTimeout(-1); break; } } void AAH_RXPlayer::cleanoutExpiredSubstreams() { static const size_t kMaxPerPass = 32; uint32_t to_remove[kMaxPerPass]; size_t cnt, i; int64_t timeout_delta = next_retrans_req_time_ - monotonicUSecNow(); do { for (i = 0, cnt = 0; (i < substreams_.size()) && (cnt < kMaxPerPass); ++i) { const sp<Substream>& stream = substreams_.valueAt(i); if (stream->shouldExpire()) { to_remove[cnt++] = stream->getSSRC(); } } timeout_delta /= 1000; if (timeout_delta <= 0) { return 0; for (i = 0; i < cnt; ++i) { LOGI("Purging substream with SSRC 0x%08x", to_remove[i]); substreams_.removeItem(to_remove[i]); } } while (cnt >= kMaxPerPass); return static_cast<uint32_t>(timeout_delta); ss_cleanout_timeout_.setTimeout(kSSCleanoutTimeoutMsec); } } // namespace android
media/libaah_rtp/aah_rx_player_ring_buffer.cpp +3 −14 Original line number Diff line number Diff line Loading @@ -53,7 +53,7 @@ void AAH_RXPlayer::RXRingBuffer::reset() { rd_seq_known_ = false; waiting_for_fast_start_ = true; fetched_first_packet_ = false; rtp_activity_timeout_valid_ = false; rtp_activity_timeout_.setTimeout(-1); } bool AAH_RXPlayer::RXRingBuffer::pushBuffer(PacketBuffer* buf, Loading @@ -62,8 +62,7 @@ bool AAH_RXPlayer::RXRingBuffer::pushBuffer(PacketBuffer* buf, CHECK(NULL != ring_); CHECK(NULL != buf); rtp_activity_timeout_valid_ = true; rtp_activity_timeout_ = monotonicUSecNow() + kRTPActivityTimeoutUSec; rtp_activity_timeout_.setTimeout(kRTPActivityTimeoutMsec); // If the ring buffer is totally reset (we have never received a single // payload) then we don't know the rd sequence number and this should be Loading Loading @@ -328,17 +327,7 @@ void AAH_RXPlayer::RXRingBuffer::processNAK(SeqNoGap* nak) { int AAH_RXPlayer::RXRingBuffer::computeInactivityTimeout() { AutoMutex lock(&lock_); if (!rtp_activity_timeout_valid_) { return -1; } uint64_t now = monotonicUSecNow(); if (rtp_activity_timeout_ <= now) { return 0; } return (rtp_activity_timeout_ - now) / 1000; return rtp_activity_timeout_.msecTillTimeout(); } AAH_RXPlayer::PacketBuffer* Loading
media/libaah_rtp/aah_rx_player_substream.cpp +7 −1 Original line number Diff line number Diff line Loading @@ -33,8 +33,9 @@ namespace android { int64_t AAH_RXPlayer::Substream::kAboutToUnderflowThreshold = const int64_t AAH_RXPlayer::Substream::kAboutToUnderflowThreshold = 50ull * 1000; const int AAH_RXPlayer::Substream::kInactivityTimeoutMsec = 10000; AAH_RXPlayer::Substream::Substream(uint32_t ssrc, OMXClient& omx) { ssrc_ = ssrc; Loading @@ -54,6 +55,7 @@ AAH_RXPlayer::Substream::Substream(uint32_t ssrc, OMXClient& omx) { // cleanupBufferInProgress will reset most of the internal state variables. // Just need to make sure that buffer_in_progress_ is NULL before calling. cleanupBufferInProgress(); resetInactivityTimeout(); } AAH_RXPlayer::Substream::~Substream() { Loading Loading @@ -108,6 +110,8 @@ void AAH_RXPlayer::Substream::processPayloadStart(uint8_t* buf, return; } resetInactivityTimeout(); // Do we have a buffer in progress already? If so, abort the buffer. In // theory, this should never happen. If there were a discontinutity in the // stream, the discon in the seq_nos at the RTP level should have already Loading Loading @@ -362,6 +366,8 @@ void AAH_RXPlayer::Substream::processPayloadCont(uint8_t* buf, return; } resetInactivityTimeout(); if (NULL == buffer_in_progress_) { LOGV("TRTP Receiver skipping payload continuation; no buffer currently" " in progress."); Loading