Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2eb1995a authored by Jeff Brown's avatar Jeff Brown Committed by Android Git Automerger
Browse files

am 22cb4ef8: am d577cfd7: Merge "Switch Looper back to using poll() instead of...

am 22cb4ef8: am d577cfd7: Merge "Switch Looper back to using poll() instead of epoll()." into gingerbread

Merge commit '22cb4ef8ce9c4d5536ac5cee5c40b82bfa56ccc5'

* commit '22cb4ef8ce9c4d5536ac5cee5c40b82bfa56ccc5':
  Switch Looper back to using poll() instead of epoll().
parents 5a0a45ff f4f8284f
Loading
Loading
Loading
Loading
+59 −3
Original line number Diff line number Diff line
@@ -20,9 +20,22 @@
#include <utils/threads.h>
#include <utils/RefBase.h>
#include <utils/KeyedVector.h>
#include <utils/Timers.h>

#include <android/looper.h>

// Currently using poll() instead of epoll_wait() since it does a better job of meeting a
// timeout deadline.  epoll_wait() typically causes additional delays of up to 10ms
// beyond the requested timeout.
//#define LOOPER_USES_EPOLL
//#define LOOPER_STATISTICS

#ifdef LOOPER_USES_EPOLL
#include <sys/epoll.h>
#else
#include <sys/poll.h>
#endif

/*
 * Declare a concrete type for the NDK's looper forward declaration.
 */
@@ -190,13 +203,54 @@ private:

    const bool mAllowNonCallbacks; // immutable

    int mEpollFd; // immutable
    int mWakeReadPipeFd;  // immutable
    int mWakeWritePipeFd; // immutable
    Mutex mLock;

#ifdef LOOPER_USES_EPOLL
    int mEpollFd; // immutable

    // Locked list of file descriptor monitoring requests.
    Mutex mLock;
    KeyedVector<int, Request> mRequests;
    KeyedVector<int, Request> mRequests;  // guarded by mLock
#else
    // The lock guards state used to track whether there is a poll() in progress and whether
    // there are any other threads waiting in wakeAndLock().  The condition variables
    // are used to transfer control among these threads such that all waiters are
    // serviced before a new poll can begin.
    // The wakeAndLock() method increments mWaiters, wakes the poll, blocks on mAwake
    // until mPolling becomes false, then decrements mWaiters again.
    // The poll() method blocks on mResume until mWaiters becomes 0, then sets
    // mPolling to true, blocks until the poll completes, then resets mPolling to false
    // and signals mResume if there are waiters.
    bool mPolling;      // guarded by mLock
    uint32_t mWaiters;  // guarded by mLock
    Condition mAwake;   // guarded by mLock
    Condition mResume;  // guarded by mLock

    Vector<struct pollfd> mRequestedFds;  // must hold mLock and mPolling must be false to modify
    Vector<Request> mRequests;            // must hold mLock and mPolling must be false to modify

    ssize_t getRequestIndexLocked(int fd);
    void wakeAndLock();
#endif

#ifdef LOOPER_STATISTICS
    static const int SAMPLED_WAKE_CYCLES_TO_AGGREGATE = 100;
    static const int SAMPLED_POLLS_TO_AGGREGATE = 1000;

    nsecs_t mPendingWakeTime;
    int mPendingWakeCount;

    int mSampledWakeCycles;
    int mSampledWakeCountSum;
    nsecs_t mSampledWakeLatencySum;

    int mSampledPolls;
    int mSampledZeroPollCount;
    int mSampledZeroPollLatencySum;
    int mSampledTimeoutPollCount;
    int mSampledTimeoutPollLatencySum;
#endif

    // This state is only used privately by pollOnce and does not require a lock since
    // it runs on a single thread.
@@ -204,6 +258,8 @@ private:
    size_t mResponseIndex;

    int pollInner(int timeoutMillis);
    void awoken();
    void pushResponse(int events, const Request& request);

    static void initTLSKey();
    static void threadDestructor(void *st);
+254 −37
Original line number Diff line number Diff line
@@ -19,16 +19,17 @@

#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>


namespace android {

#ifdef LOOPER_USES_EPOLL
// Hint for number of file descriptors to be associated with the epoll instance.
static const int EPOLL_SIZE_HINT = 8;

// Maximum number of file descriptors for which to retrieve poll events each iteration.
static const int EPOLL_MAX_EVENTS = 16;
#endif

static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
static pthread_key_t gTLSKey = 0;
@@ -36,9 +37,6 @@ static pthread_key_t gTLSKey = 0;
Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks),
        mResponseIndex(0) {
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);

    int wakeFds[2];
    int result = pipe(wakeFds);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe.  errno=%d", errno);
@@ -54,6 +52,11 @@ Looper::Looper(bool allowNonCallbacks) :
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking.  errno=%d",
            errno);

#ifdef LOOPER_USES_EPOLL
    // Allocate the epoll instance and register the wake pipe.
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);

    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
@@ -61,12 +64,45 @@ Looper::Looper(bool allowNonCallbacks) :
    result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance.  errno=%d",
            errno);
#else
    // Add the wake pipe to the head of the request list with a null callback.
    struct pollfd requestedFd;
    requestedFd.fd = mWakeReadPipeFd;
    requestedFd.events = POLLIN;
    mRequestedFds.push(requestedFd);

    Request request;
    request.fd = mWakeReadPipeFd;
    request.callback = NULL;
    request.ident = 0;
    request.data = NULL;
    mRequests.push(request);

    mPolling = false;
    mWaiters = 0;
#endif

#ifdef LOOPER_STATISTICS
    mPendingWakeTime = -1;
    mPendingWakeCount = 0;
    mSampledWakeCycles = 0;
    mSampledWakeCountSum = 0;
    mSampledWakeLatencySum = 0;

    mSampledPolls = 0;
    mSampledZeroPollCount = 0;
    mSampledZeroPollLatencySum = 0;
    mSampledTimeoutPollCount = 0;
    mSampledTimeoutPollLatencySum = 0;
#endif
}

Looper::~Looper() {
    close(mWakeReadPipeFd);
    close(mWakeWritePipeFd);
#ifdef LOOPER_USES_EPOLL
    close(mEpollFd);
#endif
}

void Looper::initTLSKey() {
@@ -157,45 +193,61 @@ int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
    LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif

    int result = ALOOPER_POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

#ifdef LOOPER_STATISTICS
    nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC);
#endif

#ifdef LOOPER_USES_EPOLL
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    bool acquiredLock = false;
#else
    // Wait for wakeAndLock() waiters to run then set mPolling to true.
    mLock.lock();
    while (mWaiters != 0) {
        mResume.wait(mLock);
    }
    mPolling = true;
    mLock.unlock();

    size_t requestedCount = mRequestedFds.size();
    int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis);
#endif

    if (eventCount < 0) {
        if (errno == EINTR) {
            return ALOOPER_POLL_WAKE;
            goto Done;
        }

        LOGW("Poll failed with an unexpected error, errno=%d", errno);
        return ALOOPER_POLL_ERROR;
        result = ALOOPER_POLL_ERROR;
        goto Done;
    }

    if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
        LOGD("%p ~ pollOnce - timeout", this);
#endif
        return ALOOPER_POLL_TIMEOUT;
        result = ALOOPER_POLL_TIMEOUT;
        goto Done;
    }

    int result = ALOOPER_POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

#if DEBUG_POLL_AND_WAKE
    LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
    bool acquiredLock = false;

#ifdef LOOPER_USES_EPOLL
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeReadPipeFd) {
            if (epollEvents & EPOLLIN) {
#if DEBUG_POLL_AND_WAKE
                LOGD("%p ~ pollOnce - awoken", this);
#endif
                char buffer[16];
                ssize_t nRead;
                do {
                    nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
                } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
                awoken();
            } else {
                LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
            }
@@ -212,11 +264,7 @@ int Looper::pollInner(int timeoutMillis) {
                if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;

                Response response;
                response.events = events;
                response.request = mRequests.valueAt(requestIndex);
                mResponses.push(response);
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
@@ -226,6 +274,66 @@ int Looper::pollInner(int timeoutMillis) {
    if (acquiredLock) {
        mLock.unlock();
    }
Done: ;
#else
    for (size_t i = 0; i < requestedCount; i++) {
        const struct pollfd& requestedFd = mRequestedFds.itemAt(i);

        short pollEvents = requestedFd.revents;
        if (pollEvents) {
            if (requestedFd.fd == mWakeReadPipeFd) {
                if (pollEvents & POLLIN) {
                    awoken();
                } else {
                    LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents);
                }
            } else {
                int events = 0;
                if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT;
                if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT;
                if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR;
                if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP;
                if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID;
                pushResponse(events, mRequests.itemAt(i));
            }
            if (--eventCount == 0) {
                break;
            }
        }
    }

Done:
    // Set mPolling to false and wake up the wakeAndLock() waiters.
    mLock.lock();
    mPolling = false;
    if (mWaiters != 0) {
        mAwake.broadcast();
    }
    mLock.unlock();
#endif

#ifdef LOOPER_STATISTICS
    nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC);
    mSampledPolls += 1;
    if (timeoutMillis == 0) {
        mSampledZeroPollCount += 1;
        mSampledZeroPollLatencySum += pollEndTime - pollStartTime;
    } else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) {
        mSampledTimeoutPollCount += 1;
        mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime
                - milliseconds_to_nanoseconds(timeoutMillis);
    }
    if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) {
        LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this,
                0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount,
                0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount);
        mSampledPolls = 0;
        mSampledZeroPollCount = 0;
        mSampledZeroPollLatencySum = 0;
        mSampledTimeoutPollCount = 0;
        mSampledTimeoutPollLatencySum = 0;
    }
#endif

    for (size_t i = 0; i < mResponses.size(); i++) {
        const Response& response = mResponses.itemAt(i);
@@ -278,6 +386,13 @@ void Looper::wake() {
    LOGD("%p ~ wake", this);
#endif

#ifdef LOOPER_STATISTICS
    // FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled.
    if (mPendingWakeCount++ == 0) {
        mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC);
    }
#endif

    ssize_t nWrite;
    do {
        nWrite = write(mWakeWritePipeFd, "W", 1);
@@ -290,23 +405,51 @@ void Looper::wake() {
    }
}

void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
    LOGD("%p ~ awoken", this);
#endif

#ifdef LOOPER_STATISTICS
    if (mPendingWakeCount == 0) {
        LOGD("%p ~ awoken: spurious!", this);
    } else {
        mSampledWakeCycles += 1;
        mSampledWakeCountSum += mPendingWakeCount;
        mSampledWakeLatencySum += systemTime(SYSTEM_TIME_MONOTONIC) - mPendingWakeTime;
        mPendingWakeCount = 0;
        mPendingWakeTime = -1;
        if (mSampledWakeCycles == SAMPLED_WAKE_CYCLES_TO_AGGREGATE) {
            LOGD("%p ~ wake statistics: %0.3fms wake latency, %0.3f wakes per cycle", this,
                    0.000001f * float(mSampledWakeLatencySum) / mSampledWakeCycles,
                    float(mSampledWakeCountSum) / mSampledWakeCycles);
            mSampledWakeCycles = 0;
            mSampledWakeCountSum = 0;
            mSampledWakeLatencySum = 0;
        }
    }
#endif

    char buffer[16];
    ssize_t nRead;
    do {
        nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}

void Looper::pushResponse(int events, const Request& request) {
    Response response;
    response.events = events;
    response.request = request;
    mResponses.push(response);
}

int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) {
#if DEBUG_CALLBACKS
    LOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
            events, callback, data);
#endif

    int epollEvents = 0;
    if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
    if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
    if (events & ALOOPER_EVENT_ERROR) epollEvents |= EPOLLERR;
    if (events & ALOOPER_EVENT_HANGUP) epollEvents |= EPOLLHUP;

    if (epollEvents == 0) {
        LOGE("Invalid attempt to set a callback with no selected poll events.");
        return -1;
    }

    if (! callback) {
        if (! mAllowNonCallbacks) {
            LOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
@@ -319,6 +462,11 @@ int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback,
        }
    }

#ifdef LOOPER_USES_EPOLL
    int epollEvents = 0;
    if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
    if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;

    { // acquire lock
        AutoMutex _l(mLock);

@@ -350,6 +498,33 @@ int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback,
            mRequests.replaceValueAt(requestIndex, request);
        }
    } // release lock
#else
    int pollEvents = 0;
    if (events & ALOOPER_EVENT_INPUT) pollEvents |= POLLIN;
    if (events & ALOOPER_EVENT_OUTPUT) pollEvents |= POLLOUT;

    wakeAndLock(); // acquire lock

    struct pollfd requestedFd;
    requestedFd.fd = fd;
    requestedFd.events = pollEvents;

    Request request;
    request.fd = fd;
    request.ident = ident;
    request.callback = callback;
    request.data = data;
    ssize_t index = getRequestIndexLocked(fd);
    if (index < 0) {
        mRequestedFds.push(requestedFd);
        mRequests.push(request);
    } else {
        mRequestedFds.replaceAt(requestedFd, size_t(index));
        mRequests.replaceAt(request, size_t(index));
    }

    mLock.unlock(); // release lock
#endif
    return 1;
}

@@ -358,6 +533,7 @@ int Looper::removeFd(int fd) {
    LOGD("%p ~ removeFd - fd=%d", this, fd);
#endif

#ifdef LOOPER_USES_EPOLL
    { // acquire lock
        AutoMutex _l(mLock);
        ssize_t requestIndex = mRequests.indexOfKey(fd);
@@ -372,8 +548,49 @@ int Looper::removeFd(int fd) {
        }

        mRequests.removeItemsAt(requestIndex);
    } // request lock
    } // release lock
    return 1;
#else
    wakeAndLock(); // acquire lock

    ssize_t index = getRequestIndexLocked(fd);
    if (index >= 0) {
        mRequestedFds.removeAt(size_t(index));
        mRequests.removeAt(size_t(index));
    }

    mLock.unlock(); // release lock
    return index >= 0;
#endif
}

#ifndef LOOPER_USES_EPOLL
ssize_t Looper::getRequestIndexLocked(int fd) {
    size_t requestCount = mRequestedFds.size();

    for (size_t i = 0; i < requestCount; i++) {
        if (mRequestedFds.itemAt(i).fd == fd) {
            return i;
        }
    }

    return -1;
}

void Looper::wakeAndLock() {
    mLock.lock();

    mWaiters += 1;
    while (mPolling) {
        wake();
        mAwake.wait(mLock);
    }

    mWaiters -= 1;
    if (mWaiters == 0) {
        mResume.signal();
    }
}
#endif

} // namespace android
+0 −8
Original line number Diff line number Diff line
@@ -354,14 +354,6 @@ TEST_F(LooperTest, AddFd_WhenCallbackAdded_ReturnsOne) {
            << "addFd should return 1 because FD was added";
}

TEST_F(LooperTest, AddFd_WhenEventsIsZero_ReturnsError) {
    Pipe pipe;
    int result = mLooper->addFd(pipe.receiveFd, 0, 0, NULL, NULL);

    EXPECT_EQ(-1, result)
            << "addFd should return -1 because arguments were invalid";
}

TEST_F(LooperTest, AddFd_WhenIdentIsNegativeAndCallbackIsNull_ReturnsError) {
    Pipe pipe;
    int result = mLooper->addFd(pipe.receiveFd, -1, ALOOPER_EVENT_INPUT, NULL, NULL);