Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit cbee6d6e authored by Jeff Brown's avatar Jeff Brown
Browse files

Rewrite input transport using sockets.

Since we will not longer be modifying events in place, we don't need
to use an ashmem region for input.  Simplified the code to instead
use a socket of type SOCK_SEQPACKET.

This is part of a series of changes to improve input system pipelining.

Bug: 5963420

Change-Id: I05909075ed8b61b93900913e44c6db84857340d8
parent 3241b6b7
Loading
Loading
Loading
Loading
+4 −16
Original line number Diff line number Diff line
@@ -130,21 +130,21 @@ AInputQueue::~AInputQueue() {
void AInputQueue::attachLooper(ALooper* looper, int ident,
        ALooper_callbackFunc callback, void* data) {
    mLooper = static_cast<android::Looper*>(looper);
    mLooper->addFd(mConsumer.getChannel()->getReceivePipeFd(),
    mLooper->addFd(mConsumer.getChannel()->getFd(),
            ident, ALOOPER_EVENT_INPUT, callback, data);
    mLooper->addFd(mDispatchKeyRead,
            ident, ALOOPER_EVENT_INPUT, callback, data);
}

void AInputQueue::detachLooper() {
    mLooper->removeFd(mConsumer.getChannel()->getReceivePipeFd());
    mLooper->removeFd(mConsumer.getChannel()->getFd());
    mLooper->removeFd(mDispatchKeyRead);
}

int32_t AInputQueue::hasEvents() {
    struct pollfd pfd[2];

    pfd[0].fd = mConsumer.getChannel()->getReceivePipeFd();
    pfd[0].fd = mConsumer.getChannel()->getFd();
    pfd[0].events = POLLIN;
    pfd[0].revents = 0;
    pfd[1].fd = mDispatchKeyRead;
@@ -201,15 +201,8 @@ int32_t AInputQueue::getEvent(AInputEvent** outEvent) {
        }
    }

    int32_t res = mConsumer.receiveDispatchSignal();
    if (res != android::OK) {
        ALOGE("channel '%s' ~ Failed to receive dispatch signal.  status=%d",
                mConsumer.getChannel()->getName().string(), res);
        return -1;
    }

    InputEvent* myEvent = NULL;
    res = mConsumer.consume(this, &myEvent);
    status_t res = mConsumer.consume(this, &myEvent);
    if (res != android::OK) {
        ALOGW("channel '%s' ~ Failed to consume input event.  status=%d",
                mConsumer.getChannel()->getName().string(), res);
@@ -481,11 +474,6 @@ struct NativeCode : public ANativeActivity {
                    android_view_InputChannel_getInputChannel(env, _channel);
            if (ic != NULL) {
                nativeInputQueue = new AInputQueue(ic, mainWorkWrite);
                if (nativeInputQueue->getConsumer().initialize() != android::OK) {
                    delete nativeInputQueue;
                    nativeInputQueue = NULL;
                    return UNKNOWN_ERROR;
                }
            } else {
                return UNKNOWN_ERROR;
            }
+6 −24
Original line number Diff line number Diff line
@@ -199,32 +199,16 @@ static void android_view_InputChannel_nativeReadFromParcel(JNIEnv* env, jobject
        bool isInitialized = parcel->readInt32();
        if (isInitialized) {
            String8 name = parcel->readString8();
            int32_t parcelAshmemFd = parcel->readFileDescriptor();
            int32_t ashmemFd = dup(parcelAshmemFd);
            if (ashmemFd < 0) {
                ALOGE("Error %d dup ashmem fd %d.", errno, parcelAshmemFd);
            }
            int32_t parcelReceivePipeFd = parcel->readFileDescriptor();
            int32_t receivePipeFd = dup(parcelReceivePipeFd);
            if (receivePipeFd < 0) {
                ALOGE("Error %d dup receive pipe fd %d.", errno, parcelReceivePipeFd);
            }
            int32_t parcelSendPipeFd = parcel->readFileDescriptor();
            int32_t sendPipeFd = dup(parcelSendPipeFd);
            if (sendPipeFd < 0) {
                ALOGE("Error %d dup send pipe fd %d.", errno, parcelSendPipeFd);
            }
            if (ashmemFd < 0 || receivePipeFd < 0 || sendPipeFd < 0) {
                if (ashmemFd >= 0) ::close(ashmemFd);
                if (receivePipeFd >= 0) ::close(receivePipeFd);
                if (sendPipeFd >= 0) ::close(sendPipeFd);
            int32_t rawFd = parcel->readFileDescriptor();
            int32_t dupFd = dup(rawFd);
            if (rawFd < 0) {
                ALOGE("Error %d dup channel fd %d.", errno, rawFd);
                jniThrowRuntimeException(env,
                        "Could not read input channel file descriptors from parcel.");
                return;
            }

            InputChannel* inputChannel = new InputChannel(name, ashmemFd,
                    receivePipeFd, sendPipeFd);
            InputChannel* inputChannel = new InputChannel(name, dupFd);
            NativeInputChannel* nativeInputChannel = new NativeInputChannel(inputChannel);

            android_view_InputChannel_setNativeInputChannel(env, obj, nativeInputChannel);
@@ -243,9 +227,7 @@ static void android_view_InputChannel_nativeWriteToParcel(JNIEnv* env, jobject o

            parcel->writeInt32(1);
            parcel->writeString8(inputChannel->getName());
            parcel->writeDupFileDescriptor(inputChannel->getAshmemFd());
            parcel->writeDupFileDescriptor(inputChannel->getReceivePipeFd());
            parcel->writeDupFileDescriptor(inputChannel->getSendPipeFd());
            parcel->writeDupFileDescriptor(inputChannel->getFd());
        } else {
            parcel->writeInt32(0);
        }
+3 −17
Original line number Diff line number Diff line
@@ -83,7 +83,7 @@ NativeInputEventReceiver::~NativeInputEventReceiver() {
    ALOGD("channel '%s' ~ Disposing input event receiver.", getInputChannelName());
#endif

    mLooper->removeFd(mInputConsumer.getChannel()->getReceivePipeFd());
    mLooper->removeFd(mInputConsumer.getChannel()->getFd());
    if (mEventInProgress) {
        mInputConsumer.sendFinishedSignal(false); // ignoring result
    }
@@ -93,14 +93,7 @@ NativeInputEventReceiver::~NativeInputEventReceiver() {
}

status_t NativeInputEventReceiver::initialize() {
    status_t result = mInputConsumer.initialize();
    if (result) {
        ALOGW("Failed to initialize input consumer for input channel '%s', status=%d",
                getInputChannelName(), result);
        return result;
    }

    int32_t receiveFd = mInputConsumer.getChannel()->getReceivePipeFd();
    int32_t receiveFd = mInputConsumer.getChannel()->getFd();
    mLooper->addFd(receiveFd, 0, ALOOPER_EVENT_INPUT, handleReceiveCallback, this);
    return OK;
}
@@ -139,13 +132,6 @@ int NativeInputEventReceiver::handleReceiveCallback(int receiveFd, int events, v
        return 1;
    }

    status_t status = r->mInputConsumer.receiveDispatchSignal();
    if (status) {
        ALOGE("channel '%s' ~ Failed to receive dispatch signal.  status=%d",
                r->getInputChannelName(), status);
        return 0; // remove the callback
    }

    if (r->mEventInProgress) {
        ALOGW("channel '%s' ~ Publisher sent spurious dispatch signal.",
                r->getInputChannelName());
@@ -153,7 +139,7 @@ int NativeInputEventReceiver::handleReceiveCallback(int receiveFd, int events, v
    }

    InputEvent* inputEvent;
    status = r->mInputConsumer.consume(&r->mInputEventFactory, &inputEvent);
    status_t status = r->mInputConsumer.consume(&r->mInputEventFactory, &inputEvent);
    if (status) {
        ALOGW("channel '%s' ~ Failed to consume input event.  status=%d",
                r->getInputChannelName(), status);
+112 −173
Original line number Diff line number Diff line
@@ -20,17 +20,13 @@
/**
 * Native input transport.
 *
 * Uses anonymous shared memory as a whiteboard for sending input events from an
 * InputPublisher to an InputConsumer and ensuring appropriate synchronization.
 * One interesting feature is that published events can be updated in place as long as they
 * have not yet been consumed.
 * The InputChannel provides a mechanism for exchanging InputMessage structures across processes.
 *
 * The InputPublisher and InputConsumer only take care of transferring event data
 * over an InputChannel and sending synchronization signals.  The InputDispatcher and InputQueue
 * build on these abstractions to add multiplexing and queueing.
 * The InputPublisher and InputConsumer each handle one end-point of an input channel.
 * The InputPublisher is used by the input dispatcher to send events to the application.
 * The InputConsumer is used by the application to receive events from the input dispatcher.
 */

#include <semaphore.h>
#include <ui/Input.h>
#include <utils/Errors.h>
#include <utils/Timers.h>
@@ -40,88 +36,25 @@
namespace android {

/*
 * An input channel consists of a shared memory buffer and a pair of pipes
 * used to send input messages from an InputPublisher to an InputConsumer
 * across processes.  Each channel has a descriptive name for debugging purposes.
 *
 * Each endpoint has its own InputChannel object that specifies its own file descriptors.
 *
 * The input channel is closed when all references to it are released.
 */
class InputChannel : public RefBase {
protected:
    virtual ~InputChannel();

public:
    InputChannel(const String8& name, int32_t ashmemFd, int32_t receivePipeFd,
            int32_t sendPipeFd);

    /* Creates a pair of input channels and their underlying shared memory buffers
     * and pipes.
     *
     * Returns OK on success.
     */
    static status_t openInputChannelPair(const String8& name,
            sp<InputChannel>& outServerChannel, sp<InputChannel>& outClientChannel);

    inline String8 getName() const { return mName; }
    inline int32_t getAshmemFd() const { return mAshmemFd; }
    inline int32_t getReceivePipeFd() const { return mReceivePipeFd; }
    inline int32_t getSendPipeFd() const { return mSendPipeFd; }

    /* Sends a signal to the other endpoint.
     *
     * Returns OK on success.
     * Returns DEAD_OBJECT if the channel's peer has been closed.
     * Other errors probably indicate that the channel is broken.
     */
    status_t sendSignal(char signal);

    /* Receives a signal send by the other endpoint.
     * (Should only call this after poll() indicates that the receivePipeFd has available input.)
     *
     * Returns OK on success.
     * Returns WOULD_BLOCK if there is no signal present.
     * Returns DEAD_OBJECT if the channel's peer has been closed.
     * Other errors probably indicate that the channel is broken.
     */
    status_t receiveSignal(char* outSignal);

private:
    String8 mName;
    int32_t mAshmemFd;
    int32_t mReceivePipeFd;
    int32_t mSendPipeFd;
};

/*
 * Private intermediate representation of input events as messages written into an
 * ashmem buffer.
 * Intermediate representation used to send input events and related signals.
 */
struct InputMessage {
    /* Semaphore count is set to 1 when the message is published.
     * It becomes 0 transiently while the publisher updates the message.
     * It becomes 0 permanently when the consumer consumes the message.
     */
    sem_t semaphore;

    /* Initialized to false by the publisher.
     * Set to true by the consumer when it consumes the message.
     */
    bool consumed;
    enum {
        TYPE_KEY = 1,
        TYPE_MOTION = 2,
        TYPE_FINISHED = 3,
    };

    int32_t type;
    struct Header {
        uint32_t type;
        uint32_t padding; // 8 byte alignment for the body that follows
    } header;

    struct SampleData {
    union Body {
        struct Key {
            nsecs_t eventTime;
        PointerCoords coords[0]; // variable length
    };

            int32_t deviceId;
            int32_t source;

    union {
        struct {
            int32_t action;
            int32_t flags;
            int32_t keyCode;
@@ -129,10 +62,16 @@ struct InputMessage {
            int32_t metaState;
            int32_t repeatCount;
            nsecs_t downTime;
            nsecs_t eventTime;

            inline size_t size() const {
                return sizeof(Key);
            }
        } key;

        struct {
        struct Motion {
            nsecs_t eventTime;
            int32_t deviceId;
            int32_t source;
            int32_t action;
            int32_t flags;
            int32_t metaState;
@@ -144,28 +83,87 @@ struct InputMessage {
            float xPrecision;
            float yPrecision;
            size_t pointerCount;
            PointerProperties pointerProperties[MAX_POINTERS];
            size_t sampleCount;
            SampleData sampleData[0]; // variable length
            struct Pointer {
                PointerProperties properties;
                PointerCoords coords;
            } pointers[MAX_POINTERS];

            inline size_t size() const {
                return sizeof(Motion) - sizeof(Pointer) * MAX_POINTERS
                        + sizeof(Pointer) * pointerCount;
            }
        } motion;

        struct Finished {
            bool handled;

            inline size_t size() const {
                return sizeof(Finished);
            }
        } finished;
    } body;

    bool isValid(size_t actualSize) const;
    size_t size() const;
};

    /* Gets the number of bytes to add to step to the next SampleData object in a motion
     * event message for a given number of pointers.
/*
 * An input channel consists of a local unix domain socket used to send and receive
 * input messages across processes.  Each channel has a descriptive name for debugging purposes.
 *
 * Each endpoint has its own InputChannel object that specifies its file descriptor.
 *
 * The input channel is closed when all references to it are released.
 */
    static inline size_t sampleDataStride(size_t pointerCount) {
        return sizeof(InputMessage::SampleData) + pointerCount * sizeof(PointerCoords);
    }
class InputChannel : public RefBase {
protected:
    virtual ~InputChannel();

    /* Adds the SampleData stride to the given pointer. */
    static inline SampleData* sampleDataPtrIncrement(SampleData* ptr, size_t stride) {
        return reinterpret_cast<InputMessage::SampleData*>(reinterpret_cast<char*>(ptr) + stride);
    }
public:
    InputChannel(const String8& name, int32_t fd);

    /* Creates a pair of input channels.
     *
     * Returns OK on success.
     */
    static status_t openInputChannelPair(const String8& name,
            sp<InputChannel>& outServerChannel, sp<InputChannel>& outClientChannel);

    inline String8 getName() const { return mName; }
    inline int32_t getFd() const { return mFd; }

    /* Sends a message to the other endpoint.
     *
     * If the channel is full then the message is guaranteed not to have been sent at all.
     * Try again after the consumer has sent a finished signal indicating that it has
     * consumed some of the pending messages from the channel.
     *
     * Returns OK on success.
     * Returns WOULD_BLOCK if the channel is full.
     * Returns DEAD_OBJECT if the channel's peer has been closed.
     * Other errors probably indicate that the channel is broken.
     */
    status_t sendMessage(const InputMessage* msg);

    /* Receives a message sent by the other endpoint.
     *
     * If there is no message present, try again after poll() indicates that the fd
     * is readable.
     *
     * Returns OK on success.
     * Returns WOULD_BLOCK if there is no message present.
     * Returns DEAD_OBJECT if the channel's peer has been closed.
     * Other errors probably indicate that the channel is broken.
     */
    status_t receiveMessage(InputMessage* msg);

private:
    String8 mName;
    int32_t mFd;
};

/*
 * Publishes input events to an anonymous shared memory buffer.
 * Uses atomic operations to coordinate shared access with a single concurrent consumer.
 * Publishes input events to an input channel.
 */
class InputPublisher {
public:
@@ -178,24 +176,12 @@ public:
    /* Gets the underlying input channel. */
    inline sp<InputChannel> getChannel() { return mChannel; }

    /* Prepares the publisher for use.  Must be called before it is used.
     * Returns OK on success.
     *
     * This method implicitly calls reset(). */
    status_t initialize();

    /* Resets the publisher to its initial state and unpins its ashmem buffer.
     * Returns OK on success.
     *
     * Should be called after an event has been consumed to release resources used by the
     * publisher until the next event is ready to be published.
     */
    status_t reset();

    /* Publishes a key event to the ashmem buffer.
    /* Publishes a key event to the input channel.
     *
     * Returns OK on success.
     * Returns INVALID_OPERATION if the publisher has not been reset.
     * Returns WOULD_BLOCK if the channel is full.
     * Returns DEAD_OBJECT if the channel's peer has been closed.
     * Other errors probably indicate that the channel is broken.
     */
    status_t publishKeyEvent(
            int32_t deviceId,
@@ -209,11 +195,13 @@ public:
            nsecs_t downTime,
            nsecs_t eventTime);

    /* Publishes a motion event to the ashmem buffer.
    /* Publishes a motion event to the input channel.
     *
     * Returns OK on success.
     * Returns INVALID_OPERATION if the publisher has not been reset.
     * Returns WOULD_BLOCK if the channel is full.
     * Returns DEAD_OBJECT if the channel's peer has been closed.
     * Returns BAD_VALUE if pointerCount is less than 1 or greater than MAX_POINTERS.
     * Other errors probably indicate that the channel is broken.
     */
    status_t publishMotionEvent(
            int32_t deviceId,
@@ -233,55 +221,22 @@ public:
            const PointerProperties* pointerProperties,
            const PointerCoords* pointerCoords);

    /* Appends a motion sample to a motion event unless already consumed.
     *
     * Returns OK on success.
     * Returns INVALID_OPERATION if the current event is not a AMOTION_EVENT_ACTION_MOVE event.
     * Returns FAILED_TRANSACTION if the current event has already been consumed.
     * Returns NO_MEMORY if the buffer is full and no additional samples can be added.
     */
    status_t appendMotionSample(
            nsecs_t eventTime,
            const PointerCoords* pointerCoords);

    /* Sends a dispatch signal to the consumer to inform it that a new message is available.
     *
     * Returns OK on success.
     * Errors probably indicate that the channel is broken.
     */
    status_t sendDispatchSignal();

    /* Receives the finished signal from the consumer in reply to the original dispatch signal.
     * Returns whether the consumer handled the message.
     *
     * Returns OK on success.
     * Returns WOULD_BLOCK if there is no signal present.
     * Returns DEAD_OBJECT if the channel's peer has been closed.
     * Other errors probably indicate that the channel is broken.
     */
    status_t receiveFinishedSignal(bool* outHandled);

private:
    sp<InputChannel> mChannel;

    size_t mAshmemSize;
    InputMessage* mSharedMessage;
    bool mPinned;
    bool mSemaphoreInitialized;
    bool mWasDispatched;

    size_t mMotionEventPointerCount;
    InputMessage::SampleData* mMotionEventSampleDataTail;
    size_t mMotionEventSampleDataStride;

    status_t publishInputEvent(
            int32_t type,
            int32_t deviceId,
            int32_t source);
};

/*
 * Consumes input events from an anonymous shared memory buffer.
 * Uses atomic operations to coordinate shared access with a single concurrent publisher.
 * Consumes input events from an input channel.
 */
class InputConsumer {
public:
@@ -294,16 +249,14 @@ public:
    /* Gets the underlying input channel. */
    inline sp<InputChannel> getChannel() { return mChannel; }

    /* Prepares the consumer for use.  Must be called before it is used. */
    status_t initialize();

    /* Consumes the input event in the buffer and copies its contents into
    /* Consumes an input event from the input channel and copies its contents into
     * an InputEvent object created using the specified factory.
     * This operation will block if the publisher is updating the event.
     *
     * Returns OK on success.
     * Returns INVALID_OPERATION if there is no currently published event.
     * Returns WOULD_BLOCK if there is no event present.
     * Returns DEAD_OBJECT if the channel's peer has been closed.
     * Returns NO_MEMORY if the event could not be created.
     * Other errors probably indicate that the channel is broken.
     */
    status_t consume(InputEventFactoryInterface* factory, InputEvent** outEvent);

@@ -311,26 +264,12 @@ public:
     * finished processing and specifies whether the message was handled by the consumer.
     *
     * Returns OK on success.
     * Errors probably indicate that the channel is broken.
     */
    status_t sendFinishedSignal(bool handled);

    /* Receives the dispatched signal from the publisher.
     *
     * Returns OK on success.
     * Returns WOULD_BLOCK if there is no signal present.
     * Other errors probably indicate that the channel is broken.
     */
    status_t receiveDispatchSignal();
    status_t sendFinishedSignal(bool handled);

private:
    sp<InputChannel> mChannel;

    size_t mAshmemSize;
    InputMessage* mSharedMessage;

    void populateKeyEvent(KeyEvent* keyEvent) const;
    void populateMotionEvent(MotionEvent* motionEvent) const;
};

} // namespace android
+194 −519

File changed.

Preview size limit exceeded, changes collapsed.

Loading