Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 3c56184d authored by Jason Simmons's avatar Jason Simmons Committed by Mike Lockwood
Browse files

Ensure that the TRTP retry buffer has contiguous sequence numbers

Previously, sequence numbers for audio packets were assigned by the
TX player before packets were queued to the sender.  This caused a
race between assignment of sequence numbers on audio packets and
sequence numbers on heartbeat packets.  A heartbeat could get queued
and added to the retry buffer before an audio packet with an earlier
sequence number got queued.

This CL centralizes packet sequence number assignment and insertion
into the retry buffer inside AAH_TXSender::doSendPacket_l.  It also
makes explicit what operations can be done on a TRTPPacket before
and after packing.

Change-Id: I6d02eae81061983e4def4f1b3dd7c1625467b151
parent 395575ab
Loading
Loading
Loading
Loading
+138 −1
Original line number Diff line number Diff line
@@ -13,12 +13,15 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define LOG_TAG "LibAAH_RTP"
#include <utils/Log.h>

#include <arpa/inet.h>
#include <string.h>

#include <media/stagefright/MediaDebug.h>

#include "aah_tx_packet.h"

namespace android {
@@ -30,6 +33,130 @@ TRTPPacket::~TRTPPacket() {
    delete mPacket;
}

/*** TRTP packet properties ***/

void TRTPPacket::setSeqNumber(uint16_t val) {
    mSeqNumber = val;

    if (mIsPacked) {
        const int kTRTPSeqNumberOffset = 2;
        uint16_t* buf = reinterpret_cast<uint16_t*>(
            mPacket + kTRTPSeqNumberOffset);
        *buf = htons(mSeqNumber);
    }
}

uint16_t TRTPPacket::getSeqNumber() const {
    return mSeqNumber;
}

void TRTPPacket::setPTS(int64_t val) {
    CHECK(!mIsPacked);
    mPTS = val;
    mPTSValid = true;
}

int64_t TRTPPacket::getPTS() const {
    return mPTS;
}

void TRTPPacket::setEpoch(uint32_t val) {
    mEpoch = val;

    if (mIsPacked) {
        const int kTRTPEpochOffset = 8;
        uint32_t* buf = reinterpret_cast<uint32_t*>(
            mPacket + kTRTPEpochOffset);
        uint32_t val = ntohl(*buf);
        val &= ~(kTRTPEpochMask << kTRTPEpochShift);
        val |= (mEpoch & kTRTPEpochMask) << kTRTPEpochShift;
        *buf = htonl(val);
    }
}

void TRTPPacket::setProgramID(uint16_t val) {
    CHECK(!mIsPacked);
    mProgramID = val;
}

void TRTPPacket::setSubstreamID(uint16_t val) {
    CHECK(!mIsPacked);
    mSubstreamID = val;
}


void TRTPPacket::setClockTransform(const LinearTransform& trans) {
    CHECK(!mIsPacked);
    mClockTranform = trans;
    mClockTranformValid = true;
}

uint8_t* TRTPPacket::getPacket() const {
    CHECK(mIsPacked);
    return mPacket;
}

int TRTPPacket::getPacketLen() const {
    CHECK(mIsPacked);
    return mPacketLen;
}

void TRTPPacket::setExpireTime(nsecs_t val) {
    CHECK(!mIsPacked);
    mExpireTime = val;
}

nsecs_t TRTPPacket::getExpireTime() const {
    return mExpireTime;
}

/*** TRTP audio packet properties ***/

void TRTPAudioPacket::setCodecType(TRTPAudioCodecType val) {
    CHECK(!mIsPacked);
    mCodecType = val;
}

void TRTPAudioPacket::setRandomAccessPoint(bool val) {
    CHECK(!mIsPacked);
    mRandomAccessPoint = val;
}

void TRTPAudioPacket::setDropable(bool val) {
    CHECK(!mIsPacked);
    mDropable = val;
}

void TRTPAudioPacket::setDiscontinuity(bool val) {
    CHECK(!mIsPacked);
    mDiscontinuity = val;
}

void TRTPAudioPacket::setEndOfStream(bool val) {
    CHECK(!mIsPacked);
    mEndOfStream = val;
}

void TRTPAudioPacket::setVolume(uint8_t val) {
    CHECK(!mIsPacked);
    mVolume = val;
}

void TRTPAudioPacket::setAccessUnitData(void* data, int len) {
    CHECK(!mIsPacked);
    mAccessUnitData = data;
    mAccessUnitLen = len;
}

/*** TRTP control packet properties ***/

void TRTPControlPacket::setCommandID(TRTPCommandID val) {
    CHECK(!mIsPacked);
    mCommandID = val;
}

/*** TRTP packet serializers ***/

void TRTPPacket::writeU8(uint8_t*& buf, uint8_t val) {
    *buf = val;
    buf++;
@@ -76,7 +203,7 @@ void TRTPPacket::writeTRTPHeader(uint8_t*& buf,
        writeU32(buf, 0);
    }
    writeU32(buf,
            ((mEpoch & kTRTPEpochMask) << 10) |
            ((mEpoch & kTRTPEpochMask) << kTRTPEpochShift) |
            ((mProgramID & 0x1F) << 5) |
            (mSubstreamID & 0x1F));

@@ -100,6 +227,10 @@ void TRTPPacket::writeTRTPHeader(uint8_t*& buf,
}

bool TRTPAudioPacket::pack() {
    if (mIsPacked) {
        return false;
    }

    int packetLen = kRTPHeaderLen +
                    mAccessUnitLen +
                    TRTPHeaderLen();
@@ -130,6 +261,7 @@ bool TRTPAudioPacket::pack() {

    memcpy(cur, mAccessUnitData, mAccessUnitLen);

    mIsPacked = true;
    return true;
}

@@ -171,6 +303,10 @@ int TRTPAudioPacket::TRTPHeaderLen() const {
}

bool TRTPControlPacket::pack() {
    if (mIsPacked) {
        return false;
    }

    // command packets contain a 2-byte command ID
    int packetLen = kRTPHeaderLen +
                    TRTPHeaderLen() +
@@ -188,6 +324,7 @@ bool TRTPControlPacket::pack() {
    writeTRTPHeader(cur, true, packetLen);
    writeU16(cur, mCommandID);

    mIsPacked = true;
    return true;
}

+29 −32
Original line number Diff line number Diff line
@@ -33,7 +33,8 @@ class TRTPPacket : public RefBase {
    };

    TRTPPacket(TRTPHeaderType headerType)
        : mVersion(2)
        : mIsPacked(false)
        , mVersion(2)
        , mPadding(false)
        , mExtension(false)
        , mCsrcCount(0)
@@ -54,31 +55,28 @@ class TRTPPacket : public RefBase {
  public:
    virtual ~TRTPPacket();

    void setSeqNumber(uint16_t val) { mSeqNumber = val; }
    uint16_t getSeqNumber() const { return mSeqNumber; }
    void setPTS(int64_t val) {
        mPTS = val;
        mPTSValid = true;
    }
    int64_t getPTS() const { return mPTS; }
    void setEpoch(uint32_t val) { mEpoch = val; }
    void setProgramID(uint16_t val) { mProgramID = val; }
    void setSubstreamID(uint16_t val) { mSubstreamID = val; }
    void setClockTransform(const LinearTransform& trans) {
        mClockTranform = trans;
        mClockTranformValid = true;
    }

    uint8_t* getPacket() const { return mPacket; }
    int getPacketLen() const { return mPacketLen; }

    void setExpireTime(nsecs_t val) { mExpireTime = val; }
    nsecs_t getExpireTime() const { return mExpireTime; }
    void setSeqNumber(uint16_t val);
    uint16_t getSeqNumber() const;

    void setPTS(int64_t val);
    int64_t getPTS() const;

    void setEpoch(uint32_t val);
    void setProgramID(uint16_t val);
    void setSubstreamID(uint16_t val);
    void setClockTransform(const LinearTransform& trans);

    uint8_t* getPacket() const;
    int getPacketLen() const;

    void setExpireTime(nsecs_t val);
    nsecs_t getExpireTime() const;

    virtual bool pack() = 0;

    // mask for the number of bits in a TRTP epoch
    static const uint32_t kTRTPEpochMask = (1 << 22) - 1;
    static const int kTRTPEpochShift = 10;

  protected:
    static const int kRTPHeaderLen = 12;
@@ -93,6 +91,8 @@ class TRTPPacket : public RefBase {
    void writeU32(uint8_t*& buf, uint32_t val);
    void writeU64(uint8_t*& buf, uint64_t val);

    bool mIsPacked;

    uint8_t mVersion;
    bool mPadding;
    bool mExtension;
@@ -135,16 +135,13 @@ class TRTPAudioPacket : public TRTPPacket {
        kCodecMPEG1Audio = 3,
    };

    void setCodecType(TRTPAudioCodecType val) { mCodecType = val; }
    void setRandomAccessPoint(bool val) { mRandomAccessPoint = val; }
    void setDropable(bool val) { mDropable = val; }
    void setDiscontinuity(bool val) { mDiscontinuity = val; }
    void setEndOfStream(bool val) { mEndOfStream = val; }
    void setVolume(uint8_t val) { mVolume = val; }
    void setAccessUnitData(void* data, int len) {
        mAccessUnitData = data;
        mAccessUnitLen = len;
    }
    void setCodecType(TRTPAudioCodecType val);
    void setRandomAccessPoint(bool val);
    void setDropable(bool val);
    void setDiscontinuity(bool val);
    void setEndOfStream(bool val);
    void setVolume(uint8_t val);
    void setAccessUnitData(void* data, int len);

    virtual bool pack();

@@ -174,7 +171,7 @@ class TRTPControlPacket : public TRTPPacket {
        kCommandEOS   = 3,
    };

    void setCommandID(TRTPCommandID val) { mCommandID = val; }
    void setCommandID(TRTPCommandID val);

    virtual bool pack();

+0 −1
Original line number Diff line number Diff line
@@ -1075,7 +1075,6 @@ void AAH_TXPlayer::queuePacketToSender_l(const sp<TRTPPacket>& packet) {
            return;
        }

        mAAH_Sender->assignSeqNumber(mEndpoint, packet);
        message->setInt32(AAH_TXSender::kSendPacketIPAddr, mEndpoint.addr);
        message->setInt32(AAH_TXSender::kSendPacketPort, mEndpoint.port);
    }
+23 −58
Original line number Diff line number Diff line
@@ -180,24 +180,6 @@ void AAH_TXSender::unregisterEndpoint(const Endpoint& endpoint) {
    }
}

void AAH_TXSender::assignSeqNumber(const Endpoint& endpoint,
                                       const sp<TRTPPacket>& packet) {
    Mutex::Autolock lock(mEndpointLock);
    assignSeqNumber_l(endpoint, packet);
}

void AAH_TXSender::assignSeqNumber_l(const Endpoint& endpoint,
                                         const sp<TRTPPacket>& packet) {
    EndpointState* eps = mEndpointMap.valueFor(endpoint);
    if (!eps) {
        // the endpoint state has disappeared, so the player that sent this
        // packet must be dead.
        return;
    }
    packet->setEpoch(eps->epoch);
    packet->setSeqNumber(eps->trtpSeqNumber++);
}

void AAH_TXSender::onMessageReceived(const sp<AMessage>& msg) {
    switch (msg->what()) {
        case kWhatSendPacket:
@@ -219,8 +201,6 @@ void AAH_TXSender::onMessageReceived(const sp<AMessage>& msg) {
}

void AAH_TXSender::onSendPacket(const sp<AMessage>& msg) {
    LOGV("*** %s", __PRETTY_FUNCTION__);

    sp<RefBase> obj;
    CHECK(msg->findObject(kSendPacketTRTPPacket, &obj));
    sp<TRTPPacket> packet = static_cast<TRTPPacket*>(obj.get());
@@ -233,19 +213,33 @@ void AAH_TXSender::onSendPacket(const sp<AMessage>& msg) {
    CHECK(msg->findInt32(kSendPacketPort, &port32));
    uint16_t port = port32;

    doSendPacket(packet, ipAddr, port);
    Mutex::Autolock lock(mEndpointLock);
    doSendPacket_l(packet, Endpoint(ipAddr, port));
}

    addToRetryBuffer(Endpoint(ipAddr, port), packet);
void AAH_TXSender::doSendPacket_l(const sp<TRTPPacket>& packet,
                                  const Endpoint& endpoint) {
    EndpointState* eps = mEndpointMap.valueFor(endpoint);
    if (!eps) {
        // the endpoint state has disappeared, so the player that sent this
        // packet must be dead.
        return;
    }

void AAH_TXSender::doSendPacket(sp<TRTPPacket> packet,
                                uint32_t ipAddr,
                                uint16_t port) {
    // assign the packet's sequence number
    packet->setEpoch(eps->epoch);
    packet->setSeqNumber(eps->trtpSeqNumber++);

    // add the packet to the retry buffer
    RetryBuffer& retry = eps->retry;
    retry.push_back(packet);

    // send the packet
    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = ipAddr;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = endpoint.addr;
    addr.sin_port = htons(endpoint.port);

    ssize_t result = sendto(mSocket,
                            packet->getPacket(),
@@ -258,32 +252,7 @@ void AAH_TXSender::doSendPacket(sp<TRTPPacket> packet,
    }
}

void AAH_TXSender::addToRetryBuffer(const Endpoint& endpoint,
                                    const sp<TRTPPacket>& packet) {
    Mutex::Autolock lock(mEndpointLock);

    EndpointState* eps = mEndpointMap.valueFor(endpoint);
    if (!eps) {
        return;
    }

    addToRetryBuffer_l(eps, packet);
}

void AAH_TXSender::addToRetryBuffer_l(EndpointState* eps,
                                      const sp<TRTPPacket>& packet) {
    RetryBuffer& retry = eps->retry;
    retry.push_back(packet);

    LOGV("*** %s seq=%hu size=%d",
         __PRETTY_FUNCTION__,
         packet->getSeqNumber(),
         retry.size());
}

void AAH_TXSender::trimRetryBuffers() {
    LOGV("*** %s", __PRETTY_FUNCTION__);

    Mutex::Autolock lock(mEndpointLock);

    nsecs_t localTimeNow = systemTime();
@@ -302,8 +271,6 @@ void AAH_TXSender::trimRetryBuffers() {
            }
        }

        LOGV("*** %s size=%d", __PRETTY_FUNCTION__, retry.size());

        if (retry.isEmpty() && eps->playerRefCount == 0) {
            endpointsToRemove.add(mEndpointMap.keyAt(i));
        }
@@ -336,13 +303,11 @@ void AAH_TXSender::sendHeartbeats() {
        sp<TRTPControlPacket> packet = new TRTPControlPacket();
        packet->setCommandID(TRTPControlPacket::kCommandNop);

        assignSeqNumber_l(ep, packet);
        packet->setExpireTime(systemTime() +
                              AAH_TXPlayer::kAAHRetryKeepAroundTimeNs);
        packet->pack();

        doSendPacket(packet, ep.addr, ep.port);
        addToRetryBuffer_l(eps, packet);
        doSendPacket_l(packet, ep);
    }

    // schedule the next heartbeat
+3 −9
Original line number Diff line number Diff line
@@ -67,8 +67,6 @@ class AAH_TXSender : public virtual RefBase {

    uint16_t registerEndpoint(const Endpoint& endpoint);
    void unregisterEndpoint(const Endpoint& endpoint);
    void assignSeqNumber(const Endpoint& endpoint,
                         const sp<TRTPPacket>& packet);

    enum {
        kWhatSendPacket,
@@ -106,15 +104,11 @@ class AAH_TXSender : public virtual RefBase {
    friend class AHandlerReflector<AAH_TXSender>;
    void onMessageReceived(const sp<AMessage>& msg);
    void onSendPacket(const sp<AMessage>& msg);
    void doSendPacket(sp<TRTPPacket> packet, uint32_t ipAddr, uint16_t port);
    void addToRetryBuffer(const Endpoint& endpoint,
                          const sp<TRTPPacket>& packet);
    void addToRetryBuffer_l(EndpointState* eps,
                            const sp<TRTPPacket>& packet);
    void doSendPacket_l(const sp<TRTPPacket>& packet,
                        const Endpoint& endpoint);
    void trimRetryBuffers();
    void sendHeartbeats();
    void assignSeqNumber_l(const Endpoint& endpoint,
                           const sp<TRTPPacket>& packet);

    sp<ALooper> mLooper;
    sp<AHandlerReflector<AAH_TXSender> > mReflector;