Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 352fec52 authored by Brian Lindahl's avatar Brian Lindahl
Browse files

Allow for reschedule-able messages

Re-scheduleable messages allows for delayed messages to be re-scheduled
to a later point in time to avoid unnecessary overhead of handling stale messages.

Test: atest AMessage_tests
Bug: 234833109
Change-Id: Id5e76060f1d021f5ea30690cca7dd108dcf8c51d
parent 26760315
Loading
Loading
Loading
Loading
+58 −3
Original line number Diff line number Diff line
@@ -69,6 +69,10 @@ int64_t ALooper::GetNowUs() {
    return systemTime(SYSTEM_TIME_MONOTONIC) / 1000LL;
}

int64_t ALooper::getNowUs() {
    return GetNowUs();
}

ALooper::ALooper()
    : mRunningLocally(false) {
    // clean up stale AHandlers. Doing it here instead of in the destructor avoids
@@ -170,11 +174,11 @@ void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {

    int64_t whenUs;
    if (delayUs > 0) {
        int64_t nowUs = GetNowUs();
        int64_t nowUs = getNowUs();
        whenUs = (delayUs > INT64_MAX - nowUs ? INT64_MAX : nowUs + delayUs);

    } else {
        whenUs = GetNowUs();
        whenUs = getNowUs();
    }

    List<Event>::iterator it = mEventQueue.begin();
@@ -185,6 +189,7 @@ void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {
    Event event;
    event.mWhenUs = whenUs;
    event.mMessage = msg;
    event.mToken = nullptr;

    if (it == mEventQueue.begin()) {
        mQueueChangedCondition.signal();
@@ -193,7 +198,57 @@ void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {
    mEventQueue.insert(it, event);
}

status_t ALooper::postUnique(const sp<AMessage> &msg, const sp<RefBase> &token, int64_t delayUs) {
    if (token == nullptr) {
        return -EINVAL;
    }
    Mutex::Autolock autoLock(mLock);

    int64_t whenUs;
    if (delayUs > 0) {
        int64_t nowUs = getNowUs();
        whenUs = (delayUs > INT64_MAX - nowUs ? INT64_MAX : nowUs + delayUs);
    } else {
        whenUs = getNowUs();
    }

    // We only need to wake the loop up if we're rescheduling to the earliest event in the queue.
    // This needs to be checked now, before we reschedule the message, in case this message is
    // already at the beginning of the queue.
    bool shouldAwakeLoop = mEventQueue.empty() || whenUs < mEventQueue.begin()->mWhenUs;

    // Erase any previously-posted event with this token.
    for (auto i = mEventQueue.begin(); i != mEventQueue.end();) {
        if (i->mToken == token) {
            i = mEventQueue.erase(i);
        } else {
            ++i;
        }
    }

    // Find the insertion point for the rescheduled message.
    List<Event>::iterator i = mEventQueue.begin();
    while (i != mEventQueue.end() && i->mWhenUs <= whenUs) {
        ++i;
    }

    Event event;
    event.mWhenUs = whenUs;
    event.mMessage = msg;
    event.mToken = token;
    mEventQueue.insert(i, event);

    // If we rescheduled the event to be earlier than the first event, then we need to wake up the
    // looper earlier than it was previously scheduled to be woken up. Otherwise, it can sleep until
    // the previous wake-up time and then go to sleep again if needed.
    if (shouldAwakeLoop){
        mQueueChangedCondition.signal();
    }
    return OK;
}

bool ALooper::loop() {

    Event event;

    {
@@ -206,7 +261,7 @@ bool ALooper::loop() {
            return true;
        }
        int64_t whenUs = (*mEventQueue.begin()).mWhenUs;
        int64_t nowUs = GetNowUs();
        int64_t nowUs = getNowUs();

        if (whenUs > nowUs) {
            int64_t delayUs = whenUs - nowUs;
+11 −0
Original line number Diff line number Diff line
@@ -430,6 +430,17 @@ status_t AMessage::post(int64_t delayUs) {
    return OK;
}

status_t AMessage::postUnique(const sp<RefBase> &token, int64_t delayUs) {
    sp<ALooper> looper = mLooper.promote();
    if (looper == NULL) {
        ALOGW("failed to post message as target looper for handler %d is gone.",
              mTarget);
        return -ENOENT;
    }

    return looper->postUnique(this, token, delayUs);
}

status_t AMessage::postAndAwaitResponse(sp<AMessage> *response) {
    sp<ALooper> looper = mLooper.promote();
    if (looper == NULL) {
+10 −1
Original line number Diff line number Diff line
@@ -59,6 +59,9 @@ struct ALooper : public RefBase {
    }

protected:
    // overridable by test harness
    virtual int64_t getNowUs();

    virtual ~ALooper();

private:
@@ -67,6 +70,7 @@ private:
    struct Event {
        int64_t mWhenUs;
        sp<AMessage> mMessage;
        sp<RefBase> mToken;
    };

    Mutex mLock;
@@ -87,9 +91,14 @@ private:

    // START --- methods used only by AMessage

    // posts a message on this looper with the given timeout
    // Posts a message on this looper with the given timeout.
    void post(const sp<AMessage> &msg, int64_t delayUs);

    // Post a message uniquely on this looper with the given timeout.
    // This method ensures that there is exactly one message with the same token pending posted on
    // this looper after the call returns. A null token will result in an EINVAL error status.
    status_t postUnique(const sp<AMessage> &msg, const sp<RefBase> &token, int64_t delayUs);

    // creates a reply token to be used with this looper
    sp<AReplyToken> createReplyToken();
    // waits for a response for the reply token.  If status is OK, the response
+5 −0
Original line number Diff line number Diff line
@@ -141,6 +141,11 @@ struct AMessage : public RefBase {

    status_t post(int64_t delayUs = 0);

    // Post a message uniquely to its target with the given timeout.
    // This method ensures that there is exactly one message with the same token posted to its
    // target after the call returns. A null token will result in an EINVAL error status.
    status_t postUnique(const sp<RefBase> &token, int64_t delayUs = 0);

    // Posts the message to its target and waits for a response (or error)
    // before returning.
    status_t postAndAwaitResponse(sp<AMessage> *response);
+192 −2
Original line number Diff line number Diff line
@@ -17,18 +17,43 @@
//#define LOG_NDEBUG 0
#define LOG_TAG "AData_test"

#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <utils/RefBase.h>

#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/foundation/AHandler.h>
#include <media/stagefright/foundation/ALooper.h>

using namespace android;

class AMessageTest : public ::testing::Test {
using ::testing::InSequence;
using ::testing::NiceMock;

class LooperWithSettableClock : public ALooper {
public:
  LooperWithSettableClock() : mClockUs(0) {}

  void setClockUs(int64_t nowUs) {
    mClockUs = nowUs;
  }

  int64_t getNowUs() override {
    return mClockUs;
  }

private:
  int64_t mClockUs;
};

timespec millis100 = {0, 100L*1000*1000};

TEST(AMessage_tests, item_manipulation) {
class MockHandler : public AHandler {
public:
    MOCK_METHOD(void, onMessageReceived, (const sp<AMessage>&), (override));
};

TEST(AMessage_tests, settersAndGetters) {
  sp<AMessage> m1 = new AMessage();

  m1->setInt32("value", 2);
@@ -120,6 +145,171 @@ TEST(AMessage_tests, item_manipulation) {
  EXPECT_TRUE(m1->findInt32("alittlelonger", &i32));

  EXPECT_NE(OK, m1->removeEntryByName("notpresent"));
}

TEST(AMessage_tests, deliversMultipleMessagesInOrderImmediately) {
  sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
  sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
  looper->registerHandler(mockHandler);

  sp<AMessage> msgNow1 = new AMessage(0, mockHandler);
  msgNow1->post();
  sp<AMessage> msgNow2 = new AMessage(0, mockHandler);
  msgNow2->post();

  {
    InSequence inSequence;
    EXPECT_CALL(*mockHandler, onMessageReceived(msgNow1)).Times(1);
    EXPECT_CALL(*mockHandler, onMessageReceived(msgNow2)).Times(1);
  }
  looper->start();
  nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
}

TEST(AMessage_tests, doesNotDeliverDelayedMessageImmediately) {
  sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
  sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
  looper->registerHandler(mockHandler);

  sp<AMessage> msgNow = new AMessage(0, mockHandler);
  msgNow->post();
  sp<AMessage> msgDelayed = new AMessage(0, mockHandler);
  msgDelayed->post(100);

  EXPECT_CALL(*mockHandler, onMessageReceived(msgNow)).Times(1);
  // note: never called
  EXPECT_CALL(*mockHandler, onMessageReceived(msgDelayed)).Times(0);
  looper->start();
  nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
}

TEST(AMessage_tests, deliversDelayedMessagesInSequence) {
  sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
  sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
  looper->registerHandler(mockHandler);

  sp<AMessage> msgIn500 = new AMessage(0, mockHandler);
  msgIn500->post(500);
  sp<AMessage> msgNow = new AMessage(0, mockHandler);
  msgNow->post();
  sp<AMessage> msgIn100 = new AMessage(0, mockHandler);
  msgIn100->post(100);
  // not expected to be received
  sp<AMessage> msgIn1000 = new AMessage(0, mockHandler);
  msgIn1000->post(1000);

  looper->setClockUs(500);
  {
    InSequence inSequence;

    EXPECT_CALL(*mockHandler, onMessageReceived(msgNow)).Times(1);
    EXPECT_CALL(*mockHandler, onMessageReceived(msgIn100)).Times(1);
    EXPECT_CALL(*mockHandler, onMessageReceived(msgIn500)).Times(1);
  }
  // note: never called
  EXPECT_CALL(*mockHandler, onMessageReceived(msgIn1000)).Times(0);
  looper->start();
  nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
}

TEST(AMessage_tests, deliversDelayedUniqueMessage) {
  sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
  sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
  looper->registerHandler(mockHandler);

  sp<AMessage> msg = new AMessage(0, mockHandler);
  msg->postUnique(msg, 50);

  looper->setClockUs(50);
  EXPECT_CALL(*mockHandler, onMessageReceived(msg)).Times(1);
  looper->start();
  nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
}

TEST(AMessage_tests, deliversImmediateUniqueMessage) {
  sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
  // note: we don't need to set the clock, but we do want a stable clock that doesn't advance
  sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
  looper->registerHandler(mockHandler);

  sp<AMessage> msg = new AMessage(0, mockHandler);
  msg->postUnique(msg, 0);

  EXPECT_CALL(*mockHandler, onMessageReceived(msg)).Times(1);
  looper->start();
  nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
}

TEST(AMessage_tests, doesNotDeliverUniqueMessageAfterRescheduleLater) {
  sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
  sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
  looper->registerHandler(mockHandler);

  sp<AMessage> msg = new AMessage(0, mockHandler);
  msg->postUnique(msg, 50);
  msg->postUnique(msg, 100); // reschedule for later

  looper->setClockUs(50); // if the message is correctly rescheduled, it should not be delivered
  // Never called because the message was rescheduled to a later point in time
  EXPECT_CALL(*mockHandler, onMessageReceived(msg)).Times(0);
  looper->start();
  nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
}

TEST(AMessage_tests, deliversUniqueMessageAfterRescheduleEarlier) {
  sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
  sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
  looper->registerHandler(mockHandler);

  sp<AMessage> msg = new AMessage(0, mockHandler);
  msg->postUnique(msg, 100);
  msg->postUnique(msg, 50); // reschedule to fire earlier

  looper->setClockUs(50); // if the message is rescheduled correctly, it should be delivered
  EXPECT_CALL(*mockHandler, onMessageReceived(msg)).Times(1);
  looper->start();
  nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
}

TEST(AMessage_tests, deliversSameMessageTwice) {
  sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
  sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
  looper->registerHandler(mockHandler);

  sp<AMessage> msg = new AMessage(0, mockHandler);
  msg->post(50);
  msg->post(100);

  looper->setClockUs(100);
  EXPECT_CALL(*mockHandler, onMessageReceived(msg)).Times(2);
  looper->start();
  nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
}

// When messages are posted twice with the same token, it will only be delivered once after being
// rescheduled.
TEST(AMessage_tests, deliversUniqueMessageOnce) {
  sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
  sp<LooperWithSettableClock> looper = new LooperWithSettableClock();
  looper->registerHandler(mockHandler);

  sp<AMessage> msg1 = new AMessage(0, mockHandler);
  msg1->postUnique(msg1, 50);
  sp<AMessage> msg2 = new AMessage(0, mockHandler);
  msg2->postUnique(msg1, 75); // note, using the same token as msg1

  looper->setClockUs(100);
  EXPECT_CALL(*mockHandler, onMessageReceived(msg1)).Times(0);
  EXPECT_CALL(*mockHandler, onMessageReceived(msg2)).Times(1);
  looper->start();
  nanosleep(&millis100, nullptr); // just enough time for the looper thread to run
}

TEST(AMessage_tests, postUnique_withNullToken_returnsInvalidArgument) {
  sp<NiceMock<MockHandler>> mockHandler = new NiceMock<MockHandler>;
  sp<ALooper> looper = new ALooper();
  looper->registerHandler(mockHandler);

  sp<AMessage> msg = new AMessage(0, mockHandler);
  EXPECT_EQ(msg->postUnique(nullptr, 0), -EINVAL);
}
Loading