Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c23fad2f authored by Yi Jin's avatar Yi Jin
Browse files

Implement c++ native lib for streaming proto, part 1

Extract protobuf class out and creates EncodedBuffer class
which holds protobuf data.
Next step is to create a ProtoOutputStream and let incident helper
adapt the change as well.
please see frameworks/base/core/java/android/util/proto

Bug: 65641021
Test: unit tested
Change-Id: I0dd343b2e62d60f091c8f857fae3452ec8da6b96
parent f39df68b
Loading
Loading
Loading
Loading
+5 −5
Original line number Diff line number Diff line
@@ -23,7 +23,7 @@ include $(CLEAR_VARS)
LOCAL_MODULE := incidentd

LOCAL_SRC_FILES := \
        src/EncodedBuffer.cpp \
        src/PrivacyBuffer.cpp \
        src/FdBuffer.cpp \
        src/IncidentService.cpp \
        src/Privacy.cpp \
@@ -31,7 +31,6 @@ LOCAL_SRC_FILES := \
        src/Section.cpp \
        src/io_util.cpp \
        src/main.cpp \
        src/protobuf.cpp \
        src/report_directory.cpp

LOCAL_CFLAGS += \
@@ -54,6 +53,7 @@ LOCAL_SHARED_LIBRARIES := \
        libcutils \
        libincident \
        liblog \
        libprotoutil \
        libselinux \
        libservices \
        libutils
@@ -93,16 +93,15 @@ LOCAL_CFLAGS := -Werror -Wall -Wno-unused-variable -Wunused-parameter
LOCAL_C_INCLUDES += $(LOCAL_PATH)/src

LOCAL_SRC_FILES := \
    src/EncodedBuffer.cpp \
    src/PrivacyBuffer.cpp \
    src/FdBuffer.cpp \
    src/Privacy.cpp \
    src/Reporter.cpp \
    src/Section.cpp \
    src/io_util.cpp \
    src/protobuf.cpp \
    src/report_directory.cpp \
    tests/section_list.cpp \
    tests/EncodedBuffer_test.cpp \
    tests/PrivacyBuffer_test.cpp \
    tests/FdBuffer_test.cpp \
    tests/Reporter_test.cpp \
    tests/Section_test.cpp \
@@ -116,6 +115,7 @@ LOCAL_SHARED_LIBRARIES := \
    libcutils \
    libincident \
    liblog \
    libprotoutil \
    libselinux \
    libservices \
    libutils \
+17 −135
Original line number Diff line number Diff line
@@ -17,7 +17,6 @@
#define LOG_TAG "incidentd"

#include "FdBuffer.h"
#include "io_util.h"

#include <cutils/log.h>
#include <utils/SystemClock.h>
@@ -31,10 +30,9 @@ const ssize_t BUFFER_SIZE = 16 * 1024; // 16 KB
const ssize_t MAX_BUFFER_COUNT = 256; // 4 MB max

FdBuffer::FdBuffer()
    :mBuffers(),
    :mBuffer(BUFFER_SIZE),
     mStartTime(-1),
     mFinishTime(-1),
     mCurrentWritten(-1),
     mTimedOut(false),
     mTruncated(false)
{
@@ -42,11 +40,6 @@ FdBuffer::FdBuffer()

FdBuffer::~FdBuffer()
{
    const int N = mBuffers.size();
    for (int i=0; i<N; i++) {
        uint8_t* buf = mBuffers[i];
        free(buf);
    }
}

status_t
@@ -60,20 +53,12 @@ FdBuffer::read(int fd, int64_t timeout)

    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);

    uint8_t* buf = NULL;
    while (true) {
        if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) {
            if (mBuffers.size() == MAX_BUFFER_COUNT) {
        if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
            mTruncated = true;
            break;
        }
            buf = (uint8_t*)malloc(BUFFER_SIZE);
            if (buf == NULL) {
                return NO_MEMORY;
            }
            mBuffers.push_back(buf);
            mCurrentWritten = 0;
        }
        if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;

        int64_t remainingTime = (mStartTime + timeout) - uptimeMillis();
        if (remainingTime <= 0) {
@@ -91,7 +76,7 @@ FdBuffer::read(int fd, int64_t timeout)
            if ((pfds.revents & POLLERR) != 0) {
                return errno != 0 ? -errno : UNKNOWN_ERROR;
            } else {
                ssize_t amt = ::read(fd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten);
                ssize_t amt = ::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite());
                if (amt < 0) {
                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
                        continue;
@@ -101,11 +86,10 @@ FdBuffer::read(int fd, int64_t timeout)
                } else if (amt == 0) {
                    break;
                }
                mCurrentWritten += amt;
                mBuffer.wp()->move(amt);
            }
        }
    }

    mFinishTime = uptimeMillis();
    return NO_ERROR;
}
@@ -132,20 +116,12 @@ FdBuffer::readProcessedDataInStream(int fd, int toFd, int fromFd, int64_t timeou
    int rpos = 0, wpos = 0;

    // This is the buffer used to store processed data
    uint8_t* buf = NULL;
    while (true) {
        if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) {
            if (mBuffers.size() == MAX_BUFFER_COUNT) {
        if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
            mTruncated = true;
            break;
        }
            buf = (uint8_t*)malloc(BUFFER_SIZE);
            if (buf == NULL) {
                return NO_MEMORY;
            }
            mBuffers.push_back(buf);
            mCurrentWritten = 0;
        }
        if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;

        int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis();
        if (remainingTime <= 0) {
@@ -223,7 +199,7 @@ FdBuffer::readProcessedDataInStream(int fd, int toFd, int fromFd, int64_t timeou
        }

        // read from parsing process
        ssize_t amt = ::read(fromFd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten);
        ssize_t amt = ::read(fromFd, mBuffer.writeBuffer(), mBuffer.currentToWrite());
        if (amt < 0) {
            if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
                return -errno;
@@ -231,7 +207,7 @@ FdBuffer::readProcessedDataInStream(int fd, int toFd, int fromFd, int64_t timeou
        } else if (amt == 0) {
            break;
        } else {
            mCurrentWritten += amt;
            mBuffer.wp()->move(amt);
        }
    }

@@ -242,105 +218,11 @@ FdBuffer::readProcessedDataInStream(int fd, int toFd, int fromFd, int64_t timeou
size_t
FdBuffer::size() const
{
    if (mBuffers.empty()) return 0;
    return ((mBuffers.size() - 1) * BUFFER_SIZE) + mCurrentWritten;
}

status_t
FdBuffer::flush(int fd) const
{
    size_t i=0;
    status_t err = NO_ERROR;
    for (i=0; i<mBuffers.size()-1; i++) {
        err = write_all(fd, mBuffers[i], BUFFER_SIZE);
        if (err != NO_ERROR) return err;
    }
    return write_all(fd, mBuffers[i], mCurrentWritten);
}

FdBuffer::iterator
FdBuffer::begin() const
{
    return iterator(*this, 0, 0);
}

FdBuffer::iterator
FdBuffer::end() const
{
    if (mBuffers.empty() || mCurrentWritten < 0) return begin();
    if (mCurrentWritten == BUFFER_SIZE)
        // FdBuffer doesn't allocate another buf since no more bytes to read.
        return FdBuffer::iterator(*this, mBuffers.size(), 0);
    return FdBuffer::iterator(*this, mBuffers.size() - 1, mCurrentWritten);
}

// ===============================================================================
FdBuffer::iterator::iterator(const FdBuffer& buffer, ssize_t index, ssize_t offset)
        : mFdBuffer(buffer),
          mIndex(index),
          mOffset(offset)
{
}

FdBuffer::iterator&
FdBuffer::iterator::operator=(iterator& other) const { return other; }

FdBuffer::iterator&
FdBuffer::iterator::operator+(size_t offset)
{
    size_t newOffset = mOffset + offset;
    while (newOffset >= BUFFER_SIZE) {
        mIndex++;
        newOffset -= BUFFER_SIZE;
    }
    mOffset = newOffset;
    return *this;
}

FdBuffer::iterator&
FdBuffer::iterator::operator+=(size_t offset) { return *this + offset; }

FdBuffer::iterator&
FdBuffer::iterator::operator++() { return *this + 1; }

FdBuffer::iterator
FdBuffer::iterator::operator++(int) { return *this + 1; }

bool
FdBuffer::iterator::operator==(iterator other) const
{
    return mIndex == other.mIndex && mOffset == other.mOffset;
}

bool
FdBuffer::iterator::operator!=(iterator other) const { return !(*this == other); }

int
FdBuffer::iterator::operator-(iterator other) const
{
    return (int)bytesRead() - (int)other.bytesRead();
}

FdBuffer::iterator::reference
FdBuffer::iterator::operator*() const
{
    return mFdBuffer.mBuffers[mIndex][mOffset];
}

FdBuffer::iterator
FdBuffer::iterator::snapshot() const
{
    return FdBuffer::iterator(mFdBuffer, mIndex, mOffset);
}

size_t
FdBuffer::iterator::bytesRead() const
{
    return mIndex * BUFFER_SIZE + mOffset;
    return mBuffer.size();
}

bool
FdBuffer::iterator::outOfBound() const
EncodedBuffer::iterator
FdBuffer::data() const
{
    return bytesRead() > mFdBuffer.size();
    return mBuffer.begin();
}
+5 −38
Original line number Diff line number Diff line
@@ -17,11 +17,11 @@
#ifndef FD_BUFFER_H
#define FD_BUFFER_H

#include <android/util/EncodedBuffer.h>
#include <utils/Errors.h>

#include <vector>

using namespace android;
using namespace android::util;
using namespace std;

/**
@@ -70,53 +70,20 @@ public:
     */
    size_t size() const;

    /**
     * Flush all the data to given file descriptor;
     */
    status_t flush(int fd) const;

    /**
     * How long the read took in milliseconds.
     */
    int64_t durationMs() const { return mFinishTime - mStartTime; }

    /**
     * Read data stored in FdBuffer
     * Reader API for data stored in FdBuffer
     */
    class iterator;
    friend class iterator;
    class iterator : public std::iterator<std::random_access_iterator_tag, uint8_t> {
    public:
        iterator(const FdBuffer& buffer, ssize_t index, ssize_t offset);
        iterator& operator=(iterator& other) const;
        iterator& operator+(size_t offset);
        iterator& operator+=(size_t offset);
        iterator& operator++();
        iterator operator++(int);
        bool operator==(iterator other) const;
        bool operator!=(iterator other) const;
        int operator-(iterator other) const;
        reference operator*() const;

        // return the snapshot of the current iterator
        iterator snapshot() const;
        // how many bytes are read
        size_t bytesRead() const;
        // random access could make the iterator out of bound
        bool outOfBound() const;
    private:
        const FdBuffer& mFdBuffer;
        size_t mIndex;
        size_t mOffset;
    };
    iterator begin() const;
    iterator end() const;
    EncodedBuffer::iterator data() const;

private:
    vector<uint8_t*> mBuffers;
    EncodedBuffer mBuffer;
    int64_t mStartTime;
    int64_t mFinishTime;
    ssize_t mCurrentWritten;
    bool mTimedOut;
    bool mTruncated;
};
+168 −0
Original line number Diff line number Diff line
@@ -14,68 +14,51 @@
 * limitations under the License.
 */

#include "EncodedBuffer.h"


#include "PrivacyBuffer.h"
#include "io_util.h"
#include "protobuf.h"

#include <android/util/protobuf.h>
#include <deque>

const size_t BUFFER_SIZE = 4 * 1024; // 4 KB

/**
 * Read varint from iterator, the iterator will point to next available byte.
 * Return the number of bytes of the varint.
 */
static uint32_t
read_raw_varint(FdBuffer::iterator* it)
{
    uint32_t val = 0;
    int i = 0;
    bool hasNext = true;
    while (hasNext) {
        hasNext = ((**it & 0x80) != 0);
        val += (**it & 0x7F) << (7*i);
        (*it)++;
        i++;
    }
    return val;
}
using namespace android::util;

/**
 * Write the field to buf based on the wire type, iterator will point to next field.
 * If skip is set to true, no data will be written to buf. Return number of bytes written.
 */
static size_t
write_field_or_skip(FdBuffer::iterator* iter, vector<uint8_t>* buf, uint8_t wireType, bool skip)
write_field_or_skip(EncodedBuffer::iterator* iter, EncodedBuffer* buf, uint8_t wireType, bool skip)
{
    FdBuffer::iterator snapshot = iter->snapshot();
    EncodedBuffer::Pointer snapshot = iter->rp()->copy();
    size_t bytesToWrite = 0;
    uint32_t varint = 0;
    switch (wireType) {
        case WIRE_TYPE_VARINT:
            varint = read_raw_varint(iter);
            if(!skip) return write_raw_varint(buf, varint);
            varint = iter->readRawVarint();
            if(!skip) return buf->writeRawVarint(varint);
            break;
        case WIRE_TYPE_FIXED64:
            bytesToWrite = 8;
            break;
        case WIRE_TYPE_LENGTH_DELIMITED:
            bytesToWrite = read_raw_varint(iter);
            if(!skip) write_raw_varint(buf, bytesToWrite);
            bytesToWrite = iter->readRawVarint();
            if(!skip) buf->writeRawVarint(bytesToWrite);
            break;
        case WIRE_TYPE_FIXED32:
            bytesToWrite = 4;
            break;
    }
    if (skip) {
        *iter += bytesToWrite;
        iter->rp()->move(bytesToWrite);
    } else {
        for (size_t i=0; i<bytesToWrite; i++) {
            buf->push_back(**iter);
            (*iter)++;
            *buf->writeBuffer() = iter->next();
            buf->wp()->move();
        }
    }
    return skip ? 0 : *iter - snapshot;
    return skip ? 0 : iter->rp()->pos() - snapshot.pos();
}

/**
@@ -86,11 +69,10 @@ write_field_or_skip(FdBuffer::iterator* iter, vector<uint8_t>* buf, uint8_t wire
 * After exit with NO_ERROR, iterator points to the next protobuf field's head.
 */
static status_t
stripField(FdBuffer::iterator* iter, vector<uint8_t>* buf, const Privacy* parentPolicy, const PrivacySpec& spec)
stripField(EncodedBuffer::iterator* iter, EncodedBuffer* buf, const Privacy* parentPolicy, const PrivacySpec& spec)
{
    if (iter->outOfBound() || parentPolicy == NULL) return BAD_VALUE;

    uint32_t varint = read_raw_varint(iter);
    if (!iter->hasNext() || parentPolicy == NULL) return BAD_VALUE;
    uint32_t varint = iter->readRawVarint();
    uint8_t wireType = read_wire_type(varint);
    uint32_t fieldId = read_field_id(varint);
    const Privacy* policy = parentPolicy->lookup(fieldId);
@@ -98,98 +80,89 @@ stripField(FdBuffer::iterator* iter, vector<uint8_t>* buf, const Privacy* parent
    if (policy == NULL || !policy->IsMessageType() || !policy->HasChildren()) {
        bool skip = !spec.CheckPremission(policy);
        size_t amt = buf->size();
        if (!skip) amt += write_header(buf, fieldId, wireType);
        if (!skip) amt += buf->writeHeader(fieldId, wireType);
        amt += write_field_or_skip(iter, buf, wireType, skip); // point to head of next field
        return buf->size() != amt ? BAD_VALUE : NO_ERROR;
    }
    // current field is message type and its sub-fields have extra privacy policies
    deque<vector<uint8_t>> q;
    uint32_t msgSize = read_raw_varint(iter);
    deque<EncodedBuffer*> q;
    uint32_t msgSize = iter->readRawVarint();
    size_t finalSize = 0;
    FdBuffer::iterator start = iter->snapshot();
    while ((*iter - start) != (int)msgSize) {
        vector<uint8_t> v;
        status_t err = stripField(iter, &v, policy, spec);
    EncodedBuffer::Pointer start = iter->rp()->copy();
    while (iter->rp()->pos() - start.pos() != msgSize) {
        EncodedBuffer* v = new EncodedBuffer();
        status_t err = stripField(iter, v, policy, spec);
        if (err != NO_ERROR) return err;
        if (v.empty()) continue;
        if (v->size() == 0) continue;
        q.push_back(v);
        finalSize += v.size();
        finalSize += v->size();
    }

    write_header(buf, fieldId, wireType);
    write_raw_varint(buf, finalSize);
    buf->reserve(finalSize); // reserve the size of the field
    buf->writeHeader(fieldId, wireType);
    buf->writeRawVarint(finalSize);
    while (!q.empty()) {
        vector<uint8_t> subField = q.front();
        for (vector<uint8_t>::iterator it = subField.begin(); it != subField.end(); it++) {
            buf->push_back(*it);
        EncodedBuffer* subField = q.front();
        EncodedBuffer::iterator it = subField->begin();
        while (it.hasNext()) {
            *buf->writeBuffer() = it.next();
            buf->wp()->move();
        }
        q.pop_front();
        delete subField;
    }
    return NO_ERROR;
}

// ================================================================================
EncodedBuffer::EncodedBuffer(const FdBuffer& buffer, const Privacy* policy)
        : mFdBuffer(buffer),
          mPolicy(policy),
          mBuffers(),
PrivacyBuffer::PrivacyBuffer(const Privacy* policy, EncodedBuffer::iterator& data)
        :mPolicy(policy),
         mData(data),
         mBuffer(0),
         mSize(0)
{
}

EncodedBuffer::~EncodedBuffer()
PrivacyBuffer::~PrivacyBuffer()
{
}

status_t
EncodedBuffer::strip(const PrivacySpec& spec)
PrivacyBuffer::strip(const PrivacySpec& spec)
{
    // optimization when no strip happens
    if (mPolicy == NULL || !mPolicy->HasChildren() || spec.RequireAll()) {
        if (spec.CheckPremission(mPolicy)) mSize = mFdBuffer.size();
        if (spec.CheckPremission(mPolicy)) mSize = mData.size();
        return NO_ERROR;
    }

    FdBuffer::iterator it = mFdBuffer.begin();
    vector<uint8_t> field;
    field.reserve(BUFFER_SIZE);

    while (it != mFdBuffer.end()) {
        status_t err = stripField(&it, &field, mPolicy, spec);
    while (mData.hasNext()) {
        status_t err = stripField(&mData, &mBuffer, mPolicy, spec);
        if (err != NO_ERROR) return err;
        if (field.size() > BUFFER_SIZE) { // rotate to another chunk if buffer size exceeds
            mBuffers.push_back(field);
            mSize += field.size();
            field.clear();
        }
    }
    if (!field.empty()) {
        mBuffers.push_back(field);
        mSize += field.size();
    }
    if (mData.bytesRead() != mData.size()) return BAD_VALUE;
    mSize = mBuffer.size();
    mData.rp()->rewind(); // rewind the read pointer back to beginning after the strip.
    return NO_ERROR;
}

void
EncodedBuffer::clear()
PrivacyBuffer::clear()
{
    mSize = 0;
    mBuffers.clear();
    mBuffer.wp()->rewind();
}

size_t
EncodedBuffer::size() const { return mSize; }
PrivacyBuffer::size() const { return mSize; }

status_t
EncodedBuffer::flush(int fd)
PrivacyBuffer::flush(int fd)
{
    if (size() == mFdBuffer.size()) return mFdBuffer.flush(fd);

    for (vector<vector<uint8_t>>::iterator it = mBuffers.begin(); it != mBuffers.end(); it++) {
        status_t err = write_all(fd, it->data(), it->size());
    status_t err = NO_ERROR;
    EncodedBuffer::iterator iter = size() == mData.size() ? mData : mBuffer.begin();
    while (iter.readBuffer() != NULL) {
        err = write_all(fd, iter.readBuffer(), iter.currentToRead());
        iter.rp()->move(iter.currentToRead());
        if (err != NO_ERROR) return err;
    }
    return NO_ERROR;
}
+16 −13
Original line number Diff line number Diff line
@@ -14,25 +14,27 @@
 * limitations under the License.
 */

#ifndef ENCODED_BUFFER_H
#define ENCODED_BUFFER_H
#ifndef PRIVACY_BUFFER_H
#define PRIVACY_BUFFER_H

#include "FdBuffer.h"
#include "Privacy.h"

#include <android/util/EncodedBuffer.h>
#include <stdint.h>
#include <vector>
#include <utils/Errors.h>

using namespace android;
using namespace android::util;

/**
 * EncodedBuffer is constructed from FdBuffer which holds original protobuf formatted data and
 * its privacy policy in its tagged proto message. The class strips PII-sensitive fields
 * based on the request and holds stripped data in its buffer for output.
 * PrivacyBuffer holds the original protobuf data and strips PII-sensitive fields
 * based on the request and holds stripped data in its own buffer for output.
 */
class EncodedBuffer
class PrivacyBuffer
{
public:
    EncodedBuffer(const FdBuffer& buffer, const Privacy* policy);
    ~EncodedBuffer();
    PrivacyBuffer(const Privacy* policy, EncodedBuffer::iterator& data);
    ~PrivacyBuffer();

    /**
     * Strip based on the request and hold data in its own buffer. Return NO_ERROR if strip succeeds.
@@ -55,10 +57,11 @@ public:
    status_t flush(int fd);

private:
    const FdBuffer& mFdBuffer;
    const Privacy* mPolicy;
    vector<vector<uint8_t>> mBuffers;
    EncodedBuffer::iterator& mData;

    EncodedBuffer mBuffer;
    size_t mSize;
};

#endif // ENCODED_BUFFER_H
 No newline at end of file
#endif // PRIVACY_BUFFER_H
 No newline at end of file
Loading