Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f88ca7a0 authored by Andreas Huber's avatar Andreas Huber
Browse files

Instead of closing the connection altogether if no UDP packets arrive after a...

Instead of closing the connection altogether if no UDP packets arrive after a certain time, try changing transports (to interleaved TCP). Also properly close the sockets on disconnection.

Change-Id: Ie8d6a3865a0477e28d4b76bb9038e468451287b1
related-to-bug: 2556656
parent 681c5ff2
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -263,6 +263,10 @@ void ARTPConnection::onPollStreams() {
        }
    }

    if (maxSocket == -1) {
        return;
    }

    int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
    CHECK_GE(res, 0);

@@ -292,6 +296,10 @@ void ARTPConnection::onPollStreams() {
             it != mStreams.end(); ++it) {
            StreamInfo *s = &*it;

            if (s->mIsInjected) {
                continue;
            }

            if (s->mNumRTCPPacketsReceived == 0) {
                // We have never received any RTCP packets on this stream,
                // we don't even know where to send a report.
+75 −32
Original line number Diff line number Diff line
@@ -31,8 +31,6 @@
#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/MediaErrors.h>

#define USE_TCP_INTERLEAVED     0

// If no access units are received within 3 secs, assume that the rtp
// stream has ended and signal end of stream.
static int64_t kAccessUnitTimeoutUs = 3000000ll;
@@ -83,7 +81,8 @@ struct MyHandler : public AHandler {
          mFirstAccessUnit(true),
          mFirstAccessUnitNTP(0),
          mNumAccessUnitsReceived(0),
          mCheckPending(false) {
          mCheckPending(false),
          mTryTCPInterleaving(false) {
        mNetLooper->setName("rtsp net");
        mNetLooper->start(false /* runOnCallingThread */,
                          false /* canCallJava */,
@@ -158,7 +157,13 @@ struct MyHandler : public AHandler {

            case 'disc':
            {
                int32_t reconnect;
                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
                    sp<AMessage> reply = new AMessage('conn', id());
                    mConn->connect(mSessionURL.c_str(), reply);
                } else {
                    (new AMessage('quit', id()))->post();
                }
                break;
            }

@@ -325,10 +330,6 @@ struct MyHandler : public AHandler {

                    parsePlayResponse(response);

                    mDoneMsg->setInt32("result", OK);
                    mDoneMsg->post();
                    mDoneMsg = NULL;

                    sp<AMessage> timeout = new AMessage('tiou', id());
                    timeout->post(kStartupTimeoutUs);
                } else {
@@ -342,12 +343,26 @@ struct MyHandler : public AHandler {
            case 'abor':
            {
                for (size_t i = 0; i < mTracks.size(); ++i) {
                    mTracks.editItemAt(i).mPacketSource->signalEOS(
                            ERROR_END_OF_STREAM);
                    TrackInfo *info = &mTracks.editItemAt(i);

                    info->mPacketSource->signalEOS(ERROR_END_OF_STREAM);

                    if (!info->mUsingInterleavedTCP) {
                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);

                        close(info->mRTPSocket);
                        close(info->mRTCPSocket);
                    }
                }
                mTracks.clear();

                sp<AMessage> reply = new AMessage('tear', id());

                int32_t reconnect;
                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
                    reply->setInt32("reconnect", true);
                }

                AString request;
                request = "TEARDOWN ";

@@ -374,6 +389,12 @@ struct MyHandler : public AHandler {
                     << result << " (" << strerror(-result) << ")";

                sp<AMessage> reply = new AMessage('disc', id());

                int32_t reconnect;
                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
                    reply->setInt32("reconnect", true);
                }

                mConn->disconnect(reply);
                break;
            }
@@ -414,6 +435,11 @@ struct MyHandler : public AHandler {
                size_t trackIndex;
                CHECK(msg->findSize("track-index", &trackIndex));

                if (trackIndex >= mTracks.size()) {
                    LOG(ERROR) << "late packets ignored.";
                    break;
                }

                TrackInfo *track = &mTracks.editItemAt(trackIndex);

                int32_t eos;
@@ -457,6 +483,10 @@ struct MyHandler : public AHandler {
                }

                if (mFirstAccessUnit) {
                    mDoneMsg->setInt32("result", OK);
                    mDoneMsg->post();
                    mDoneMsg = NULL;

                    mFirstAccessUnit = false;
                    mFirstAccessUnitNTP = ntpTime;
                }
@@ -583,8 +613,19 @@ struct MyHandler : public AHandler {
            case 'tiou':
            {
                if (mFirstAccessUnit) {
                    if (mTryTCPInterleaving) {
                        LOG(WARNING) << "Never received any data, disconnecting.";
                        (new AMessage('abor', id()))->post();
                    } else {
                        LOG(WARNING)
                            << "Never received any data, switching transports.";

                        mTryTCPInterleaving = true;

                        sp<AMessage> msg = new AMessage('abor', id());
                        msg->setInt32("reconnect", true);
                        msg->post();
                    }
                }
                break;
            }
@@ -705,6 +746,7 @@ private:
    uint64_t mFirstAccessUnitNTP;
    int64_t mNumAccessUnitsReceived;
    bool mCheckPending;
    bool mTryTCPInterleaving;

    struct TrackInfo {
        AString mURL;
@@ -723,6 +765,7 @@ private:
    void setupTrack(size_t index) {
        sp<APacketSource> source =
            new APacketSource(mSessionDesc, index);

        if (source->initCheck() != OK) {
            LOG(WARNING) << "Unsupported format. Ignoring track #"
                         << index << ".";
@@ -754,7 +797,7 @@ private:
        request.append(trackURL);
        request.append(" RTSP/1.0\r\n");

#if USE_TCP_INTERLEAVED
        if (mTryTCPInterleaving) {
            size_t interleaveIndex = 2 * (mTracks.size() - 1);
            info->mUsingInterleavedTCP = true;
            info->mRTPSocket = interleaveIndex;
@@ -764,7 +807,7 @@ private:
            request.append(interleaveIndex);
            request.append("-");
            request.append(interleaveIndex + 1);
#else
        } else {
            unsigned rtpPort;
            ARTPConnection::MakePortPair(
                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
@@ -773,7 +816,7 @@ private:
            request.append(rtpPort);
            request.append("-");
            request.append(rtpPort + 1);
#endif
        }

        request.append("\r\n");