Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 5f51ebf7 authored by Vikas Reddy Pachika's avatar Vikas Reddy Pachika Committed by Linux Build Service Account
Browse files

rtsp: Add support for rtsp over IPV6

Added missing functionality needed in order to
support RTSP streaming over IPv6 also.

Change-Id: If825664cd2237addd60a45e915061472fb1698d2
parent bbf9dc69
Loading
Loading
Loading
Loading
+306 −0
Original line number Diff line number Diff line
@@ -43,6 +43,12 @@
#include <cutils/properties.h>
#include <media/stagefright/MediaExtractor.h>
#include <media/MediaProfiles.h>
#include <media/stagefright/Utils.h>

//RTSPStream
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netdb.h>

#include "include/ExtendedUtils.h"

@@ -1343,6 +1349,282 @@ bool ExtendedUtils::checkDPFromVOLHeader(const uint8_t *data, size_t size) {
    return retVal;
}

bool ExtendedUtils::RTSPStream::ParseURL_V6(
        AString *host, const char **colonPos) {

    ssize_t bracketEnd = host->find("]");
    ALOGI("ExtendedUtils::ParseURL_V6() : host->c_str() = %s", host->c_str());

    if (bracketEnd > 0) {
        if (host->find(":", bracketEnd) == bracketEnd + 1) {
            *colonPos = host->c_str() + bracketEnd + 1;
        }
    } else {
        return false;
    }

    host->erase(bracketEnd, host->size() - bracketEnd);
    host->erase(0, 1);

    return true;
}

void ExtendedUtils::RTSPStream::MakePortPair_V6(
        int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {

    struct addrinfo hints, *result = NULL;

    ALOGV("ExtendedUtils::RTSPStream::MakePortPair_V6()");

    *rtpSocket = socket(AF_INET6, SOCK_DGRAM, 0);
    CHECK_GE(*rtpSocket, 0);
    bumpSocketBufferSize_V6(*rtpSocket);

    *rtcpSocket = socket(AF_INET6, SOCK_DGRAM, 0);
    CHECK_GE(*rtcpSocket, 0);

    bumpSocketBufferSize_V6(*rtcpSocket);

    /* rand() * 1000 may overflow int type, use long long */
    unsigned start = (unsigned)((rand()* 1000ll)/RAND_MAX) + 15550;
    start &= ~1;

     for (unsigned port = start; port < 65536; port += 2) {
         struct sockaddr_in6 addr;
         addr.sin6_family = AF_INET6;
         addr.sin6_addr = in6addr_any;
         addr.sin6_port = htons(port);

         if (bind(*rtpSocket,
                  (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
             continue;
         }

         addr.sin6_port = htons(port + 1);

         if (bind(*rtcpSocket,
                  (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
             *rtpPort = port;
             ALOGV("END MakePortPair_V6: %u", port);
             return;
         }
    }
    TRESPASS();
}

void ExtendedUtils::RTSPStream::bumpSocketBufferSize_V6(int s) {
    int size = 256 * 1024;
    CHECK_EQ(setsockopt(s, IPPROTO_IPV6, IPV6_RECVPKTINFO, &size, sizeof(size)), 0);
}

bool ExtendedUtils::RTSPStream::pokeAHole_V6(int rtpSocket, int rtcpSocket,
                const AString &transport, AString &sessionHost) {
    struct sockaddr_in addr;
    memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    addr.sin_family = AF_INET6;

    struct addrinfo hints, *result = NULL;
    ALOGV("Inside ExtendedUtils::RTSPStream::pokeAHole_V6");
    memset(&hints, 0, sizeof (hints));
    hints.ai_family = PF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE;

    AString source;
    AString server_port;

    Vector<struct sockaddr_in> s_addrs;
    if (GetAttribute(transport.c_str(), "source", &source)) {
        ALOGI("found 'source' = %s field in Transport response",
            source.c_str());
        int err = getaddrinfo(source.c_str(), NULL, &hints, &result);
        if (err != 0 || result == NULL) {
            ALOGI("no need to poke the hole");
        } else {
            s_addrs.push(*(struct sockaddr_in *)result->ai_addr);
        }
    }

    int err = getaddrinfo(sessionHost.c_str(), NULL, &hints, &result);
    if (err != 0 || result == NULL) {
        ALOGE("Failed to look up address of session host '%s' err:%d(%s)",
            sessionHost.c_str(), err, gai_strerror(err));

        return false;
     } else {
        ALOGD("get the endpoint address of session host");
        addr = (*(struct sockaddr_in *)result->ai_addr);

        if (addr.sin_addr.s_addr == INADDR_NONE || IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
            ALOGI("no need to poke the hole");
        } else if (s_addrs.size() == 0 || s_addrs[0].sin_addr.s_addr != addr.sin_addr.s_addr) {
            s_addrs.push(addr);
        }
    }

    if (s_addrs.size() == 0){
        ALOGW("Failed to get any session address");
        return false;
    }

    if (!GetAttribute(transport.c_str(),
                             "server_port",
                             &server_port)) {
        ALOGW("Missing 'server_port' field in Transport response.");
        return false;
    }

    int rtpPort, rtcpPort;
    if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
            || rtpPort <= 0 || rtpPort > 65535
            || rtcpPort <=0 || rtcpPort > 65535
            || rtcpPort != rtpPort + 1) {
        ALOGE("Server picked invalid RTP/RTCP port pair %s,"
             " RTP port must be even, RTCP port must be one higher.",
             server_port.c_str());

        return false;
    }

    if (rtpPort & 1) {
        ALOGW("Server picked an odd RTP port, it should've picked an "
             "even one, we'll let it pass for now, but this may break "
             "in the future.");
    }

    // Make up an RR/SDES RTCP packet.
    sp<ABuffer> buf = new ABuffer(65536);
    buf->setRange(0, 0);
    addRR(buf);
    addSDES(rtpSocket, buf);

    for (uint32_t i = 0; i < s_addrs.size(); i++){
        addr.sin_addr.s_addr = s_addrs[i].sin_addr.s_addr;

        addr.sin_port = htons(rtpPort);

        ssize_t n = sendto(
                rtpSocket, buf->data(), buf->size(), 0,
                (const sockaddr *)&addr, sizeof(sockaddr_in6));

        if (n < (ssize_t)buf->size()) {
            ALOGE("failed to poke a hole for RTP packets");
            continue;
        }

        addr.sin_port = htons(rtcpPort);

        n = sendto(
                rtcpSocket, buf->data(), buf->size(), 0,
                (const sockaddr *)&addr, sizeof(sockaddr_in6));

        if (n < (ssize_t)buf->size()) {
            ALOGE("failed to poke a hole for RTCP packets");
            continue;
        }

        ALOGV("successfully poked holes for the address = %u", s_addrs[i].sin_addr.s_addr);
    }

    return true;
}

bool ExtendedUtils::RTSPStream::GetAttribute(const char *s, const char *key, AString *value) {
    value->clear();

    size_t keyLen = strlen(key);

    for (;;) {
        while (isspace(*s)) {
            ++s;
        }

        const char *colonPos = strchr(s, ';');

        size_t len =
            (colonPos == NULL) ? strlen(s) : colonPos - s;

        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
            value->setTo(&s[keyLen + 1], len - keyLen - 1);
            return true;
        }

        if (colonPos == NULL) {
            return false;
        }

        s = colonPos + 1;
    }
}

void ExtendedUtils::RTSPStream::addRR(const sp<ABuffer> &buf) {
    uint8_t *ptr = buf->data() + buf->size();
    ptr[0] = 0x80 | 0;
    ptr[1] = 201;  // RR
    ptr[2] = 0;
    ptr[3] = 1;
    ptr[4] = 0xde;  // SSRC
    ptr[5] = 0xad;
    ptr[6] = 0xbe;
    ptr[7] = 0xef;

    buf->setRange(0, buf->size() + 8);
}

void ExtendedUtils::RTSPStream::addSDES(int s, const sp<ABuffer> &buffer) {
    struct sockaddr_in addr;
    socklen_t addrSize = sizeof(addr);
    CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));

    uint8_t *data = buffer->data() + buffer->size();
    data[0] = 0x80 | 1;
    data[1] = 202;  // SDES
    data[4] = 0xde;  // SSRC
    data[5] = 0xad;
    data[6] = 0xbe;
    data[7] = 0xef;

    size_t offset = 8;

    data[offset++] = 1;  // CNAME

    AString cname = "stagefright@";
    cname.append(inet_ntoa(addr.sin_addr));
    data[offset++] = cname.size();

    memcpy(&data[offset], cname.c_str(), cname.size());
    offset += cname.size();

    data[offset++] = 6;  // TOOL

    AString tool = MakeUserAgent();

    data[offset++] = tool.size();

    memcpy(&data[offset], tool.c_str(), tool.size());
    offset += tool.size();

    data[offset++] = 0;

    if ((offset % 4) > 0) {
        size_t count = 4 - (offset % 4);
        switch (count) {
            case 3:
                data[offset++] = 0;
            case 2:
                data[offset++] = 0;
            case 1:
                data[offset++] = 0;
        }
    }

    size_t numWords = (offset / 4) - 1;
    data[2] = numWords >> 8;
    data[3] = numWords & 0xff;

    buffer->setRange(buffer->offset(), buffer->size() + offset);
}

}
#else //ENABLE_AV_ENHANCEMENTS

@@ -1521,6 +1803,30 @@ bool ExtendedUtils::checkDPFromVOLHeader(const uint8_t *data, size_t size) {
    return false;
}

bool ExtendedUtils::RTSPStream::ParseURL_V6(
        AString *host, const char **colonPos) {
    return false;
}

void ExtendedUtils::RTSPStream::MakePortPair_V6(
        int *rtpSocket, int *rtcpSocket, unsigned *rtpPort){}

bool ExtendedUtils::RTSPStream::pokeAHole_V6(int rtpSocket, int rtcpSocket,
        const AString &transport, AString &sessionHost) {
    return false;
}

void ExtendedUtils::RTSPStream::bumpSocketBufferSize_V6(int s) {}

bool ExtendedUtils::RTSPStream::GetAttribute(const char *s, const char *key, AString *value) {
    return false;
}

void ExtendedUtils::RTSPStream::addRR(const sp<ABuffer> &buf) {}

void ExtendedUtils::RTSPStream::addSDES(int s, const sp<ABuffer> &buffer) {}


} // namespace android
#endif //ENABLE_AV_ENHANCEMENTS

+33 −0
Original line number Diff line number Diff line
@@ -45,6 +45,9 @@
#define MIN_BITERATE_AAC 24000
#define MAX_BITERATE_AAC 192000

#define IPV4 4
#define IPV6 6

namespace android {

/*
@@ -166,6 +169,36 @@ struct ExtendedUtils {
        static void getRtpPortRange(unsigned *start, unsigned *end);
    };

    struct RTSPStream {

        static bool ParseURL_V6(
                AString *host, const char **colonPos);

        // Creates a pair of UDP datagram sockets bound to adjacent ports
        // (the rtpSocket is bound to an even port, the rtcpSocket to the
        // next higher port) for IPV6.
        static void MakePortPair_V6(
                int *rtpSocket, int *rtcpSocket, unsigned *rtpPort);

        // In case we're behind NAT, fire off two UDP packets to the remote
        // rtp/rtcp ports to poke a hole into the firewall for future incoming
        // packets. We're going to send an RR/SDES RTCP packet to both of them.
        static bool pokeAHole_V6(int rtpSocket, int rtcpSocket,
                 const AString &transport, AString &sessionHost);

        private:

        static void bumpSocketBufferSize_V6(int s);

        static bool GetAttribute(const char *s, const char *key, AString *value);

        static void addRR(const sp<ABuffer> &buf);

        static void addSDES(int s, const sp<ABuffer> &buffer);

    };


    static const int32_t kNumBFramesPerPFrame = 1;
    static bool mIsQCHWAACEncoder;

+27 −7
Original line number Diff line number Diff line
@@ -31,6 +31,9 @@
#include <media/stagefright/foundation/hexdump.h>

#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>

#include <sys/socket.h>

#include "include/ExtendedUtils.h"
@@ -72,7 +75,8 @@ struct ARTPConnection::StreamInfo {
ARTPConnection::ARTPConnection(uint32_t flags)
    : mFlags(flags),
      mPollEventPending(false),
      mLastReceiverReportTimeUs(-1) {
      mLastReceiverReportTimeUs(-1),
      mIPVersion(IPV4) {
}

ARTPConnection::~ARTPConnection() {
@@ -84,6 +88,7 @@ void ARTPConnection::addStream(
        size_t index,
        const sp<AMessage> &notify,
        bool injected) {
        ALOGV("addStream() rtpSocket:%d rtcpSocket:%d index:%zu injected:%d", rtpSocket, rtcpSocket, index, (int)injected);
    sp<AMessage> msg = new AMessage(kWhatAddStream, id());
    msg->setInt32("rtp-socket", rtpSocket);
    msg->setInt32("rtcp-socket", rtcpSocket);
@@ -95,6 +100,7 @@ void ARTPConnection::addStream(
}

void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
    ALOGV("removeStream() rtpSocket:%d rtcpSocket:%d ", rtpSocket, rtcpSocket);
    sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
    msg->setInt32("rtp-socket", rtpSocket);
    msg->setInt32("rtcp-socket", rtcpSocket);
@@ -106,6 +112,11 @@ static void bumpSocketBufferSize(int s) {
    CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
}

void ARTPConnection::setIPVersion(int ipVersion) {
    mIPVersion = ipVersion;
    ALOGI("IP Version:%d", mIPVersion);
}

//static
void ARTPConnection::MakePortPair(
        int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
@@ -352,12 +363,21 @@ void ARTPConnection::onPollStreams() {
            if (buffer->size() > 0) {
                ALOGV("Sending RR...");

                ssize_t n;
                ssize_t n = 0;
                do {
                    if(mIPVersion == IPV4) {
                        n = sendto(
                            s->mRTCPSocket, buffer->data(), buffer->size(), 0,
                            (const struct sockaddr *)&s->mRemoteRTCPAddr,
                            sizeof(s->mRemoteRTCPAddr));
                    } else if (mIPVersion == IPV6) {
                        n = sendto(
                            s->mRTCPSocket, buffer->data(), buffer->size(), 0,
                            (const struct sockaddr *)&s->mRemoteRTCPAddr,
                            sizeof(struct sockaddr_in6));
                    } else {
                        TRESPASS();
                    }
                } while (n < 0 && errno == EINTR);

                if (n <= 0) {
+3 −0
Original line number Diff line number Diff line
@@ -50,6 +50,8 @@ struct ARTPConnection : public AHandler {
    static void MakePortPair(
            int *rtpSocket, int *rtcpSocket, unsigned *rtpPort);

    void setIPVersion(int ipVersion);

protected:
    virtual ~ARTPConnection();
    virtual void onMessageReceived(const sp<AMessage> &msg);
@@ -71,6 +73,7 @@ private:

    bool mPollEventPending;
    int64_t mLastReceiverReportTimeUs;
    int mIPVersion;

    void onAddStream(const sp<AMessage> &msg);
    void onRemoveStream(const sp<AMessage> &msg);
+108 −50
Original line number Diff line number Diff line
@@ -34,6 +34,7 @@
#include <sys/socket.h>

#include "include/HTTPBase.h"
#include "include/ExtendedUtils.h"

namespace android {

@@ -170,7 +171,19 @@ bool ARTSPConnection::ParseURL(
        }
    }

    const char *colonPos = strchr(host->c_str(), ':');
    const char *colonPos = NULL;
    ssize_t bracketBegin = host->find("[");

    if (bracketBegin > 0) {
        return false;
    } else if ( bracketBegin == 0) {
        ALOGV("IPV6 ip address found");
        if (!(ExtendedUtils::RTSPStream::ParseURL_V6(host, &colonPos))) {
            return false;
        }
    } else {
        colonPos = strchr(host->c_str(), ':');
    }

    if (colonPos != NULL) {
        unsigned long x;
@@ -209,6 +222,78 @@ static status_t MakeSocketBlocking(int s, bool blocking) {
    return flags == -1 ? UNKNOWN_ERROR : OK;
}

bool ARTSPConnection::createSocketAndConnect(void *res, unsigned port,const sp<AMessage> &reply) {


    for (struct addrinfo *result = (struct addrinfo *)res; result; result = result->ai_next) {
        char ipstr[INET6_ADDRSTRLEN];
        int ipver;
        void *sptr;

        switch (result->ai_family) {
        case AF_INET:
            sptr = &((struct sockaddr_in *)result->ai_addr)->sin_addr;
            ((struct sockaddr_in *)result->ai_addr)->sin_port = htons(port);
            reply->setInt32("server-ip", ntohl(((struct in_addr *)sptr)->s_addr));
            ipver = 4;
            break;
        case AF_INET6:
            sptr = &((struct sockaddr_in6 *)result->ai_addr)->sin6_addr;
            ((struct sockaddr_in6 *)result->ai_addr)->sin6_port = htons(port);
            ipver = 6;
            break;
        default:
            ALOGW("Skipping unknown protocol family %d", result->ai_family);
            continue;
        }

        inet_ntop(result->ai_family, sptr, ipstr, sizeof(ipstr));
        ALOGV("Connecting to IPv%d: %s", ipver, ipstr);

        mSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);

        if (mUIDValid) {
            HTTPBase::RegisterSocketUserTag(mSocket, mUID,
                                            (uint32_t)*(uint32_t*) "RTSP");
            HTTPBase::RegisterSocketUserMark(mSocket, mUID);
        }

        MakeSocketBlocking(mSocket, false);

        int err = ::connect(mSocket, result->ai_addr, result->ai_addrlen);
        if (err == 0) {
            ALOGV("Connected to (%s)", ipstr);
            reply->setInt32("result", OK);
            mState = CONNECTED;
            mNextCSeq = 1;
            postReceiveReponseEvent();
            reply->post();
            freeaddrinfo((struct addrinfo *)res);
            return true;
        }

        if (errno == EINPROGRESS) {
            ALOGV("Connection to %s in progress", ipstr);
            sp<AMessage> msg = new AMessage(kWhatCompleteConnection, id());
            msg->setMessage("reply", reply);
            ALOGV("setting ipversion:%d", ipver);
            msg->setInt32("ipversion", ipver);
            msg->setInt32("connection-id", mConnectionID);
            msg->post();
            freeaddrinfo((struct addrinfo *)res);
            return true;
        }

        if (mUIDValid) {
            HTTPBase::UnRegisterSocketUserTag(mSocket);
            HTTPBase::UnRegisterSocketUserMark(mSocket);
        }
        close(mSocket);
        ALOGV("Connection err %d, (%s)", errno, strerror(errno));
    }
    return false;
}

void ARTSPConnection::onConnect(const sp<AMessage> &msg) {
    ++mConnectionID;

@@ -252,65 +337,33 @@ void ARTSPConnection::onConnect(const sp<AMessage> &msg) {
        ALOGV("user = '%s', pass = '%s'", mUser.c_str(), mPass.c_str());
    }

    struct hostent *ent = gethostbyname(host.c_str());
    if (ent == NULL) {
        ALOGE("Unknown host %s", host.c_str());
    struct addrinfo hints, *res = NULL;
    memset(&hints, 0, sizeof (hints));
    hints.ai_flags = AI_PASSIVE;
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;

    int err = getaddrinfo(host.c_str(), NULL, &hints, &res);

    if (err != 0 || res == NULL) {
        ALOGE("Unknown host, err %d (%s)", err, gai_strerror(err));
        reply->setInt32("result", -ENOENT);
        reply->post();

        mState = DISCONNECTED;
        return;
    }

    mSocket = socket(AF_INET, SOCK_STREAM, 0);

    if (mUIDValid) {
        HTTPBase::RegisterSocketUserTag(mSocket, mUID,
                                        (uint32_t)*(uint32_t*) "RTSP");
        HTTPBase::RegisterSocketUserMark(mSocket, mUID);
        if (res != NULL) {
            freeaddrinfo(res);
        }

    MakeSocketBlocking(mSocket, false);

    struct sockaddr_in remote;
    memset(remote.sin_zero, 0, sizeof(remote.sin_zero));
    remote.sin_family = AF_INET;
    remote.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
    remote.sin_port = htons(port);

    int err = ::connect(
            mSocket, (const struct sockaddr *)&remote, sizeof(remote));

    reply->setInt32("server-ip", ntohl(remote.sin_addr.s_addr));

    if (err < 0) {
        if (errno == EINPROGRESS) {
            sp<AMessage> msg = new AMessage(kWhatCompleteConnection, id());
            msg->setMessage("reply", reply);
            msg->setInt32("connection-id", mConnectionID);
            msg->post();
        return;
    }

    if (!createSocketAndConnect(res, port, reply)) {
        ALOGV("Failed to connect to %s", host.c_str());
        reply->setInt32("result", -errno);
        mState = DISCONNECTED;

        if (mUIDValid) {
            HTTPBase::UnRegisterSocketUserTag(mSocket);
            HTTPBase::UnRegisterSocketUserMark(mSocket);
        }
        close(mSocket);
        mSocket = -1;
    } else {
        reply->setInt32("result", OK);
        mState = CONNECTED;
        mNextCSeq = 1;

        postReceiveReponseEvent();
    }

        reply->post();
        freeaddrinfo(res);
    }
}

void ARTSPConnection::performDisconnect() {
@@ -378,6 +431,8 @@ void ARTSPConnection::onCompleteConnection(const sp<AMessage> &msg) {
    }

    int err;
    int ipver;

    socklen_t optionLen = sizeof(err);
    CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0);
    CHECK_EQ(optionLen, (socklen_t)sizeof(err));
@@ -395,7 +450,10 @@ void ARTSPConnection::onCompleteConnection(const sp<AMessage> &msg) {
        close(mSocket);
        mSocket = -1;
    } else {
        CHECK(msg->findInt32("ipversion", &ipver));
        reply->setInt32("result", OK);
        ALOGV("setting ipversion:%d", ipver);
        reply->setInt32("ipversion", ipver);
        mState = CONNECTED;
        mNextCSeq = 1;

Loading