Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 123c72a4 authored by Jeff Brown's avatar Jeff Brown Committed by Android Git Automerger
Browse files

am 072ec96a: Implement batching of input events on the consumer side.

* commit '072ec96a4900d4616574733646ee46311cb5d2cb':
  Implement batching of input events on the consumer side.
parents 8bd35629 f0490c94
Loading
Loading
Loading
Loading
+52 −7
Original line number Diff line number Diff line
@@ -52,6 +52,7 @@ struct InputMessage {

    union Body {
        struct Key {
            uint32_t seq;
            nsecs_t eventTime;
            int32_t deviceId;
            int32_t source;
@@ -69,6 +70,7 @@ struct InputMessage {
        } key;

        struct Motion {
            uint32_t seq;
            nsecs_t eventTime;
            int32_t deviceId;
            int32_t source;
@@ -95,6 +97,7 @@ struct InputMessage {
        } motion;

        struct Finished {
            uint32_t seq;
            bool handled;

            inline size_t size() const {
@@ -181,9 +184,11 @@ public:
     * Returns OK on success.
     * Returns WOULD_BLOCK if the channel is full.
     * Returns DEAD_OBJECT if the channel's peer has been closed.
     * Returns BAD_VALUE if seq is 0.
     * Other errors probably indicate that the channel is broken.
     */
    status_t publishKeyEvent(
            uint32_t seq,
            int32_t deviceId,
            int32_t source,
            int32_t action,
@@ -200,10 +205,11 @@ public:
     * Returns OK on success.
     * Returns WOULD_BLOCK if the channel is full.
     * Returns DEAD_OBJECT if the channel's peer has been closed.
     * Returns BAD_VALUE if pointerCount is less than 1 or greater than MAX_POINTERS.
     * Returns BAD_VALUE if seq is 0 or if pointerCount is less than 1 or greater than MAX_POINTERS.
     * Other errors probably indicate that the channel is broken.
     */
    status_t publishMotionEvent(
            uint32_t seq,
            int32_t deviceId,
            int32_t source,
            int32_t action,
@@ -222,14 +228,17 @@ public:
            const PointerCoords* pointerCoords);

    /* Receives the finished signal from the consumer in reply to the original dispatch signal.
     * Returns whether the consumer handled the message.
     * If a signal was received, returns the message sequence number,
     * and whether the consumer handled the message.
     *
     * The returned sequence number is never 0 unless the operation failed.
     *
     * Returns OK on success.
     * Returns WOULD_BLOCK if there is no signal present.
     * Returns DEAD_OBJECT if the channel's peer has been closed.
     * Other errors probably indicate that the channel is broken.
     */
    status_t receiveFinishedSignal(bool* outHandled);
    status_t receiveFinishedSignal(uint32_t* outSeq, bool* outHandled);

private:
    sp<InputChannel> mChannel;
@@ -252,24 +261,60 @@ public:
    /* Consumes an input event from the input channel and copies its contents into
     * an InputEvent object created using the specified factory.
     *
     * Tries to combine a series of move events into larger batches whenever possible.
     *
     * If consumeBatches is false, then defers consuming pending batched events if it
     * is possible for additional samples to be added to them later.  Call hasPendingBatch()
     * to determine whether a pending batch is available to be consumed.
     *
     * If consumeBatches is true, then events are still batched but they are consumed
     * immediately as soon as the input channel is exhausted.
     *
     * The returned sequence number is never 0 unless the operation failed.
     *
     * Returns OK on success.
     * Returns WOULD_BLOCK if there is no event present.
     * Returns DEAD_OBJECT if the channel's peer has been closed.
     * Returns NO_MEMORY if the event could not be created.
     * Other errors probably indicate that the channel is broken.
     */
    status_t consume(InputEventFactoryInterface* factory, InputEvent** outEvent);
    status_t consume(InputEventFactoryInterface* factory, bool consumeBatches,
            uint32_t* outSeq, InputEvent** outEvent);

    /* Sends a finished signal to the publisher to inform it that the current message is
     * finished processing and specifies whether the message was handled by the consumer.
    /* Sends a finished signal to the publisher to inform it that the message
     * with the specified sequence number has finished being process and whether
     * the message was handled by the consumer.
     *
     * Returns OK on success.
     * Returns BAD_VALUE if seq is 0.
     * Other errors probably indicate that the channel is broken.
     */
    status_t sendFinishedSignal(bool handled);
    status_t sendFinishedSignal(uint32_t seq, bool handled);

    /* Returns true if there is a pending batch. */
    bool hasPendingBatch() const;

private:
    sp<InputChannel> mChannel;

    // State about an event that consume would have returned except that it had to
    // return a completed batch first.  Sequence number is non-zero if an event was deferred.
    uint32_t mDeferredEventSeq;
    MotionEvent mDeferredEvent;

    // Batched motion events per device and source.
    struct Batch {
        uint32_t seq;
        MotionEvent event;
    };
    Vector<Batch> mBatches;

    ssize_t findBatch(int32_t deviceId, int32_t source) const;

    static void initializeKeyEvent(KeyEvent* event, const InputMessage* msg);
    static void initializeMotionEvent(MotionEvent* event, const InputMessage* msg);
    static bool canAppendSamples(const MotionEvent* event, const InputMessage* msg);
    static void appendSamples(MotionEvent* event, const InputMessage* msg);
};

} // namespace android
+244 −70
Original line number Diff line number Diff line
@@ -13,8 +13,8 @@
// Log debug messages whenever InputChannel objects are created/destroyed
#define DEBUG_CHANNEL_LIFECYCLE 0

#define DEBUG_TRANSPORT_ACTIONS 0
// Log debug messages about transport actions
#define DEBUG_TRANSPORT_ACTIONS 0


#include <cutils/log.h>
@@ -203,6 +203,7 @@ InputPublisher::~InputPublisher() {
}

status_t InputPublisher::publishKeyEvent(
        uint32_t seq,
        int32_t deviceId,
        int32_t source,
        int32_t action,
@@ -214,16 +215,22 @@ status_t InputPublisher::publishKeyEvent(
        nsecs_t downTime,
        nsecs_t eventTime) {
#if DEBUG_TRANSPORT_ACTIONS
    ALOGD("channel '%s' publisher ~ publishKeyEvent: deviceId=%d, source=0x%x, "
    ALOGD("channel '%s' publisher ~ publishKeyEvent: seq=%u, deviceId=%d, source=0x%x, "
            "action=0x%x, flags=0x%x, keyCode=%d, scanCode=%d, metaState=0x%x, repeatCount=%d,"
            "downTime=%lld, eventTime=%lld",
            mChannel->getName().string(),
            mChannel->getName().string(), seq,
            deviceId, source, action, flags, keyCode, scanCode, metaState, repeatCount,
            downTime, eventTime);
#endif

    if (!seq) {
        ALOGE("Attempted to publish a key event with sequence number 0.");
        return BAD_VALUE;
    }

    InputMessage msg;
    msg.header.type = InputMessage::TYPE_KEY;
    msg.body.key.seq = seq;
    msg.body.key.deviceId = deviceId;
    msg.body.key.source = source;
    msg.body.key.action = action;
@@ -238,6 +245,7 @@ status_t InputPublisher::publishKeyEvent(
}

status_t InputPublisher::publishMotionEvent(
        uint32_t seq,
        int32_t deviceId,
        int32_t source,
        int32_t action,
@@ -255,16 +263,21 @@ status_t InputPublisher::publishMotionEvent(
        const PointerProperties* pointerProperties,
        const PointerCoords* pointerCoords) {
#if DEBUG_TRANSPORT_ACTIONS
    ALOGD("channel '%s' publisher ~ publishMotionEvent: deviceId=%d, source=0x%x, "
    ALOGD("channel '%s' publisher ~ publishMotionEvent: seq=%u, deviceId=%d, source=0x%x, "
            "action=0x%x, flags=0x%x, edgeFlags=0x%x, metaState=0x%x, buttonState=0x%x, "
            "xOffset=%f, yOffset=%f, "
            "xPrecision=%f, yPrecision=%f, downTime=%lld, eventTime=%lld, "
            "pointerCount=%d",
            mChannel->getName().string(),
            mChannel->getName().string(), seq,
            deviceId, source, action, flags, edgeFlags, metaState, buttonState,
            xOffset, yOffset, xPrecision, yPrecision, downTime, eventTime, pointerCount);
#endif

    if (!seq) {
        ALOGE("Attempted to publish a motion event with sequence number 0.");
        return BAD_VALUE;
    }

    if (pointerCount > MAX_POINTERS || pointerCount < 1) {
        ALOGE("channel '%s' publisher ~ Invalid number of pointers provided: %d.",
                mChannel->getName().string(), pointerCount);
@@ -273,6 +286,7 @@ status_t InputPublisher::publishMotionEvent(

    InputMessage msg;
    msg.header.type = InputMessage::TYPE_MOTION;
    msg.body.motion.seq = seq;
    msg.body.motion.deviceId = deviceId;
    msg.body.motion.source = source;
    msg.body.motion.action = action;
@@ -294,7 +308,7 @@ status_t InputPublisher::publishMotionEvent(
    return mChannel->sendMessage(&msg);
}

status_t InputPublisher::receiveFinishedSignal(bool* outHandled) {
status_t InputPublisher::receiveFinishedSignal(uint32_t* outSeq, bool* outHandled) {
#if DEBUG_TRANSPORT_ACTIONS
    ALOGD("channel '%s' publisher ~ receiveFinishedSignal",
            mChannel->getName().string());
@@ -303,6 +317,7 @@ status_t InputPublisher::receiveFinishedSignal(bool* outHandled) {
    InputMessage msg;
    status_t result = mChannel->receiveMessage(&msg);
    if (result) {
        *outSeq = 0;
        *outHandled = false;
        return result;
    }
@@ -311,6 +326,7 @@ status_t InputPublisher::receiveFinishedSignal(bool* outHandled) {
                mChannel->getName().string(), msg.header.type);
        return UNKNOWN_ERROR;
    }
    *outSeq = msg.body.finished.seq;
    *outHandled = msg.body.finished.handled;
    return OK;
}
@@ -318,23 +334,61 @@ status_t InputPublisher::receiveFinishedSignal(bool* outHandled) {
// --- InputConsumer ---

InputConsumer::InputConsumer(const sp<InputChannel>& channel) :
        mChannel(channel) {
        mChannel(channel), mDeferredEventSeq(0) {
}

InputConsumer::~InputConsumer() {
}

status_t InputConsumer::consume(InputEventFactoryInterface* factory, InputEvent** outEvent) {
status_t InputConsumer::consume(InputEventFactoryInterface* factory,
        bool consumeBatches, uint32_t* outSeq, InputEvent** outEvent) {
#if DEBUG_TRANSPORT_ACTIONS
    ALOGD("channel '%s' consumer ~ consume",
            mChannel->getName().string());
    ALOGD("channel '%s' consumer ~ consume: consumeBatches=%s",
            mChannel->getName().string(), consumeBatches ? "true" : "false");
#endif

    *outSeq = 0;
    *outEvent = NULL;

    // Report deferred event first, if we had to end a batch earlier than we expected
    // during the previous time consume was called.
    if (mDeferredEventSeq) {
        MotionEvent* motionEvent = factory->createMotionEvent();
        if (! motionEvent) return NO_MEMORY;

        motionEvent->copyFrom(&mDeferredEvent, true /*keepHistory*/);
        *outSeq = mDeferredEventSeq;
        *outEvent = motionEvent;
        mDeferredEventSeq = 0;
#if DEBUG_TRANSPORT_ACTIONS
        ALOGD("channel '%s' consumer ~ consumed deferred event, seq=%u",
                mChannel->getName().string(), *outSeq);
#endif
        return OK;
    }

    // Fetch the next input message.
    // Loop until an event can be returned or no additional events are received.
    while (!*outEvent) {
        InputMessage msg;
        status_t result = mChannel->receiveMessage(&msg);
        if (result) {
            // Consume the next batched event unless batches are being held for later.
            if (!mBatches.isEmpty() && (consumeBatches || result != WOULD_BLOCK)) {
                MotionEvent* motionEvent = factory->createMotionEvent();
                if (! motionEvent) return NO_MEMORY;

                const Batch& batch = mBatches.top();
                motionEvent->copyFrom(&batch.event, true /*keepHistory*/);
                *outSeq = batch.seq;
                *outEvent = motionEvent;
                mBatches.pop();
#if DEBUG_TRANSPORT_ACTIONS
                ALOGD("channel '%s' consumer ~ consumed batch event, seq=%u",
                        mChannel->getName().string(), *outSeq);
#endif
                break;
            }
            return result;
        }

@@ -343,51 +397,84 @@ status_t InputConsumer::consume(InputEventFactoryInterface* factory, InputEvent*
            KeyEvent* keyEvent = factory->createKeyEvent();
            if (!keyEvent) return NO_MEMORY;

        keyEvent->initialize(
                msg.body.key.deviceId,
                msg.body.key.source,
                msg.body.key.action,
                msg.body.key.flags,
                msg.body.key.keyCode,
                msg.body.key.scanCode,
                msg.body.key.metaState,
                msg.body.key.repeatCount,
                msg.body.key.downTime,
                msg.body.key.eventTime);
            initializeKeyEvent(keyEvent, &msg);
            *outSeq = msg.body.key.seq;
            *outEvent = keyEvent;
#if DEBUG_TRANSPORT_ACTIONS
            ALOGD("channel '%s' consumer ~ consumed key event, seq=%u",
                    mChannel->getName().string(), *outSeq);
#endif
            break;
        }

        case AINPUT_EVENT_TYPE_MOTION: {
            ssize_t batchIndex = findBatch(msg.body.motion.deviceId, msg.body.motion.source);
            if (batchIndex >= 0) {
                Batch& batch = mBatches.editItemAt(batchIndex);
                if (canAppendSamples(&batch.event, &msg)) {
                    // Send finished message for the earlier part of the batch.
                    // Claim that we handled the event.  (The dispatcher doesn't care either
                    // way at the moment.)
                    status_t status = sendFinishedSignal(batch.seq, true);
                    if (status) {
                        return status;
                    }

                    // Append to the batch and save the new sequence number for the tail end.
                    appendSamples(&batch.event, &msg);
                    batch.seq = msg.body.motion.seq;
#if DEBUG_TRANSPORT_ACTIONS
                    ALOGD("channel '%s' consumer ~ appended to batch event",
                            mChannel->getName().string());
#endif
                    break;
                } else {
                    MotionEvent* motionEvent = factory->createMotionEvent();
                    if (! motionEvent) return NO_MEMORY;

        size_t pointerCount = msg.body.motion.pointerCount;
        PointerProperties pointerProperties[pointerCount];
        PointerCoords pointerCoords[pointerCount];
        for (size_t i = 0; i < pointerCount; i++) {
            pointerProperties[i].copyFrom(msg.body.motion.pointers[i].properties);
            pointerCoords[i].copyFrom(msg.body.motion.pointers[i].coords);
        }

        motionEvent->initialize(
                msg.body.motion.deviceId,
                msg.body.motion.source,
                msg.body.motion.action,
                msg.body.motion.flags,
                msg.body.motion.edgeFlags,
                msg.body.motion.metaState,
                msg.body.motion.buttonState,
                msg.body.motion.xOffset,
                msg.body.motion.yOffset,
                msg.body.motion.xPrecision,
                msg.body.motion.yPrecision,
                msg.body.motion.downTime,
                msg.body.motion.eventTime,
                pointerCount,
                pointerProperties,
                pointerCoords);
                    // We cannot append to the batch in progress, so we need to consume
                    // the previous batch right now and defer the new event until later.
                    mDeferredEventSeq = msg.body.motion.seq;
                    initializeMotionEvent(&mDeferredEvent, &msg);

                    // Return the end of the previous batch.
                    motionEvent->copyFrom(&batch.event, true /*keepHistory*/);
                    *outSeq = batch.seq;
                    *outEvent = motionEvent;
                    mBatches.removeAt(batchIndex);
#if DEBUG_TRANSPORT_ACTIONS
                    ALOGD("channel '%s' consumer ~ consumed batch event and "
                            "deferred current event, seq=%u",
                            mChannel->getName().string(), *outSeq);
#endif
                    break;
                }
            }

            // Start a new batch if needed.
            if (msg.body.motion.action == AMOTION_EVENT_ACTION_MOVE
                    || msg.body.motion.action == AMOTION_EVENT_ACTION_HOVER_MOVE) {
                mBatches.push();
                Batch& batch = mBatches.editTop();
                batch.seq = msg.body.motion.seq;
                initializeMotionEvent(&batch.event, &msg);
#if DEBUG_TRANSPORT_ACTIONS
                ALOGD("channel '%s' consumer ~ started batch event",
                        mChannel->getName().string());
#endif
                break;
            }

            MotionEvent* motionEvent = factory->createMotionEvent();
            if (! motionEvent) return NO_MEMORY;

            initializeMotionEvent(motionEvent, &msg);
            *outSeq = msg.body.motion.seq;
            *outEvent = motionEvent;
#if DEBUG_TRANSPORT_ACTIONS
            ALOGD("channel '%s' consumer ~ consumed motion event, seq=%u",
                    mChannel->getName().string(), *outSeq);
#endif
            break;
        }

@@ -396,20 +483,107 @@ status_t InputConsumer::consume(InputEventFactoryInterface* factory, InputEvent*
                    mChannel->getName().string(), msg.header.type);
            return UNKNOWN_ERROR;
        }

    }
    return OK;
}

status_t InputConsumer::sendFinishedSignal(bool handled) {
status_t InputConsumer::sendFinishedSignal(uint32_t seq, bool handled) {
#if DEBUG_TRANSPORT_ACTIONS
    ALOGD("channel '%s' consumer ~ sendFinishedSignal: handled=%d",
            mChannel->getName().string(), handled);
    ALOGD("channel '%s' consumer ~ sendFinishedSignal: seq=%u, handled=%s",
            mChannel->getName().string(), seq, handled ? "true" : "false");
#endif

    if (!seq) {
        ALOGE("Attempted to send a finished signal with sequence number 0.");
        return BAD_VALUE;
    }

    InputMessage msg;
    msg.header.type = InputMessage::TYPE_FINISHED;
    msg.body.finished.seq = seq;
    msg.body.finished.handled = handled;
    return mChannel->sendMessage(&msg);
}

bool InputConsumer::hasPendingBatch() const {
    return !mBatches.isEmpty();
}

ssize_t InputConsumer::findBatch(int32_t deviceId, int32_t source) const {
    for (size_t i = 0; i < mBatches.size(); i++) {
        const Batch& batch = mBatches.itemAt(i);
        if (batch.event.getDeviceId() == deviceId && batch.event.getSource() == source) {
            return i;
        }
    }
    return -1;
}

void InputConsumer::initializeKeyEvent(KeyEvent* event, const InputMessage* msg) {
    event->initialize(
            msg->body.key.deviceId,
            msg->body.key.source,
            msg->body.key.action,
            msg->body.key.flags,
            msg->body.key.keyCode,
            msg->body.key.scanCode,
            msg->body.key.metaState,
            msg->body.key.repeatCount,
            msg->body.key.downTime,
            msg->body.key.eventTime);
}

void InputConsumer::initializeMotionEvent(MotionEvent* event, const InputMessage* msg) {
    size_t pointerCount = msg->body.motion.pointerCount;
    PointerProperties pointerProperties[pointerCount];
    PointerCoords pointerCoords[pointerCount];
    for (size_t i = 0; i < pointerCount; i++) {
        pointerProperties[i].copyFrom(msg->body.motion.pointers[i].properties);
        pointerCoords[i].copyFrom(msg->body.motion.pointers[i].coords);
    }

    event->initialize(
            msg->body.motion.deviceId,
            msg->body.motion.source,
            msg->body.motion.action,
            msg->body.motion.flags,
            msg->body.motion.edgeFlags,
            msg->body.motion.metaState,
            msg->body.motion.buttonState,
            msg->body.motion.xOffset,
            msg->body.motion.yOffset,
            msg->body.motion.xPrecision,
            msg->body.motion.yPrecision,
            msg->body.motion.downTime,
            msg->body.motion.eventTime,
            pointerCount,
            pointerProperties,
            pointerCoords);
}

bool InputConsumer::canAppendSamples(const MotionEvent* event, const InputMessage *msg) {
    size_t pointerCount = msg->body.motion.pointerCount;
    if (event->getPointerCount() != pointerCount
            || event->getAction() != msg->body.motion.action) {
        return false;
    }
    for (size_t i = 0; i < pointerCount; i++) {
        if (*event->getPointerProperties(i) != msg->body.motion.pointers[i].properties) {
            return false;
        }
    }
    return true;
}

void InputConsumer::appendSamples(MotionEvent* event, const InputMessage* msg) {
    size_t pointerCount = msg->body.motion.pointerCount;
    PointerCoords pointerCoords[pointerCount];
    for (size_t i = 0; i < pointerCount; i++) {
        pointerCoords[i].copyFrom(msg->body.motion.pointers[i].coords);
    }

    event->setMetaState(event->getMetaState() | msg->body.motion.metaState);
    event->addSample(msg->body.motion.eventTime, pointerCoords);
}

} // namespace android
+3 −0
Original line number Diff line number Diff line
@@ -90,6 +90,7 @@ TEST_F(InputChannelTest, OpenInputChannelPair_ReturnsAPairOfConnectedChannels) {
    InputMessage clientReply;
    memset(&clientReply, 0, sizeof(InputMessage));
    clientReply.header.type = InputMessage::TYPE_FINISHED;
    clientReply.body.finished.seq = 0x11223344;
    clientReply.body.finished.handled = true;
    EXPECT_EQ(OK, clientChannel->sendMessage(&clientReply))
            << "client channel should be able to send message to server channel";
@@ -99,6 +100,8 @@ TEST_F(InputChannelTest, OpenInputChannelPair_ReturnsAPairOfConnectedChannels) {
            << "server channel should be able to receive message from client channel";
    EXPECT_EQ(clientReply.header.type, serverReply.header.type)
            << "server channel should receive the correct message from client channel";
    EXPECT_EQ(clientReply.body.finished.seq, serverReply.body.finished.seq)
            << "server channel should receive the correct message from client channel";
    EXPECT_EQ(clientReply.body.finished.handled, serverReply.body.finished.handled)
            << "server channel should receive the correct message from client channel";
}
+22 −10

File changed.

Preview size limit exceeded, changes collapsed.