Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7c738cdb authored by Josh Gao's avatar Josh Gao
Browse files

adb: add IOVector.

An IOVector is a collection of immutable reference counted blocks which
can have its head detached at an arbitrary index. This is extremely
useful for implementing packet-framed protocols like adb on top of a
stream protocol like TCP: a stream reader can read blocks, append them
to the end of the IOVector, and then pull packets off of the front.
This also lends itself naturally towards scatter/gather I/O, which will
enable us to read data from disk and send it across the wire with a
theoretical minimum number of copies in USB, and one extra copy over
TCP.

Since this is basically a generalization of std::deque<Range>, delete
Range and replace its uses with IOVector.

Test: adb_test
Test: wine adb_test.exe
Change-Id: I06561ad0bb25a3a51b378b61d257b5b04b41d9c4
parent 9da1a911
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -122,6 +122,7 @@ libadb_test_srcs = [
    "sysdeps_test.cpp",
    "sysdeps/stat_test.cpp",
    "transport_test.cpp",
    "types_test.cpp",
]

cc_library_host_static {
+1 −1
Original line number Diff line number Diff line
@@ -62,7 +62,7 @@ struct asocket {
    int fd = -1;

    // queue of data waiting to be written
    std::deque<Range> packet_queue;
    IOVector packet_queue;

    std::string smart_socket_data;

+9 −11
Original line number Diff line number Diff line
@@ -113,14 +113,14 @@ enum class SocketFlushResult {
};

static SocketFlushResult local_socket_flush_incoming(asocket* s) {
    while (!s->packet_queue.empty()) {
        Range& r = s->packet_queue.front();

        int rc = adb_write(s->fd, r.data(), r.size());
        if (rc == static_cast<int>(r.size())) {
            s->packet_queue.pop_front();
    if (!s->packet_queue.empty()) {
        std::vector<adb_iovec> iov = s->packet_queue.iovecs();
        ssize_t rc = adb_writev(s->fd, iov.data(), iov.size());
        if (rc > 0 && static_cast<size_t>(rc) == s->packet_queue.size()) {
            s->packet_queue.clear();
        } else if (rc > 0) {
            r.drop_front(rc);
            // TODO: Implement a faster drop_front?
            s->packet_queue.take_front(rc);
            fdevent_add(s->fde, FDE_WRITE);
            return SocketFlushResult::TryAgain;
        } else if (rc == -1 && errno == EAGAIN) {
@@ -130,7 +130,6 @@ static SocketFlushResult local_socket_flush_incoming(asocket* s) {
            // We failed to write, but it's possible that we can still read from the socket.
            // Give that a try before giving up.
            s->has_write_error = true;
            break;
        }
    }

@@ -217,8 +216,7 @@ static bool local_socket_flush_outgoing(asocket* s) {
static int local_socket_enqueue(asocket* s, apacket::payload_type data) {
    D("LS(%d): enqueue %zu", s->id, data.size());

    Range r(std::move(data));
    s->packet_queue.push_back(std::move(r));
    s->packet_queue.append(std::move(data));
    switch (local_socket_flush_incoming(s)) {
        case SocketFlushResult::Destroyed:
            return -1;
@@ -622,7 +620,7 @@ static int smart_socket_enqueue(asocket* s, apacket::payload_type data) {
    D("SS(%d): enqueue %zu", s->id, data.size());

    if (s->smart_socket_data.empty()) {
        // TODO: Make this a BlockChain?
        // TODO: Make this an IOVector?
        s->smart_socket_data.assign(data.begin(), data.end());
    } else {
        std::copy(data.begin(), data.end(), std::back_inserter(s->smart_socket_data));
+192 −17
Original line number Diff line number Diff line
@@ -17,11 +17,15 @@
#pragma once

#include <algorithm>
#include <deque>
#include <type_traits>
#include <utility>
#include <vector>

#include <android-base/logging.h>

#include "sysdeps/memory.h"
#include "sysdeps/uio.h"

// Essentially std::vector<char>, except without zero initialization or reallocation.
struct Block {
@@ -130,34 +134,205 @@ struct apacket {
    payload_type payload;
};

struct Range {
    explicit Range(apacket::payload_type data) : data_(std::move(data)) {}
struct IOVector {
    using value_type = char;
    using block_type = Block;
    using size_type = size_t;

    Range(const Range& copy) = delete;
    Range& operator=(const Range& copy) = delete;
    IOVector() {}

    Range(Range&& move) = default;
    Range& operator=(Range&& move) = default;
    explicit IOVector(std::unique_ptr<block_type> block) {
        append(std::move(block));
    }

    IOVector(const IOVector& copy) = delete;
    IOVector(IOVector&& move) : IOVector() {
        *this = std::move(move);
    }

    IOVector& operator=(const IOVector& copy) = delete;
    IOVector& operator=(IOVector&& move) {
        chain_ = std::move(move.chain_);
        chain_length_ = move.chain_length_;
        begin_offset_ = move.begin_offset_;
        end_offset_ = move.end_offset_;

        move.chain_.clear();
        move.chain_length_ = 0;
        move.begin_offset_ = 0;
        move.end_offset_ = 0;

        return *this;
    }

    size_t size() const { return data_.size() - begin_offset_ - end_offset_; };
    size_type size() const { return chain_length_ - begin_offset_ - end_offset_; }
    bool empty() const { return size() == 0; }

    void drop_front(size_t n) {
        CHECK_GE(size(), n);
        begin_offset_ += n;
    void clear() {
        chain_length_ = 0;
        begin_offset_ = 0;
        end_offset_ = 0;
        chain_.clear();
    }

    // Split the first |len| bytes out of this chain into its own.
    IOVector take_front(size_type len) {
        IOVector head;

        if (len == 0) {
            return head;
        }
        CHECK_GE(size(), len);

        std::shared_ptr<const block_type> first_block = chain_.front();
        CHECK_GE(first_block->size(), begin_offset_);
        head.append_shared(std::move(first_block));
        head.begin_offset_ = begin_offset_;

        while (head.size() < len) {
            pop_front_block();
            CHECK(!chain_.empty());

            head.append_shared(chain_.front());
        }

        if (head.size() == len) {
            // Head takes full ownership of the last block it took.
            head.end_offset_ = 0;
            begin_offset_ = 0;
            pop_front_block();
        } else {
            // Head takes partial ownership of the last block it took.
            size_t bytes_taken = head.size() - len;
            head.end_offset_ = bytes_taken;
            CHECK_GE(chain_.front()->size(), bytes_taken);
            begin_offset_ = chain_.front()->size() - bytes_taken;
        }

        return head;
    }

    // Add a nonempty block to the chain.
    // The end of the chain must be a complete block (i.e. end_offset_ == 0).
    void append(std::unique_ptr<const block_type> block) {
        CHECK_NE(0ULL, block->size());
        CHECK_EQ(0ULL, end_offset_);
        chain_length_ += block->size();
        chain_.emplace_back(std::move(block));
    }

    void append(block_type&& block) { append(std::make_unique<block_type>(std::move(block))); }

    void trim_front() {
        if (begin_offset_ == 0) {
            return;
        }

        const block_type* first_block = chain_.front().get();
        auto copy = std::make_unique<block_type>(first_block->size() - begin_offset_);
        memcpy(copy->data(), first_block->data() + begin_offset_, copy->size());
        chain_.front() = std::move(copy);

        chain_length_ -= begin_offset_;
        begin_offset_ = 0;
    }

  private:
    // append, except takes a shared_ptr.
    // Private to prevent exterior mutation of blocks.
    void append_shared(std::shared_ptr<const block_type> block) {
        CHECK_NE(0ULL, block->size());
        CHECK_EQ(0ULL, end_offset_);
        chain_length_ += block->size();
        chain_.emplace_back(std::move(block));
    }

    // Drop the front block from the chain, and update chain_length_ appropriately.
    void pop_front_block() {
        chain_length_ -= chain_.front()->size();
        begin_offset_ = 0;
        chain_.pop_front();
    }

    void drop_end(size_t n) {
        CHECK_GE(size(), n);
        end_offset_ += n;
    // Iterate over the blocks with a callback with an operator()(const char*, size_t).
    template <typename Fn>
    void iterate_blocks(Fn&& callback) const {
        if (chain_.size() == 0) {
            return;
        }

    char* data() { return &data_[0] + begin_offset_; }
        for (size_t i = 0; i < chain_.size(); ++i) {
            const std::shared_ptr<const block_type>& block = chain_.at(i);
            const char* begin = block->data();
            size_t length = block->size();

            // Note that both of these conditions can be true if there's only one block.
            if (i == 0) {
                CHECK_GE(block->size(), begin_offset_);
                begin += begin_offset_;
                length -= begin_offset_;
            }

            if (i == chain_.size() - 1) {
                CHECK_GE(length, end_offset_);
                length -= end_offset_;
            }

            callback(begin, length);
        }
    }

    apacket::payload_type::iterator begin() { return data_.begin() + begin_offset_; }
    apacket::payload_type::iterator end() { return data_.end() - end_offset_; }
  public:
    // Copy all of the blocks into a single block.
    template <typename CollectionType = block_type>
    CollectionType coalesce() const {
        CollectionType result;
        if (size() == 0) {
            return result;
        }

        result.resize(size());

        size_t offset = 0;
        iterate_blocks([&offset, &result](const char* data, size_t len) {
            memcpy(&result[offset], data, len);
            offset += len;
        });

        return result;
    }

    template <typename FunctionType>
    auto coalesced(FunctionType&& f) const ->
        typename std::result_of<FunctionType(const char*, size_t)>::type {
        if (chain_.size() == 1) {
            // If we only have one block, we can use it directly.
            return f(chain_.front()->data() + begin_offset_, size());
        } else {
            // Otherwise, copy to a single block.
            auto data = coalesce();
            return f(data.data(), data.size());
        }
    }

    // Get a list of iovecs that can be used to write out all of the blocks.
    std::vector<adb_iovec> iovecs() const {
        std::vector<adb_iovec> result;
        iterate_blocks([&result](const char* data, size_t len) {
            adb_iovec iov;
            iov.iov_base = const_cast<char*>(data);
            iov.iov_len = len;
            result.emplace_back(iov);
        });

        return result;
    }

  private:
    // Total length of all of the blocks in the chain.
    size_t chain_length_ = 0;

    apacket::payload_type data_;
    size_t begin_offset_ = 0;
    size_t end_offset_ = 0;
    std::deque<std::shared_ptr<const block_type>> chain_;
};

adb/types_test.cpp

0 → 100644
+119 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2018 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <gtest/gtest.h>

#include "sysdeps/memory.h"
#include "types.h"

static std::unique_ptr<IOVector::block_type> create_block(const std::string& string) {
    return std::make_unique<IOVector::block_type>(string.begin(), string.end());
}

static std::unique_ptr<IOVector::block_type> create_block(char value, size_t len) {
    auto block = std::make_unique<IOVector::block_type>();
    block->resize(len);
    memset(&(*block)[0], value, len);
    return block;
}

template <typename T>
static std::unique_ptr<IOVector::block_type> copy_block(T&& block) {
    auto copy = std::make_unique<IOVector::block_type>();
    copy->assign(block->begin(), block->end());
    return copy;
}

TEST(IOVector, empty) {
    // Empty IOVector.
    IOVector bc;
    CHECK_EQ(0ULL, bc.coalesce().size());
}

TEST(IOVector, single_block) {
    // A single block.
    auto block = create_block('x', 100);
    IOVector bc;
    bc.append(copy_block(block));
    ASSERT_EQ(100ULL, bc.size());
    auto coalesced = bc.coalesce();
    ASSERT_EQ(*block, coalesced);
}

TEST(IOVector, single_block_split) {
    // One block split.
    IOVector bc;
    bc.append(create_block("foobar"));
    IOVector foo = bc.take_front(3);
    ASSERT_EQ(3ULL, foo.size());
    ASSERT_EQ(3ULL, bc.size());
    ASSERT_EQ(*create_block("foo"), foo.coalesce());
    ASSERT_EQ(*create_block("bar"), bc.coalesce());
}

TEST(IOVector, aligned_split) {
    IOVector bc;
    bc.append(create_block("foo"));
    bc.append(create_block("bar"));
    bc.append(create_block("baz"));
    ASSERT_EQ(9ULL, bc.size());

    IOVector foo = bc.take_front(3);
    ASSERT_EQ(3ULL, foo.size());
    ASSERT_EQ(*create_block("foo"), foo.coalesce());

    IOVector bar = bc.take_front(3);
    ASSERT_EQ(3ULL, bar.size());
    ASSERT_EQ(*create_block("bar"), bar.coalesce());

    IOVector baz = bc.take_front(3);
    ASSERT_EQ(3ULL, baz.size());
    ASSERT_EQ(*create_block("baz"), baz.coalesce());

    ASSERT_EQ(0ULL, bc.size());
}

TEST(IOVector, misaligned_split) {
    IOVector bc;
    bc.append(create_block("foo"));
    bc.append(create_block("bar"));
    bc.append(create_block("baz"));
    bc.append(create_block("qux"));
    bc.append(create_block("quux"));

    // Aligned left, misaligned right, across multiple blocks.
    IOVector foob = bc.take_front(4);
    ASSERT_EQ(4ULL, foob.size());
    ASSERT_EQ(*create_block("foob"), foob.coalesce());

    // Misaligned left, misaligned right, in one block.
    IOVector a = bc.take_front(1);
    ASSERT_EQ(1ULL, a.size());
    ASSERT_EQ(*create_block("a"), a.coalesce());

    // Misaligned left, misaligned right, across two blocks.
    IOVector rba = bc.take_front(3);
    ASSERT_EQ(3ULL, rba.size());
    ASSERT_EQ(*create_block("rba"), rba.coalesce());

    // Misaligned left, misaligned right, across three blocks.
    IOVector zquxquu = bc.take_front(7);
    ASSERT_EQ(7ULL, zquxquu.size());
    ASSERT_EQ(*create_block("zquxquu"), zquxquu.coalesce());

    ASSERT_EQ(1ULL, bc.size());
    ASSERT_EQ(*create_block("x"), bc.coalesce());
}