Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2db21f1d authored by John Grossman's avatar John Grossman Committed by Android Git Automerger
Browse files

am 42a6382f: LibAAH_RTP: Refactor TXGroup code, add unicast mode.

* commit '42a6382f':
  LibAAH_RTP: Refactor TXGroup code, add unicast mode.
parents bad6566f 42a6382f
Loading
Loading
Loading
Loading
+3 −2
Original line number Diff line number Diff line
@@ -14,9 +14,9 @@ LOCAL_SRC_FILES := \
    aah_rx_player_core.cpp \
    aah_rx_player_ring_buffer.cpp \
    aah_rx_player_substream.cpp \
    aah_tx_group.cpp \
    aah_tx_packet.cpp \
    aah_tx_player.cpp \
    aah_tx_group.cpp \
    pipe_event.cpp

LOCAL_C_INCLUDES := \
@@ -26,8 +26,9 @@ LOCAL_C_INCLUDES := \
    frameworks/base/media/libstagefright

LOCAL_SHARED_LIBRARIES := \
    libcommon_time_client \
    libbinder \
    libcommon_time_client \
    libcutils \
    libmedia \
    libstagefright \
    libstagefright_foundation \
+2 −2
Original line number Diff line number Diff line
@@ -164,7 +164,7 @@ bool AAH_RXPlayer::setupSocket() {
    if (res < 0) {
        uint32_t a = ntohl(bind_addr.sin_addr.s_addr);
        uint16_t p = ntohs(bind_addr.sin_port);
        LOGE("Failed to bind socket (%d) to %d.%d.%d.%d:%hd. (errno %d)",
        LOGE("Failed to bind socket (%d) to %d.%d.%d.%d:%hu. (errno %d)",
             sock_fd_,
             (a >> 24) & 0xFF,
             (a >> 16) & 0xFF,
@@ -194,7 +194,7 @@ bool AAH_RXPlayer::setupSocket() {
        LOGW("Failed to increase socket buffer size to %d.  (errno %d)",
             buf_size, errno);
    } else {
        LOGI("RX socket buffer size is now %d bytes",  buf_size);
        LOGD("RX socket buffer size is now %d bytes",  buf_size);
    }

    if (listen_addr_.sin_addr.s_addr) {
+910 −359

File changed.

Preview size limit exceeded, changes collapsed.

+261 −77
Original line number Diff line number Diff line
@@ -17,13 +17,18 @@
#ifndef __AAH_TX_SENDER_H__
#define __AAH_TX_SENDER_H__

#include <media/stagefright/foundation/ALooper.h>
#include <media/stagefright/foundation/AHandlerReflector.h>
#include <netinet/in.h>
#include <stdint.h>

#include <media/stagefright/foundation/ABase.h>
#include <utils/RefBase.h>
#include <utils/threads.h>
#include <utils/Vector.h>

#include "aah_tx_packet.h"
#include "pipe_event.h"

#define IP_PRINTF_HELPER(a) ((a >> 24) & 0xFF), ((a >> 16) & 0xFF), \
                            ((a >>  8) & 0xFF),  (a        & 0xFF)

namespace android {

@@ -47,111 +52,290 @@ template <typename T> class CircularBuffer {
    size_t mFillCount;
};

struct RetryPacket {
    uint32_t id;
    // TODO: endpointIP and endpointPort are no longer needed now that we use a
    // dedicated send/C&C socket for each TX group.  Removing them is a protocol
    // breaking change; something which would be good to do before 1.0 ship so
    // we don't have to maintain backwards compatibility forever.
    uint32_t endpointIP;
    uint16_t endpointPort;
    uint16_t seqStart;
    uint16_t seqEnd;
} __attribute__((packed));

class AAH_TXGroup : public virtual RefBase {
  public:
    // Obtain the instance of the TXGroup whose command and control socket is
    // currently listening on the specified port.  Alternatively, if port is 0,
    // create a new TXGroup with an ephemerally bound command and control port.
    static sp<AAH_TXGroup> getGroup(uint16_t port);

    // Obtain the instance of the TXGroup whose multicast transmit target is
    // currently set to target, or NULL if no such group exists.  To create a
    // new transmit group with a new multicast target address, call getGroup(0)
    // followed by setMulticastTXTarget.
    static sp<AAH_TXGroup> getGroup(const struct sockaddr_in* target);

    // AAH_TXGroups successfully obtained using calls to getGroup will have a
    // client reference placed on them on behalf of the caller.  When the caller
    // is finished using the group, they must call dropClientReference to remove
    // this reference.  Once call client references have been released from a TX
    // Group, the group will linger in the system for a short period of time
    // before finally expiring and being cleaned up by the command and control
    // thread.
    // 
    // TODO : someday, expose the AAH_TXGroup as a top level object in the
    // android MediaAPIs so that applications may explicitly manage TX group
    // lifecycles instead of relying on this timeout/cleanup mechanism.
    void dropClientReference();

    // Fetch the UDP port on which this TXGroup is listening for command and
    // control messages.  No need to hold any locks for this, the port is
    // established as group is created and bound, and then never changed
    // afterwards.
    uint16_t getCmdAndControlPort() const { return mCmdAndControlPort; }

    // Used by players to obtain a new program ID for this retransmission group.
    uint16_t getNewProgramID();

    // Assign a TRTP sequence number to the supplied packet and send it to all
    // registered clients.  Then place the packet into the RetryBuffer to
    // service future client retry requests.
    status_t sendPacket(const sp<TRTPPacket>& packet);

    // Sets the multicast transmit target for this TX Group.  Pass NULL to clear
    // the multicast transmit target and return to pure unicast mode.
    void setMulticastTXTarget(const struct sockaddr_in* target);

  protected:
    // TXGroups are ref counted objects; make sure that only RefBase can call
    // the destructor.
    ~AAH_TXGroup();

    static sp<AAH_TXGroup> GetInstance();
  private:
    // Definition of a helper class used to track things like when we need to
    // transmit a heartbeat, or when we will need to wake up and trim the retry
    // buffers.
    class Timeout {
      public:
        Timeout() : mSystemEndTime(0) { }

        // Set a timeout which should occur msec milliseconds from now.
        // Negative values will cancel any current timeout;
        void setTimeout(int msec);

    ALooper::handler_id handlerID() { return mReflector->id(); }
        // Return the number of milliseconds until the timeout occurs, or -1 if
        // no timeout is scheduled.
        int msecTillTimeout(nsecs_t nowTime);
        int msecTillTimeout() { return msecTillTimeout(systemTime()); }

    // an IP address and port
    struct Endpoint {
        Endpoint();
        Endpoint(uint32_t a, uint16_t p);
        bool operator<(const Endpoint& other) const;
      private:
        // The systemTime() at which the timeout will be complete, or 0 if no
        // timeout is currently scheduled.
        nsecs_t mSystemEndTime;

        uint32_t addr;
        uint16_t port;
        DISALLOW_EVIL_CONSTRUCTORS(Timeout);
    };

    uint16_t registerEndpoint(const Endpoint& endpoint);
    void unregisterEndpoint(const Endpoint& endpoint);
    // Definition of the singleton command and control receiver who will handle
    // requests from RX clients.  Requests include things like unicast group
    // management as well as retransmission requests.
    class CmdAndControlRXer : public Thread {
      public:
        CmdAndControlRXer();
        void wakeupThread();
        bool init();

      protected:
        virtual ~CmdAndControlRXer();

    enum {
        kWhatSendPacket,
        kWhatTrimRetryBuffers,
        kWhatSendHeartbeats,
      private:
        virtual bool threadLoop();
        void clearWakeupEvent();

        static const int kMaxReceiverPacketLen;
        static const uint32_t kRetryRequestID;
        static const uint32_t kFastStartRequestID;
        static const uint32_t kRetryNakID;

        int mWakeupEventFD;

        // Timeout used to track when we need to trim all of the retry buffers.
        Timeout mTrimRetryTimeout;

        DISALLOW_EVIL_CONSTRUCTORS(CmdAndControlRXer);
    };

    // fields for SendPacket messages
    static const char* kSendPacketIPAddr;
    static const char* kSendPacketPort;
    static const char* kSendPacketTRTPPacket;
    // Small utility class used to keep track of our unicast clients and when
    // they are going to time out of the group if we don't keep receiving group
    // membership reports.
    class UnicastTarget : public RefBase {
      public:
        explicit UnicastTarget(const struct sockaddr_in& endpoint) {
            mGroupTimeout.setTimeout(kUnicastClientTimeoutMsec);
            mEndpoint = endpoint;
        }

  private:
    AAH_TXGroup();
        struct sockaddr_in mEndpoint;
        Timeout mGroupTimeout;

        DISALLOW_EVIL_CONSTRUCTORS(UnicastTarget);
    };

    // A circular buffer of TRTPPackets is a retry buffer for a TX Group.
    typedef CircularBuffer< sp<TRTPPacket> > RetryBuffer;

    /***************************************************************************
     *
     * Static shared state as well as static methods.
     *
     **************************************************************************/

    // Our lock for serializing access to shared state.
    static Mutex sLock;
    static wp<AAH_TXGroup> sInstance;

    // The list of currently active TX groups.
    static Vector < sp<AAH_TXGroup> > sActiveTXGroups;

    // The singleton command and control receiver thread.
    static sp<CmdAndControlRXer> mCmdAndControlRXer;

    // State used to generate unique TRTP epochs.
    static uint32_t sNextEpoch;
    static bool sNextEpochValid;

    static uint32_t getNextEpoch();

    typedef CircularBuffer<sp<TRTPPacket> > RetryBuffer;
    /***************************************************************************
     *
     * Private methods.
     *
     **************************************************************************/

    // state maintained on a per-endpoint basis
    struct EndpointState {
        EndpointState(uint32_t epoch);
        RetryBuffer retry;
        int playerRefCount;
        uint16_t trtpSeqNumber;
        uint16_t nextProgramID;
        uint32_t epoch;
    };
    // Private constructor.  Use the static getGroup to obtain an AAH_TXGroup
    // instance.
    AAH_TXGroup();

    friend class AHandlerReflector<AAH_TXGroup>;
    void onMessageReceived(const sp<AMessage>& msg);
    void onSendPacket(const sp<AMessage>& msg);
    void doSendPacket_l(const sp<TRTPPacket>& packet,
                        const Endpoint& endpoint);
    void trimRetryBuffers();
    void sendHeartbeats();
    bool shouldSendHeartbeats_l();
    // Used by the static getGroup methods when they need to create a brand new
    // TXGroup.
    bool bindSocket();

    sp<ALooper> mLooper;
    sp<AHandlerReflector<AAH_TXGroup> > mReflector;
    // Locked version of sendPacket.
    status_t sendPacket_l(const sp<TRTPPacket>& packet);

    int mSocket;
    nsecs_t mLastSentPacketTime;
    // Send a payload to a specific target address.
    status_t sendToTarget_l(const struct sockaddr_in* target,
                            const uint8_t* payload,
                            size_t length);

    DefaultKeyedVector<Endpoint, EndpointState*> mEndpointMap;
    Mutex mEndpointLock;
    // Add a client ref to this TX Group and reset its cleanup timer.
    void addClientReference();

    static const int kRetryTrimIntervalUs;
    static const int kHeartbeatIntervalUs;
    static const int kRetryBufferCapacity;
    static const nsecs_t kHeartbeatTimeout;
    // Test used by the C&C thread to see if its time to expire and cleanup a
    // client.
    bool shouldExpire();

    class RetryReceiver : public Thread {
      private:
        friend class AAH_TXGroup;
    // Trim expired packets from this instance's retry buffer.
    void trimRetryBuffer();

        RetryReceiver(AAH_TXGroup* sender);
        virtual ~RetryReceiver();
        virtual bool threadLoop();
        void handleRetryRequest();
    // Send a heartbeat to this instance's clients to let them know that we are
    // still alive if its been a while since we sent any traffic to them.
    void sendHeartbeatIfNeeded();

        static const int kMaxReceiverPacketLen;
        static const uint32_t kRetryRequestID;
        static const uint32_t kFastStartRequestID;
        static const uint32_t kRetryNakID;
    // Handle any pending command and control requests waiting in this TX
    // group's socket.
    void handleRequests();

        AAH_TXGroup* mSender;
        PipeEvent mWakeupEvent;
    };
    // Handlers for individual command and control request types.
    void handleRetryRequest(const uint8_t* req,
                            const struct sockaddr_in* src_addr,
                            bool isFastStart);

    sp<RetryReceiver> mRetryReceiver;
};
    void handleJoinGroup(const struct sockaddr_in* src_addr);

struct RetryPacket {
    uint32_t id;
    uint32_t endpointIP;
    uint16_t endpointPort;
    uint16_t seqStart;
    uint16_t seqEnd;
} __attribute__((packed));
    void handleLeaveGroup(const struct sockaddr_in* src_addr);

    /***************************************************************************
     *
     * Private member variables
     *
     **************************************************************************/

    // Lock we use to serialize access to instance variables.
    Mutex mLock;
    
    // The list of packets we hold for servicing retry requests.
    RetryBuffer mRetryBuffer;

    // The number of TX Player clients currently using this TX group.
    uint32_t mClientRefCount;

    // The sequence number to assign to the next transmitted TRTP packet.
    uint16_t mTRTPSeqNumber;

    // The program ID to assign to the next TXPlayer client.  Program IDs are
    // actually unsigned 16 bit ints, but android's atomic inc routine operates
    // on ints, not shorts.  Its not a problem, we just mask out the lower 16
    // bits and call it good.
    int mNextProgramID;

    // The TRTP Epoch assigned to this transmit group.
    uint32_t mEpoch;

    // The socket we use to send packet and receive command and control
    // requests.
    int mSocket;

    // The UDP port to which our socket is bound (in host order).
    uint16_t mCmdAndControlPort;

    // The multicast target to send traffic to (if valid) For sanity's sake,
    // TXGroups are not allowed to have multiple multicast targets.
    struct sockaddr_in mMulticastTarget;
    bool mMulticastTargetValid;

    // The list of unicast client targets to send traffic to.
    //
    // TODO: right now, N for this list is expected to be small (think 1-3), and
    // is capped at something reasonable (16 right now).  If we ever need to go
    // much beyond that, we should seriously consider switching this to
    // something with O(ln) lookup time indexed by client's endpoint so we can
    // efficiently handle the regular group membership reports we need to
    // process from each client.
    Vector< sp<UnicastTarget> > mUnicastTargets;

    // Timeout used to track when the next heartbeat should be sent.
    Timeout mHeartbeatTimeout;

    // Timeout used to determine when to clean up this TXGroup after it no
    // longer has any TXPlayer clients.
    Timeout mCleanupTimeout;

    /***************************************************************************
     *
     * Class wide constants.
     *
     **************************************************************************/
    static const int kRetryTrimIntervalMsec;
    static const int kHeartbeatIntervalMsec;
    static const int kTXGroupLingerTimeMsec;
    static const int kUnicastClientTimeoutMsec;

    static const size_t kRetryBufferCapacity;
    static const size_t kMaxUnicastTargets;
    static const size_t kInitialUnicastTargetCapacity;
    static const size_t kMaxAllowedTXGroups;
    static const size_t kInitialActiveTXGroupsCapacity;

    static const uint32_t kCNC_RetryRequestID;
    static const uint32_t kCNC_FastStartRequestID;
    static const uint32_t kCNC_NakRetryRequestID;
    static const uint32_t kCNC_JoinGroupID;
    static const uint32_t kCNC_LeaveGroupID;
    static const uint32_t kCNC_NakJoinGroupID;

    DISALLOW_EVIL_CONSTRUCTORS(AAH_TXGroup);
};

}  // namespace android

+1 −0
Original line number Diff line number Diff line
@@ -73,6 +73,7 @@ class TRTPPacket : public RefBase {
    nsecs_t getExpireTime() const;

    virtual bool pack() = 0;
    bool isPacked() const { return mIsPacked; }

    // mask for the number of bits in a TRTP epoch
    static const uint32_t kTRTPEpochMask = (1 << 22) - 1;
Loading