Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit fe32f2ff authored by Andreas Huber's avatar Andreas Huber Committed by Android Git Automerger
Browse files

am 96954c00: Merge "Change ANetworkSession implementation to optionally attach...

am 96954c00: Merge "Change ANetworkSession implementation to optionally attach timestamps" into jb-mr2-dev

* commit '96954c00':
  Change ANetworkSession implementation to optionally attach timestamps
parents 6969cb95 42404e89
Loading
Loading
Loading
Loading
+108 −42
Original line number Diff line number Diff line
@@ -81,7 +81,8 @@ struct ANetworkSession::Session : public RefBase {
    status_t readMore();
    status_t writeMore();

    status_t sendRequest(const void *data, ssize_t size);
    status_t sendRequest(
            const void *data, ssize_t size, bool timeValid, int64_t timeUs);

    void setIsRTSPConnection(bool yesno);

@@ -89,6 +90,15 @@ protected:
    virtual ~Session();

private:
    enum {
        FRAGMENT_FLAG_TIME_VALID = 1,
    };
    struct Fragment {
        uint32_t mFlags;
        int64_t mTimeUs;
        sp<ABuffer> mBuffer;
    };

    int32_t mSessionID;
    State mState;
    bool mIsRTSPConnection;
@@ -96,11 +106,7 @@ private:
    sp<AMessage> mNotify;
    bool mSawReceiveFailure, mSawSendFailure;

    // for TCP / stream data
    AString mOutBuffer;

    // for UDP / datagrams
    List<sp<ABuffer> > mOutDatagrams;
    List<Fragment> mOutFragments;

    AString mInBuffer;

@@ -109,6 +115,8 @@ private:
    void notifyError(bool send, status_t err, const char *detail);
    void notify(NotificationReason reason);

    void dumpFragmentStats(const Fragment &frag);

    DISALLOW_EVIL_CONSTRUCTORS(Session);
};
////////////////////////////////////////////////////////////////////////////////
@@ -221,8 +229,8 @@ bool ANetworkSession::Session::wantsToRead() {
bool ANetworkSession::Session::wantsToWrite() {
    return !mSawSendFailure
        && (mState == CONNECTING
            || (mState == CONNECTED && !mOutBuffer.empty())
            || (mState == DATAGRAM && !mOutDatagrams.empty()));
            || (mState == CONNECTED && !mOutFragments.empty())
            || (mState == DATAGRAM && !mOutFragments.empty()));
}

status_t ANetworkSession::Session::readMore() {
@@ -407,13 +415,41 @@ status_t ANetworkSession::Session::readMore() {
    return err;
}

void ANetworkSession::Session::dumpFragmentStats(const Fragment &frag) {
#if 0
    int64_t nowUs = ALooper::GetNowUs();
    int64_t delayMs = (nowUs - frag.mTimeUs) / 1000ll;

    static const int64_t kMinDelayMs = 0;
    static const int64_t kMaxDelayMs = 300;

    const char *kPattern = "########################################";
    size_t kPatternSize = strlen(kPattern);

    int n = (kPatternSize * (delayMs - kMinDelayMs))
                / (kMaxDelayMs - kMinDelayMs);

    if (n < 0) {
        n = 0;
    } else if ((size_t)n > kPatternSize) {
        n = kPatternSize;
    }

    ALOGI("[%lld]: (%4lld ms) %s\n",
          frag.mTimeUs / 1000,
          delayMs,
          kPattern + kPatternSize - n);
#endif
}

status_t ANetworkSession::Session::writeMore() {
    if (mState == DATAGRAM) {
        CHECK(!mOutDatagrams.empty());
        CHECK(!mOutFragments.empty());

        status_t err;
        do {
            const sp<ABuffer> &datagram = *mOutDatagrams.begin();
            const Fragment &frag = *mOutFragments.begin();
            const sp<ABuffer> &datagram = frag.mBuffer;

            uint8_t *data = datagram->data();
            if (data[0] == 0x80 && (data[1] & 0x7f) == 33) {
@@ -441,17 +477,21 @@ status_t ANetworkSession::Session::writeMore() {
            err = OK;

            if (n > 0) {
                mOutDatagrams.erase(mOutDatagrams.begin());
                if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
                    dumpFragmentStats(frag);
                }

                mOutFragments.erase(mOutFragments.begin());
            } else if (n < 0) {
                err = -errno;
            } else if (n == 0) {
                err = -ECONNRESET;
            }
        } while (err == OK && !mOutDatagrams.empty());
        } while (err == OK && !mOutFragments.empty());

        if (err == -EAGAIN) {
            if (!mOutDatagrams.empty()) {
                ALOGI("%d datagrams remain queued.", mOutDatagrams.size());
            if (!mOutFragments.empty()) {
                ALOGI("%d datagrams remain queued.", mOutFragments.size());
            }
            err = OK;
        }
@@ -484,23 +524,37 @@ status_t ANetworkSession::Session::writeMore() {
    }

    CHECK_EQ(mState, CONNECTED);
    CHECK(!mOutBuffer.empty());
    CHECK(!mOutFragments.empty());

    ssize_t n;
    while (!mOutFragments.empty()) {
        const Fragment &frag = *mOutFragments.begin();

        do {
        n = send(mSocket, mOutBuffer.c_str(), mOutBuffer.size(), 0);
            n = send(mSocket, frag.mBuffer->data(), frag.mBuffer->size(), 0);
        } while (n < 0 && errno == EINTR);

    status_t err = OK;
        if (n <= 0) {
            break;
        }

    if (n > 0) {
#if 0
        ALOGI("out:");
        hexdump(mOutBuffer.c_str(), n);
#endif
        frag.mBuffer->setRange(
                frag.mBuffer->offset() + n, frag.mBuffer->size() - n);

        mOutBuffer.erase(0, n);
    } else if (n < 0) {
        if (frag.mBuffer->size() > 0) {
            break;
        }

        if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
            dumpFragmentStats(frag);
        }

        mOutFragments.erase(mOutFragments.begin());
    }

    status_t err = OK;

    if (n < 0) {
        err = -errno;
    } else if (n == 0) {
        err = -ECONNRESET;
@@ -537,32 +591,43 @@ status_t ANetworkSession::Session::writeMore() {
    return err;
}

status_t ANetworkSession::Session::sendRequest(const void *data, ssize_t size) {
status_t ANetworkSession::Session::sendRequest(
        const void *data, ssize_t size, bool timeValid, int64_t timeUs) {
    CHECK(mState == CONNECTED || mState == DATAGRAM);

    if (mState == DATAGRAM) {
        CHECK_GE(size, 0);

        sp<ABuffer> datagram = new ABuffer(size);
        memcpy(datagram->data(), data, size);
    if (size < 0) {
        size = strlen((const char *)data);
    }

        mOutDatagrams.push_back(datagram);
    if (size == 0) {
        return OK;
    }

    sp<ABuffer> buffer;

    if (mState == CONNECTED && !mIsRTSPConnection) {
        CHECK_LE(size, 65535);

        uint8_t prefix[2];
        prefix[0] = size >> 8;
        prefix[1] = size & 0xff;
        buffer = new ABuffer(size + 2);
        buffer->data()[0] = size >> 8;
        buffer->data()[1] = size & 0xff;
        memcpy(buffer->data() + 2, data, size);
    } else {
        buffer = new ABuffer(size);
        memcpy(buffer->data(), data, size);
    }

        mOutBuffer.append((const char *)prefix, sizeof(prefix));
    Fragment frag;

    frag.mFlags = 0;
    if (timeValid) {
        frag.mFlags = FRAGMENT_FLAG_TIME_VALID;
        frag.mTimeUs = timeUs;
    }

    mOutBuffer.append(
            (const char *)data,
            (size >= 0) ? size : strlen((const char *)data));
    frag.mBuffer = buffer;

    mOutFragments.push_back(frag);

    return OK;
}
@@ -985,7 +1050,8 @@ status_t ANetworkSession::connectUDPSession(
}

status_t ANetworkSession::sendRequest(
        int32_t sessionID, const void *data, ssize_t size) {
        int32_t sessionID, const void *data, ssize_t size,
        bool timeValid, int64_t timeUs) {
    Mutex::Autolock autoLock(mLock);

    ssize_t index = mSessions.indexOfKey(sessionID);
@@ -996,7 +1062,7 @@ status_t ANetworkSession::sendRequest(

    const sp<Session> session = mSessions.valueAt(index);

    status_t err = session->sendRequest(data, size);
    status_t err = session->sendRequest(data, size, timeValid, timeUs);

    interrupt();

+2 −1
Original line number Diff line number Diff line
@@ -74,7 +74,8 @@ struct ANetworkSession : public RefBase {
    status_t destroySession(int32_t sessionID);

    status_t sendRequest(
            int32_t sessionID, const void *data, ssize_t size = -1);
            int32_t sessionID, const void *data, ssize_t size = -1,
            bool timeValid = false, int64_t timeUs = -1ll);

    enum NotificationReason {
        kWhatError,
+4 −0
Original line number Diff line number Diff line
@@ -252,6 +252,10 @@ status_t MediaSender::queueAccessUnit(
                    fwrite(tsPackets->data(), 1, tsPackets->size(), mLogFile);
                }

                int64_t timeUs;
                CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
                tsPackets->meta()->setInt64("timeUs", timeUs);

                err = mTSSender->queueBuffer(
                        tsPackets,
                        33 /* packetType */,
+16 −5
Original line number Diff line number Diff line
@@ -194,6 +194,9 @@ status_t RTPSender::queueTSPackets(
        const sp<ABuffer> &tsPackets, uint8_t packetType) {
    CHECK_EQ(0, tsPackets->size() % 188);

    int64_t timeUs;
    CHECK(tsPackets->meta()->findInt64("timeUs", &timeUs));

    const size_t numTSPackets = tsPackets->size() / 188;

    size_t srcOffset = 0;
@@ -232,13 +235,19 @@ status_t RTPSender::queueTSPackets(
        memcpy(&rtp[12], tsPackets->data() + srcOffset, numTSPackets * 188);

        udpPacket->setRange(0, 12 + numTSPackets * 188);
        status_t err = sendRTPPacket(udpPacket, true /* storeInHistory */);

        srcOffset += numTSPackets * 188;
        bool isLastPacket = (srcOffset == tsPackets->size());

        status_t err = sendRTPPacket(
                udpPacket,
                true /* storeInHistory */,
                isLastPacket /* timeValid */,
                timeUs);

        if (err != OK) {
            return err;
        }

        srcOffset += numTSPackets * 188;
    }

    return OK;
@@ -395,11 +404,13 @@ status_t RTPSender::queueAVCBuffer(
}

status_t RTPSender::sendRTPPacket(
        const sp<ABuffer> &buffer, bool storeInHistory) {
        const sp<ABuffer> &buffer, bool storeInHistory,
        bool timeValid, int64_t timeUs) {
    CHECK(mRTPConnected);

    status_t err = mNetSession->sendRequest(
            mRTPSessionID, buffer->data(), buffer->size());
            mRTPSessionID, buffer->data(), buffer->size(),
            timeValid, timeUs);

    if (err != OK) {
        return err;
+3 −1
Original line number Diff line number Diff line
@@ -94,7 +94,9 @@ private:
    status_t queueTSPackets(const sp<ABuffer> &tsPackets, uint8_t packetType);
    status_t queueAVCBuffer(const sp<ABuffer> &accessUnit, uint8_t packetType);

    status_t sendRTPPacket(const sp<ABuffer> &packet, bool storeInHistory);
    status_t sendRTPPacket(
            const sp<ABuffer> &packet, bool storeInHistory,
            bool timeValid = false, int64_t timeUs = -1ll);

    void onNetNotify(bool isRTP, const sp<AMessage> &msg);

Loading