Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 877f352b authored by Jeff Brown's avatar Jeff Brown Committed by Android Git Automerger
Browse files

am d1c48a05: Dispatch multiple touch events in parallel.

* commit 'd1c48a05':
  Dispatch multiple touch events in parallel.
parents 477f82dc d1c48a05
Loading
Loading
Loading
Loading
+13 −0
Original line number Diff line number Diff line
@@ -28,6 +28,13 @@

namespace android {

// Socket buffer size.  The default is typically about 128KB, which is much larger than
// we really need.  So we make it smaller.  It just needs to be big enough to hold
// a few dozen large multi-finger motion events in the case where an application gets
// behind processing touches.
static const size_t SOCKET_BUFFER_SIZE = 32 * 1024;


// --- InputMessage ---

bool InputMessage::isValid(size_t actualSize) const {
@@ -93,6 +100,12 @@ status_t InputChannel::openInputChannelPair(const String8& name,
        return result;
    }

    int bufferSize = SOCKET_BUFFER_SIZE;
    setsockopt(sockets[0], SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize));
    setsockopt(sockets[0], SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize));
    setsockopt(sockets[1], SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize));
    setsockopt(sockets[1], SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize));

    String8 serverChannelName = name;
    serverChannelName.append(" (server)");
    outServerChannel = new InputChannel(serverChannelName, sockets[0]);
+170 −148
Original line number Diff line number Diff line
@@ -70,6 +70,12 @@ const nsecs_t APP_SWITCH_TIMEOUT = 500 * 1000000LL; // 0.5sec
// before considering it stale and dropping it.
const nsecs_t STALE_EVENT_TIMEOUT = 10000 * 1000000LL; // 10sec

// Amount of time to allow touch events to be streamed out to a connection before requiring
// that the first event be finished.  This value extends the ANR timeout by the specified
// amount.  For example, if streaming is allowed to get ahead by one second relative to the
// queue of waiting unfinished events, then ANRs will similarly be delayed by one second.
const nsecs_t STREAM_AHEAD_EVENT_TIMEOUT = 500 * 1000000LL; // 0.5sec


static inline nsecs_t now() {
    return systemTime(SYSTEM_TIME_MONOTONIC);
@@ -1035,7 +1041,8 @@ int32_t InputDispatcher::findFocusedWindowTargetsLocked(nsecs_t currentTime,
    }

    // If the currently focused window is still working on previous events then keep waiting.
    if (! isWindowFinishedWithPreviousInputLocked(mFocusedWindowHandle)) {
    if (!isWindowReadyForMoreInputLocked(currentTime,
            mFocusedWindowHandle, true /*focusedEvent*/)) {
#if DEBUG_FOCUS
        ALOGD("Waiting because focused window still processing previous input.");
#endif
@@ -1398,7 +1405,8 @@ int32_t InputDispatcher::findTouchedWindowTargetsLocked(nsecs_t currentTime,
            }

            // If the touched window is still working on previous events then keep waiting.
            if (! isWindowFinishedWithPreviousInputLocked(touchedWindow.windowHandle)) {
            if (!isWindowReadyForMoreInputLocked(currentTime,
                    touchedWindow.windowHandle, false /*focusedEvent*/)) {
#if DEBUG_FOCUS
                ALOGD("Waiting because touched window still processing previous input.");
#endif
@@ -1609,15 +1617,33 @@ bool InputDispatcher::isWindowObscuredAtPointLocked(
    return false;
}

bool InputDispatcher::isWindowFinishedWithPreviousInputLocked(
        const sp<InputWindowHandle>& windowHandle) {
bool InputDispatcher::isWindowReadyForMoreInputLocked(nsecs_t currentTime,
        const sp<InputWindowHandle>& windowHandle, bool focusedEvent) {
    ssize_t connectionIndex = getConnectionIndexLocked(windowHandle->getInputChannel());
    if (connectionIndex >= 0) {
        sp<Connection> connection = mConnectionsByFd.valueAt(connectionIndex);
        return connection->outboundQueue.isEmpty();
    } else {
        return true;
        if (connection->inputPublisherBlocked) {
            return false;
        }
        if (focusedEvent) {
            // If the event relies on input focus (such as a key event), then we must
            // wait for all previous events to complete before delivering it because they
            // may move focus elsewhere.
            return connection->outboundQueue.isEmpty()
                    && connection->waitQueue.isEmpty();
        }
        // Touch events can always be sent to a window because the user intended to touch
        // whatever was visible immediately.  Even if focus changes or a new window appears,
        // the touch event was meant for whatever happened to be on screen at the time.
        // However, if the wait queue is piling up with lots of events, then hold up
        // new events for awhile.  This condition ensures that ANRs still work.
        if (!connection->waitQueue.isEmpty()
                && currentTime >= connection->waitQueue.head->eventEntry->eventTime
                        + STREAM_AHEAD_EVENT_TIMEOUT) {
            return false;
        }
    }
    return true;
}

String8 InputDispatcher::getApplicationWindowLabelLocked(
@@ -1834,14 +1860,9 @@ void InputDispatcher::startDispatchCycleLocked(nsecs_t currentTime,
            connection->getInputChannelName());
#endif

    ALOG_ASSERT(connection->status == Connection::STATUS_NORMAL);
    ALOG_ASSERT(! connection->outboundQueue.isEmpty());

    while (connection->status == Connection::STATUS_NORMAL
            && !connection->outboundQueue.isEmpty()) {
        DispatchEntry* dispatchEntry = connection->outboundQueue.head;
    ALOG_ASSERT(! dispatchEntry->inProgress);

    // Mark the dispatch entry as in progress.
    dispatchEntry->inProgress = true;

        // Publish the event.
        status_t status;
@@ -1857,13 +1878,6 @@ void InputDispatcher::startDispatchCycleLocked(nsecs_t currentTime,
                    keyEntry->keyCode, keyEntry->scanCode,
                    keyEntry->metaState, keyEntry->repeatCount, keyEntry->downTime,
                    keyEntry->eventTime);

        if (status) {
            ALOGE("channel '%s' ~ Could not publish key event, "
                    "status=%d", connection->getInputChannelName(), status);
            abortBrokenDispatchCycleLocked(currentTime, connection, true /*notify*/);
            return;
        }
            break;
        }

@@ -1875,7 +1889,7 @@ void InputDispatcher::startDispatchCycleLocked(nsecs_t currentTime,

            // Set the X and Y offset depending on the input source.
            float xOffset, yOffset, scaleFactor;
        if (motionEntry->source & AINPUT_SOURCE_CLASS_POINTER
            if ((motionEntry->source & AINPUT_SOURCE_CLASS_POINTER)
                    && !(dispatchEntry->targetFlags & InputTarget::FLAG_ZERO_COORDS)) {
                scaleFactor = dispatchEntry->scaleFactor;
                xOffset = dispatchEntry->xOffset * scaleFactor;
@@ -1911,25 +1925,47 @@ void InputDispatcher::startDispatchCycleLocked(nsecs_t currentTime,
                    motionEntry->downTime, motionEntry->eventTime,
                    motionEntry->pointerCount, motionEntry->pointerProperties,
                    usingCoords);
            break;
        }

        default:
            ALOG_ASSERT(false);
            return;
        }

        // Check the result.
        if (status) {
            ALOGE("channel '%s' ~ Could not publish motion event, "
            if (status == WOULD_BLOCK) {
                if (connection->waitQueue.isEmpty()) {
                    ALOGE("channel '%s' ~ Could not publish event because the pipe is full. "
                            "This is unexpected because the wait queue is empty, so the pipe "
                            "should be empty and we shouldn't have any problems writing an "
                            "event to it, status=%d", connection->getInputChannelName(), status);
                    abortBrokenDispatchCycleLocked(currentTime, connection, true /*notify*/);
                } else {
                    // Pipe is full and we are waiting for the app to finish process some events
                    // before sending more events to it.
#if DEBUG_DISPATCH_CYCLE
                    ALOGD("channel '%s' ~ Could not publish event because the pipe is full, "
                            "waiting for the application to catch up",
                            connection->getInputChannelName());
#endif
                    connection->inputPublisherBlocked = true;
                }
            } else {
                ALOGE("channel '%s' ~ Could not publish event due to an unexpected error, "
                        "status=%d", connection->getInputChannelName(), status);
                abortBrokenDispatchCycleLocked(currentTime, connection, true /*notify*/);
            return;
            }
        break;
            return;
        }

    default: {
        ALOG_ASSERT(false);
        // Re-enqueue the event on the wait queue.
        connection->outboundQueue.dequeue(dispatchEntry);
        connection->waitQueue.enqueueAtTail(dispatchEntry);
    }
}

    // Notify other system components.
    onDispatchCycleStartedLocked(currentTime, connection);
}

void InputDispatcher::finishDispatchCycleLocked(nsecs_t currentTime,
        const sp<Connection>& connection, bool handled) {
#if DEBUG_DISPATCH_CYCLE
@@ -1937,6 +1973,8 @@ void InputDispatcher::finishDispatchCycleLocked(nsecs_t currentTime,
            connection->getInputChannelName(), toString(handled));
#endif

    connection->inputPublisherBlocked = false;

    if (connection->status == Connection::STATUS_BROKEN
            || connection->status == Connection::STATUS_ZOMBIE) {
        return;
@@ -1946,28 +1984,6 @@ void InputDispatcher::finishDispatchCycleLocked(nsecs_t currentTime,
    onDispatchCycleFinishedLocked(currentTime, connection, handled);
}

void InputDispatcher::startNextDispatchCycleLocked(nsecs_t currentTime,
        const sp<Connection>& connection) {
    // Start the next dispatch cycle for this connection.
    while (! connection->outboundQueue.isEmpty()) {
        DispatchEntry* dispatchEntry = connection->outboundQueue.head;
        if (dispatchEntry->inProgress) {
            // Finished.
            connection->outboundQueue.dequeueAtHead();
            if (dispatchEntry->hasForegroundTarget()) {
                decrementPendingForegroundDispatchesLocked(dispatchEntry->eventEntry);
            }
            delete dispatchEntry;
        } else {
            // If the head is not in progress, then we must have already dequeued the in
            // progress event, which means we actually aborted it.
            // So just start the next event for this connection.
            startDispatchCycleLocked(currentTime, connection);
            return;
        }
    }
}

void InputDispatcher::abortBrokenDispatchCycleLocked(nsecs_t currentTime,
        const sp<Connection>& connection, bool notify) {
#if DEBUG_DISPATCH_CYCLE
@@ -1975,8 +1991,9 @@ void InputDispatcher::abortBrokenDispatchCycleLocked(nsecs_t currentTime,
            connection->getInputChannelName(), toString(notify));
#endif

    // Clear the outbound queue.
    drainOutboundQueueLocked(connection.get());
    // Clear the dispatch queues.
    drainDispatchQueueLocked(&connection->outboundQueue);
    drainDispatchQueueLocked(&connection->waitQueue);

    // The connection appears to be unrecoverably broken.
    // Ignore already broken or zombie connections.
@@ -1990,15 +2007,19 @@ void InputDispatcher::abortBrokenDispatchCycleLocked(nsecs_t currentTime,
    }
}

void InputDispatcher::drainOutboundQueueLocked(Connection* connection) {
    while (! connection->outboundQueue.isEmpty()) {
        DispatchEntry* dispatchEntry = connection->outboundQueue.dequeueAtHead();
void InputDispatcher::drainDispatchQueueLocked(Queue<DispatchEntry>* queue) {
    while (!queue->isEmpty()) {
        DispatchEntry* dispatchEntry = queue->dequeueAtHead();
        releaseDispatchEntryLocked(dispatchEntry);
    }
}

void InputDispatcher::releaseDispatchEntryLocked(DispatchEntry* dispatchEntry) {
    if (dispatchEntry->hasForegroundTarget()) {
        decrementPendingForegroundDispatchesLocked(dispatchEntry->eventEntry);
    }
    delete dispatchEntry;
}
}

int InputDispatcher::handleReceiveCallback(int fd, int events, void* data) {
    InputDispatcher* d = static_cast<InputDispatcher*>(data);
@@ -2121,11 +2142,9 @@ void InputDispatcher::synthesizeCancelationEventsForConnectionLocked(
            cancelationEventEntry->release();
        }

        if (!connection->outboundQueue.head->inProgress) {
        startDispatchCycleLocked(currentTime, connection);
    }
}
}

InputDispatcher::MotionEntry*
InputDispatcher::splitMotionEvent(const MotionEntry* originalMotionEntry, BitSet32 pointerIds) {
@@ -3139,10 +3158,6 @@ ssize_t InputDispatcher::getConnectionIndexLocked(const sp<InputChannel>& inputC
    return -1;
}

void InputDispatcher::onDispatchCycleStartedLocked(
        nsecs_t currentTime, const sp<Connection>& connection) {
}

void InputDispatcher::onDispatchCycleFinishedLocked(
        nsecs_t currentTime, const sp<Connection>& connection, bool handled) {
    CommandEntry* commandEntry = postCommandLocked(
@@ -3243,24 +3258,37 @@ void InputDispatcher::doDispatchCycleFinishedLockedInterruptible(
    sp<Connection> connection = commandEntry->connection;
    bool handled = commandEntry->handled;

    bool skipNext = false;
    if (!connection->outboundQueue.isEmpty()) {
        DispatchEntry* dispatchEntry = connection->outboundQueue.head;
        if (dispatchEntry->inProgress) {
    if (!connection->waitQueue.isEmpty()) {
        // Handle post-event policy actions.
        bool restartEvent;
        DispatchEntry* dispatchEntry = connection->waitQueue.head;
        if (dispatchEntry->eventEntry->type == EventEntry::TYPE_KEY) {
            KeyEntry* keyEntry = static_cast<KeyEntry*>(dispatchEntry->eventEntry);
                skipNext = afterKeyEventLockedInterruptible(connection,
            restartEvent = afterKeyEventLockedInterruptible(connection,
                    dispatchEntry, keyEntry, handled);
        } else if (dispatchEntry->eventEntry->type == EventEntry::TYPE_MOTION) {
            MotionEntry* motionEntry = static_cast<MotionEntry*>(dispatchEntry->eventEntry);
                skipNext = afterMotionEventLockedInterruptible(connection,
            restartEvent = afterMotionEventLockedInterruptible(connection,
                    dispatchEntry, motionEntry, handled);
        } else {
            restartEvent = false;
        }

        // Dequeue the event and start the next cycle.
        // Note that because the lock might have been released, it is possible that the
        // contents of the wait queue to have been drained, so we need to double-check
        // a few things.
        if (connection->waitQueue.head == dispatchEntry) {
            connection->waitQueue.dequeueAtHead();
            if (restartEvent && connection->status == Connection::STATUS_NORMAL) {
                connection->outboundQueue.enqueueAtHead(dispatchEntry);
            } else {
                releaseDispatchEntryLocked(dispatchEntry);
            }
        }

    if (!skipNext) {
        startNextDispatchCycleLocked(now(), connection);
        // Start the next dispatch cycle for this connection.
        startDispatchCycleLocked(now(), connection);
    }
}

@@ -3324,11 +3352,9 @@ bool InputDispatcher::afterKeyEventLockedInterruptible(const sp<Connection>& con

            if (connection->status != Connection::STATUS_NORMAL) {
                connection->inputState.removeFallbackKey(originalKeyCode);
                return true; // skip next cycle
                return false;
            }

            ALOG_ASSERT(connection->outboundQueue.head == dispatchEntry);

            // Latch the fallback keycode for this key on an initial down.
            // The fallback keycode cannot change at any other point in the lifecycle.
            if (initialDown) {
@@ -3406,10 +3432,7 @@ bool InputDispatcher::afterKeyEventLockedInterruptible(const sp<Connection>& con
                        "originalKeyCode=%d, fallbackKeyCode=%d, fallbackMetaState=%08x",
                        originalKeyCode, fallbackKeyCode, keyEntry->metaState);
#endif

                dispatchEntry->inProgress = false;
                startDispatchCycleLocked(now(), connection);
                return true; // already started next cycle
                return true; // restart the event
            } else {
#if DEBUG_OUTBOUND_EVENT_DETAILS
                ALOGD("Unhandled key event: No fallback key.");
@@ -3604,7 +3627,6 @@ InputDispatcher::DispatchEntry::DispatchEntry(EventEntry* eventEntry,
        int32_t targetFlags, float xOffset, float yOffset, float scaleFactor) :
        eventEntry(eventEntry), targetFlags(targetFlags),
        xOffset(xOffset), yOffset(yOffset), scaleFactor(scaleFactor),
        inProgress(false),
        resolvedAction(0), resolvedFlags(0) {
    eventEntry->refCount += 1;
}
@@ -3943,7 +3965,7 @@ InputDispatcher::Connection::Connection(const sp<InputChannel>& inputChannel,
        const sp<InputWindowHandle>& inputWindowHandle, bool monitor) :
        status(STATUS_NORMAL), inputChannel(inputChannel), inputWindowHandle(inputWindowHandle),
        monitor(monitor),
        inputPublisher(inputChannel) {
        inputPublisher(inputChannel), inputPublisherBlocked(false) {
}

InputDispatcher::Connection::~Connection() {
+14 −8
Original line number Diff line number Diff line
@@ -528,9 +528,6 @@ private:
        float yOffset;
        float scaleFactor;

        // True if dispatch has started.
        bool inProgress;

        // Set to the resolved action and flags when the event is enqueued.
        int32_t resolvedAction;
        int32_t resolvedFlags;
@@ -782,8 +779,18 @@ private:
        bool monitor;
        InputPublisher inputPublisher;
        InputState inputState;

        // True if the socket is full and no further events can be published until
        // the application consumes some of the input.
        bool inputPublisherBlocked;

        // Queue of events that need to be published to the connection.
        Queue<DispatchEntry> outboundQueue;

        // Queue of events that have been published to the connection but that have not
        // yet received a "finished" response from the application.
        Queue<DispatchEntry> waitQueue;

        explicit Connection(const sp<InputChannel>& inputChannel,
                const sp<InputWindowHandle>& inputWindowHandle, bool monitor);

@@ -976,7 +983,8 @@ private:
            const InjectionState* injectionState);
    bool isWindowObscuredAtPointLocked(const sp<InputWindowHandle>& windowHandle,
            int32_t x, int32_t y) const;
    bool isWindowFinishedWithPreviousInputLocked(const sp<InputWindowHandle>& windowHandle);
    bool isWindowReadyForMoreInputLocked(nsecs_t currentTime,
            const sp<InputWindowHandle>& windowHandle, bool focusedEvent);
    String8 getApplicationWindowLabelLocked(const sp<InputApplicationHandle>& applicationHandle,
            const sp<InputWindowHandle>& windowHandle);

@@ -993,10 +1001,10 @@ private:
    void startDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection);
    void finishDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection,
            bool handled);
    void startNextDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection);
    void abortBrokenDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection,
            bool notify);
    void drainOutboundQueueLocked(Connection* connection);
    void drainDispatchQueueLocked(Queue<DispatchEntry>* queue);
    void releaseDispatchEntryLocked(DispatchEntry* dispatchEntry);
    static int handleReceiveCallback(int fd, int events, void* data);

    void synthesizeCancelationEventsForAllConnectionsLocked(
@@ -1025,8 +1033,6 @@ private:
    void deactivateConnectionLocked(Connection* connection);

    // Interesting events that we might like to log or tell the framework about.
    void onDispatchCycleStartedLocked(
            nsecs_t currentTime, const sp<Connection>& connection);
    void onDispatchCycleFinishedLocked(
            nsecs_t currentTime, const sp<Connection>& connection, bool handled);
    void onDispatchCycleBrokenLocked(