Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0416da73 authored by Andreas Huber's avatar Andreas Huber
Browse files

Support for RTP packets arriving interleaved with RTSP responses.

Change-Id: Ib32fba257da32a199134cf8943117cf3eaa07a25
parent 913a8925
Loading
Loading
Loading
Loading
+66 −2
Original line number Diff line number Diff line
@@ -57,6 +57,8 @@ struct ARTPConnection::StreamInfo {

    int32_t mNumRTCPPacketsReceived;
    struct sockaddr_in mRemoteRTCPAddr;

    bool mIsInjected;
};

ARTPConnection::ARTPConnection(uint32_t flags)
@@ -72,13 +74,15 @@ void ARTPConnection::addStream(
        int rtpSocket, int rtcpSocket,
        const sp<ASessionDescription> &sessionDesc,
        size_t index,
        const sp<AMessage> &notify) {
        const sp<AMessage> &notify,
        bool injected) {
    sp<AMessage> msg = new AMessage(kWhatAddStream, id());
    msg->setInt32("rtp-socket", rtpSocket);
    msg->setInt32("rtcp-socket", rtcpSocket);
    msg->setObject("session-desc", sessionDesc);
    msg->setSize("index", index);
    msg->setMessage("notify", notify);
    msg->setInt32("injected", injected);
    msg->post();
}

@@ -154,6 +158,12 @@ void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
            break;
        }

        case kWhatInjectPacket:
        {
            onInjectPacket(msg);
            break;
        }

        default:
        {
            TRESPASS();
@@ -172,6 +182,11 @@ void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
    CHECK(msg->findInt32("rtcp-socket", &s));
    info->mRTCPSocket = s;

    int32_t injected;
    CHECK(msg->findInt32("injected", &injected));

    info->mIsInjected = injected;

    sp<RefBase> obj;
    CHECK(msg->findObject("session-desc", &obj));
    info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
@@ -182,8 +197,10 @@ void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
    info->mNumRTCPPacketsReceived = 0;
    memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));

    if (!injected) {
        postPollEvent();
    }
}

void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
    int32_t rtpSocket, rtcpSocket;
@@ -231,6 +248,10 @@ void ARTPConnection::onPollStreams() {
    int maxSocket = -1;
    for (List<StreamInfo>::iterator it = mStreams.begin();
         it != mStreams.end(); ++it) {
        if ((*it).mIsInjected) {
            continue;
        }

        FD_SET(it->mRTPSocket, &rs);
        FD_SET(it->mRTCPSocket, &rs);

@@ -248,6 +269,10 @@ void ARTPConnection::onPollStreams() {
    if (res > 0) {
        for (List<StreamInfo>::iterator it = mStreams.begin();
             it != mStreams.end(); ++it) {
            if ((*it).mIsInjected) {
                continue;
            }

            if (FD_ISSET(it->mRTPSocket, &rs)) {
                receive(&*it, true);
            }
@@ -301,6 +326,8 @@ void ARTPConnection::onPollStreams() {
}

status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
    CHECK(!s->mIsInjected);

    sp<ABuffer> buffer = new ABuffer(65536);

    socklen_t remoteAddrLen =
@@ -559,5 +586,42 @@ sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
    return source;
}

void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
    sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
    msg->setInt32("index", index);
    msg->setObject("buffer", buffer);
    msg->post();
}

void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
    int32_t index;
    CHECK(msg->findInt32("index", &index));

    sp<RefBase> obj;
    CHECK(msg->findObject("buffer", &obj));

    sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());

    List<StreamInfo>::iterator it = mStreams.begin();
    while (it != mStreams.end()
           && it->mRTPSocket != index && it->mRTCPSocket != index) {
        ++it;
    }

    if (it == mStreams.end()) {
        TRESPASS();
    }

    StreamInfo *s = &*it;

    status_t err;
    if (it->mRTPSocket == index) {
        err = parseRTP(s, buffer);
    } else {
        ++s->mNumRTCPPacketsReceived;
        err = parseRTCP(s, buffer);
    }
}

}  // namespace android
+6 −1
Original line number Diff line number Diff line
@@ -38,10 +38,13 @@ struct ARTPConnection : public AHandler {
    void addStream(
            int rtpSocket, int rtcpSocket,
            const sp<ASessionDescription> &sessionDesc, size_t index,
            const sp<AMessage> &notify);
            const sp<AMessage> &notify,
            bool injected);

    void removeStream(int rtpSocket, int rtcpSocket);

    void injectPacket(int index, const sp<ABuffer> &buffer);

    // Creates a pair of UDP datagram sockets bound to adjacent ports
    // (the rtpSocket is bound to an even port, the rtcpSocket to the
    // next higher port).
@@ -57,6 +60,7 @@ private:
        kWhatAddStream,
        kWhatRemoveStream,
        kWhatPollStreams,
        kWhatInjectPacket,
    };

    static const int64_t kSelectTimeoutUs;
@@ -72,6 +76,7 @@ private:
    void onAddStream(const sp<AMessage> &msg);
    void onRemoveStream(const sp<AMessage> &msg);
    void onPollStreams();
    void onInjectPacket(const sp<AMessage> &msg);
    void onSendReceiverReports();

    status_t receive(StreamInfo *info, bool receiveRTP);
+2 −1
Original line number Diff line number Diff line
@@ -83,7 +83,8 @@ status_t ARTPSession::setup(const sp<ASessionDescription> &desc) {
        sp<AMessage> notify = new AMessage(kWhatAccessUnitComplete, id());
        notify->setSize("track-index", mTracks.size() - 1);

        mRTPConn->addStream(rtpSocket, rtcpSocket, mDesc, i, notify);
        mRTPConn->addStream(
                rtpSocket, rtcpSocket, mDesc, i, notify, false /* injected */);

        info->mPacketSource = source;
    }
+78 −10
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@
#include <media/stagefright/foundation/ABuffer.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/MediaErrors.h>

#include <arpa/inet.h>
#include <fcntl.h>
@@ -67,6 +68,12 @@ void ARTSPConnection::sendRequest(
    msg->post();
}

void ARTSPConnection::observeBinaryData(const sp<AMessage> &reply) {
    sp<AMessage> msg = new AMessage(kWhatObserveBinaryData, id());
    msg->setMessage("reply", reply);
    msg->post();
}

void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) {
    switch (msg->what()) {
        case kWhatConnect:
@@ -89,6 +96,12 @@ void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) {
            onReceiveResponse();
            break;

        case kWhatObserveBinaryData:
        {
            CHECK(msg->findMessage("reply", &mObserveBinaryMessage));
            break;
        }

        default:
            TRESPASS();
            break;
@@ -396,16 +409,13 @@ void ARTSPConnection::postReceiveReponseEvent() {
    mReceiveResponseEventPending = true;
}

bool ARTSPConnection::receiveLine(AString *line) {
    line->clear();

    bool sawCR = false;
    for (;;) {
        char c;
        ssize_t n = recv(mSocket, &c, 1, 0);
status_t ARTSPConnection::receive(void *data, size_t size) {
    size_t offset = 0;
    while (offset < size) {
        ssize_t n = recv(mSocket, (uint8_t *)data + offset, size - offset, 0);
        if (n == 0) {
            // Server closed the connection.
            return false;
            return ERROR_IO;
        } else if (n < 0) {
            if (errno == EINTR) {
                continue;
@@ -414,6 +424,22 @@ bool ARTSPConnection::receiveLine(AString *line) {
            TRESPASS();
        }

        offset += (size_t)n;
    }

    return OK;
}

bool ARTSPConnection::receiveLine(AString *line) {
    line->clear();

    bool sawCR = false;
    for (;;) {
        char c;
        if (receive(&c, 1) != OK) {
            return false;
        }

        if (sawCR && c == '\n') {
            line->erase(line->size() - 1, 1);
            return true;
@@ -421,17 +447,59 @@ bool ARTSPConnection::receiveLine(AString *line) {

        line->append(&c, 1);

        if (c == '$' && line->size() == 1) {
            // Special-case for interleaved binary data.
            return true;
        }

        sawCR = (c == '\r');
    }
}

sp<ABuffer> ARTSPConnection::receiveBinaryData() {
    uint8_t x[3];
    if (receive(x, 3) != OK) {
        return NULL;
    }

    sp<ABuffer> buffer = new ABuffer((x[1] << 8) | x[2]);
    if (receive(buffer->data(), buffer->size()) != OK) {
        return NULL;
    }

    buffer->meta()->setInt32("index", (int32_t)x[0]);

    return buffer;
}

bool ARTSPConnection::receiveRTSPReponse() {
    sp<ARTSPResponse> response = new ARTSPResponse;
    AString statusLine;

    if (!receiveLine(&response->mStatusLine)) {
    if (!receiveLine(&statusLine)) {
        return false;
    }

    if (statusLine == "$") {
        sp<ABuffer> buffer = receiveBinaryData();

        if (buffer == NULL) {
            return false;
        }

        if (mObserveBinaryMessage != NULL) {
            sp<AMessage> notify = mObserveBinaryMessage->dup();
            notify->setObject("buffer", buffer);
            notify->post();
        } else {
            LOG(WARNING) << "received binary data, but no one cares.";
        }

        return true;
    }

    sp<ARTSPResponse> response = new ARTSPResponse;
    response->mStatusLine = statusLine;

    LOG(INFO) << "status: " << response->mStatusLine;

    ssize_t space1 = response->mStatusLine.find(" ");
+7 −0
Original line number Diff line number Diff line
@@ -40,6 +40,8 @@ struct ARTSPConnection : public AHandler {

    void sendRequest(const char *request, const sp<AMessage> &reply);

    void observeBinaryData(const sp<AMessage> &reply);

protected:
    virtual ~ARTSPConnection();
    virtual void onMessageReceived(const sp<AMessage> &msg);
@@ -57,6 +59,7 @@ private:
        kWhatCompleteConnection = 'comc',
        kWhatSendRequest        = 'sreq',
        kWhatReceiveResponse    = 'rres',
        kWhatObserveBinaryData  = 'obin',
    };

    static const int64_t kSelectTimeoutUs;
@@ -69,6 +72,8 @@ private:

    KeyedVector<int32_t, sp<AMessage> > mPendingRequests;

    sp<AMessage> mObserveBinaryMessage;

    void onConnect(const sp<AMessage> &msg);
    void onDisconnect(const sp<AMessage> &msg);
    void onCompleteConnection(const sp<AMessage> &msg);
@@ -80,7 +85,9 @@ private:

    // Return false iff something went unrecoverably wrong.
    bool receiveRTSPReponse();
    status_t receive(void *data, size_t size);
    bool receiveLine(AString *line);
    sp<ABuffer> receiveBinaryData();
    bool notifyResponseListener(const sp<ARTSPResponse> &response);

    static bool ParseURL(
Loading