Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d1c48a05 authored by Jeff Brown's avatar Jeff Brown
Browse files

Dispatch multiple touch events in parallel.

This change enables the input dispatcher to send multiple touch
events to an application without waiting for them to be acknowledged.
Essentially this is a variation on the old streaming optimization
but it is much more comprehensive.  Event dispatch will stall as
soon as 0.5sec of unacknowledged events are accumulated or a
focused event (such as a key event) needs to be delivered.

Streaming input events makes a tremendous difference in application
performance.  The next step will be to enable batching of input
events on the client side once again.

This is part of a series of changes to improve input system pipelining.

Bug: 5963420
Change-Id: I025df90c06165d719fcca7f63eed322a5cce4a78
parent 8b4be560
Loading
Loading
Loading
Loading
+13 −0
Original line number Diff line number Diff line
@@ -28,6 +28,13 @@

namespace android {

// Socket buffer size.  The default is typically about 128KB, which is much larger than
// we really need.  So we make it smaller.  It just needs to be big enough to hold
// a few dozen large multi-finger motion events in the case where an application gets
// behind processing touches.
static const size_t SOCKET_BUFFER_SIZE = 32 * 1024;


// --- InputMessage ---

bool InputMessage::isValid(size_t actualSize) const {
@@ -93,6 +100,12 @@ status_t InputChannel::openInputChannelPair(const String8& name,
        return result;
    }

    int bufferSize = SOCKET_BUFFER_SIZE;
    setsockopt(sockets[0], SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize));
    setsockopt(sockets[0], SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize));
    setsockopt(sockets[1], SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize));
    setsockopt(sockets[1], SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize));

    String8 serverChannelName = name;
    serverChannelName.append(" (server)");
    outServerChannel = new InputChannel(serverChannelName, sockets[0]);
+170 −148
Original line number Diff line number Diff line
@@ -70,6 +70,12 @@ const nsecs_t APP_SWITCH_TIMEOUT = 500 * 1000000LL; // 0.5sec
// before considering it stale and dropping it.
const nsecs_t STALE_EVENT_TIMEOUT = 10000 * 1000000LL; // 10sec

// Amount of time to allow touch events to be streamed out to a connection before requiring
// that the first event be finished.  This value extends the ANR timeout by the specified
// amount.  For example, if streaming is allowed to get ahead by one second relative to the
// queue of waiting unfinished events, then ANRs will similarly be delayed by one second.
const nsecs_t STREAM_AHEAD_EVENT_TIMEOUT = 500 * 1000000LL; // 0.5sec


static inline nsecs_t now() {
    return systemTime(SYSTEM_TIME_MONOTONIC);
@@ -1035,7 +1041,8 @@ int32_t InputDispatcher::findFocusedWindowTargetsLocked(nsecs_t currentTime,
    }

    // If the currently focused window is still working on previous events then keep waiting.
    if (! isWindowFinishedWithPreviousInputLocked(mFocusedWindowHandle)) {
    if (!isWindowReadyForMoreInputLocked(currentTime,
            mFocusedWindowHandle, true /*focusedEvent*/)) {
#if DEBUG_FOCUS
        ALOGD("Waiting because focused window still processing previous input.");
#endif
@@ -1398,7 +1405,8 @@ int32_t InputDispatcher::findTouchedWindowTargetsLocked(nsecs_t currentTime,
            }

            // If the touched window is still working on previous events then keep waiting.
            if (! isWindowFinishedWithPreviousInputLocked(touchedWindow.windowHandle)) {
            if (!isWindowReadyForMoreInputLocked(currentTime,
                    touchedWindow.windowHandle, false /*focusedEvent*/)) {
#if DEBUG_FOCUS
                ALOGD("Waiting because touched window still processing previous input.");
#endif
@@ -1609,15 +1617,33 @@ bool InputDispatcher::isWindowObscuredAtPointLocked(
    return false;
}

bool InputDispatcher::isWindowFinishedWithPreviousInputLocked(
        const sp<InputWindowHandle>& windowHandle) {
bool InputDispatcher::isWindowReadyForMoreInputLocked(nsecs_t currentTime,
        const sp<InputWindowHandle>& windowHandle, bool focusedEvent) {
    ssize_t connectionIndex = getConnectionIndexLocked(windowHandle->getInputChannel());
    if (connectionIndex >= 0) {
        sp<Connection> connection = mConnectionsByFd.valueAt(connectionIndex);
        return connection->outboundQueue.isEmpty();
    } else {
        return true;
        if (connection->inputPublisherBlocked) {
            return false;
        }
        if (focusedEvent) {
            // If the event relies on input focus (such as a key event), then we must
            // wait for all previous events to complete before delivering it because they
            // may move focus elsewhere.
            return connection->outboundQueue.isEmpty()
                    && connection->waitQueue.isEmpty();
        }
        // Touch events can always be sent to a window because the user intended to touch
        // whatever was visible immediately.  Even if focus changes or a new window appears,
        // the touch event was meant for whatever happened to be on screen at the time.
        // However, if the wait queue is piling up with lots of events, then hold up
        // new events for awhile.  This condition ensures that ANRs still work.
        if (!connection->waitQueue.isEmpty()
                && currentTime >= connection->waitQueue.head->eventEntry->eventTime
                        + STREAM_AHEAD_EVENT_TIMEOUT) {
            return false;
        }
    }
    return true;
}

String8 InputDispatcher::getApplicationWindowLabelLocked(
@@ -1834,14 +1860,9 @@ void InputDispatcher::startDispatchCycleLocked(nsecs_t currentTime,
            connection->getInputChannelName());
#endif

    ALOG_ASSERT(connection->status == Connection::STATUS_NORMAL);
    ALOG_ASSERT(! connection->outboundQueue.isEmpty());

    while (connection->status == Connection::STATUS_NORMAL
            && !connection->outboundQueue.isEmpty()) {
        DispatchEntry* dispatchEntry = connection->outboundQueue.head;
    ALOG_ASSERT(! dispatchEntry->inProgress);

    // Mark the dispatch entry as in progress.
    dispatchEntry->inProgress = true;

        // Publish the event.
        status_t status;
@@ -1857,13 +1878,6 @@ void InputDispatcher::startDispatchCycleLocked(nsecs_t currentTime,
                    keyEntry->keyCode, keyEntry->scanCode,
                    keyEntry->metaState, keyEntry->repeatCount, keyEntry->downTime,
                    keyEntry->eventTime);

        if (status) {
            ALOGE("channel '%s' ~ Could not publish key event, "
                    "status=%d", connection->getInputChannelName(), status);
            abortBrokenDispatchCycleLocked(currentTime, connection, true /*notify*/);
            return;
        }
            break;
        }

@@ -1875,7 +1889,7 @@ void InputDispatcher::startDispatchCycleLocked(nsecs_t currentTime,

            // Set the X and Y offset depending on the input source.
            float xOffset, yOffset, scaleFactor;
        if (motionEntry->source & AINPUT_SOURCE_CLASS_POINTER
            if ((motionEntry->source & AINPUT_SOURCE_CLASS_POINTER)
                    && !(dispatchEntry->targetFlags & InputTarget::FLAG_ZERO_COORDS)) {
                scaleFactor = dispatchEntry->scaleFactor;
                xOffset = dispatchEntry->xOffset * scaleFactor;
@@ -1911,25 +1925,47 @@ void InputDispatcher::startDispatchCycleLocked(nsecs_t currentTime,
                    motionEntry->downTime, motionEntry->eventTime,
                    motionEntry->pointerCount, motionEntry->pointerProperties,
                    usingCoords);
            break;
        }

        default:
            ALOG_ASSERT(false);
            return;
        }

        // Check the result.
        if (status) {
            ALOGE("channel '%s' ~ Could not publish motion event, "
            if (status == WOULD_BLOCK) {
                if (connection->waitQueue.isEmpty()) {
                    ALOGE("channel '%s' ~ Could not publish event because the pipe is full. "
                            "This is unexpected because the wait queue is empty, so the pipe "
                            "should be empty and we shouldn't have any problems writing an "
                            "event to it, status=%d", connection->getInputChannelName(), status);
                    abortBrokenDispatchCycleLocked(currentTime, connection, true /*notify*/);
                } else {
                    // Pipe is full and we are waiting for the app to finish process some events
                    // before sending more events to it.
#if DEBUG_DISPATCH_CYCLE
                    ALOGD("channel '%s' ~ Could not publish event because the pipe is full, "
                            "waiting for the application to catch up",
                            connection->getInputChannelName());
#endif
                    connection->inputPublisherBlocked = true;
                }
            } else {
                ALOGE("channel '%s' ~ Could not publish event due to an unexpected error, "
                        "status=%d", connection->getInputChannelName(), status);
                abortBrokenDispatchCycleLocked(currentTime, connection, true /*notify*/);
            return;
            }
        break;
            return;
        }

    default: {
        ALOG_ASSERT(false);
        // Re-enqueue the event on the wait queue.
        connection->outboundQueue.dequeue(dispatchEntry);
        connection->waitQueue.enqueueAtTail(dispatchEntry);
    }
}

    // Notify other system components.
    onDispatchCycleStartedLocked(currentTime, connection);
}

void InputDispatcher::finishDispatchCycleLocked(nsecs_t currentTime,
        const sp<Connection>& connection, bool handled) {
#if DEBUG_DISPATCH_CYCLE
@@ -1937,6 +1973,8 @@ void InputDispatcher::finishDispatchCycleLocked(nsecs_t currentTime,
            connection->getInputChannelName(), toString(handled));
#endif

    connection->inputPublisherBlocked = false;

    if (connection->status == Connection::STATUS_BROKEN
            || connection->status == Connection::STATUS_ZOMBIE) {
        return;
@@ -1946,28 +1984,6 @@ void InputDispatcher::finishDispatchCycleLocked(nsecs_t currentTime,
    onDispatchCycleFinishedLocked(currentTime, connection, handled);
}

void InputDispatcher::startNextDispatchCycleLocked(nsecs_t currentTime,
        const sp<Connection>& connection) {
    // Start the next dispatch cycle for this connection.
    while (! connection->outboundQueue.isEmpty()) {
        DispatchEntry* dispatchEntry = connection->outboundQueue.head;
        if (dispatchEntry->inProgress) {
            // Finished.
            connection->outboundQueue.dequeueAtHead();
            if (dispatchEntry->hasForegroundTarget()) {
                decrementPendingForegroundDispatchesLocked(dispatchEntry->eventEntry);
            }
            delete dispatchEntry;
        } else {
            // If the head is not in progress, then we must have already dequeued the in
            // progress event, which means we actually aborted it.
            // So just start the next event for this connection.
            startDispatchCycleLocked(currentTime, connection);
            return;
        }
    }
}

void InputDispatcher::abortBrokenDispatchCycleLocked(nsecs_t currentTime,
        const sp<Connection>& connection, bool notify) {
#if DEBUG_DISPATCH_CYCLE
@@ -1975,8 +1991,9 @@ void InputDispatcher::abortBrokenDispatchCycleLocked(nsecs_t currentTime,
            connection->getInputChannelName(), toString(notify));
#endif

    // Clear the outbound queue.
    drainOutboundQueueLocked(connection.get());
    // Clear the dispatch queues.
    drainDispatchQueueLocked(&connection->outboundQueue);
    drainDispatchQueueLocked(&connection->waitQueue);

    // The connection appears to be unrecoverably broken.
    // Ignore already broken or zombie connections.
@@ -1990,15 +2007,19 @@ void InputDispatcher::abortBrokenDispatchCycleLocked(nsecs_t currentTime,
    }
}

void InputDispatcher::drainOutboundQueueLocked(Connection* connection) {
    while (! connection->outboundQueue.isEmpty()) {
        DispatchEntry* dispatchEntry = connection->outboundQueue.dequeueAtHead();
void InputDispatcher::drainDispatchQueueLocked(Queue<DispatchEntry>* queue) {
    while (!queue->isEmpty()) {
        DispatchEntry* dispatchEntry = queue->dequeueAtHead();
        releaseDispatchEntryLocked(dispatchEntry);
    }
}

void InputDispatcher::releaseDispatchEntryLocked(DispatchEntry* dispatchEntry) {
    if (dispatchEntry->hasForegroundTarget()) {
        decrementPendingForegroundDispatchesLocked(dispatchEntry->eventEntry);
    }
    delete dispatchEntry;
}
}

int InputDispatcher::handleReceiveCallback(int fd, int events, void* data) {
    InputDispatcher* d = static_cast<InputDispatcher*>(data);
@@ -2121,11 +2142,9 @@ void InputDispatcher::synthesizeCancelationEventsForConnectionLocked(
            cancelationEventEntry->release();
        }

        if (!connection->outboundQueue.head->inProgress) {
        startDispatchCycleLocked(currentTime, connection);
    }
}
}

InputDispatcher::MotionEntry*
InputDispatcher::splitMotionEvent(const MotionEntry* originalMotionEntry, BitSet32 pointerIds) {
@@ -3139,10 +3158,6 @@ ssize_t InputDispatcher::getConnectionIndexLocked(const sp<InputChannel>& inputC
    return -1;
}

void InputDispatcher::onDispatchCycleStartedLocked(
        nsecs_t currentTime, const sp<Connection>& connection) {
}

void InputDispatcher::onDispatchCycleFinishedLocked(
        nsecs_t currentTime, const sp<Connection>& connection, bool handled) {
    CommandEntry* commandEntry = postCommandLocked(
@@ -3243,24 +3258,37 @@ void InputDispatcher::doDispatchCycleFinishedLockedInterruptible(
    sp<Connection> connection = commandEntry->connection;
    bool handled = commandEntry->handled;

    bool skipNext = false;
    if (!connection->outboundQueue.isEmpty()) {
        DispatchEntry* dispatchEntry = connection->outboundQueue.head;
        if (dispatchEntry->inProgress) {
    if (!connection->waitQueue.isEmpty()) {
        // Handle post-event policy actions.
        bool restartEvent;
        DispatchEntry* dispatchEntry = connection->waitQueue.head;
        if (dispatchEntry->eventEntry->type == EventEntry::TYPE_KEY) {
            KeyEntry* keyEntry = static_cast<KeyEntry*>(dispatchEntry->eventEntry);
                skipNext = afterKeyEventLockedInterruptible(connection,
            restartEvent = afterKeyEventLockedInterruptible(connection,
                    dispatchEntry, keyEntry, handled);
        } else if (dispatchEntry->eventEntry->type == EventEntry::TYPE_MOTION) {
            MotionEntry* motionEntry = static_cast<MotionEntry*>(dispatchEntry->eventEntry);
                skipNext = afterMotionEventLockedInterruptible(connection,
            restartEvent = afterMotionEventLockedInterruptible(connection,
                    dispatchEntry, motionEntry, handled);
        } else {
            restartEvent = false;
        }

        // Dequeue the event and start the next cycle.
        // Note that because the lock might have been released, it is possible that the
        // contents of the wait queue to have been drained, so we need to double-check
        // a few things.
        if (connection->waitQueue.head == dispatchEntry) {
            connection->waitQueue.dequeueAtHead();
            if (restartEvent && connection->status == Connection::STATUS_NORMAL) {
                connection->outboundQueue.enqueueAtHead(dispatchEntry);
            } else {
                releaseDispatchEntryLocked(dispatchEntry);
            }
        }

    if (!skipNext) {
        startNextDispatchCycleLocked(now(), connection);
        // Start the next dispatch cycle for this connection.
        startDispatchCycleLocked(now(), connection);
    }
}

@@ -3324,11 +3352,9 @@ bool InputDispatcher::afterKeyEventLockedInterruptible(const sp<Connection>& con

            if (connection->status != Connection::STATUS_NORMAL) {
                connection->inputState.removeFallbackKey(originalKeyCode);
                return true; // skip next cycle
                return false;
            }

            ALOG_ASSERT(connection->outboundQueue.head == dispatchEntry);

            // Latch the fallback keycode for this key on an initial down.
            // The fallback keycode cannot change at any other point in the lifecycle.
            if (initialDown) {
@@ -3406,10 +3432,7 @@ bool InputDispatcher::afterKeyEventLockedInterruptible(const sp<Connection>& con
                        "originalKeyCode=%d, fallbackKeyCode=%d, fallbackMetaState=%08x",
                        originalKeyCode, fallbackKeyCode, keyEntry->metaState);
#endif

                dispatchEntry->inProgress = false;
                startDispatchCycleLocked(now(), connection);
                return true; // already started next cycle
                return true; // restart the event
            } else {
#if DEBUG_OUTBOUND_EVENT_DETAILS
                ALOGD("Unhandled key event: No fallback key.");
@@ -3604,7 +3627,6 @@ InputDispatcher::DispatchEntry::DispatchEntry(EventEntry* eventEntry,
        int32_t targetFlags, float xOffset, float yOffset, float scaleFactor) :
        eventEntry(eventEntry), targetFlags(targetFlags),
        xOffset(xOffset), yOffset(yOffset), scaleFactor(scaleFactor),
        inProgress(false),
        resolvedAction(0), resolvedFlags(0) {
    eventEntry->refCount += 1;
}
@@ -3943,7 +3965,7 @@ InputDispatcher::Connection::Connection(const sp<InputChannel>& inputChannel,
        const sp<InputWindowHandle>& inputWindowHandle, bool monitor) :
        status(STATUS_NORMAL), inputChannel(inputChannel), inputWindowHandle(inputWindowHandle),
        monitor(monitor),
        inputPublisher(inputChannel) {
        inputPublisher(inputChannel), inputPublisherBlocked(false) {
}

InputDispatcher::Connection::~Connection() {
+14 −8
Original line number Diff line number Diff line
@@ -528,9 +528,6 @@ private:
        float yOffset;
        float scaleFactor;

        // True if dispatch has started.
        bool inProgress;

        // Set to the resolved action and flags when the event is enqueued.
        int32_t resolvedAction;
        int32_t resolvedFlags;
@@ -782,8 +779,18 @@ private:
        bool monitor;
        InputPublisher inputPublisher;
        InputState inputState;

        // True if the socket is full and no further events can be published until
        // the application consumes some of the input.
        bool inputPublisherBlocked;

        // Queue of events that need to be published to the connection.
        Queue<DispatchEntry> outboundQueue;

        // Queue of events that have been published to the connection but that have not
        // yet received a "finished" response from the application.
        Queue<DispatchEntry> waitQueue;

        explicit Connection(const sp<InputChannel>& inputChannel,
                const sp<InputWindowHandle>& inputWindowHandle, bool monitor);

@@ -976,7 +983,8 @@ private:
            const InjectionState* injectionState);
    bool isWindowObscuredAtPointLocked(const sp<InputWindowHandle>& windowHandle,
            int32_t x, int32_t y) const;
    bool isWindowFinishedWithPreviousInputLocked(const sp<InputWindowHandle>& windowHandle);
    bool isWindowReadyForMoreInputLocked(nsecs_t currentTime,
            const sp<InputWindowHandle>& windowHandle, bool focusedEvent);
    String8 getApplicationWindowLabelLocked(const sp<InputApplicationHandle>& applicationHandle,
            const sp<InputWindowHandle>& windowHandle);

@@ -993,10 +1001,10 @@ private:
    void startDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection);
    void finishDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection,
            bool handled);
    void startNextDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection);
    void abortBrokenDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection,
            bool notify);
    void drainOutboundQueueLocked(Connection* connection);
    void drainDispatchQueueLocked(Queue<DispatchEntry>* queue);
    void releaseDispatchEntryLocked(DispatchEntry* dispatchEntry);
    static int handleReceiveCallback(int fd, int events, void* data);

    void synthesizeCancelationEventsForAllConnectionsLocked(
@@ -1025,8 +1033,6 @@ private:
    void deactivateConnectionLocked(Connection* connection);

    // Interesting events that we might like to log or tell the framework about.
    void onDispatchCycleStartedLocked(
            nsecs_t currentTime, const sp<Connection>& connection);
    void onDispatchCycleFinishedLocked(
            nsecs_t currentTime, const sp<Connection>& connection, bool handled);
    void onDispatchCycleBrokenLocked(