Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 79314de0 authored by John Grossman's avatar John Grossman
Browse files

LibAAH_RTP: Change names to prepare for refactor.



Rename AAH_TXSender to AAH_TXGroup in preparation for refactoring to
support unicast retransmission.

Signed-off-by: default avatarJohn Grossman <johngro@google.com>
Change-Id: I3984db27d1c61c6155d5d7cb9c38eead421b9249
parent 5490256b
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -16,7 +16,7 @@ LOCAL_SRC_FILES := \
    aah_rx_player_substream.cpp \
    aah_tx_packet.cpp \
    aah_tx_player.cpp \
    aah_tx_sender.cpp \
    aah_tx_group.cpp \
    pipe_event.cpp

LOCAL_C_INCLUDES := \
+40 −40
Original line number Diff line number Diff line
@@ -27,35 +27,35 @@
#include <utils/misc.h>

#include "aah_tx_player.h"
#include "aah_tx_sender.h"
#include "aah_tx_group.h"

namespace android {

const char* AAH_TXSender::kSendPacketIPAddr = "ipaddr";
const char* AAH_TXSender::kSendPacketPort = "port";
const char* AAH_TXSender::kSendPacketTRTPPacket = "trtp";
const char* AAH_TXGroup::kSendPacketIPAddr = "ipaddr";
const char* AAH_TXGroup::kSendPacketPort = "port";
const char* AAH_TXGroup::kSendPacketTRTPPacket = "trtp";

const int AAH_TXSender::kRetryTrimIntervalUs = 100000;
const int AAH_TXSender::kHeartbeatIntervalUs = 1000000;
const int AAH_TXSender::kRetryBufferCapacity = 100;
const nsecs_t AAH_TXSender::kHeartbeatTimeout = 600ull * 1000000000ull;
const int AAH_TXGroup::kRetryTrimIntervalUs = 100000;
const int AAH_TXGroup::kHeartbeatIntervalUs = 1000000;
const int AAH_TXGroup::kRetryBufferCapacity = 100;
const nsecs_t AAH_TXGroup::kHeartbeatTimeout = 600ull * 1000000000ull;

Mutex AAH_TXSender::sLock;
wp<AAH_TXSender> AAH_TXSender::sInstance;
uint32_t AAH_TXSender::sNextEpoch;
bool AAH_TXSender::sNextEpochValid = false;
Mutex AAH_TXGroup::sLock;
wp<AAH_TXGroup> AAH_TXGroup::sInstance;
uint32_t AAH_TXGroup::sNextEpoch;
bool AAH_TXGroup::sNextEpochValid = false;

AAH_TXSender::AAH_TXSender() : mSocket(-1) {
AAH_TXGroup::AAH_TXGroup() : mSocket(-1) {
    mLastSentPacketTime = systemTime();
}

sp<AAH_TXSender> AAH_TXSender::GetInstance() {
sp<AAH_TXGroup> AAH_TXGroup::GetInstance() {
    Mutex::Autolock autoLock(sLock);

    sp<AAH_TXSender> sender = sInstance.promote();
    sp<AAH_TXGroup> sender = sInstance.promote();

    if (sender == NULL) {
        sender = new AAH_TXSender();
        sender = new AAH_TXGroup();
        if (sender == NULL) {
            return NULL;
        }
@@ -65,7 +65,7 @@ sp<AAH_TXSender> AAH_TXSender::GetInstance() {
            return NULL;
        }

        sender->mReflector = new AHandlerReflector<AAH_TXSender>(sender.get());
        sender->mReflector = new AHandlerReflector<AAH_TXGroup>(sender.get());
        if (sender->mReflector == NULL) {
            return NULL;
        }
@@ -92,11 +92,11 @@ sp<AAH_TXSender> AAH_TXSender::GetInstance() {
            return NULL;
        }

        sender->mLooper->setName("AAH_TXSender");
        sender->mLooper->setName("AAH_TXGroup");
        sender->mLooper->registerHandler(sender->mReflector);
        sender->mLooper->start(false, false, PRIORITY_AUDIO);

        if (sender->mRetryReceiver->run("AAH_TXSenderRetry", PRIORITY_AUDIO)
        if (sender->mRetryReceiver->run("AAH_TXGroupRetry", PRIORITY_AUDIO)
                != OK) {
            LOGW("%s unable to start retry thread", __PRETTY_FUNCTION__);
            return NULL;
@@ -108,7 +108,7 @@ sp<AAH_TXSender> AAH_TXSender::GetInstance() {
    return sender;
}

AAH_TXSender::~AAH_TXSender() {
AAH_TXGroup::~AAH_TXGroup() {
    mLooper->stop();
    mLooper->unregisterHandler(mReflector->id());

@@ -128,7 +128,7 @@ AAH_TXSender::~AAH_TXSender() {
}

// Return the next epoch number usable for a newly instantiated endpoint.
uint32_t AAH_TXSender::getNextEpoch() {
uint32_t AAH_TXGroup::getNextEpoch() {
    Mutex::Autolock autoLock(sLock);

    if (sNextEpochValid) {
@@ -143,7 +143,7 @@ uint32_t AAH_TXSender::getNextEpoch() {

// Notify the sender that a player has started sending to this endpoint.
// Returns a program ID for use by the calling player.
uint16_t AAH_TXSender::registerEndpoint(const Endpoint& endpoint) {
uint16_t AAH_TXGroup::registerEndpoint(const Endpoint& endpoint) {
    Mutex::Autolock lock(mEndpointLock);

    EndpointState* eps = mEndpointMap.valueFor(endpoint);
@@ -173,7 +173,7 @@ uint16_t AAH_TXSender::registerEndpoint(const Endpoint& endpoint) {
// Notify the sender that a player has ceased sending to this endpoint.
// An endpoint's state can not be deleted until all of the endpoint's
// registered players have called unregisterEndpoint.
void AAH_TXSender::unregisterEndpoint(const Endpoint& endpoint) {
void AAH_TXGroup::unregisterEndpoint(const Endpoint& endpoint) {
    Mutex::Autolock lock(mEndpointLock);

    EndpointState* eps = mEndpointMap.valueFor(endpoint);
@@ -183,7 +183,7 @@ void AAH_TXSender::unregisterEndpoint(const Endpoint& endpoint) {
    }
}

void AAH_TXSender::onMessageReceived(const sp<AMessage>& msg) {
void AAH_TXGroup::onMessageReceived(const sp<AMessage>& msg) {
    switch (msg->what()) {
        case kWhatSendPacket:
            onSendPacket(msg);
@@ -203,7 +203,7 @@ void AAH_TXSender::onMessageReceived(const sp<AMessage>& msg) {
    }
}

void AAH_TXSender::onSendPacket(const sp<AMessage>& msg) {
void AAH_TXGroup::onSendPacket(const sp<AMessage>& msg) {
    sp<RefBase> obj;
    CHECK(msg->findObject(kSendPacketTRTPPacket, &obj));
    sp<TRTPPacket> packet = static_cast<TRTPPacket*>(obj.get());
@@ -221,7 +221,7 @@ void AAH_TXSender::onSendPacket(const sp<AMessage>& msg) {
    mLastSentPacketTime = systemTime();
}

void AAH_TXSender::doSendPacket_l(const sp<TRTPPacket>& packet,
void AAH_TXGroup::doSendPacket_l(const sp<TRTPPacket>& packet,
                                  const Endpoint& endpoint) {
    EndpointState* eps = mEndpointMap.valueFor(endpoint);
    if (!eps) {
@@ -256,7 +256,7 @@ void AAH_TXSender::doSendPacket_l(const sp<TRTPPacket>& packet,
    }
}

void AAH_TXSender::trimRetryBuffers() {
void AAH_TXGroup::trimRetryBuffers() {
    Mutex::Autolock lock(mEndpointLock);

    nsecs_t localTimeNow = systemTime();
@@ -297,7 +297,7 @@ void AAH_TXSender::trimRetryBuffers() {
    }
}

void AAH_TXSender::sendHeartbeats() {
void AAH_TXGroup::sendHeartbeats() {
    Mutex::Autolock lock(mEndpointLock);

    if (shouldSendHeartbeats_l()) {
@@ -324,7 +324,7 @@ void AAH_TXSender::sendHeartbeats() {
    }
}

bool AAH_TXSender::shouldSendHeartbeats_l() {
bool AAH_TXGroup::shouldSendHeartbeats_l() {
    // assert(holding endpoint lock)
    return (systemTime() < (mLastSentPacketTime + kHeartbeatTimeout));
}
@@ -332,19 +332,19 @@ bool AAH_TXSender::shouldSendHeartbeats_l() {
// Receiver

// initial 4-byte ID of a retry request packet
const uint32_t AAH_TXSender::RetryReceiver::kRetryRequestID = 'Treq';
const uint32_t AAH_TXGroup::RetryReceiver::kRetryRequestID = 'Treq';

// initial 4-byte ID of a retry NAK packet
const uint32_t AAH_TXSender::RetryReceiver::kRetryNakID = 'Tnak';
const uint32_t AAH_TXGroup::RetryReceiver::kRetryNakID = 'Tnak';

// initial 4-byte ID of a fast start request packet
const uint32_t AAH_TXSender::RetryReceiver::kFastStartRequestID = 'Tfst';
const uint32_t AAH_TXGroup::RetryReceiver::kFastStartRequestID = 'Tfst';

AAH_TXSender::RetryReceiver::RetryReceiver(AAH_TXSender* sender)
AAH_TXGroup::RetryReceiver::RetryReceiver(AAH_TXGroup* sender)
        : Thread(false),
    mSender(sender) {}

    AAH_TXSender::RetryReceiver::~RetryReceiver() {
    AAH_TXGroup::RetryReceiver::~RetryReceiver() {
        mWakeupEvent.clearPendingEvents();
    }

@@ -357,7 +357,7 @@ static inline bool withinIntervalWithRollover(T val, T start, T end) {
            (start > end && (val >= start || val <= end)));
}

bool AAH_TXSender::RetryReceiver::threadLoop() {
bool AAH_TXGroup::RetryReceiver::threadLoop() {
    struct pollfd pollFds[2];
    pollFds[0].fd = mSender->mSocket;
    pollFds[0].events = POLLIN;
@@ -384,7 +384,7 @@ bool AAH_TXSender::RetryReceiver::threadLoop() {
    return true;
}

void AAH_TXSender::RetryReceiver::handleRetryRequest() {
void AAH_TXGroup::RetryReceiver::handleRetryRequest() {
    LOGV("*** RX %s start", __PRETTY_FUNCTION__);

    RetryPacket request;
@@ -517,22 +517,22 @@ void AAH_TXSender::RetryReceiver::handleRetryRequest() {

// Endpoint

AAH_TXSender::Endpoint::Endpoint()
AAH_TXGroup::Endpoint::Endpoint()
        : addr(0)
        , port(0) { }

AAH_TXSender::Endpoint::Endpoint(uint32_t a, uint16_t p)
AAH_TXGroup::Endpoint::Endpoint(uint32_t a, uint16_t p)
        : addr(a)
        , port(p) {}

bool AAH_TXSender::Endpoint::operator<(const Endpoint& other) const {
bool AAH_TXGroup::Endpoint::operator<(const Endpoint& other) const {
    return ((addr < other.addr) ||
            (addr == other.addr && port < other.port));
}

// EndpointState

AAH_TXSender::EndpointState::EndpointState(uint32_t _epoch)
AAH_TXGroup::EndpointState::EndpointState(uint32_t _epoch)
    : retry(kRetryBufferCapacity)
    , playerRefCount(1)
    , trtpSeqNumber(0)
+10 −10
Original line number Diff line number Diff line
@@ -47,11 +47,11 @@ template <typename T> class CircularBuffer {
    size_t mFillCount;
};

class AAH_TXSender : public virtual RefBase {
class AAH_TXGroup : public virtual RefBase {
  public:
    ~AAH_TXSender();
    ~AAH_TXGroup();

    static sp<AAH_TXSender> GetInstance();
    static sp<AAH_TXGroup> GetInstance();

    ALooper::handler_id handlerID() { return mReflector->id(); }

@@ -80,10 +80,10 @@ class AAH_TXSender : public virtual RefBase {
    static const char* kSendPacketTRTPPacket;

  private:
    AAH_TXSender();
    AAH_TXGroup();

    static Mutex sLock;
    static wp<AAH_TXSender> sInstance;
    static wp<AAH_TXGroup> sInstance;
    static uint32_t sNextEpoch;
    static bool sNextEpochValid;

@@ -101,7 +101,7 @@ class AAH_TXSender : public virtual RefBase {
        uint32_t epoch;
    };

    friend class AHandlerReflector<AAH_TXSender>;
    friend class AHandlerReflector<AAH_TXGroup>;
    void onMessageReceived(const sp<AMessage>& msg);
    void onSendPacket(const sp<AMessage>& msg);
    void doSendPacket_l(const sp<TRTPPacket>& packet,
@@ -111,7 +111,7 @@ class AAH_TXSender : public virtual RefBase {
    bool shouldSendHeartbeats_l();

    sp<ALooper> mLooper;
    sp<AHandlerReflector<AAH_TXSender> > mReflector;
    sp<AHandlerReflector<AAH_TXGroup> > mReflector;

    int mSocket;
    nsecs_t mLastSentPacketTime;
@@ -126,9 +126,9 @@ class AAH_TXSender : public virtual RefBase {

    class RetryReceiver : public Thread {
      private:
        friend class AAH_TXSender;
        friend class AAH_TXGroup;

        RetryReceiver(AAH_TXSender* sender);
        RetryReceiver(AAH_TXGroup* sender);
        virtual ~RetryReceiver();
        virtual bool threadLoop();
        void handleRetryRequest();
@@ -138,7 +138,7 @@ class AAH_TXSender : public virtual RefBase {
        static const uint32_t kFastStartRequestID;
        static const uint32_t kRetryNakID;

        AAH_TXSender* mSender;
        AAH_TXGroup* mSender;
        PipeEvent mWakeupEvent;
    };

+17 −17
Original line number Diff line number Diff line
@@ -228,8 +228,8 @@ status_t AAH_TXPlayer::prepareAsync_l() {
        return UNKNOWN_ERROR;  // async prepare already pending
    }

    mAAH_Sender = AAH_TXSender::GetInstance();
    if (mAAH_Sender == NULL) {
    mAAH_TXGroup = AAH_TXGroup::GetInstance();
    if (mAAH_TXGroup == NULL) {
        return NO_MEMORY;
    }

@@ -513,7 +513,7 @@ status_t AAH_TXPlayer::play_l() {
            return INVALID_OPERATION;
        }
        if (!mEndpointRegistered) {
            mProgramID = mAAH_Sender->registerEndpoint(mEndpoint);
            mProgramID = mAAH_TXGroup->registerEndpoint(mEndpoint);
            mEndpointRegistered = true;
        }
    }
@@ -597,13 +597,13 @@ void AAH_TXPlayer::updateClockTransform_l(bool pause) {
    sp<TRTPControlPacket> packet = new TRTPControlPacket();
    packet->setClockTransform(mCurrentClockTransform);
    packet->setCommandID(TRTPControlPacket::kCommandNop);
    queuePacketToSender_l(packet);
    queuePacket_l(packet);
}

void AAH_TXPlayer::sendEOS_l() {
    sp<TRTPControlPacket> packet = new TRTPControlPacket();
    packet->setCommandID(TRTPControlPacket::kCommandEOS);
    queuePacketToSender_l(packet);
    queuePacket_l(packet);
}

bool AAH_TXPlayer::isPlaying() {
@@ -630,7 +630,7 @@ status_t AAH_TXPlayer::seekTo_l(int64_t timeUs) {
    // send a flush command packet
    sp<TRTPControlPacket> packet = new TRTPControlPacket();
    packet->setCommandID(TRTPControlPacket::kCommandFlush);
    queuePacketToSender_l(packet);
    queuePacket_l(packet);

    return OK;
}
@@ -753,8 +753,8 @@ void AAH_TXPlayer::reset_l() {

    {
        Mutex::Autolock lock(mEndpointLock);
        if (mAAH_Sender != NULL && mEndpointRegistered) {
            mAAH_Sender->unregisterEndpoint(mEndpoint);
        if (mAAH_TXGroup != NULL && mEndpointRegistered) {
            mAAH_TXGroup->unregisterEndpoint(mEndpoint);
        }
        mEndpointRegistered = false;
        mEndpointValid = false;
@@ -762,7 +762,7 @@ void AAH_TXPlayer::reset_l() {

    mProgramID = 0;

    mAAH_Sender.clear();
    mAAH_TXGroup.clear();
    mLastQueuedMediaTimePTSValid = false;
    mCurrentClockTransformValid = false;
    mPlayRateIsPaused = false;
@@ -1124,7 +1124,7 @@ void AAH_TXPlayer::onPumpAudio() {
            packet->setAuxData(mAudioCodecData, mAudioCodecDataSize);
        }

        queuePacketToSender_l(packet);
        queuePacket_l(packet);
        mediaBuffer->release();

        mLastQueuedMediaTimePTSValid = true;
@@ -1147,13 +1147,13 @@ void AAH_TXPlayer::onPumpAudio() {
    }
}

void AAH_TXPlayer::queuePacketToSender_l(const sp<TRTPPacket>& packet) {
    if (mAAH_Sender == NULL) {
void AAH_TXPlayer::queuePacket_l(const sp<TRTPPacket>& packet) {
    if (mAAH_TXGroup == NULL) {
        return;
    }

    sp<AMessage> message = new AMessage(AAH_TXSender::kWhatSendPacket,
                                        mAAH_Sender->handlerID());
    sp<AMessage> message = new AMessage(AAH_TXGroup::kWhatSendPacket,
                                        mAAH_TXGroup->handlerID());

    {
        Mutex::Autolock lock(mEndpointLock);
@@ -1161,15 +1161,15 @@ void AAH_TXPlayer::queuePacketToSender_l(const sp<TRTPPacket>& packet) {
            return;
        }

        message->setInt32(AAH_TXSender::kSendPacketIPAddr, mEndpoint.addr);
        message->setInt32(AAH_TXSender::kSendPacketPort, mEndpoint.port);
        message->setInt32(AAH_TXGroup::kSendPacketIPAddr, mEndpoint.addr);
        message->setInt32(AAH_TXGroup::kSendPacketPort, mEndpoint.port);
    }

    packet->setProgramID(mProgramID);
    packet->setExpireTime(systemTime() + kAAHRetryKeepAroundTimeNs);
    packet->pack();

    message->setObject(AAH_TXSender::kSendPacketTRTPPacket, packet);
    message->setObject(AAH_TXGroup::kSendPacketTRTPPacket, packet);

    message->post();
}
+4 −4
Original line number Diff line number Diff line
@@ -28,7 +28,7 @@
#include <utils/String8.h>
#include <utils/threads.h>

#include "aah_tx_sender.h"
#include "aah_tx_group.h"

namespace android {

@@ -113,7 +113,7 @@ class AAH_TXPlayer : public MediaPlayerHWInterface {
    void postPumpAudioEvent_l(int64_t delayUs);
    void onBufferingUpdate();
    void onPumpAudio();
    void queuePacketToSender_l(const sp<TRTPPacket>& packet);
    void queuePacket_l(const sp<TRTPPacket>& packet);

    Mutex mLock;

@@ -153,7 +153,7 @@ class AAH_TXPlayer : public MediaPlayerHWInterface {
    int64_t mDurationUs;
    int64_t mBitrate;

    sp<AAH_TXSender> mAAH_Sender;
    sp<AAH_TXGroup>  mAAH_TXGroup;
    LinearTransform  mCurrentClockTransform;
    bool             mCurrentClockTransformValid;
    int64_t          mLastQueuedMediaTimePTS;
@@ -162,7 +162,7 @@ class AAH_TXPlayer : public MediaPlayerHWInterface {
    CCHelper         mCCHelper;

    Mutex mEndpointLock;
    AAH_TXSender::Endpoint mEndpoint;
    AAH_TXGroup::Endpoint mEndpoint;
    bool mEndpointValid;
    bool mEndpointRegistered;
    uint16_t mProgramID;