Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 96954c00 authored by Andreas Huber's avatar Andreas Huber Committed by Android (Google) Code Review
Browse files

Merge "Change ANetworkSession implementation to optionally attach timestamps" into jb-mr2-dev

parents 106f1628 a239dd72
Loading
Loading
Loading
Loading
+108 −42
Original line number Original line Diff line number Diff line
@@ -81,7 +81,8 @@ struct ANetworkSession::Session : public RefBase {
    status_t readMore();
    status_t readMore();
    status_t writeMore();
    status_t writeMore();


    status_t sendRequest(const void *data, ssize_t size);
    status_t sendRequest(
            const void *data, ssize_t size, bool timeValid, int64_t timeUs);


    void setIsRTSPConnection(bool yesno);
    void setIsRTSPConnection(bool yesno);


@@ -89,6 +90,15 @@ protected:
    virtual ~Session();
    virtual ~Session();


private:
private:
    enum {
        FRAGMENT_FLAG_TIME_VALID = 1,
    };
    struct Fragment {
        uint32_t mFlags;
        int64_t mTimeUs;
        sp<ABuffer> mBuffer;
    };

    int32_t mSessionID;
    int32_t mSessionID;
    State mState;
    State mState;
    bool mIsRTSPConnection;
    bool mIsRTSPConnection;
@@ -96,11 +106,7 @@ private:
    sp<AMessage> mNotify;
    sp<AMessage> mNotify;
    bool mSawReceiveFailure, mSawSendFailure;
    bool mSawReceiveFailure, mSawSendFailure;


    // for TCP / stream data
    List<Fragment> mOutFragments;
    AString mOutBuffer;

    // for UDP / datagrams
    List<sp<ABuffer> > mOutDatagrams;


    AString mInBuffer;
    AString mInBuffer;


@@ -109,6 +115,8 @@ private:
    void notifyError(bool send, status_t err, const char *detail);
    void notifyError(bool send, status_t err, const char *detail);
    void notify(NotificationReason reason);
    void notify(NotificationReason reason);


    void dumpFragmentStats(const Fragment &frag);

    DISALLOW_EVIL_CONSTRUCTORS(Session);
    DISALLOW_EVIL_CONSTRUCTORS(Session);
};
};
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
@@ -221,8 +229,8 @@ bool ANetworkSession::Session::wantsToRead() {
bool ANetworkSession::Session::wantsToWrite() {
bool ANetworkSession::Session::wantsToWrite() {
    return !mSawSendFailure
    return !mSawSendFailure
        && (mState == CONNECTING
        && (mState == CONNECTING
            || (mState == CONNECTED && !mOutBuffer.empty())
            || (mState == CONNECTED && !mOutFragments.empty())
            || (mState == DATAGRAM && !mOutDatagrams.empty()));
            || (mState == DATAGRAM && !mOutFragments.empty()));
}
}


status_t ANetworkSession::Session::readMore() {
status_t ANetworkSession::Session::readMore() {
@@ -407,13 +415,41 @@ status_t ANetworkSession::Session::readMore() {
    return err;
    return err;
}
}


void ANetworkSession::Session::dumpFragmentStats(const Fragment &frag) {
#if 0
    int64_t nowUs = ALooper::GetNowUs();
    int64_t delayMs = (nowUs - frag.mTimeUs) / 1000ll;

    static const int64_t kMinDelayMs = 0;
    static const int64_t kMaxDelayMs = 300;

    const char *kPattern = "########################################";
    size_t kPatternSize = strlen(kPattern);

    int n = (kPatternSize * (delayMs - kMinDelayMs))
                / (kMaxDelayMs - kMinDelayMs);

    if (n < 0) {
        n = 0;
    } else if ((size_t)n > kPatternSize) {
        n = kPatternSize;
    }

    ALOGI("[%lld]: (%4lld ms) %s\n",
          frag.mTimeUs / 1000,
          delayMs,
          kPattern + kPatternSize - n);
#endif
}

status_t ANetworkSession::Session::writeMore() {
status_t ANetworkSession::Session::writeMore() {
    if (mState == DATAGRAM) {
    if (mState == DATAGRAM) {
        CHECK(!mOutDatagrams.empty());
        CHECK(!mOutFragments.empty());


        status_t err;
        status_t err;
        do {
        do {
            const sp<ABuffer> &datagram = *mOutDatagrams.begin();
            const Fragment &frag = *mOutFragments.begin();
            const sp<ABuffer> &datagram = frag.mBuffer;


            uint8_t *data = datagram->data();
            uint8_t *data = datagram->data();
            if (data[0] == 0x80 && (data[1] & 0x7f) == 33) {
            if (data[0] == 0x80 && (data[1] & 0x7f) == 33) {
@@ -441,17 +477,21 @@ status_t ANetworkSession::Session::writeMore() {
            err = OK;
            err = OK;


            if (n > 0) {
            if (n > 0) {
                mOutDatagrams.erase(mOutDatagrams.begin());
                if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
                    dumpFragmentStats(frag);
                }

                mOutFragments.erase(mOutFragments.begin());
            } else if (n < 0) {
            } else if (n < 0) {
                err = -errno;
                err = -errno;
            } else if (n == 0) {
            } else if (n == 0) {
                err = -ECONNRESET;
                err = -ECONNRESET;
            }
            }
        } while (err == OK && !mOutDatagrams.empty());
        } while (err == OK && !mOutFragments.empty());


        if (err == -EAGAIN) {
        if (err == -EAGAIN) {
            if (!mOutDatagrams.empty()) {
            if (!mOutFragments.empty()) {
                ALOGI("%d datagrams remain queued.", mOutDatagrams.size());
                ALOGI("%d datagrams remain queued.", mOutFragments.size());
            }
            }
            err = OK;
            err = OK;
        }
        }
@@ -484,23 +524,37 @@ status_t ANetworkSession::Session::writeMore() {
    }
    }


    CHECK_EQ(mState, CONNECTED);
    CHECK_EQ(mState, CONNECTED);
    CHECK(!mOutBuffer.empty());
    CHECK(!mOutFragments.empty());


    ssize_t n;
    ssize_t n;
    while (!mOutFragments.empty()) {
        const Fragment &frag = *mOutFragments.begin();

        do {
        do {
        n = send(mSocket, mOutBuffer.c_str(), mOutBuffer.size(), 0);
            n = send(mSocket, frag.mBuffer->data(), frag.mBuffer->size(), 0);
        } while (n < 0 && errno == EINTR);
        } while (n < 0 && errno == EINTR);


    status_t err = OK;
        if (n <= 0) {
            break;
        }


    if (n > 0) {
        frag.mBuffer->setRange(
#if 0
                frag.mBuffer->offset() + n, frag.mBuffer->size() - n);
        ALOGI("out:");
        hexdump(mOutBuffer.c_str(), n);
#endif


        mOutBuffer.erase(0, n);
        if (frag.mBuffer->size() > 0) {
    } else if (n < 0) {
            break;
        }

        if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
            dumpFragmentStats(frag);
        }

        mOutFragments.erase(mOutFragments.begin());
    }

    status_t err = OK;

    if (n < 0) {
        err = -errno;
        err = -errno;
    } else if (n == 0) {
    } else if (n == 0) {
        err = -ECONNRESET;
        err = -ECONNRESET;
@@ -537,32 +591,43 @@ status_t ANetworkSession::Session::writeMore() {
    return err;
    return err;
}
}


status_t ANetworkSession::Session::sendRequest(const void *data, ssize_t size) {
status_t ANetworkSession::Session::sendRequest(
        const void *data, ssize_t size, bool timeValid, int64_t timeUs) {
    CHECK(mState == CONNECTED || mState == DATAGRAM);
    CHECK(mState == CONNECTED || mState == DATAGRAM);


    if (mState == DATAGRAM) {
    if (size < 0) {
        CHECK_GE(size, 0);
        size = strlen((const char *)data);

    }
        sp<ABuffer> datagram = new ABuffer(size);
        memcpy(datagram->data(), data, size);


        mOutDatagrams.push_back(datagram);
    if (size == 0) {
        return OK;
        return OK;
    }
    }


    sp<ABuffer> buffer;

    if (mState == CONNECTED && !mIsRTSPConnection) {
    if (mState == CONNECTED && !mIsRTSPConnection) {
        CHECK_LE(size, 65535);
        CHECK_LE(size, 65535);


        uint8_t prefix[2];
        buffer = new ABuffer(size + 2);
        prefix[0] = size >> 8;
        buffer->data()[0] = size >> 8;
        prefix[1] = size & 0xff;
        buffer->data()[1] = size & 0xff;
        memcpy(buffer->data() + 2, data, size);
    } else {
        buffer = new ABuffer(size);
        memcpy(buffer->data(), data, size);
    }


        mOutBuffer.append((const char *)prefix, sizeof(prefix));
    Fragment frag;

    frag.mFlags = 0;
    if (timeValid) {
        frag.mFlags = FRAGMENT_FLAG_TIME_VALID;
        frag.mTimeUs = timeUs;
    }
    }


    mOutBuffer.append(
    frag.mBuffer = buffer;
            (const char *)data,

            (size >= 0) ? size : strlen((const char *)data));
    mOutFragments.push_back(frag);


    return OK;
    return OK;
}
}
@@ -985,7 +1050,8 @@ status_t ANetworkSession::connectUDPSession(
}
}


status_t ANetworkSession::sendRequest(
status_t ANetworkSession::sendRequest(
        int32_t sessionID, const void *data, ssize_t size) {
        int32_t sessionID, const void *data, ssize_t size,
        bool timeValid, int64_t timeUs) {
    Mutex::Autolock autoLock(mLock);
    Mutex::Autolock autoLock(mLock);


    ssize_t index = mSessions.indexOfKey(sessionID);
    ssize_t index = mSessions.indexOfKey(sessionID);
@@ -996,7 +1062,7 @@ status_t ANetworkSession::sendRequest(


    const sp<Session> session = mSessions.valueAt(index);
    const sp<Session> session = mSessions.valueAt(index);


    status_t err = session->sendRequest(data, size);
    status_t err = session->sendRequest(data, size, timeValid, timeUs);


    interrupt();
    interrupt();


+2 −1
Original line number Original line Diff line number Diff line
@@ -74,7 +74,8 @@ struct ANetworkSession : public RefBase {
    status_t destroySession(int32_t sessionID);
    status_t destroySession(int32_t sessionID);


    status_t sendRequest(
    status_t sendRequest(
            int32_t sessionID, const void *data, ssize_t size = -1);
            int32_t sessionID, const void *data, ssize_t size = -1,
            bool timeValid = false, int64_t timeUs = -1ll);


    enum NotificationReason {
    enum NotificationReason {
        kWhatError,
        kWhatError,
+4 −0
Original line number Original line Diff line number Diff line
@@ -252,6 +252,10 @@ status_t MediaSender::queueAccessUnit(
                    fwrite(tsPackets->data(), 1, tsPackets->size(), mLogFile);
                    fwrite(tsPackets->data(), 1, tsPackets->size(), mLogFile);
                }
                }


                int64_t timeUs;
                CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
                tsPackets->meta()->setInt64("timeUs", timeUs);

                err = mTSSender->queueBuffer(
                err = mTSSender->queueBuffer(
                        tsPackets,
                        tsPackets,
                        33 /* packetType */,
                        33 /* packetType */,
+16 −5
Original line number Original line Diff line number Diff line
@@ -194,6 +194,9 @@ status_t RTPSender::queueTSPackets(
        const sp<ABuffer> &tsPackets, uint8_t packetType) {
        const sp<ABuffer> &tsPackets, uint8_t packetType) {
    CHECK_EQ(0, tsPackets->size() % 188);
    CHECK_EQ(0, tsPackets->size() % 188);


    int64_t timeUs;
    CHECK(tsPackets->meta()->findInt64("timeUs", &timeUs));

    const size_t numTSPackets = tsPackets->size() / 188;
    const size_t numTSPackets = tsPackets->size() / 188;


    size_t srcOffset = 0;
    size_t srcOffset = 0;
@@ -232,13 +235,19 @@ status_t RTPSender::queueTSPackets(
        memcpy(&rtp[12], tsPackets->data() + srcOffset, numTSPackets * 188);
        memcpy(&rtp[12], tsPackets->data() + srcOffset, numTSPackets * 188);


        udpPacket->setRange(0, 12 + numTSPackets * 188);
        udpPacket->setRange(0, 12 + numTSPackets * 188);
        status_t err = sendRTPPacket(udpPacket, true /* storeInHistory */);

        srcOffset += numTSPackets * 188;
        bool isLastPacket = (srcOffset == tsPackets->size());

        status_t err = sendRTPPacket(
                udpPacket,
                true /* storeInHistory */,
                isLastPacket /* timeValid */,
                timeUs);


        if (err != OK) {
        if (err != OK) {
            return err;
            return err;
        }
        }

        srcOffset += numTSPackets * 188;
    }
    }


    return OK;
    return OK;
@@ -395,11 +404,13 @@ status_t RTPSender::queueAVCBuffer(
}
}


status_t RTPSender::sendRTPPacket(
status_t RTPSender::sendRTPPacket(
        const sp<ABuffer> &buffer, bool storeInHistory) {
        const sp<ABuffer> &buffer, bool storeInHistory,
        bool timeValid, int64_t timeUs) {
    CHECK(mRTPConnected);
    CHECK(mRTPConnected);


    status_t err = mNetSession->sendRequest(
    status_t err = mNetSession->sendRequest(
            mRTPSessionID, buffer->data(), buffer->size());
            mRTPSessionID, buffer->data(), buffer->size(),
            timeValid, timeUs);


    if (err != OK) {
    if (err != OK) {
        return err;
        return err;
+3 −1
Original line number Original line Diff line number Diff line
@@ -94,7 +94,9 @@ private:
    status_t queueTSPackets(const sp<ABuffer> &tsPackets, uint8_t packetType);
    status_t queueTSPackets(const sp<ABuffer> &tsPackets, uint8_t packetType);
    status_t queueAVCBuffer(const sp<ABuffer> &accessUnit, uint8_t packetType);
    status_t queueAVCBuffer(const sp<ABuffer> &accessUnit, uint8_t packetType);


    status_t sendRTPPacket(const sp<ABuffer> &packet, bool storeInHistory);
    status_t sendRTPPacket(
            const sp<ABuffer> &packet, bool storeInHistory,
            bool timeValid = false, int64_t timeUs = -1ll);


    void onNetNotify(bool isRTP, const sp<AMessage> &msg);
    void onNetNotify(bool isRTP, const sp<AMessage> &msg);


Loading