Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit abb8398e authored by Andreas Huber's avatar Andreas Huber Committed by Android (Google) Code Review
Browse files

Merge "Instead of closing the connection altogether if no UDP packets arrive...

Merge "Instead of closing the connection altogether if no UDP packets arrive after a certain time, try changing transports (to interleaved TCP). Also properly close the sockets on disconnection." into gingerbread
parents 2e86809b f88ca7a0
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -263,6 +263,10 @@ void ARTPConnection::onPollStreams() {
        }
    }

    if (maxSocket == -1) {
        return;
    }

    int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
    CHECK_GE(res, 0);

@@ -292,6 +296,10 @@ void ARTPConnection::onPollStreams() {
             it != mStreams.end(); ++it) {
            StreamInfo *s = &*it;

            if (s->mIsInjected) {
                continue;
            }

            if (s->mNumRTCPPacketsReceived == 0) {
                // We have never received any RTCP packets on this stream,
                // we don't even know where to send a report.
+75 −32
Original line number Diff line number Diff line
@@ -31,8 +31,6 @@
#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/MediaErrors.h>

#define USE_TCP_INTERLEAVED     0

// If no access units are received within 3 secs, assume that the rtp
// stream has ended and signal end of stream.
static int64_t kAccessUnitTimeoutUs = 3000000ll;
@@ -83,7 +81,8 @@ struct MyHandler : public AHandler {
          mFirstAccessUnit(true),
          mFirstAccessUnitNTP(0),
          mNumAccessUnitsReceived(0),
          mCheckPending(false) {
          mCheckPending(false),
          mTryTCPInterleaving(false) {
        mNetLooper->setName("rtsp net");
        mNetLooper->start(false /* runOnCallingThread */,
                          false /* canCallJava */,
@@ -158,7 +157,13 @@ struct MyHandler : public AHandler {

            case 'disc':
            {
                int32_t reconnect;
                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
                    sp<AMessage> reply = new AMessage('conn', id());
                    mConn->connect(mSessionURL.c_str(), reply);
                } else {
                    (new AMessage('quit', id()))->post();
                }
                break;
            }

@@ -325,10 +330,6 @@ struct MyHandler : public AHandler {

                    parsePlayResponse(response);

                    mDoneMsg->setInt32("result", OK);
                    mDoneMsg->post();
                    mDoneMsg = NULL;

                    sp<AMessage> timeout = new AMessage('tiou', id());
                    timeout->post(kStartupTimeoutUs);
                } else {
@@ -342,12 +343,26 @@ struct MyHandler : public AHandler {
            case 'abor':
            {
                for (size_t i = 0; i < mTracks.size(); ++i) {
                    mTracks.editItemAt(i).mPacketSource->signalEOS(
                            ERROR_END_OF_STREAM);
                    TrackInfo *info = &mTracks.editItemAt(i);

                    info->mPacketSource->signalEOS(ERROR_END_OF_STREAM);

                    if (!info->mUsingInterleavedTCP) {
                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);

                        close(info->mRTPSocket);
                        close(info->mRTCPSocket);
                    }
                }
                mTracks.clear();

                sp<AMessage> reply = new AMessage('tear', id());

                int32_t reconnect;
                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
                    reply->setInt32("reconnect", true);
                }

                AString request;
                request = "TEARDOWN ";

@@ -374,6 +389,12 @@ struct MyHandler : public AHandler {
                     << result << " (" << strerror(-result) << ")";

                sp<AMessage> reply = new AMessage('disc', id());

                int32_t reconnect;
                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
                    reply->setInt32("reconnect", true);
                }

                mConn->disconnect(reply);
                break;
            }
@@ -414,6 +435,11 @@ struct MyHandler : public AHandler {
                size_t trackIndex;
                CHECK(msg->findSize("track-index", &trackIndex));

                if (trackIndex >= mTracks.size()) {
                    LOG(ERROR) << "late packets ignored.";
                    break;
                }

                TrackInfo *track = &mTracks.editItemAt(trackIndex);

                int32_t eos;
@@ -457,6 +483,10 @@ struct MyHandler : public AHandler {
                }

                if (mFirstAccessUnit) {
                    mDoneMsg->setInt32("result", OK);
                    mDoneMsg->post();
                    mDoneMsg = NULL;

                    mFirstAccessUnit = false;
                    mFirstAccessUnitNTP = ntpTime;
                }
@@ -583,8 +613,19 @@ struct MyHandler : public AHandler {
            case 'tiou':
            {
                if (mFirstAccessUnit) {
                    if (mTryTCPInterleaving) {
                        LOG(WARNING) << "Never received any data, disconnecting.";
                        (new AMessage('abor', id()))->post();
                    } else {
                        LOG(WARNING)
                            << "Never received any data, switching transports.";

                        mTryTCPInterleaving = true;

                        sp<AMessage> msg = new AMessage('abor', id());
                        msg->setInt32("reconnect", true);
                        msg->post();
                    }
                }
                break;
            }
@@ -705,6 +746,7 @@ private:
    uint64_t mFirstAccessUnitNTP;
    int64_t mNumAccessUnitsReceived;
    bool mCheckPending;
    bool mTryTCPInterleaving;

    struct TrackInfo {
        AString mURL;
@@ -723,6 +765,7 @@ private:
    void setupTrack(size_t index) {
        sp<APacketSource> source =
            new APacketSource(mSessionDesc, index);

        if (source->initCheck() != OK) {
            LOG(WARNING) << "Unsupported format. Ignoring track #"
                         << index << ".";
@@ -754,7 +797,7 @@ private:
        request.append(trackURL);
        request.append(" RTSP/1.0\r\n");

#if USE_TCP_INTERLEAVED
        if (mTryTCPInterleaving) {
            size_t interleaveIndex = 2 * (mTracks.size() - 1);
            info->mUsingInterleavedTCP = true;
            info->mRTPSocket = interleaveIndex;
@@ -764,7 +807,7 @@ private:
            request.append(interleaveIndex);
            request.append("-");
            request.append(interleaveIndex + 1);
#else
        } else {
            unsigned rtpPort;
            ARTPConnection::MakePortPair(
                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
@@ -773,7 +816,7 @@ private:
            request.append(rtpPort);
            request.append("-");
            request.append(rtpPort + 1);
#endif
        }

        request.append("\r\n");