Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit bd08e2f9 authored by Andreas Huber's avatar Andreas Huber
Browse files

Support for RTP/RTCP traffic over TCP sockets in wifi display code.

This is opt-in through

adb shell setprop media.wfd.enable-tcp true

Change-Id: I49ab22a8c8d62690065631e781a7f0057e5ce557
parent 874a4618
Loading
Loading
Loading
Loading
+163 −57
Original line number Diff line number Diff line
@@ -56,7 +56,8 @@ struct ANetworkSession::Session : public RefBase {
    enum State {
        CONNECTING,
        CONNECTED,
        LISTENING,
        LISTENING_RTSP,
        LISTENING_TCP_DGRAMS,
        DATAGRAM,
    };

@@ -69,7 +70,8 @@ struct ANetworkSession::Session : public RefBase {
    int socket() const;
    sp<AMessage> getNotificationMessage() const;

    bool isListening() const;
    bool isRTSPServer() const;
    bool isTCPDatagramServer() const;

    bool wantsToRead();
    bool wantsToWrite();
@@ -79,12 +81,15 @@ struct ANetworkSession::Session : public RefBase {

    status_t sendRequest(const void *data, ssize_t size);

    void setIsRTSPConnection(bool yesno);

protected:
    virtual ~Session();

private:
    int32_t mSessionID;
    State mState;
    bool mIsRTSPConnection;
    int mSocket;
    sp<AMessage> mNotify;
    bool mSawReceiveFailure, mSawSendFailure;
@@ -123,6 +128,7 @@ ANetworkSession::Session::Session(
        const sp<AMessage> &notify)
    : mSessionID(sessionID),
      mState(state),
      mIsRTSPConnection(false),
      mSocket(s),
      mNotify(notify),
      mSawReceiveFailure(false),
@@ -184,12 +190,20 @@ int ANetworkSession::Session::socket() const {
    return mSocket;
}

void ANetworkSession::Session::setIsRTSPConnection(bool yesno) {
    mIsRTSPConnection = yesno;
}

sp<AMessage> ANetworkSession::Session::getNotificationMessage() const {
    return mNotify;
}

bool ANetworkSession::Session::isListening() const {
    return mState == LISTENING;
bool ANetworkSession::Session::isRTSPServer() const {
    return mState == LISTENING_RTSP;
}

bool ANetworkSession::Session::isTCPDatagramServer() const {
    return mState == LISTENING_TCP_DGRAMS;
}

bool ANetworkSession::Session::wantsToRead() {
@@ -284,6 +298,28 @@ status_t ANetworkSession::Session::readMore() {
        err = -ECONNRESET;
    }

    if (!mIsRTSPConnection) {
        // TCP stream carrying 16-bit length-prefixed datagrams.

        while (mInBuffer.size() >= 2) {
            size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str());

            if (mInBuffer.size() < packetSize + 2) {
                break;
            }

            sp<ABuffer> packet = new ABuffer(packetSize);
            memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize);

            sp<AMessage> notify = mNotify->dup();
            notify->setInt32("sessionID", mSessionID);
            notify->setInt32("reason", kWhatDatagram);
            notify->setBuffer("data", packet);
            notify->post();

            mInBuffer.erase(0, packetSize + 2);
        }
    } else {
        for (;;) {
            size_t length;

@@ -350,6 +386,7 @@ status_t ANetworkSession::Session::readMore() {
                break;
            }
        }
    }

    if (err != OK) {
        notifyError(false /* send */, err, "Recv failed.");
@@ -408,7 +445,7 @@ status_t ANetworkSession::Session::writeMore() {
            notifyError(kWhatError, -err, "Connection failed");
            mSawSendFailure = true;

            return UNKNOWN_ERROR;
            return -err;
        }

        mState = CONNECTED;
@@ -451,6 +488,16 @@ status_t ANetworkSession::Session::writeMore() {
status_t ANetworkSession::Session::sendRequest(const void *data, ssize_t size) {
    CHECK(mState == CONNECTED || mState == DATAGRAM);

    if (mState == CONNECTED && !mIsRTSPConnection) {
        CHECK_LE(size, 65535);

        uint8_t prefix[2];
        prefix[0] = size >> 8;
        prefix[1] = size & 0xff;

        mOutBuffer.append((const char *)prefix, sizeof(prefix));
    }

    mOutBuffer.append(
            (const char *)data,
            (size >= 0) ? size : strlen((const char *)data));
@@ -585,6 +632,35 @@ status_t ANetworkSession::createUDPSession(
            sessionID);
}

status_t ANetworkSession::createTCPDatagramSession(
        const struct in_addr &addr, unsigned port,
        const sp<AMessage> &notify, int32_t *sessionID) {
    return createClientOrServer(
            kModeCreateTCPDatagramSessionPassive,
            &addr,
            port,
            NULL /* remoteHost */,
            0 /* remotePort */,
            notify,
            sessionID);
}

status_t ANetworkSession::createTCPDatagramSession(
        unsigned localPort,
        const char *remoteHost,
        unsigned remotePort,
        const sp<AMessage> &notify,
        int32_t *sessionID) {
    return createClientOrServer(
            kModeCreateTCPDatagramSessionActive,
            NULL /* addr */,
            localPort,
            remoteHost,
            remotePort,
            notify,
            sessionID);
}

status_t ANetworkSession::destroySession(int32_t sessionID) {
    Mutex::Autolock autoLock(mLock);

@@ -641,7 +717,8 @@ status_t ANetworkSession::createClientOrServer(
        goto bail;
    }

    if (mode == kModeCreateRTSPServer) {
    if (mode == kModeCreateRTSPServer
            || mode == kModeCreateTCPDatagramSessionPassive) {
        const int yes = 1;
        res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));

@@ -679,7 +756,8 @@ status_t ANetworkSession::createClientOrServer(
    memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    addr.sin_family = AF_INET;

    if (mode == kModeCreateRTSPClient) {
    if (mode == kModeCreateRTSPClient
            || mode == kModeCreateTCPDatagramSessionActive) {
        struct hostent *ent= gethostbyname(remoteHost);
        if (ent == NULL) {
            err = -h_errno;
@@ -696,7 +774,17 @@ status_t ANetworkSession::createClientOrServer(
        addr.sin_port = htons(port);
    }

    if (mode == kModeCreateRTSPClient) {
    if (mode == kModeCreateRTSPClient
            || mode == kModeCreateTCPDatagramSessionActive) {
        in_addr_t x = ntohl(addr.sin_addr.s_addr);
        ALOGI("connecting socket %d to %d.%d.%d.%d:%d",
              s,
              (x >> 24),
              (x >> 16) & 0xff,
              (x >> 8) & 0xff,
              x & 0xff,
              ntohs(addr.sin_port));

        res = connect(s, (const struct sockaddr *)&addr, sizeof(addr));

        CHECK_LT(res, 0);
@@ -707,7 +795,8 @@ status_t ANetworkSession::createClientOrServer(
        res = bind(s, (const struct sockaddr *)&addr, sizeof(addr));

        if (res == 0) {
            if (mode == kModeCreateRTSPServer) {
            if (mode == kModeCreateRTSPServer
                    || mode == kModeCreateTCPDatagramSessionPassive) {
                res = listen(s, 4);
            } else {
                CHECK_EQ(mode, kModeCreateUDPSession);
@@ -746,8 +835,16 @@ status_t ANetworkSession::createClientOrServer(
            state = Session::CONNECTING;
            break;

        case kModeCreateTCPDatagramSessionActive:
            state = Session::CONNECTING;
            break;

        case kModeCreateTCPDatagramSessionPassive:
            state = Session::LISTENING_TCP_DGRAMS;
            break;

        case kModeCreateRTSPServer:
            state = Session::LISTENING;
            state = Session::LISTENING_RTSP;
            break;

        default:
@@ -762,6 +859,12 @@ status_t ANetworkSession::createClientOrServer(
            s,
            notify);

    if (mode == kModeCreateTCPDatagramSessionActive) {
        session->setIsRTSPConnection(false);
    } else if (mode == kModeCreateRTSPClient) {
        session->setIsRTSPConnection(true);
    }

    mSessions.add(session->sessionID(), session);

    interrupt();
@@ -932,7 +1035,7 @@ void ANetworkSession::threadLoop() {
            }

            if (FD_ISSET(s, &rs)) {
                if (session->isListening()) {
                if (session->isRTSPServer() || session->isTCPDatagramServer()) {
                    struct sockaddr_in remoteAddr;
                    socklen_t remoteAddrLen = sizeof(remoteAddr);

@@ -969,6 +1072,9 @@ void ANetworkSession::threadLoop() {
                                        clientSocket,
                                        session->getNotificationMessage());

                            clientSession->setIsRTSPConnection(
                                    session->isRTSPServer());

                            sessionsToAdd.push_back(clientSession);
                        }
                    } else {
+15 −0
Original line number Diff line number Diff line
@@ -58,6 +58,19 @@ struct ANetworkSession : public RefBase {
    status_t connectUDPSession(
            int32_t sessionID, const char *remoteHost, unsigned remotePort);

    // passive
    status_t createTCPDatagramSession(
            const struct in_addr &addr, unsigned port,
            const sp<AMessage> &notify, int32_t *sessionID);

    // active
    status_t createTCPDatagramSession(
            unsigned localPort,
            const char *remoteHost,
            unsigned remotePort,
            const sp<AMessage> &notify,
            int32_t *sessionID);

    status_t destroySession(int32_t sessionID);

    status_t sendRequest(
@@ -90,6 +103,8 @@ private:

    enum Mode {
        kModeCreateUDPSession,
        kModeCreateTCPDatagramSessionPassive,
        kModeCreateTCPDatagramSessionActive,
        kModeCreateRTSPServer,
        kModeCreateRTSPClient,
    };
+129 −21
Original line number Diff line number Diff line
@@ -171,20 +171,26 @@ status_t WifiDisplaySource::PlaybackSession::Track::stop() {
WifiDisplaySource::PlaybackSession::PlaybackSession(
        const sp<ANetworkSession> &netSession,
        const sp<AMessage> &notify,
        const in_addr &interfaceAddr,
        bool legacyMode)
    : mNetSession(netSession),
      mNotify(notify),
      mInterfaceAddr(interfaceAddr),
      mLegacyMode(legacyMode),
      mLastLifesignUs(),
      mVideoTrackIndex(-1),
      mTSQueue(new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188)),
      mPrevTimeUs(-1ll),
      mUseInterleavedTCP(false),
      mTransportMode(TRANSPORT_UDP),
      mRTPChannel(0),
      mRTCPChannel(0),
      mRTPPort(0),
      mRTPSessionID(0),
      mRTCPSessionID(0),
      mClientRTPPort(0),
      mClientRTCPPort(0),
      mRTPConnected(false),
      mRTCPConnected(false),
      mRTPSeqNo(0),
      mLastNTPTime(0),
      mLastRTPTime(0),
@@ -208,15 +214,18 @@ WifiDisplaySource::PlaybackSession::PlaybackSession(

status_t WifiDisplaySource::PlaybackSession::init(
        const char *clientIP, int32_t clientRtp, int32_t clientRtcp,
        bool useInterleavedTCP) {
        TransportMode transportMode) {
    mClientIP = clientIP;

    status_t err = setupPacketizer();

    if (err != OK) {
        return err;
    }

    if (useInterleavedTCP) {
        mUseInterleavedTCP = true;
    mTransportMode = transportMode;

    if (transportMode == TRANSPORT_TCP_INTERLEAVED) {
        mRTPChannel = clientRtp;
        mRTCPChannel = clientRtcp;
        mRTPPort = 0;
@@ -227,19 +236,38 @@ status_t WifiDisplaySource::PlaybackSession::init(
        return OK;
    }

    mUseInterleavedTCP = false;
    mRTPChannel = 0;
    mRTCPChannel = 0;

    if (mTransportMode == TRANSPORT_TCP) {
        // XXX This is wrong, we need to allocate sockets here, we only
        // need to do this because the dongles are not establishing their
        // end until after PLAY instead of before SETUP.
        mRTPPort = 20000;
        mRTPSessionID = 0;
        mRTCPSessionID = 0;
        mClientRTPPort = clientRtp;
        mClientRTCPPort = clientRtcp;

        updateLiveness();
        return OK;
    }

    int serverRtp;

    sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, id());
    sp<AMessage> rtcpNotify = new AMessage(kWhatRTCPNotify, id());
    for (serverRtp = 15550;; serverRtp += 2) {
        int32_t rtpSession;
        if (mTransportMode == TRANSPORT_UDP) {
            err = mNetSession->createUDPSession(
                        serverRtp, clientIP, clientRtp,
                        rtpNotify, &rtpSession);
        } else {
            err = mNetSession->createTCPDatagramSession(
                        serverRtp, clientIP, clientRtp,
                        rtpNotify, &rtpSession);
        }

        if (err != OK) {
            ALOGI("failed to create RTP socket on port %d", serverRtp);
@@ -258,9 +286,15 @@ status_t WifiDisplaySource::PlaybackSession::init(
        }

        int32_t rtcpSession;
        if (mTransportMode == TRANSPORT_UDP) {
            err = mNetSession->createUDPSession(
                    serverRtp + 1, clientIP, clientRtcp,
                    rtcpNotify, &rtcpSession);
        } else {
            err = mNetSession->createTCPDatagramSession(
                    serverRtp + 1, clientIP, clientRtcp,
                    rtcpNotify, &rtcpSession);
        }

        if (err == OK) {
            mRTPPort = serverRtp;
@@ -308,6 +342,42 @@ void WifiDisplaySource::PlaybackSession::updateLiveness() {
status_t WifiDisplaySource::PlaybackSession::play() {
    updateLiveness();

    return OK;
}

status_t WifiDisplaySource::PlaybackSession::finishPlay() {
    // XXX Give the dongle 3 secs to bind its sockets.
    (new AMessage(kWhatFinishPlay, id()))->post(3000000ll);
    return OK;
}

status_t WifiDisplaySource::PlaybackSession::onFinishPlay() {
    if (mTransportMode != TRANSPORT_TCP) {
        return onFinishPlay2();
    }

    sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, id());

    status_t err = mNetSession->createTCPDatagramSession(
                mRTPPort, mClientIP.c_str(), mClientRTPPort,
                rtpNotify, &mRTPSessionID);

    if (err != OK) {
        return err;
    }

    if (mClientRTCPPort >= 0) {
        sp<AMessage> rtcpNotify = new AMessage(kWhatRTCPNotify, id());

        err = mNetSession->createTCPDatagramSession(
                mRTPPort + 1, mClientIP.c_str(), mClientRTCPPort,
                rtcpNotify, &mRTCPSessionID);
    }

    return err;
}

status_t WifiDisplaySource::PlaybackSession::onFinishPlay2() {
    if (mRTCPSessionID != 0) {
        scheduleSendSR();
    }
@@ -328,6 +398,10 @@ status_t WifiDisplaySource::PlaybackSession::play() {
        }
    }

    sp<AMessage> notify = mNotify->dup();
    notify->setInt32("what", kWhatSessionEstablished);
    notify->post();

    return OK;
}

@@ -445,6 +519,32 @@ void WifiDisplaySource::PlaybackSession::onMessageReceived(
                    break;
                }

                case ANetworkSession::kWhatConnected:
                {
                    CHECK_EQ(mTransportMode, TRANSPORT_TCP);

                    int32_t sessionID;
                    CHECK(msg->findInt32("sessionID", &sessionID));

                    if (sessionID == mRTPSessionID) {
                        CHECK(!mRTPConnected);
                        mRTPConnected = true;
                        ALOGI("RTP Session now connected.");
                    } else if (sessionID == mRTCPSessionID) {
                        CHECK(!mRTCPConnected);
                        mRTCPConnected = true;
                        ALOGI("RTCP Session now connected.");
                    } else {
                        TRESPASS();
                    }

                    if (mRTPConnected
                            && (mClientRTCPPort < 0 || mRTCPConnected)) {
                        onFinishPlay2();
                    }
                    break;
                }

                default:
                    TRESPASS();
            }
@@ -610,6 +710,12 @@ void WifiDisplaySource::PlaybackSession::onMessageReceived(
            break;
        }

        case kWhatFinishPlay:
        {
            onFinishPlay();
            break;
        }

        default:
            TRESPASS();
    }
@@ -956,15 +1062,14 @@ void WifiDisplaySource::PlaybackSession::onSendSR() {
    addSR(buffer);
    addSDES(buffer);

    if (mUseInterleavedTCP) {
    if (mTransportMode == TRANSPORT_TCP_INTERLEAVED) {
        sp<AMessage> notify = mNotify->dup();
        notify->setInt32("what", kWhatBinaryData);
        notify->setInt32("channel", mRTCPChannel);
        notify->setBuffer("data", buffer);
        notify->post();
    } else {
        mNetSession->sendRequest(
                mRTCPSessionID, buffer->data(), buffer->size());
        sendPacket(mRTCPSessionID, buffer->data(), buffer->size());
    }

    ++mNumSRsSent;
@@ -1011,7 +1116,7 @@ ssize_t WifiDisplaySource::PlaybackSession::appendTSData(
        mLastRTPTime = rtpTime;
        mLastNTPTime = GetNowNTP();

        if (mUseInterleavedTCP) {
        if (mTransportMode == TRANSPORT_TCP_INTERLEAVED) {
            sp<AMessage> notify = mNotify->dup();
            notify->setInt32("what", kWhatBinaryData);

@@ -1022,8 +1127,7 @@ ssize_t WifiDisplaySource::PlaybackSession::appendTSData(
            notify->setBuffer("data", data);
            notify->post();
        } else {
            mNetSession->sendRequest(
                    mRTPSessionID, rtp, mTSQueue->size());
            sendPacket(mRTPSessionID, rtp, mTSQueue->size());

            mTotalBytesSent += mTSQueue->size();
            int64_t delayUs = ALooper::GetNowUs() - mFirstPacketTimeUs;
@@ -1144,8 +1248,7 @@ status_t WifiDisplaySource::PlaybackSession::parseTSFB(
            uint16_t bufferSeqNo = buffer->int32Data() & 0xffff;

            if (bufferSeqNo == seqNo) {
                mNetSession->sendRequest(
                        mRTPSessionID, buffer->data(), buffer->size());
                sendPacket(mRTPSessionID, buffer->data(), buffer->size());

                found = true;
                break;
@@ -1172,5 +1275,10 @@ void WifiDisplaySource::PlaybackSession::requestIDRFrame() {
    }
}

status_t WifiDisplaySource::PlaybackSession::sendPacket(
        int32_t sessionID, const void *data, size_t size) {
    return mNetSession->sendRequest(sessionID, data, size);
}

}  // namespace android
+22 −2
Original line number Diff line number Diff line
@@ -38,11 +38,17 @@ struct WifiDisplaySource::PlaybackSession : public AHandler {
    PlaybackSession(
            const sp<ANetworkSession> &netSession,
            const sp<AMessage> &notify,
            const struct in_addr &interfaceAddr,
            bool legacyMode);

    enum TransportMode {
        TRANSPORT_UDP,
        TRANSPORT_TCP_INTERLEAVED,
        TRANSPORT_TCP,
    };
    status_t init(
            const char *clientIP, int32_t clientRtp, int32_t clientRtcp,
            bool useInterleavedTCP);
            TransportMode transportMode);

    status_t destroy();

@@ -52,6 +58,7 @@ struct WifiDisplaySource::PlaybackSession : public AHandler {
    void updateLiveness();

    status_t play();
    status_t finishPlay();
    status_t pause();

    sp<ISurfaceTexture> getSurfaceTexture();
@@ -63,6 +70,7 @@ struct WifiDisplaySource::PlaybackSession : public AHandler {
    enum {
        kWhatSessionDead,
        kWhatBinaryData,
        kWhatSessionEstablished,
    };

protected:
@@ -79,6 +87,7 @@ private:
        kWhatSerializerNotify,
        kWhatConverterNotify,
        kWhatUpdateSurface,
        kWhatFinishPlay,
    };

    static const int64_t kSendSRIntervalUs = 10000000ll;
@@ -87,6 +96,7 @@ private:

    sp<ANetworkSession> mNetSession;
    sp<AMessage> mNotify;
    in_addr mInterfaceAddr;
    bool mLegacyMode;

    int64_t mLastLifesignUs;
@@ -102,7 +112,9 @@ private:
    sp<ABuffer> mTSQueue;
    int64_t mPrevTimeUs;

    bool mUseInterleavedTCP;
    TransportMode mTransportMode;

    AString mClientIP;

    // in TCP mode
    int32_t mRTPChannel;
@@ -113,6 +125,10 @@ private:
    int32_t mRTPSessionID;
    int32_t mRTCPSessionID;

    int32_t mClientRTPPort;
    int32_t mClientRTCPPort;
    bool mRTPConnected;
    bool mRTCPConnected;

    uint32_t mRTPSeqNo;

@@ -160,6 +176,10 @@ private:
    status_t parseRTCP(const sp<ABuffer> &buffer);
    status_t parseTSFB(const uint8_t *data, size_t size);

    status_t sendPacket(int32_t sessionID, const void *data, size_t size);
    status_t onFinishPlay();
    status_t onFinishPlay2();

    DISALLOW_EVIL_CONSTRUCTORS(PlaybackSession);
};

+1 −2
Original line number Diff line number Diff line
@@ -286,8 +286,7 @@ status_t Serializer::onStart() {
    }

    if (err == OK) {
        // XXX add a 5 second delay for the client to get ready.
        schedulePoll(5000000ll);
        schedulePoll();
    }

    return err;
Loading