Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 13bda6c5 authored by Siarhei Vishniakou's avatar Siarhei Vishniakou
Browse files

Use std::deque for DispatchEntry'ies.

Currently, there is a custom Queue class being used in InputDispatcher.
But that class makes a lot of assumptions about the memory management in
the queue. This also eliminates the possibility of using std::unique_ptr
inside dispatcher. First refactor to remove the custom Queue from
dispatcher.

Bug: 70668286
Test: SANITIZE_TARGET=hwaddress atest libinput_tests inputflinger_tests
Change-Id: Ie5919e2f2da11424e0cb48e9f960d73abaf59f46
parent 1d6fc8ff
Loading
Loading
Loading
Loading
+61 −50
Original line number Diff line number Diff line
@@ -1873,8 +1873,9 @@ std::string InputDispatcher::checkWindowReadyForMoreInputLocked(nsecs_t currentT
    // If the connection is backed up then keep waiting.
    if (connection->inputPublisherBlocked) {
        return StringPrintf("Waiting because the %s window's input channel is full.  "
                "Outbound queue length: %d.  Wait queue length: %d.",
                targetType, connection->outboundQueue.count(), connection->waitQueue.count());
                            "Outbound queue length: %zu.  Wait queue length: %zu.",
                            targetType, connection->outboundQueue.size(),
                            connection->waitQueue.size());
    }

    // Ensure that the dispatch queues aren't too far backed up for this event.
@@ -1890,11 +1891,13 @@ std::string InputDispatcher::checkWindowReadyForMoreInputLocked(nsecs_t currentT
        // often anticipate pending UI changes when typing on a keyboard.
        // To obtain this behavior, we must serialize key events with respect to all
        // prior input events.
        if (!connection->outboundQueue.isEmpty() || !connection->waitQueue.isEmpty()) {
        if (!connection->outboundQueue.empty() || !connection->waitQueue.empty()) {
            return StringPrintf("Waiting to send key event because the %s window has not "
                                "finished processing all of the input events that were previously "
                    "delivered to it.  Outbound queue length: %d.  Wait queue length: %d.",
                    targetType, connection->outboundQueue.count(), connection->waitQueue.count());
                                "delivered to it.  Outbound queue length: %zu.  Wait queue length: "
                                "%zu.",
                                targetType, connection->outboundQueue.size(),
                                connection->waitQueue.size());
        }
    } else {
        // Touch events can always be sent to a window immediately because the user intended
@@ -1912,15 +1915,18 @@ std::string InputDispatcher::checkWindowReadyForMoreInputLocked(nsecs_t currentT
        // The one case where we pause input event delivery is when the wait queue is piling
        // up with lots of events because the application is not responding.
        // This condition ensures that ANRs are detected reliably.
        if (!connection->waitQueue.isEmpty()
                && currentTime >= connection->waitQueue.head->deliveryTime
                        + STREAM_AHEAD_EVENT_TIMEOUT) {
        if (!connection->waitQueue.empty() &&
            currentTime >=
                    connection->waitQueue.front()->deliveryTime + STREAM_AHEAD_EVENT_TIMEOUT) {
            return StringPrintf("Waiting to send non-key event because the %s window has not "
                    "finished processing certain input events that were delivered to it over "
                    "%0.1fms ago.  Wait queue length: %d.  Wait queue head age: %0.1fms.",
                                "finished processing certain input events that were delivered to "
                                "it over "
                                "%0.1fms ago.  Wait queue length: %zu.  Wait queue head age: "
                                "%0.1fms.",
                                targetType, STREAM_AHEAD_EVENT_TIMEOUT * 0.000001f,
                    connection->waitQueue.count(),
                    (currentTime - connection->waitQueue.head->deliveryTime) * 0.000001f);
                                connection->waitQueue.size(),
                                (currentTime - connection->waitQueue.front()->deliveryTime) *
                                        0.000001f);
        }
    }
    return "";
@@ -2053,7 +2059,7 @@ void InputDispatcher::enqueueDispatchEntriesLocked(nsecs_t currentTime,
        ATRACE_NAME(message.c_str());
    }

    bool wasEmpty = connection->outboundQueue.isEmpty();
    bool wasEmpty = connection->outboundQueue.empty();

    // Enqueue dispatch entries for the requested modes.
    enqueueDispatchEntryLocked(connection, eventEntry, inputTarget,
@@ -2070,7 +2076,7 @@ void InputDispatcher::enqueueDispatchEntriesLocked(nsecs_t currentTime,
            InputTarget::FLAG_DISPATCH_AS_SLIPPERY_ENTER);

    // If the outbound queue was previously empty, start the dispatch cycle going.
    if (wasEmpty && !connection->outboundQueue.isEmpty()) {
    if (wasEmpty && !connection->outboundQueue.empty()) {
        startDispatchCycleLocked(currentTime, connection);
    }
}
@@ -2173,7 +2179,7 @@ void InputDispatcher::enqueueDispatchEntryLocked(
    }

    // Enqueue the dispatch entry.
    connection->outboundQueue.enqueueAtTail(dispatchEntry);
    connection->outboundQueue.push_back(dispatchEntry);
    traceOutboundQueueLength(connection);

}
@@ -2217,9 +2223,8 @@ void InputDispatcher::startDispatchCycleLocked(nsecs_t currentTime,
            connection->getInputChannelName().c_str());
#endif

    while (connection->status == Connection::STATUS_NORMAL
            && !connection->outboundQueue.isEmpty()) {
        DispatchEntry* dispatchEntry = connection->outboundQueue.head;
    while (connection->status == Connection::STATUS_NORMAL && !connection->outboundQueue.empty()) {
        DispatchEntry* dispatchEntry = connection->outboundQueue.front();
        dispatchEntry->deliveryTime = currentTime;

        // Publish the event.
@@ -2301,7 +2306,7 @@ void InputDispatcher::startDispatchCycleLocked(nsecs_t currentTime,
        // Check the result.
        if (status) {
            if (status == WOULD_BLOCK) {
                if (connection->waitQueue.isEmpty()) {
                if (connection->waitQueue.empty()) {
                    ALOGE("channel '%s' ~ Could not publish event because the pipe is full. "
                            "This is unexpected because the wait queue is empty, so the pipe "
                            "should be empty and we shouldn't have any problems writing an "
@@ -2327,9 +2332,11 @@ void InputDispatcher::startDispatchCycleLocked(nsecs_t currentTime,
        }

        // Re-enqueue the event on the wait queue.
        connection->outboundQueue.dequeue(dispatchEntry);
        connection->outboundQueue.erase(std::remove(connection->outboundQueue.begin(),
                                                    connection->outboundQueue.end(),
                                                    dispatchEntry));
        traceOutboundQueueLength(connection);
        connection->waitQueue.enqueueAtTail(dispatchEntry);
        connection->waitQueue.push_back(dispatchEntry);
        traceWaitQueueLength(connection);
    }
}
@@ -2360,9 +2367,9 @@ void InputDispatcher::abortBrokenDispatchCycleLocked(nsecs_t currentTime,
#endif

    // Clear the dispatch queues.
    drainDispatchQueue(&connection->outboundQueue);
    drainDispatchQueue(connection->outboundQueue);
    traceOutboundQueueLength(connection);
    drainDispatchQueue(&connection->waitQueue);
    drainDispatchQueue(connection->waitQueue);
    traceWaitQueueLength(connection);

    // The connection appears to be unrecoverably broken.
@@ -2377,9 +2384,10 @@ void InputDispatcher::abortBrokenDispatchCycleLocked(nsecs_t currentTime,
    }
}

void InputDispatcher::drainDispatchQueue(Queue<DispatchEntry>* queue) {
    while (!queue->isEmpty()) {
        DispatchEntry* dispatchEntry = queue->dequeueAtHead();
void InputDispatcher::drainDispatchQueue(std::deque<DispatchEntry*>& queue) {
    while (!queue.empty()) {
        DispatchEntry* dispatchEntry = queue.front();
        queue.pop_front();
        releaseDispatchEntry(dispatchEntry);
    }
}
@@ -3815,11 +3823,10 @@ void InputDispatcher::dumpDispatchStateLocked(std::string& dump) {
                    connection->getStatusLabel(), toString(connection->monitor),
                    toString(connection->inputPublisherBlocked));

            if (!connection->outboundQueue.isEmpty()) {
                dump += StringPrintf(INDENT3 "OutboundQueue: length=%u\n",
                        connection->outboundQueue.count());
                for (DispatchEntry* entry = connection->outboundQueue.head; entry;
                        entry = entry->next) {
            if (!connection->outboundQueue.empty()) {
                dump += StringPrintf(INDENT3 "OutboundQueue: length=%zu\n",
                                     connection->outboundQueue.size());
                for (DispatchEntry* entry : connection->outboundQueue) {
                    dump.append(INDENT4);
                    entry->eventEntry->appendDescription(dump);
                    dump += StringPrintf(", targetFlags=0x%08x, resolvedAction=%d, age=%0.1fms\n",
@@ -3830,11 +3837,10 @@ void InputDispatcher::dumpDispatchStateLocked(std::string& dump) {
                dump += INDENT3 "OutboundQueue: <empty>\n";
            }

            if (!connection->waitQueue.isEmpty()) {
                dump += StringPrintf(INDENT3 "WaitQueue: length=%u\n",
                        connection->waitQueue.count());
                for (DispatchEntry* entry = connection->waitQueue.head; entry;
                        entry = entry->next) {
            if (!connection->waitQueue.empty()) {
                dump += StringPrintf(INDENT3 "WaitQueue: length=%zu\n",
                                     connection->waitQueue.size());
                for (DispatchEntry* entry : connection->waitQueue) {
                    dump += INDENT4;
                    entry->eventEntry->appendDescription(dump);
                    dump += StringPrintf(", targetFlags=0x%08x, resolvedAction=%d, "
@@ -4242,10 +4248,12 @@ void InputDispatcher::doDispatchCycleFinishedLockedInterruptible(
    const bool handled = commandEntry->handled;

    // Handle post-event policy actions.
    DispatchEntry* dispatchEntry = connection->findWaitQueueEntry(seq);
    if (!dispatchEntry) {
    std::deque<InputDispatcher::DispatchEntry*>::iterator dispatchEntryIt =
            connection->findWaitQueueEntry(seq);
    if (dispatchEntryIt == connection->waitQueue.end()) {
        return;
    }
    DispatchEntry* dispatchEntry = *dispatchEntryIt;

    nsecs_t eventDuration = finishTime - dispatchEntry->deliveryTime;
    if (eventDuration > SLOW_EVENT_PROCESSING_WARNING_TIMEOUT) {
@@ -4273,11 +4281,13 @@ void InputDispatcher::doDispatchCycleFinishedLockedInterruptible(
    // Note that because the lock might have been released, it is possible that the
    // contents of the wait queue to have been drained, so we need to double-check
    // a few things.
    if (dispatchEntry == connection->findWaitQueueEntry(seq)) {
        connection->waitQueue.dequeue(dispatchEntry);
    dispatchEntryIt = connection->findWaitQueueEntry(seq);
    if (dispatchEntryIt != connection->waitQueue.end()) {
        dispatchEntry = *dispatchEntryIt;
        connection->waitQueue.erase(dispatchEntryIt);
        traceWaitQueueLength(connection);
        if (restartEvent && connection->status == Connection::STATUS_NORMAL) {
            connection->outboundQueue.enqueueAtHead(dispatchEntry);
            connection->outboundQueue.push_front(dispatchEntry);
            traceOutboundQueueLength(connection);
        } else {
            releaseDispatchEntry(dispatchEntry);
@@ -4504,7 +4514,7 @@ void InputDispatcher::traceOutboundQueueLength(const sp<Connection>& connection)
    if (ATRACE_ENABLED()) {
        char counterName[40];
        snprintf(counterName, sizeof(counterName), "oq:%s", connection->getWindowName().c_str());
        ATRACE_INT(counterName, connection->outboundQueue.count());
        ATRACE_INT(counterName, connection->outboundQueue.size());
    }
}

@@ -4512,7 +4522,7 @@ void InputDispatcher::traceWaitQueueLength(const sp<Connection>& connection) {
    if (ATRACE_ENABLED()) {
        char counterName[40];
        snprintf(counterName, sizeof(counterName), "wq:%s", connection->getWindowName().c_str());
        ATRACE_INT(counterName, connection->waitQueue.count());
        ATRACE_INT(counterName, connection->waitQueue.size());
    }
}

@@ -5148,13 +5158,14 @@ const char* InputDispatcher::Connection::getStatusLabel() const {
    }
}

InputDispatcher::DispatchEntry* InputDispatcher::Connection::findWaitQueueEntry(uint32_t seq) {
    for (DispatchEntry* entry = waitQueue.head; entry != nullptr; entry = entry->next) {
        if (entry->seq == seq) {
            return entry;
std::deque<InputDispatcher::DispatchEntry*>::iterator
InputDispatcher::Connection::findWaitQueueEntry(uint32_t seq) {
    for (std::deque<DispatchEntry*>::iterator it = waitQueue.begin(); it != waitQueue.end(); it++) {
        if ((*it)->seq == seq) {
            return it;
        }
    }
    return nullptr;
    return waitQueue.end();
}

// --- InputDispatcher::Monitor
+7 −6
Original line number Diff line number Diff line
@@ -33,9 +33,10 @@
#include <cutils/atomic.h>
#include <unordered_map>

#include <limits.h>
#include <stddef.h>
#include <unistd.h>
#include <limits.h>
#include <deque>
#include <unordered_map>

#include "InputListener.h"
@@ -592,7 +593,7 @@ private:
    };

    // Tracks the progress of dispatching a particular event to a particular connection.
    struct DispatchEntry : Link<DispatchEntry> {
    struct DispatchEntry {
        const uint32_t seq; // unique sequence number, never 0

        EventEntry* eventEntry; // the event to dispatch
@@ -886,11 +887,11 @@ private:
        bool inputPublisherBlocked;

        // Queue of events that need to be published to the connection.
        Queue<DispatchEntry> outboundQueue;
        std::deque<DispatchEntry*> outboundQueue;

        // Queue of events that have been published to the connection but that have not
        // yet received a "finished" response from the application.
        Queue<DispatchEntry> waitQueue;
        std::deque<DispatchEntry*> waitQueue;

        explicit Connection(const sp<InputChannel>& inputChannel, bool monitor);

@@ -899,7 +900,7 @@ private:
        const std::string getWindowName() const;
        const char* getStatusLabel() const;

        DispatchEntry* findWaitQueueEntry(uint32_t seq);
        std::deque<DispatchEntry*>::iterator findWaitQueueEntry(uint32_t seq);
    };

    struct Monitor {
@@ -1221,7 +1222,7 @@ private:
            uint32_t seq, bool handled) REQUIRES(mLock);
    void abortBrokenDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection,
            bool notify) REQUIRES(mLock);
    void drainDispatchQueue(Queue<DispatchEntry>* queue);
    void drainDispatchQueue(std::deque<DispatchEntry*>& queue);
    void releaseDispatchEntry(DispatchEntry* dispatchEntry);
    static int handleReceiveCallback(int fd, int events, void* data);
    // The action sent should only be of type AMOTION_EVENT_*