Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 17a765a1 authored by Andreas Huber's avatar Andreas Huber Committed by Android (Google) Code Review
Browse files

Merge "Support for RTP packets arriving interleaved with RTSP responses." into gingerbread

parents ed101933 0416da73
Loading
Loading
Loading
Loading
+66 −2
Original line number Diff line number Diff line
@@ -57,6 +57,8 @@ struct ARTPConnection::StreamInfo {

    int32_t mNumRTCPPacketsReceived;
    struct sockaddr_in mRemoteRTCPAddr;

    bool mIsInjected;
};

ARTPConnection::ARTPConnection(uint32_t flags)
@@ -72,13 +74,15 @@ void ARTPConnection::addStream(
        int rtpSocket, int rtcpSocket,
        const sp<ASessionDescription> &sessionDesc,
        size_t index,
        const sp<AMessage> &notify) {
        const sp<AMessage> &notify,
        bool injected) {
    sp<AMessage> msg = new AMessage(kWhatAddStream, id());
    msg->setInt32("rtp-socket", rtpSocket);
    msg->setInt32("rtcp-socket", rtcpSocket);
    msg->setObject("session-desc", sessionDesc);
    msg->setSize("index", index);
    msg->setMessage("notify", notify);
    msg->setInt32("injected", injected);
    msg->post();
}

@@ -154,6 +158,12 @@ void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
            break;
        }

        case kWhatInjectPacket:
        {
            onInjectPacket(msg);
            break;
        }

        default:
        {
            TRESPASS();
@@ -172,6 +182,11 @@ void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
    CHECK(msg->findInt32("rtcp-socket", &s));
    info->mRTCPSocket = s;

    int32_t injected;
    CHECK(msg->findInt32("injected", &injected));

    info->mIsInjected = injected;

    sp<RefBase> obj;
    CHECK(msg->findObject("session-desc", &obj));
    info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
@@ -182,8 +197,10 @@ void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
    info->mNumRTCPPacketsReceived = 0;
    memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));

    if (!injected) {
        postPollEvent();
    }
}

void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
    int32_t rtpSocket, rtcpSocket;
@@ -231,6 +248,10 @@ void ARTPConnection::onPollStreams() {
    int maxSocket = -1;
    for (List<StreamInfo>::iterator it = mStreams.begin();
         it != mStreams.end(); ++it) {
        if ((*it).mIsInjected) {
            continue;
        }

        FD_SET(it->mRTPSocket, &rs);
        FD_SET(it->mRTCPSocket, &rs);

@@ -248,6 +269,10 @@ void ARTPConnection::onPollStreams() {
    if (res > 0) {
        for (List<StreamInfo>::iterator it = mStreams.begin();
             it != mStreams.end(); ++it) {
            if ((*it).mIsInjected) {
                continue;
            }

            if (FD_ISSET(it->mRTPSocket, &rs)) {
                receive(&*it, true);
            }
@@ -301,6 +326,8 @@ void ARTPConnection::onPollStreams() {
}

status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
    CHECK(!s->mIsInjected);

    sp<ABuffer> buffer = new ABuffer(65536);

    socklen_t remoteAddrLen =
@@ -559,5 +586,42 @@ sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
    return source;
}

void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
    sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
    msg->setInt32("index", index);
    msg->setObject("buffer", buffer);
    msg->post();
}

void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
    int32_t index;
    CHECK(msg->findInt32("index", &index));

    sp<RefBase> obj;
    CHECK(msg->findObject("buffer", &obj));

    sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());

    List<StreamInfo>::iterator it = mStreams.begin();
    while (it != mStreams.end()
           && it->mRTPSocket != index && it->mRTCPSocket != index) {
        ++it;
    }

    if (it == mStreams.end()) {
        TRESPASS();
    }

    StreamInfo *s = &*it;

    status_t err;
    if (it->mRTPSocket == index) {
        err = parseRTP(s, buffer);
    } else {
        ++s->mNumRTCPPacketsReceived;
        err = parseRTCP(s, buffer);
    }
}

}  // namespace android
+6 −1
Original line number Diff line number Diff line
@@ -38,10 +38,13 @@ struct ARTPConnection : public AHandler {
    void addStream(
            int rtpSocket, int rtcpSocket,
            const sp<ASessionDescription> &sessionDesc, size_t index,
            const sp<AMessage> &notify);
            const sp<AMessage> &notify,
            bool injected);

    void removeStream(int rtpSocket, int rtcpSocket);

    void injectPacket(int index, const sp<ABuffer> &buffer);

    // Creates a pair of UDP datagram sockets bound to adjacent ports
    // (the rtpSocket is bound to an even port, the rtcpSocket to the
    // next higher port).
@@ -57,6 +60,7 @@ private:
        kWhatAddStream,
        kWhatRemoveStream,
        kWhatPollStreams,
        kWhatInjectPacket,
    };

    static const int64_t kSelectTimeoutUs;
@@ -72,6 +76,7 @@ private:
    void onAddStream(const sp<AMessage> &msg);
    void onRemoveStream(const sp<AMessage> &msg);
    void onPollStreams();
    void onInjectPacket(const sp<AMessage> &msg);
    void onSendReceiverReports();

    status_t receive(StreamInfo *info, bool receiveRTP);
+2 −1
Original line number Diff line number Diff line
@@ -83,7 +83,8 @@ status_t ARTPSession::setup(const sp<ASessionDescription> &desc) {
        sp<AMessage> notify = new AMessage(kWhatAccessUnitComplete, id());
        notify->setSize("track-index", mTracks.size() - 1);

        mRTPConn->addStream(rtpSocket, rtcpSocket, mDesc, i, notify);
        mRTPConn->addStream(
                rtpSocket, rtcpSocket, mDesc, i, notify, false /* injected */);

        info->mPacketSource = source;
    }
+78 −10
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@
#include <media/stagefright/foundation/ABuffer.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/MediaErrors.h>

#include <arpa/inet.h>
#include <fcntl.h>
@@ -67,6 +68,12 @@ void ARTSPConnection::sendRequest(
    msg->post();
}

void ARTSPConnection::observeBinaryData(const sp<AMessage> &reply) {
    sp<AMessage> msg = new AMessage(kWhatObserveBinaryData, id());
    msg->setMessage("reply", reply);
    msg->post();
}

void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) {
    switch (msg->what()) {
        case kWhatConnect:
@@ -89,6 +96,12 @@ void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) {
            onReceiveResponse();
            break;

        case kWhatObserveBinaryData:
        {
            CHECK(msg->findMessage("reply", &mObserveBinaryMessage));
            break;
        }

        default:
            TRESPASS();
            break;
@@ -396,16 +409,13 @@ void ARTSPConnection::postReceiveReponseEvent() {
    mReceiveResponseEventPending = true;
}

bool ARTSPConnection::receiveLine(AString *line) {
    line->clear();

    bool sawCR = false;
    for (;;) {
        char c;
        ssize_t n = recv(mSocket, &c, 1, 0);
status_t ARTSPConnection::receive(void *data, size_t size) {
    size_t offset = 0;
    while (offset < size) {
        ssize_t n = recv(mSocket, (uint8_t *)data + offset, size - offset, 0);
        if (n == 0) {
            // Server closed the connection.
            return false;
            return ERROR_IO;
        } else if (n < 0) {
            if (errno == EINTR) {
                continue;
@@ -414,6 +424,22 @@ bool ARTSPConnection::receiveLine(AString *line) {
            TRESPASS();
        }

        offset += (size_t)n;
    }

    return OK;
}

bool ARTSPConnection::receiveLine(AString *line) {
    line->clear();

    bool sawCR = false;
    for (;;) {
        char c;
        if (receive(&c, 1) != OK) {
            return false;
        }

        if (sawCR && c == '\n') {
            line->erase(line->size() - 1, 1);
            return true;
@@ -421,17 +447,59 @@ bool ARTSPConnection::receiveLine(AString *line) {

        line->append(&c, 1);

        if (c == '$' && line->size() == 1) {
            // Special-case for interleaved binary data.
            return true;
        }

        sawCR = (c == '\r');
    }
}

sp<ABuffer> ARTSPConnection::receiveBinaryData() {
    uint8_t x[3];
    if (receive(x, 3) != OK) {
        return NULL;
    }

    sp<ABuffer> buffer = new ABuffer((x[1] << 8) | x[2]);
    if (receive(buffer->data(), buffer->size()) != OK) {
        return NULL;
    }

    buffer->meta()->setInt32("index", (int32_t)x[0]);

    return buffer;
}

bool ARTSPConnection::receiveRTSPReponse() {
    sp<ARTSPResponse> response = new ARTSPResponse;
    AString statusLine;

    if (!receiveLine(&response->mStatusLine)) {
    if (!receiveLine(&statusLine)) {
        return false;
    }

    if (statusLine == "$") {
        sp<ABuffer> buffer = receiveBinaryData();

        if (buffer == NULL) {
            return false;
        }

        if (mObserveBinaryMessage != NULL) {
            sp<AMessage> notify = mObserveBinaryMessage->dup();
            notify->setObject("buffer", buffer);
            notify->post();
        } else {
            LOG(WARNING) << "received binary data, but no one cares.";
        }

        return true;
    }

    sp<ARTSPResponse> response = new ARTSPResponse;
    response->mStatusLine = statusLine;

    LOG(INFO) << "status: " << response->mStatusLine;

    ssize_t space1 = response->mStatusLine.find(" ");
+7 −0
Original line number Diff line number Diff line
@@ -40,6 +40,8 @@ struct ARTSPConnection : public AHandler {

    void sendRequest(const char *request, const sp<AMessage> &reply);

    void observeBinaryData(const sp<AMessage> &reply);

protected:
    virtual ~ARTSPConnection();
    virtual void onMessageReceived(const sp<AMessage> &msg);
@@ -57,6 +59,7 @@ private:
        kWhatCompleteConnection = 'comc',
        kWhatSendRequest        = 'sreq',
        kWhatReceiveResponse    = 'rres',
        kWhatObserveBinaryData  = 'obin',
    };

    static const int64_t kSelectTimeoutUs;
@@ -69,6 +72,8 @@ private:

    KeyedVector<int32_t, sp<AMessage> > mPendingRequests;

    sp<AMessage> mObserveBinaryMessage;

    void onConnect(const sp<AMessage> &msg);
    void onDisconnect(const sp<AMessage> &msg);
    void onCompleteConnection(const sp<AMessage> &msg);
@@ -80,7 +85,9 @@ private:

    // Return false iff something went unrecoverably wrong.
    bool receiveRTSPReponse();
    status_t receive(void *data, size_t size);
    bool receiveLine(AString *line);
    sp<ABuffer> receiveBinaryData();
    bool notifyResponseListener(const sp<ARTSPResponse> &response);

    static bool ParseURL(
Loading