Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 663c692a authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "Looper: Use sequence numbers in epoll_event to track requests"

parents 237ec60d 729057ab
Loading
Loading
Loading
Loading
+88 −92
Original line number Diff line number Diff line
@@ -20,6 +20,16 @@

namespace android {

namespace {

constexpr uint64_t WAKE_EVENT_FD_SEQ = 1;

epoll_event createEpollEvent(uint32_t events, uint64_t seq) {
    return {.events = events, .data = {.u64 = seq}};
}

}  // namespace

// --- WeakMessageHandler ---

WeakMessageHandler::WeakMessageHandler(const wp<MessageHandler>& handler) :
@@ -64,7 +74,7 @@ Looper::Looper(bool allowNonCallbacks)
      mSendingMessage(false),
      mPolling(false),
      mEpollRebuildRequired(false),
      mNextRequestSeq(0),
      mNextRequestSeq(WAKE_EVENT_FD_SEQ + 1),
      mResponseIndex(0),
      mNextMessageUptime(LLONG_MAX) {
    mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
@@ -137,22 +147,17 @@ void Looper::rebuildEpollLocked() {
        mEpollFd.reset();
    }

    // Allocate the new epoll instance and register the wake pipe.
    // Allocate the new epoll instance and register the WakeEventFd.
    mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));

    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd.get();
    int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
    epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ);
    int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
                        strerror(errno));

    for (size_t i = 0; i < mRequests.size(); i++) {
        const Request& request = mRequests.valueAt(i);
        struct epoll_event eventItem;
        request.initEventItem(&eventItem);
    for (const auto& [seq, request] : mRequests) {
        epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);

        int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
        if (epollResult < 0) {
@@ -276,26 +281,28 @@ int Looper::pollInner(int timeoutMillis) {
#endif

    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        const SequenceNumber seq = eventItems[i].data.u64;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd.get()) {
        if (seq == WAKE_EVENT_FD_SEQ) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
            const auto& request_it = mRequests.find(seq);
            if (request_it != mRequests.end()) {
                const auto& request = request_it->second;
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
                mResponses.push({.seq = seq, .events = events, .request = request});
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
                ALOGW("Ignoring unexpected epoll events 0x%x for sequence number %" PRIu64
                      " that is no longer registered.",
                      epollEvents, seq);
            }
        }
    }
@@ -354,7 +361,8 @@ Done: ;
            // we need to be a little careful when removing the file descriptor afterwards.
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
                AutoMutex _l(mLock);
                removeSequenceNumberLocked(response.seq);
            }

            // Clear the callback reference in the response structure promptly because we
@@ -416,13 +424,6 @@ void Looper::awoken() {
    TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
}

void Looper::pushResponse(int events, const Request& request) {
    Response response;
    response.events = events;
    response.request = request;
    mResponses.push(response);
}

int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
    return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : nullptr, data);
}
@@ -449,27 +450,27 @@ int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callb

    { // acquire lock
        AutoMutex _l(mLock);
        // There is a sequence number reserved for the WakeEventFd.
        if (mNextRequestSeq == WAKE_EVENT_FD_SEQ) mNextRequestSeq++;
        const SequenceNumber seq = mNextRequestSeq++;

        Request request;
        request.fd = fd;
        request.ident = ident;
        request.events = events;
        request.seq = mNextRequestSeq++;
        request.callback = callback;
        request.data = data;
        if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1

        struct epoll_event eventItem;
        request.initEventItem(&eventItem);

        ssize_t requestIndex = mRequests.indexOfKey(fd);
        if (requestIndex < 0) {
        epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
        auto seq_it = mSequenceNumberByFd.find(fd);
        if (seq_it == mSequenceNumberByFd.end()) {
            int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
            if (epollResult < 0) {
                ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
                return -1;
            }
            mRequests.add(fd, request);
            mRequests.emplace(seq, request);
            mSequenceNumberByFd.emplace(fd, seq);
        } else {
            int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
            if (epollResult < 0) {
@@ -486,7 +487,7 @@ int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callb
                    // set from scratch because it may contain an old file handle that we are
                    // now unable to remove since its file descriptor is no longer valid.
                    // No such problem would have occurred if we were using the poll system
                    // call instead, but that approach carries others disadvantages.
                    // call instead, but that approach carries other disadvantages.
#if DEBUG_CALLBACKS
                    ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor "
                            "being recycled, falling back on EPOLL_CTL_ADD: %s",
@@ -504,58 +505,57 @@ int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callb
                    return -1;
                }
            }
            mRequests.replaceValueAt(requestIndex, request);
            const SequenceNumber oldSeq = seq_it->second;
            mRequests.erase(oldSeq);
            mRequests.emplace(seq, request);
            seq_it->second = seq;
        }
    } // release lock
    return 1;
}

int Looper::removeFd(int fd) {
    return removeFd(fd, -1);
}

int Looper::removeFd(int fd, int seq) {
#if DEBUG_CALLBACKS
    ALOGD("%p ~ removeFd - fd=%d, seq=%d", this, fd, seq);
#endif

    { // acquire lock
    AutoMutex _l(mLock);
        ssize_t requestIndex = mRequests.indexOfKey(fd);
        if (requestIndex < 0) {
    const auto& it = mSequenceNumberByFd.find(fd);
    if (it == mSequenceNumberByFd.end()) {
        return 0;
    }
    return removeSequenceNumberLocked(it->second);
}

        // Check the sequence number if one was given.
        if (seq != -1 && mRequests.valueAt(requestIndex).seq != seq) {
int Looper::removeSequenceNumberLocked(SequenceNumber seq) {
#if DEBUG_CALLBACKS
            ALOGD("%p ~ removeFd - sequence number mismatch, oldSeq=%d",
                    this, mRequests.valueAt(requestIndex).seq);
    ALOGD("%p ~ removeFd - fd=%d, seq=%u", this, fd, seq);
#endif

    const auto& request_it = mRequests.find(seq);
    if (request_it == mRequests.end()) {
        return 0;
    }
    const int fd = request_it->second.fd;

    // Always remove the FD from the request map even if an error occurs while
    // updating the epoll set so that we avoid accidentally leaking callbacks.
        mRequests.removeItemsAt(requestIndex);
    mRequests.erase(request_it);
    mSequenceNumberByFd.erase(fd);

    int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_DEL, fd, nullptr);
    if (epollResult < 0) {
            if (seq != -1 && (errno == EBADF || errno == ENOENT)) {
                // Tolerate EBADF or ENOENT when the sequence number is known because it
                // means that the file descriptor was closed before its callback was
                // unregistered.  This error may occur naturally when a callback has the
                // side-effect of closing the file descriptor before returning and
        if (errno == EBADF || errno == ENOENT) {
            // Tolerate EBADF or ENOENT because it means that the file descriptor was closed
            // before its callback was unregistered. This error may occur naturally when a
            // callback has the side-effect of closing the file descriptor before returning and
            // unregistering itself.
            //
            // Unfortunately due to kernel limitations we need to rebuild the epoll
            // set from scratch because it may contain an old file handle that we are
            // now unable to remove since its file descriptor is no longer valid.
            // No such problem would have occurred if we were using the poll system
                // call instead, but that approach carries others disadvantages.
            // call instead, but that approach carries other disadvantages.
#if DEBUG_CALLBACKS
            ALOGD("%p ~ removeFd - EPOLL_CTL_DEL failed due to file descriptor "
                        "being closed: %s", this, strerror(errno));
                  "being closed: %s",
                  this, strerror(errno));
#endif
            scheduleEpollRebuildLocked();
        } else {
@@ -568,7 +568,6 @@ int Looper::removeFd(int fd, int seq) {
            return -1;
        }
    }
    } // release lock
    return 1;
}

@@ -656,14 +655,11 @@ bool Looper::isPolling() const {
    return mPolling;
}

void Looper::Request::initEventItem(struct epoll_event* eventItem) const {
    int epollEvents = 0;
uint32_t Looper::Request::getEpollEvents() const {
    uint32_t epollEvents = 0;
    if (events & EVENT_INPUT) epollEvents |= EPOLLIN;
    if (events & EVENT_OUTPUT) epollEvents |= EPOLLOUT;

    memset(eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem->events = epollEvents;
    eventItem->data.fd = fd;
    return epollEvents;
}

MessageHandler::~MessageHandler() { }
+122 −0
Original line number Diff line number Diff line
@@ -8,6 +8,9 @@
#include <utils/Looper.h>
#include <utils/StopWatch.h>
#include <utils/Timers.h>
#include <thread>
#include <unordered_map>
#include <utility>
#include "Looper_test_pipe.h"

#include <utils/threads.h>
@@ -710,4 +713,123 @@ TEST_F(LooperTest, RemoveMessage_WhenRemovingSomeMessagesForHandler_ShouldRemove
            << "no more messages to handle";
}

class LooperEventCallback : public LooperCallback {
  public:
    using Callback = std::function<int(int fd, int events)>;
    explicit LooperEventCallback(Callback callback) : mCallback(std::move(callback)) {}
    int handleEvent(int fd, int events, void* /*data*/) override { return mCallback(fd, events); }

  private:
    Callback mCallback;
};

// A utility class that allows for pipes to be added and removed from the looper, and polls the
// looper from a different thread.
class ThreadedLooperUtil {
  public:
    explicit ThreadedLooperUtil(const sp<Looper>& looper) : mLooper(looper), mRunning(true) {
        mThread = std::thread([this]() {
            while (mRunning) {
                static constexpr std::chrono::milliseconds POLL_TIMEOUT(500);
                mLooper->pollOnce(POLL_TIMEOUT.count());
            }
        });
    }

    ~ThreadedLooperUtil() {
        mRunning = false;
        mThread.join();
    }

    // Create a new pipe, and return the write end of the pipe and the id used to track the pipe.
    // The read end of the pipe is added to the looper.
    std::pair<int /*id*/, base::unique_fd> createPipe() {
        int pipeFd[2];
        if (pipe(pipeFd)) {
            ADD_FAILURE() << "pipe() failed.";
            return {};
        }
        const int readFd = pipeFd[0];
        const int writeFd = pipeFd[1];

        int id;
        {  // acquire lock
            std::scoped_lock l(mLock);

            id = mNextId++;
            mFds.emplace(id, readFd);

            auto removeCallback = [this, id, readFd](int fd, int events) {
                EXPECT_EQ(readFd, fd) << "Received callback for incorrect fd.";
                if ((events & Looper::EVENT_HANGUP) == 0) {
                    return 1;  // Not a hangup, keep the callback.
                }
                removePipe(id);
                return 0;  // Remove the callback.
            };

            mLooper->addFd(readFd, 0, Looper::EVENT_INPUT,
                           new LooperEventCallback(std::move(removeCallback)), nullptr);
        }  // release lock

        return {id, base::unique_fd(writeFd)};
    }

    // Remove the pipe with the given id.
    void removePipe(int id) {
        std::scoped_lock l(mLock);
        if (mFds.find(id) == mFds.end()) {
            return;
        }
        mLooper->removeFd(mFds[id].get());
        mFds.erase(id);
    }

    // Check if the pipe with the given id exists and has not been removed.
    bool hasPipe(int id) {
        std::scoped_lock l(mLock);
        return mFds.find(id) != mFds.end();
    }

  private:
    sp<Looper> mLooper;
    std::atomic<bool> mRunning;
    std::thread mThread;

    std::mutex mLock;
    std::unordered_map<int, base::unique_fd> mFds GUARDED_BY(mLock);
    int mNextId GUARDED_BY(mLock) = 0;
};

TEST_F(LooperTest, MultiThreaded_NoUnexpectedFdRemoval) {
    ThreadedLooperUtil util(mLooper);

    // Iterate repeatedly to try to recreate a flaky instance.
    for (int i = 0; i < 1000; i++) {
        auto [firstPipeId, firstPipeFd] = util.createPipe();
        const int firstFdNumber = firstPipeFd.get();

        // Close the first pipe's fd, causing a fd hangup.
        firstPipeFd.reset();

        // Request to remove the pipe from this test thread. This causes a race for pipe removal
        // between the hangup in the looper's thread and this remove request from the test thread.
        util.removePipe(firstPipeId);

        // Create the second pipe. Since the fds for the first pipe are closed, this pipe should
        // have the same fd numbers as the first pipe because the lowest unused fd number is used.
        const auto [secondPipeId, fd] = util.createPipe();
        EXPECT_EQ(firstFdNumber, fd.get())
                << "The first and second fds must match for the purposes of this test.";

        // Wait for unexpected hangup to occur.
        std::this_thread::sleep_for(std::chrono::milliseconds(1));

        ASSERT_TRUE(util.hasPipe(secondPipeId)) << "The second pipe was removed unexpectedly.";

        util.removePipe(secondPipeId);
    }
    SUCCEED() << "No unexpectedly removed fds.";
}

} // namespace android
+23 −16
Original line number Diff line number Diff line
@@ -17,15 +17,16 @@
#ifndef UTILS_LOOPER_H
#define UTILS_LOOPER_H

#include <utils/threads.h>
#include <utils/RefBase.h>
#include <utils/KeyedVector.h>
#include <utils/Timers.h>
#include <utils/Vector.h>
#include <utils/threads.h>

#include <sys/epoll.h>

#include <android-base/unique_fd.h>

#include <unordered_map>
#include <utility>

namespace android {
@@ -421,18 +422,20 @@ public:
    static sp<Looper> getForThread();

private:
  using SequenceNumber = uint64_t;

  struct Request {
      int fd;
      int ident;
      int events;
        int seq;
      sp<LooperCallback> callback;
      void* data;

        void initEventItem(struct epoll_event* eventItem) const;
      uint32_t getEpollEvents() const;
  };

    struct Response {
        SequenceNumber seq;
        int events;
        Request request;
    };
@@ -463,9 +466,14 @@ private:
    android::base::unique_fd mEpollFd;  // guarded by mLock but only modified on the looper thread
    bool mEpollRebuildRequired; // guarded by mLock

    // Locked list of file descriptor monitoring requests.
    KeyedVector<int, Request> mRequests;  // guarded by mLock
    int mNextRequestSeq;
    // Locked maps of fds and sequence numbers monitoring requests.
    // Both maps must be kept in sync at all times.
    std::unordered_map<SequenceNumber, Request> mRequests;               // guarded by mLock
    std::unordered_map<int /*fd*/, SequenceNumber> mSequenceNumberByFd;  // guarded by mLock

    // The sequence number to use for the next fd that is added to the looper.
    // The sequence number 0 is reserved for the WakeEventFd.
    SequenceNumber mNextRequestSeq;  // guarded by mLock

    // This state is only used privately by pollOnce and does not require a lock since
    // it runs on a single thread.
@@ -474,9 +482,8 @@ private:
    nsecs_t mNextMessageUptime; // set to LLONG_MAX when none

    int pollInner(int timeoutMillis);
    int removeFd(int fd, int seq);
    int removeSequenceNumberLocked(SequenceNumber seq);  // requires mLock
    void awoken();
    void pushResponse(int events, const Request& request);
    void rebuildEpollLocked();
    void scheduleEpollRebuildLocked();