Loading include/utils/Looper.h +59 −3 Original line number Original line Diff line number Diff line Loading @@ -20,9 +20,22 @@ #include <utils/threads.h> #include <utils/threads.h> #include <utils/RefBase.h> #include <utils/RefBase.h> #include <utils/KeyedVector.h> #include <utils/KeyedVector.h> #include <utils/Timers.h> #include <android/looper.h> #include <android/looper.h> // Currently using poll() instead of epoll_wait() since it does a better job of meeting a // timeout deadline. epoll_wait() typically causes additional delays of up to 10ms // beyond the requested timeout. //#define LOOPER_USES_EPOLL //#define LOOPER_STATISTICS #ifdef LOOPER_USES_EPOLL #include <sys/epoll.h> #else #include <sys/poll.h> #endif /* /* * Declare a concrete type for the NDK's looper forward declaration. * Declare a concrete type for the NDK's looper forward declaration. */ */ Loading Loading @@ -190,13 +203,54 @@ private: const bool mAllowNonCallbacks; // immutable const bool mAllowNonCallbacks; // immutable int mEpollFd; // immutable int mWakeReadPipeFd; // immutable int mWakeReadPipeFd; // immutable int mWakeWritePipeFd; // immutable int mWakeWritePipeFd; // immutable Mutex mLock; #ifdef LOOPER_USES_EPOLL int mEpollFd; // immutable // Locked list of file descriptor monitoring requests. // Locked list of file descriptor monitoring requests. Mutex mLock; KeyedVector<int, Request> mRequests; // guarded by mLock KeyedVector<int, Request> mRequests; #else // The lock guards state used to track whether there is a poll() in progress and whether // there are any other threads waiting in wakeAndLock(). The condition variables // are used to transfer control among these threads such that all waiters are // serviced before a new poll can begin. // The wakeAndLock() method increments mWaiters, wakes the poll, blocks on mAwake // until mPolling becomes false, then decrements mWaiters again. // The poll() method blocks on mResume until mWaiters becomes 0, then sets // mPolling to true, blocks until the poll completes, then resets mPolling to false // and signals mResume if there are waiters. bool mPolling; // guarded by mLock uint32_t mWaiters; // guarded by mLock Condition mAwake; // guarded by mLock Condition mResume; // guarded by mLock Vector<struct pollfd> mRequestedFds; // must hold mLock and mPolling must be false to modify Vector<Request> mRequests; // must hold mLock and mPolling must be false to modify ssize_t getRequestIndexLocked(int fd); void wakeAndLock(); #endif #ifdef LOOPER_STATISTICS static const int SAMPLED_WAKE_CYCLES_TO_AGGREGATE = 100; static const int SAMPLED_POLLS_TO_AGGREGATE = 1000; nsecs_t mPendingWakeTime; int mPendingWakeCount; int mSampledWakeCycles; int mSampledWakeCountSum; nsecs_t mSampledWakeLatencySum; int mSampledPolls; int mSampledZeroPollCount; int mSampledZeroPollLatencySum; int mSampledTimeoutPollCount; int mSampledTimeoutPollLatencySum; #endif // This state is only used privately by pollOnce and does not require a lock since // This state is only used privately by pollOnce and does not require a lock since // it runs on a single thread. // it runs on a single thread. Loading @@ -204,6 +258,8 @@ private: size_t mResponseIndex; size_t mResponseIndex; int pollInner(int timeoutMillis); int pollInner(int timeoutMillis); void awoken(); void pushResponse(int events, const Request& request); static void initTLSKey(); static void initTLSKey(); static void threadDestructor(void *st); static void threadDestructor(void *st); Loading libs/utils/Looper.cpp +254 −37 Original line number Original line Diff line number Diff line Loading @@ -19,16 +19,17 @@ #include <unistd.h> #include <unistd.h> #include <fcntl.h> #include <fcntl.h> #include <sys/epoll.h> namespace android { namespace android { #ifdef LOOPER_USES_EPOLL // Hint for number of file descriptors to be associated with the epoll instance. // Hint for number of file descriptors to be associated with the epoll instance. static const int EPOLL_SIZE_HINT = 8; static const int EPOLL_SIZE_HINT = 8; // Maximum number of file descriptors for which to retrieve poll events each iteration. // Maximum number of file descriptors for which to retrieve poll events each iteration. static const int EPOLL_MAX_EVENTS = 16; static const int EPOLL_MAX_EVENTS = 16; #endif static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT; static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT; static pthread_key_t gTLSKey = 0; static pthread_key_t gTLSKey = 0; Loading @@ -36,9 +37,6 @@ static pthread_key_t gTLSKey = 0; Looper::Looper(bool allowNonCallbacks) : Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mAllowNonCallbacks(allowNonCallbacks), mResponseIndex(0) { mResponseIndex(0) { mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); int wakeFds[2]; int wakeFds[2]; int result = pipe(wakeFds); int result = pipe(wakeFds); LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno); LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno); Loading @@ -54,6 +52,11 @@ Looper::Looper(bool allowNonCallbacks) : LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d", LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d", errno); errno); #ifdef LOOPER_USES_EPOLL // Allocate the epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); struct epoll_event eventItem; struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.events = EPOLLIN; Loading @@ -61,12 +64,45 @@ Looper::Looper(bool allowNonCallbacks) : result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d", LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d", errno); errno); #else // Add the wake pipe to the head of the request list with a null callback. struct pollfd requestedFd; requestedFd.fd = mWakeReadPipeFd; requestedFd.events = POLLIN; mRequestedFds.push(requestedFd); Request request; request.fd = mWakeReadPipeFd; request.callback = NULL; request.ident = 0; request.data = NULL; mRequests.push(request); mPolling = false; mWaiters = 0; #endif #ifdef LOOPER_STATISTICS mPendingWakeTime = -1; mPendingWakeCount = 0; mSampledWakeCycles = 0; mSampledWakeCountSum = 0; mSampledWakeLatencySum = 0; mSampledPolls = 0; mSampledZeroPollCount = 0; mSampledZeroPollLatencySum = 0; mSampledTimeoutPollCount = 0; mSampledTimeoutPollLatencySum = 0; #endif } } Looper::~Looper() { Looper::~Looper() { close(mWakeReadPipeFd); close(mWakeReadPipeFd); close(mWakeWritePipeFd); close(mWakeWritePipeFd); #ifdef LOOPER_USES_EPOLL close(mEpollFd); close(mEpollFd); #endif } } void Looper::initTLSKey() { void Looper::initTLSKey() { Loading Loading @@ -157,45 +193,61 @@ int Looper::pollInner(int timeoutMillis) { #if DEBUG_POLL_AND_WAKE #if DEBUG_POLL_AND_WAKE LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis); LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis); #endif #endif int result = ALOOPER_POLL_WAKE; mResponses.clear(); mResponseIndex = 0; #ifdef LOOPER_STATISTICS nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC); #endif #ifdef LOOPER_USES_EPOLL struct epoll_event eventItems[EPOLL_MAX_EVENTS]; struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); bool acquiredLock = false; #else // Wait for wakeAndLock() waiters to run then set mPolling to true. mLock.lock(); while (mWaiters != 0) { mResume.wait(mLock); } mPolling = true; mLock.unlock(); size_t requestedCount = mRequestedFds.size(); int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis); #endif if (eventCount < 0) { if (eventCount < 0) { if (errno == EINTR) { if (errno == EINTR) { return ALOOPER_POLL_WAKE; goto Done; } } LOGW("Poll failed with an unexpected error, errno=%d", errno); LOGW("Poll failed with an unexpected error, errno=%d", errno); return ALOOPER_POLL_ERROR; result = ALOOPER_POLL_ERROR; goto Done; } } if (eventCount == 0) { if (eventCount == 0) { #if DEBUG_POLL_AND_WAKE #if DEBUG_POLL_AND_WAKE LOGD("%p ~ pollOnce - timeout", this); LOGD("%p ~ pollOnce - timeout", this); #endif #endif return ALOOPER_POLL_TIMEOUT; result = ALOOPER_POLL_TIMEOUT; goto Done; } } int result = ALOOPER_POLL_WAKE; mResponses.clear(); mResponseIndex = 0; #if DEBUG_POLL_AND_WAKE #if DEBUG_POLL_AND_WAKE LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); #endif #endif bool acquiredLock = false; #ifdef LOOPER_USES_EPOLL for (int i = 0; i < eventCount; i++) { for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeReadPipeFd) { if (fd == mWakeReadPipeFd) { if (epollEvents & EPOLLIN) { if (epollEvents & EPOLLIN) { #if DEBUG_POLL_AND_WAKE awoken(); LOGD("%p ~ pollOnce - awoken", this); #endif char buffer[16]; ssize_t nRead; do { nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer)); } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer)); } else { } else { LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); } } Loading @@ -212,11 +264,7 @@ int Looper::pollInner(int timeoutMillis) { if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT; if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR; if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP; if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); Response response; response.events = events; response.request = mRequests.valueAt(requestIndex); mResponses.push(response); } else { } else { LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); "no longer registered.", epollEvents, fd); Loading @@ -226,6 +274,66 @@ int Looper::pollInner(int timeoutMillis) { if (acquiredLock) { if (acquiredLock) { mLock.unlock(); mLock.unlock(); } } Done: ; #else for (size_t i = 0; i < requestedCount; i++) { const struct pollfd& requestedFd = mRequestedFds.itemAt(i); short pollEvents = requestedFd.revents; if (pollEvents) { if (requestedFd.fd == mWakeReadPipeFd) { if (pollEvents & POLLIN) { awoken(); } else { LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents); } } else { int events = 0; if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT; if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT; if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR; if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP; if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID; pushResponse(events, mRequests.itemAt(i)); } if (--eventCount == 0) { break; } } } Done: // Set mPolling to false and wake up the wakeAndLock() waiters. mLock.lock(); mPolling = false; if (mWaiters != 0) { mAwake.broadcast(); } mLock.unlock(); #endif #ifdef LOOPER_STATISTICS nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC); mSampledPolls += 1; if (timeoutMillis == 0) { mSampledZeroPollCount += 1; mSampledZeroPollLatencySum += pollEndTime - pollStartTime; } else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) { mSampledTimeoutPollCount += 1; mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime - milliseconds_to_nanoseconds(timeoutMillis); } if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) { LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this, 0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount, 0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount); mSampledPolls = 0; mSampledZeroPollCount = 0; mSampledZeroPollLatencySum = 0; mSampledTimeoutPollCount = 0; mSampledTimeoutPollLatencySum = 0; } #endif for (size_t i = 0; i < mResponses.size(); i++) { for (size_t i = 0; i < mResponses.size(); i++) { const Response& response = mResponses.itemAt(i); const Response& response = mResponses.itemAt(i); Loading Loading @@ -278,6 +386,13 @@ void Looper::wake() { LOGD("%p ~ wake", this); LOGD("%p ~ wake", this); #endif #endif #ifdef LOOPER_STATISTICS // FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled. if (mPendingWakeCount++ == 0) { mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC); } #endif ssize_t nWrite; ssize_t nWrite; do { do { nWrite = write(mWakeWritePipeFd, "W", 1); nWrite = write(mWakeWritePipeFd, "W", 1); Loading @@ -290,23 +405,51 @@ void Looper::wake() { } } } } void Looper::awoken() { #if DEBUG_POLL_AND_WAKE LOGD("%p ~ awoken", this); #endif #ifdef LOOPER_STATISTICS if (mPendingWakeCount == 0) { LOGD("%p ~ awoken: spurious!", this); } else { mSampledWakeCycles += 1; mSampledWakeCountSum += mPendingWakeCount; mSampledWakeLatencySum += systemTime(SYSTEM_TIME_MONOTONIC) - mPendingWakeTime; mPendingWakeCount = 0; mPendingWakeTime = -1; if (mSampledWakeCycles == SAMPLED_WAKE_CYCLES_TO_AGGREGATE) { LOGD("%p ~ wake statistics: %0.3fms wake latency, %0.3f wakes per cycle", this, 0.000001f * float(mSampledWakeLatencySum) / mSampledWakeCycles, float(mSampledWakeCountSum) / mSampledWakeCycles); mSampledWakeCycles = 0; mSampledWakeCountSum = 0; mSampledWakeLatencySum = 0; } } #endif char buffer[16]; ssize_t nRead; do { nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer)); } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer)); } void Looper::pushResponse(int events, const Request& request) { Response response; response.events = events; response.request = request; mResponses.push(response); } int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) { int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) { #if DEBUG_CALLBACKS #if DEBUG_CALLBACKS LOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident, LOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident, events, callback, data); events, callback, data); #endif #endif int epollEvents = 0; if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN; if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT; if (events & ALOOPER_EVENT_ERROR) epollEvents |= EPOLLERR; if (events & ALOOPER_EVENT_HANGUP) epollEvents |= EPOLLHUP; if (epollEvents == 0) { LOGE("Invalid attempt to set a callback with no selected poll events."); return -1; } if (! callback) { if (! callback) { if (! mAllowNonCallbacks) { if (! mAllowNonCallbacks) { LOGE("Invalid attempt to set NULL callback but not allowed for this looper."); LOGE("Invalid attempt to set NULL callback but not allowed for this looper."); Loading @@ -319,6 +462,11 @@ int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, } } } } #ifdef LOOPER_USES_EPOLL int epollEvents = 0; if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN; if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT; { // acquire lock { // acquire lock AutoMutex _l(mLock); AutoMutex _l(mLock); Loading Loading @@ -350,6 +498,33 @@ int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, mRequests.replaceValueAt(requestIndex, request); mRequests.replaceValueAt(requestIndex, request); } } } // release lock } // release lock #else int pollEvents = 0; if (events & ALOOPER_EVENT_INPUT) pollEvents |= POLLIN; if (events & ALOOPER_EVENT_OUTPUT) pollEvents |= POLLOUT; wakeAndLock(); // acquire lock struct pollfd requestedFd; requestedFd.fd = fd; requestedFd.events = pollEvents; Request request; request.fd = fd; request.ident = ident; request.callback = callback; request.data = data; ssize_t index = getRequestIndexLocked(fd); if (index < 0) { mRequestedFds.push(requestedFd); mRequests.push(request); } else { mRequestedFds.replaceAt(requestedFd, size_t(index)); mRequests.replaceAt(request, size_t(index)); } mLock.unlock(); // release lock #endif return 1; return 1; } } Loading @@ -358,6 +533,7 @@ int Looper::removeFd(int fd) { LOGD("%p ~ removeFd - fd=%d", this, fd); LOGD("%p ~ removeFd - fd=%d", this, fd); #endif #endif #ifdef LOOPER_USES_EPOLL { // acquire lock { // acquire lock AutoMutex _l(mLock); AutoMutex _l(mLock); ssize_t requestIndex = mRequests.indexOfKey(fd); ssize_t requestIndex = mRequests.indexOfKey(fd); Loading @@ -372,8 +548,49 @@ int Looper::removeFd(int fd) { } } mRequests.removeItemsAt(requestIndex); mRequests.removeItemsAt(requestIndex); } // request lock } // release lock return 1; return 1; #else wakeAndLock(); // acquire lock ssize_t index = getRequestIndexLocked(fd); if (index >= 0) { mRequestedFds.removeAt(size_t(index)); mRequests.removeAt(size_t(index)); } mLock.unlock(); // release lock return index >= 0; #endif } #ifndef LOOPER_USES_EPOLL ssize_t Looper::getRequestIndexLocked(int fd) { size_t requestCount = mRequestedFds.size(); for (size_t i = 0; i < requestCount; i++) { if (mRequestedFds.itemAt(i).fd == fd) { return i; } } return -1; } } void Looper::wakeAndLock() { mLock.lock(); mWaiters += 1; while (mPolling) { wake(); mAwake.wait(mLock); } mWaiters -= 1; if (mWaiters == 0) { mResume.signal(); } } #endif } // namespace android } // namespace android libs/utils/tests/Looper_test.cpp +0 −8 Original line number Original line Diff line number Diff line Loading @@ -354,14 +354,6 @@ TEST_F(LooperTest, AddFd_WhenCallbackAdded_ReturnsOne) { << "addFd should return 1 because FD was added"; << "addFd should return 1 because FD was added"; } } TEST_F(LooperTest, AddFd_WhenEventsIsZero_ReturnsError) { Pipe pipe; int result = mLooper->addFd(pipe.receiveFd, 0, 0, NULL, NULL); EXPECT_EQ(-1, result) << "addFd should return -1 because arguments were invalid"; } TEST_F(LooperTest, AddFd_WhenIdentIsNegativeAndCallbackIsNull_ReturnsError) { TEST_F(LooperTest, AddFd_WhenIdentIsNegativeAndCallbackIsNull_ReturnsError) { Pipe pipe; Pipe pipe; int result = mLooper->addFd(pipe.receiveFd, -1, ALOOPER_EVENT_INPUT, NULL, NULL); int result = mLooper->addFd(pipe.receiveFd, -1, ALOOPER_EVENT_INPUT, NULL, NULL); Loading Loading
include/utils/Looper.h +59 −3 Original line number Original line Diff line number Diff line Loading @@ -20,9 +20,22 @@ #include <utils/threads.h> #include <utils/threads.h> #include <utils/RefBase.h> #include <utils/RefBase.h> #include <utils/KeyedVector.h> #include <utils/KeyedVector.h> #include <utils/Timers.h> #include <android/looper.h> #include <android/looper.h> // Currently using poll() instead of epoll_wait() since it does a better job of meeting a // timeout deadline. epoll_wait() typically causes additional delays of up to 10ms // beyond the requested timeout. //#define LOOPER_USES_EPOLL //#define LOOPER_STATISTICS #ifdef LOOPER_USES_EPOLL #include <sys/epoll.h> #else #include <sys/poll.h> #endif /* /* * Declare a concrete type for the NDK's looper forward declaration. * Declare a concrete type for the NDK's looper forward declaration. */ */ Loading Loading @@ -190,13 +203,54 @@ private: const bool mAllowNonCallbacks; // immutable const bool mAllowNonCallbacks; // immutable int mEpollFd; // immutable int mWakeReadPipeFd; // immutable int mWakeReadPipeFd; // immutable int mWakeWritePipeFd; // immutable int mWakeWritePipeFd; // immutable Mutex mLock; #ifdef LOOPER_USES_EPOLL int mEpollFd; // immutable // Locked list of file descriptor monitoring requests. // Locked list of file descriptor monitoring requests. Mutex mLock; KeyedVector<int, Request> mRequests; // guarded by mLock KeyedVector<int, Request> mRequests; #else // The lock guards state used to track whether there is a poll() in progress and whether // there are any other threads waiting in wakeAndLock(). The condition variables // are used to transfer control among these threads such that all waiters are // serviced before a new poll can begin. // The wakeAndLock() method increments mWaiters, wakes the poll, blocks on mAwake // until mPolling becomes false, then decrements mWaiters again. // The poll() method blocks on mResume until mWaiters becomes 0, then sets // mPolling to true, blocks until the poll completes, then resets mPolling to false // and signals mResume if there are waiters. bool mPolling; // guarded by mLock uint32_t mWaiters; // guarded by mLock Condition mAwake; // guarded by mLock Condition mResume; // guarded by mLock Vector<struct pollfd> mRequestedFds; // must hold mLock and mPolling must be false to modify Vector<Request> mRequests; // must hold mLock and mPolling must be false to modify ssize_t getRequestIndexLocked(int fd); void wakeAndLock(); #endif #ifdef LOOPER_STATISTICS static const int SAMPLED_WAKE_CYCLES_TO_AGGREGATE = 100; static const int SAMPLED_POLLS_TO_AGGREGATE = 1000; nsecs_t mPendingWakeTime; int mPendingWakeCount; int mSampledWakeCycles; int mSampledWakeCountSum; nsecs_t mSampledWakeLatencySum; int mSampledPolls; int mSampledZeroPollCount; int mSampledZeroPollLatencySum; int mSampledTimeoutPollCount; int mSampledTimeoutPollLatencySum; #endif // This state is only used privately by pollOnce and does not require a lock since // This state is only used privately by pollOnce and does not require a lock since // it runs on a single thread. // it runs on a single thread. Loading @@ -204,6 +258,8 @@ private: size_t mResponseIndex; size_t mResponseIndex; int pollInner(int timeoutMillis); int pollInner(int timeoutMillis); void awoken(); void pushResponse(int events, const Request& request); static void initTLSKey(); static void initTLSKey(); static void threadDestructor(void *st); static void threadDestructor(void *st); Loading
libs/utils/Looper.cpp +254 −37 Original line number Original line Diff line number Diff line Loading @@ -19,16 +19,17 @@ #include <unistd.h> #include <unistd.h> #include <fcntl.h> #include <fcntl.h> #include <sys/epoll.h> namespace android { namespace android { #ifdef LOOPER_USES_EPOLL // Hint for number of file descriptors to be associated with the epoll instance. // Hint for number of file descriptors to be associated with the epoll instance. static const int EPOLL_SIZE_HINT = 8; static const int EPOLL_SIZE_HINT = 8; // Maximum number of file descriptors for which to retrieve poll events each iteration. // Maximum number of file descriptors for which to retrieve poll events each iteration. static const int EPOLL_MAX_EVENTS = 16; static const int EPOLL_MAX_EVENTS = 16; #endif static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT; static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT; static pthread_key_t gTLSKey = 0; static pthread_key_t gTLSKey = 0; Loading @@ -36,9 +37,6 @@ static pthread_key_t gTLSKey = 0; Looper::Looper(bool allowNonCallbacks) : Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mAllowNonCallbacks(allowNonCallbacks), mResponseIndex(0) { mResponseIndex(0) { mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); int wakeFds[2]; int wakeFds[2]; int result = pipe(wakeFds); int result = pipe(wakeFds); LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno); LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno); Loading @@ -54,6 +52,11 @@ Looper::Looper(bool allowNonCallbacks) : LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d", LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d", errno); errno); #ifdef LOOPER_USES_EPOLL // Allocate the epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); struct epoll_event eventItem; struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.events = EPOLLIN; Loading @@ -61,12 +64,45 @@ Looper::Looper(bool allowNonCallbacks) : result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d", LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d", errno); errno); #else // Add the wake pipe to the head of the request list with a null callback. struct pollfd requestedFd; requestedFd.fd = mWakeReadPipeFd; requestedFd.events = POLLIN; mRequestedFds.push(requestedFd); Request request; request.fd = mWakeReadPipeFd; request.callback = NULL; request.ident = 0; request.data = NULL; mRequests.push(request); mPolling = false; mWaiters = 0; #endif #ifdef LOOPER_STATISTICS mPendingWakeTime = -1; mPendingWakeCount = 0; mSampledWakeCycles = 0; mSampledWakeCountSum = 0; mSampledWakeLatencySum = 0; mSampledPolls = 0; mSampledZeroPollCount = 0; mSampledZeroPollLatencySum = 0; mSampledTimeoutPollCount = 0; mSampledTimeoutPollLatencySum = 0; #endif } } Looper::~Looper() { Looper::~Looper() { close(mWakeReadPipeFd); close(mWakeReadPipeFd); close(mWakeWritePipeFd); close(mWakeWritePipeFd); #ifdef LOOPER_USES_EPOLL close(mEpollFd); close(mEpollFd); #endif } } void Looper::initTLSKey() { void Looper::initTLSKey() { Loading Loading @@ -157,45 +193,61 @@ int Looper::pollInner(int timeoutMillis) { #if DEBUG_POLL_AND_WAKE #if DEBUG_POLL_AND_WAKE LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis); LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis); #endif #endif int result = ALOOPER_POLL_WAKE; mResponses.clear(); mResponseIndex = 0; #ifdef LOOPER_STATISTICS nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC); #endif #ifdef LOOPER_USES_EPOLL struct epoll_event eventItems[EPOLL_MAX_EVENTS]; struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); bool acquiredLock = false; #else // Wait for wakeAndLock() waiters to run then set mPolling to true. mLock.lock(); while (mWaiters != 0) { mResume.wait(mLock); } mPolling = true; mLock.unlock(); size_t requestedCount = mRequestedFds.size(); int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis); #endif if (eventCount < 0) { if (eventCount < 0) { if (errno == EINTR) { if (errno == EINTR) { return ALOOPER_POLL_WAKE; goto Done; } } LOGW("Poll failed with an unexpected error, errno=%d", errno); LOGW("Poll failed with an unexpected error, errno=%d", errno); return ALOOPER_POLL_ERROR; result = ALOOPER_POLL_ERROR; goto Done; } } if (eventCount == 0) { if (eventCount == 0) { #if DEBUG_POLL_AND_WAKE #if DEBUG_POLL_AND_WAKE LOGD("%p ~ pollOnce - timeout", this); LOGD("%p ~ pollOnce - timeout", this); #endif #endif return ALOOPER_POLL_TIMEOUT; result = ALOOPER_POLL_TIMEOUT; goto Done; } } int result = ALOOPER_POLL_WAKE; mResponses.clear(); mResponseIndex = 0; #if DEBUG_POLL_AND_WAKE #if DEBUG_POLL_AND_WAKE LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); #endif #endif bool acquiredLock = false; #ifdef LOOPER_USES_EPOLL for (int i = 0; i < eventCount; i++) { for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeReadPipeFd) { if (fd == mWakeReadPipeFd) { if (epollEvents & EPOLLIN) { if (epollEvents & EPOLLIN) { #if DEBUG_POLL_AND_WAKE awoken(); LOGD("%p ~ pollOnce - awoken", this); #endif char buffer[16]; ssize_t nRead; do { nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer)); } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer)); } else { } else { LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); } } Loading @@ -212,11 +264,7 @@ int Looper::pollInner(int timeoutMillis) { if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT; if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR; if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP; if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); Response response; response.events = events; response.request = mRequests.valueAt(requestIndex); mResponses.push(response); } else { } else { LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); "no longer registered.", epollEvents, fd); Loading @@ -226,6 +274,66 @@ int Looper::pollInner(int timeoutMillis) { if (acquiredLock) { if (acquiredLock) { mLock.unlock(); mLock.unlock(); } } Done: ; #else for (size_t i = 0; i < requestedCount; i++) { const struct pollfd& requestedFd = mRequestedFds.itemAt(i); short pollEvents = requestedFd.revents; if (pollEvents) { if (requestedFd.fd == mWakeReadPipeFd) { if (pollEvents & POLLIN) { awoken(); } else { LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents); } } else { int events = 0; if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT; if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT; if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR; if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP; if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID; pushResponse(events, mRequests.itemAt(i)); } if (--eventCount == 0) { break; } } } Done: // Set mPolling to false and wake up the wakeAndLock() waiters. mLock.lock(); mPolling = false; if (mWaiters != 0) { mAwake.broadcast(); } mLock.unlock(); #endif #ifdef LOOPER_STATISTICS nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC); mSampledPolls += 1; if (timeoutMillis == 0) { mSampledZeroPollCount += 1; mSampledZeroPollLatencySum += pollEndTime - pollStartTime; } else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) { mSampledTimeoutPollCount += 1; mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime - milliseconds_to_nanoseconds(timeoutMillis); } if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) { LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this, 0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount, 0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount); mSampledPolls = 0; mSampledZeroPollCount = 0; mSampledZeroPollLatencySum = 0; mSampledTimeoutPollCount = 0; mSampledTimeoutPollLatencySum = 0; } #endif for (size_t i = 0; i < mResponses.size(); i++) { for (size_t i = 0; i < mResponses.size(); i++) { const Response& response = mResponses.itemAt(i); const Response& response = mResponses.itemAt(i); Loading Loading @@ -278,6 +386,13 @@ void Looper::wake() { LOGD("%p ~ wake", this); LOGD("%p ~ wake", this); #endif #endif #ifdef LOOPER_STATISTICS // FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled. if (mPendingWakeCount++ == 0) { mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC); } #endif ssize_t nWrite; ssize_t nWrite; do { do { nWrite = write(mWakeWritePipeFd, "W", 1); nWrite = write(mWakeWritePipeFd, "W", 1); Loading @@ -290,23 +405,51 @@ void Looper::wake() { } } } } void Looper::awoken() { #if DEBUG_POLL_AND_WAKE LOGD("%p ~ awoken", this); #endif #ifdef LOOPER_STATISTICS if (mPendingWakeCount == 0) { LOGD("%p ~ awoken: spurious!", this); } else { mSampledWakeCycles += 1; mSampledWakeCountSum += mPendingWakeCount; mSampledWakeLatencySum += systemTime(SYSTEM_TIME_MONOTONIC) - mPendingWakeTime; mPendingWakeCount = 0; mPendingWakeTime = -1; if (mSampledWakeCycles == SAMPLED_WAKE_CYCLES_TO_AGGREGATE) { LOGD("%p ~ wake statistics: %0.3fms wake latency, %0.3f wakes per cycle", this, 0.000001f * float(mSampledWakeLatencySum) / mSampledWakeCycles, float(mSampledWakeCountSum) / mSampledWakeCycles); mSampledWakeCycles = 0; mSampledWakeCountSum = 0; mSampledWakeLatencySum = 0; } } #endif char buffer[16]; ssize_t nRead; do { nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer)); } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer)); } void Looper::pushResponse(int events, const Request& request) { Response response; response.events = events; response.request = request; mResponses.push(response); } int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) { int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) { #if DEBUG_CALLBACKS #if DEBUG_CALLBACKS LOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident, LOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident, events, callback, data); events, callback, data); #endif #endif int epollEvents = 0; if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN; if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT; if (events & ALOOPER_EVENT_ERROR) epollEvents |= EPOLLERR; if (events & ALOOPER_EVENT_HANGUP) epollEvents |= EPOLLHUP; if (epollEvents == 0) { LOGE("Invalid attempt to set a callback with no selected poll events."); return -1; } if (! callback) { if (! callback) { if (! mAllowNonCallbacks) { if (! mAllowNonCallbacks) { LOGE("Invalid attempt to set NULL callback but not allowed for this looper."); LOGE("Invalid attempt to set NULL callback but not allowed for this looper."); Loading @@ -319,6 +462,11 @@ int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, } } } } #ifdef LOOPER_USES_EPOLL int epollEvents = 0; if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN; if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT; { // acquire lock { // acquire lock AutoMutex _l(mLock); AutoMutex _l(mLock); Loading Loading @@ -350,6 +498,33 @@ int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, mRequests.replaceValueAt(requestIndex, request); mRequests.replaceValueAt(requestIndex, request); } } } // release lock } // release lock #else int pollEvents = 0; if (events & ALOOPER_EVENT_INPUT) pollEvents |= POLLIN; if (events & ALOOPER_EVENT_OUTPUT) pollEvents |= POLLOUT; wakeAndLock(); // acquire lock struct pollfd requestedFd; requestedFd.fd = fd; requestedFd.events = pollEvents; Request request; request.fd = fd; request.ident = ident; request.callback = callback; request.data = data; ssize_t index = getRequestIndexLocked(fd); if (index < 0) { mRequestedFds.push(requestedFd); mRequests.push(request); } else { mRequestedFds.replaceAt(requestedFd, size_t(index)); mRequests.replaceAt(request, size_t(index)); } mLock.unlock(); // release lock #endif return 1; return 1; } } Loading @@ -358,6 +533,7 @@ int Looper::removeFd(int fd) { LOGD("%p ~ removeFd - fd=%d", this, fd); LOGD("%p ~ removeFd - fd=%d", this, fd); #endif #endif #ifdef LOOPER_USES_EPOLL { // acquire lock { // acquire lock AutoMutex _l(mLock); AutoMutex _l(mLock); ssize_t requestIndex = mRequests.indexOfKey(fd); ssize_t requestIndex = mRequests.indexOfKey(fd); Loading @@ -372,8 +548,49 @@ int Looper::removeFd(int fd) { } } mRequests.removeItemsAt(requestIndex); mRequests.removeItemsAt(requestIndex); } // request lock } // release lock return 1; return 1; #else wakeAndLock(); // acquire lock ssize_t index = getRequestIndexLocked(fd); if (index >= 0) { mRequestedFds.removeAt(size_t(index)); mRequests.removeAt(size_t(index)); } mLock.unlock(); // release lock return index >= 0; #endif } #ifndef LOOPER_USES_EPOLL ssize_t Looper::getRequestIndexLocked(int fd) { size_t requestCount = mRequestedFds.size(); for (size_t i = 0; i < requestCount; i++) { if (mRequestedFds.itemAt(i).fd == fd) { return i; } } return -1; } } void Looper::wakeAndLock() { mLock.lock(); mWaiters += 1; while (mPolling) { wake(); mAwake.wait(mLock); } mWaiters -= 1; if (mWaiters == 0) { mResume.signal(); } } #endif } // namespace android } // namespace android
libs/utils/tests/Looper_test.cpp +0 −8 Original line number Original line Diff line number Diff line Loading @@ -354,14 +354,6 @@ TEST_F(LooperTest, AddFd_WhenCallbackAdded_ReturnsOne) { << "addFd should return 1 because FD was added"; << "addFd should return 1 because FD was added"; } } TEST_F(LooperTest, AddFd_WhenEventsIsZero_ReturnsError) { Pipe pipe; int result = mLooper->addFd(pipe.receiveFd, 0, 0, NULL, NULL); EXPECT_EQ(-1, result) << "addFd should return -1 because arguments were invalid"; } TEST_F(LooperTest, AddFd_WhenIdentIsNegativeAndCallbackIsNull_ReturnsError) { TEST_F(LooperTest, AddFd_WhenIdentIsNegativeAndCallbackIsNull_ReturnsError) { Pipe pipe; Pipe pipe; int result = mLooper->addFd(pipe.receiveFd, -1, ALOOPER_EVENT_INPUT, NULL, NULL); int result = mLooper->addFd(pipe.receiveFd, -1, ALOOPER_EVENT_INPUT, NULL, NULL); Loading