Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 5dda7f6c authored by Yurii Zubrytskyi's avatar Yurii Zubrytskyi Committed by Josh Gao
Browse files

[adb] Optimize adbd's usb reading

Try to not allocate as many blocks on the heap, and reuse
memory instead of copying it

Get rid of unique_ptr and shared_ptr where possible, move
the Block objects themselves

Overall this reduces the time spent in memcpy() from 30% to
15% of the whole 'adb push' command, and gets rid of about 5%
of the time spent in the malloc/free calls

Test: builds
Change-Id: I8995115274b6f08a4df13c58183c928ef384a767
parent 834e4751
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -134,6 +134,7 @@ libadb_srcs = [
    "transport_fd.cpp",
    "transport_local.cpp",
    "transport_usb.cpp",
    "types.cpp",
]

libadb_posix_srcs = [
+60 −48
Original line number Diff line number Diff line
@@ -117,14 +117,18 @@ struct TransferId {
    }
};

template <class Payload>
struct IoBlock {
    bool pending = false;
    struct iocb control = {};
    std::shared_ptr<Block> payload;
    Payload payload;

    TransferId id() const { return TransferId::from_value(control.aio_data); }
};

using IoReadBlock = IoBlock<Block>;
using IoWriteBlock = IoBlock<std::shared_ptr<Block>>;

struct ScopedAioContext {
    ScopedAioContext() = default;
    ~ScopedAioContext() { reset(); }
@@ -208,16 +212,17 @@ struct UsbFfsConnection : public Connection {

    virtual bool Write(std::unique_ptr<apacket> packet) override final {
        LOG(DEBUG) << "USB write: " << dump_header(&packet->msg);
        Block header(sizeof(packet->msg));
        memcpy(header.data(), &packet->msg, sizeof(packet->msg));
        auto header = std::make_shared<Block>(sizeof(packet->msg));
        memcpy(header->data(), &packet->msg, sizeof(packet->msg));

        std::lock_guard<std::mutex> lock(write_mutex_);
        write_requests_.push_back(CreateWriteBlock(std::move(header), next_write_id_++));
        write_requests_.push_back(
                CreateWriteBlock(std::move(header), 0, sizeof(packet->msg), next_write_id_++));
        if (!packet->payload.empty()) {
            // The kernel attempts to allocate a contiguous block of memory for each write,
            // which can fail if the write is large and the kernel heap is fragmented.
            // Split large writes into smaller chunks to avoid this.
            std::shared_ptr<Block> payload = std::make_shared<Block>(std::move(packet->payload));
            auto payload = std::make_shared<Block>(std::move(packet->payload));
            size_t offset = 0;
            size_t len = payload->size();

@@ -464,16 +469,20 @@ struct UsbFfsConnection : public Connection {
        worker_thread_.join();
    }

    void PrepareReadBlock(IoBlock* block, uint64_t id) {
    void PrepareReadBlock(IoReadBlock* block, uint64_t id) {
        block->pending = false;
        block->payload = std::make_shared<Block>(kUsbReadSize);
        if (block->payload.capacity() >= kUsbReadSize) {
            block->payload.resize(kUsbReadSize);
        } else {
            block->payload = Block(kUsbReadSize);
        }
        block->control.aio_data = static_cast<uint64_t>(TransferId::read(id));
        block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload->data());
        block->control.aio_nbytes = block->payload->size();
        block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload.data());
        block->control.aio_nbytes = block->payload.size();
    }

    IoBlock CreateReadBlock(uint64_t id) {
        IoBlock block;
    IoReadBlock CreateReadBlock(uint64_t id) {
        IoReadBlock block;
        PrepareReadBlock(&block, id);
        block.control.aio_rw_flags = 0;
        block.control.aio_lio_opcode = IOCB_CMD_PREAD;
@@ -518,9 +527,9 @@ struct UsbFfsConnection : public Connection {

    void HandleRead(TransferId id, int64_t size) {
        uint64_t read_idx = id.id % kUsbReadQueueDepth;
        IoBlock* block = &read_requests_[read_idx];
        IoReadBlock* block = &read_requests_[read_idx];
        block->pending = false;
        block->payload->resize(size);
        block->payload.resize(size);

        // Notification for completed reads can be received out of order.
        if (block->id().id != needed_read_id_) {
@@ -531,7 +540,7 @@ struct UsbFfsConnection : public Connection {

        for (uint64_t id = needed_read_id_;; ++id) {
            size_t read_idx = id % kUsbReadQueueDepth;
            IoBlock* current_block = &read_requests_[read_idx];
            IoReadBlock* current_block = &read_requests_[read_idx];
            if (current_block->pending) {
                break;
            }
@@ -540,19 +549,19 @@ struct UsbFfsConnection : public Connection {
        }
    }

    void ProcessRead(IoBlock* block) {
        if (!block->payload->empty()) {
    void ProcessRead(IoReadBlock* block) {
        if (!block->payload.empty()) {
            if (!incoming_header_.has_value()) {
                CHECK_EQ(sizeof(amessage), block->payload->size());
                amessage msg;
                memcpy(&msg, block->payload->data(), sizeof(amessage));
                CHECK_EQ(sizeof(amessage), block->payload.size());
                amessage& msg = incoming_header_.emplace();
                memcpy(&msg, block->payload.data(), sizeof(msg));
                LOG(DEBUG) << "USB read:" << dump_header(&msg);
                incoming_header_ = msg;
            } else {
                size_t bytes_left = incoming_header_->data_length - incoming_payload_.size();
                Block payload = std::move(*block->payload);
                Block payload = std::move(block->payload);
                CHECK_LE(payload.size(), bytes_left);
                incoming_payload_.append(std::make_unique<Block>(std::move(payload)));
                incoming_payload_.append(std::move(payload));
            }

            if (incoming_header_->data_length == incoming_payload_.size()) {
@@ -560,11 +569,15 @@ struct UsbFfsConnection : public Connection {
                packet->msg = *incoming_header_;

                // TODO: Make apacket contain an IOVector so we don't have to coalesce.
                packet->payload = incoming_payload_.coalesce();
                packet->payload = std::move(incoming_payload_).coalesce();
                read_callback_(this, std::move(packet));

                incoming_header_.reset();
                incoming_payload_.clear();
                // reuse the capacity of the incoming payload while we can.
                auto free_block = incoming_payload_.clear();
                if (block->payload.capacity() == 0) {
                    block->payload = std::move(free_block);
                }
            }
        }

@@ -572,7 +585,7 @@ struct UsbFfsConnection : public Connection {
        SubmitRead(block);
    }

    bool SubmitRead(IoBlock* block) {
    bool SubmitRead(IoReadBlock* block) {
        block->pending = true;
        struct iocb* iocb = &block->control;
        if (io_submit(aio_context_.get(), 1, &iocb) != 1) {
@@ -594,7 +607,7 @@ struct UsbFfsConnection : public Connection {
        std::lock_guard<std::mutex> lock(write_mutex_);
        auto it =
                std::find_if(write_requests_.begin(), write_requests_.end(), [id](const auto& req) {
                    return static_cast<uint64_t>(req->id()) == static_cast<uint64_t>(id);
                    return static_cast<uint64_t>(req.id()) == static_cast<uint64_t>(id);
                });
        CHECK(it != write_requests_.end());

@@ -605,27 +618,26 @@ struct UsbFfsConnection : public Connection {
        SubmitWrites();
    }

    std::unique_ptr<IoBlock> CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset,
                                              size_t len, uint64_t id) {
        auto block = std::make_unique<IoBlock>();
        block->payload = std::move(payload);
        block->control.aio_data = static_cast<uint64_t>(TransferId::write(id));
        block->control.aio_rw_flags = 0;
        block->control.aio_lio_opcode = IOCB_CMD_PWRITE;
        block->control.aio_reqprio = 0;
        block->control.aio_fildes = write_fd_.get();
        block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload->data() + offset);
        block->control.aio_nbytes = len;
        block->control.aio_offset = 0;
        block->control.aio_flags = IOCB_FLAG_RESFD;
        block->control.aio_resfd = worker_event_fd_.get();
    IoWriteBlock CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset, size_t len,
                                  uint64_t id) {
        auto block = IoWriteBlock();
        block.payload = std::move(payload);
        block.control.aio_data = static_cast<uint64_t>(TransferId::write(id));
        block.control.aio_rw_flags = 0;
        block.control.aio_lio_opcode = IOCB_CMD_PWRITE;
        block.control.aio_reqprio = 0;
        block.control.aio_fildes = write_fd_.get();
        block.control.aio_buf = reinterpret_cast<uintptr_t>(block.payload->data() + offset);
        block.control.aio_nbytes = len;
        block.control.aio_offset = 0;
        block.control.aio_flags = IOCB_FLAG_RESFD;
        block.control.aio_resfd = worker_event_fd_.get();
        return block;
    }

    std::unique_ptr<IoBlock> CreateWriteBlock(Block payload, uint64_t id) {
        std::shared_ptr<Block> block = std::make_shared<Block>(std::move(payload));
        size_t len = block->size();
        return CreateWriteBlock(std::move(block), 0, len, id);
    IoWriteBlock CreateWriteBlock(Block&& payload, uint64_t id) {
        size_t len = payload.size();
        return CreateWriteBlock(std::make_shared<Block>(std::move(payload)), 0, len, id);
    }

    void SubmitWrites() REQUIRES(write_mutex_) {
@@ -642,9 +654,9 @@ struct UsbFfsConnection : public Connection {

        struct iocb* iocbs[kUsbWriteQueueDepth];
        for (int i = 0; i < writes_to_submit; ++i) {
            CHECK(!write_requests_[writes_submitted_ + i]->pending);
            write_requests_[writes_submitted_ + i]->pending = true;
            iocbs[i] = &write_requests_[writes_submitted_ + i]->control;
            CHECK(!write_requests_[writes_submitted_ + i].pending);
            write_requests_[writes_submitted_ + i].pending = true;
            iocbs[i] = &write_requests_[writes_submitted_ + i].control;
            LOG(VERBOSE) << "submitting write_request " << static_cast<void*>(iocbs[i]);
        }

@@ -689,7 +701,7 @@ struct UsbFfsConnection : public Connection {
    std::optional<amessage> incoming_header_;
    IOVector incoming_payload_;

    std::array<IoBlock, kUsbReadQueueDepth> read_requests_;
    std::array<IoReadBlock, kUsbReadQueueDepth> read_requests_;
    IOVector read_data_;

    // ID of the next request that we're going to send out.
@@ -699,7 +711,7 @@ struct UsbFfsConnection : public Connection {
    size_t needed_read_id_ = 0;

    std::mutex write_mutex_;
    std::deque<std::unique_ptr<IoBlock>> write_requests_ GUARDED_BY(write_mutex_);
    std::deque<IoWriteBlock> write_requests_ GUARDED_BY(write_mutex_);
    size_t next_write_id_ GUARDED_BY(write_mutex_) = 0;
    size_t writes_submitted_ GUARDED_BY(write_mutex_) = 0;

+1 −2
Original line number Diff line number Diff line
@@ -125,8 +125,7 @@ static SocketFlushResult local_socket_flush_incoming(asocket* s) {
        if (rc > 0 && static_cast<size_t>(rc) == s->packet_queue.size()) {
            s->packet_queue.clear();
        } else if (rc > 0) {
            // TODO: Implement a faster drop_front?
            s->packet_queue.take_front(rc);
            s->packet_queue.drop_front(rc);
            fdevent_add(s->fde, FDE_WRITE);
            return SocketFlushResult::TryAgain;
        } else if (rc == -1 && errno == EAGAIN) {
+7 −8
Original line number Diff line number Diff line
@@ -93,8 +93,8 @@ struct NonblockingFdConnection : public Connection {

                if (pfds[0].revents & POLLIN) {
                    // TODO: Should we be getting blocks from a free list?
                    auto block = std::make_unique<IOVector::block_type>(MAX_PAYLOAD);
                    rc = adb_read(fd_.get(), &(*block)[0], block->size());
                    auto block = IOVector::block_type(MAX_PAYLOAD);
                    rc = adb_read(fd_.get(), &block[0], block.size());
                    if (rc == -1) {
                        *error = std::string("read failed: ") + strerror(errno);
                        return;
@@ -102,7 +102,7 @@ struct NonblockingFdConnection : public Connection {
                        *error = "read failed: EOF";
                        return;
                    }
                    block->resize(rc);
                    block.resize(rc);
                    read_buffer_.append(std::move(block));

                    if (!read_header_ && read_buffer_.size() >= sizeof(amessage)) {
@@ -116,7 +116,7 @@ struct NonblockingFdConnection : public Connection {
                        auto data_chain = read_buffer_.take_front(read_header_->data_length);

                        // TODO: Make apacket carry around a IOVector instead of coalescing.
                        auto payload = data_chain.coalesce<apacket::payload_type>();
                        auto payload = std::move(data_chain).coalesce();
                        auto packet = std::make_unique<apacket>();
                        packet->msg = *read_header_;
                        packet->payload = std::move(payload);
@@ -184,8 +184,7 @@ struct NonblockingFdConnection : public Connection {
            return WriteResult::Error;
        }

        // TODO: Implement a more efficient drop_front?
        write_buffer_.take_front(rc);
        write_buffer_.drop_front(rc);
        writable_ = write_buffer_.empty();
        if (write_buffer_.empty()) {
            return WriteResult::Completed;
@@ -199,10 +198,10 @@ struct NonblockingFdConnection : public Connection {
        std::lock_guard<std::mutex> lock(write_mutex_);
        const char* header_begin = reinterpret_cast<const char*>(&packet->msg);
        const char* header_end = header_begin + sizeof(packet->msg);
        auto header_block = std::make_unique<IOVector::block_type>(header_begin, header_end);
        auto header_block = IOVector::block_type(header_begin, header_end);
        write_buffer_.append(std::move(header_block));
        if (!packet->payload.empty()) {
            write_buffer_.append(std::make_unique<IOVector::block_type>(std::move(packet->payload)));
            write_buffer_.append(std::move(packet->payload));
        }

        WriteResult result = DispatchWrites();

adb/types.cpp

0 → 100644
+204 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2019 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "types.h"

IOVector& IOVector::operator=(IOVector&& move) noexcept {
    chain_ = std::move(move.chain_);
    chain_length_ = move.chain_length_;
    begin_offset_ = move.begin_offset_;
    start_index_ = move.start_index_;

    move.clear();
    return *this;
}

IOVector::block_type IOVector::clear() {
    chain_length_ = 0;
    begin_offset_ = 0;
    start_index_ = 0;
    block_type res;
    if (!chain_.empty()) {
        res = std::move(chain_.back());
    }
    chain_.clear();
    return res;
}

void IOVector::drop_front(IOVector::size_type len) {
    if (len == 0) {
        return;
    }
    if (len == size()) {
        clear();
        return;
    }
    CHECK_LT(len, size());

    auto dropped = 0u;
    while (dropped < len) {
        const auto next = chain_[start_index_].size() - begin_offset_;
        if (dropped + next < len) {
            pop_front_block();
            dropped += next;
        } else {
            const auto taken = len - dropped;
            begin_offset_ += taken;
            break;
        }
    }
}

IOVector IOVector::take_front(IOVector::size_type len) {
    if (len == 0) {
        return {};
    }
    if (len == size()) {
        return std::move(*this);
    }

    CHECK_GE(size(), len);
    IOVector res;
    // first iterate over the blocks that completely go into the other vector
    while (chain_[start_index_].size() - begin_offset_ <= len) {
        chain_length_ -= chain_[start_index_].size();
        len -= chain_[start_index_].size() - begin_offset_;
        if (chain_[start_index_].size() > begin_offset_) {
            res.append(std::move(chain_[start_index_]));
            if (begin_offset_) {
                res.begin_offset_ = std::exchange(begin_offset_, 0);
            }
        } else {
            begin_offset_ = 0;
        }
        ++start_index_;
    }

    if (len > 0) {
        // what's left is a single buffer that needs to be split between the |res| and |this|
        // we know that it has to be split - there was a check for the case when it has to
        // go away as a whole.
        if (begin_offset_ != 0 || len < chain_[start_index_].size() / 2) {
            // let's memcpy the data out
            block_type block(chain_[start_index_].begin() + begin_offset_,
                             chain_[start_index_].begin() + begin_offset_ + len);
            res.append(std::move(block));
            begin_offset_ += len;
        } else {
            CHECK_EQ(begin_offset_, 0u);
            // move out the internal buffer out and copy only the tail of it back in
            block_type block(chain_[start_index_].begin() + len, chain_[start_index_].end());
            chain_length_ -= chain_[start_index_].size();
            chain_[start_index_].resize(len);
            res.append(std::move(chain_[start_index_]));
            chain_length_ += block.size();
            chain_[start_index_] = std::move(block);
        }
    }
    return res;
}

void IOVector::trim_front() {
    if ((begin_offset_ == 0 && start_index_ == 0) || chain_.empty()) {
        return;
    }
    block_type& first_block = chain_[start_index_];
    if (begin_offset_ == first_block.size()) {
        ++start_index_;
    } else {
        memmove(first_block.data(), first_block.data() + begin_offset_,
                first_block.size() - begin_offset_);
        first_block.resize(first_block.size() - begin_offset_);
    }
    chain_length_ -= begin_offset_;
    begin_offset_ = 0;
    trim_chain_front();
}

void IOVector::trim_chain_front() {
    if (start_index_) {
        chain_.erase(chain_.begin(), chain_.begin() + start_index_);
        start_index_ = 0;
    }
}

void IOVector::pop_front_block() {
    chain_length_ -= chain_[start_index_].size();
    begin_offset_ = 0;
    chain_[start_index_].clear();
    ++start_index_;
    if (start_index_ > std::max<size_t>(4, chain_.size() / 2)) {
        trim_chain_front();
    }
}

IOVector::block_type IOVector::coalesce() && {
    // Destructive coalesce() may optimize for several cases when it doesn't need to allocate
    // new buffer, or even return one of the existing blocks as is. The only guarantee is that
    // after this call the IOVector is in some valid state. Nothing is guaranteed about the
    // specifics.
    if (size() == 0) {
        return {};
    }
    if (begin_offset_ == chain_[start_index_].size() && chain_.size() == start_index_ + 2) {
        chain_length_ -= chain_.back().size();
        auto res = std::move(chain_.back());
        chain_.pop_back();
        return res;
    }
    if (chain_.size() == start_index_ + 1) {
        chain_length_ -= chain_.back().size();
        auto res = std::move(chain_.back());
        chain_.pop_back();
        if (begin_offset_ != 0) {
            memmove(res.data(), res.data() + begin_offset_, res.size() - begin_offset_);
            res.resize(res.size() - begin_offset_);
            begin_offset_ = 0;
        }
        return res;
    }
    if (auto& firstBuffer = chain_[start_index_]; firstBuffer.capacity() >= size()) {
        auto res = std::move(chain_[start_index_]);
        auto size = res.size();
        chain_length_ -= size;
        if (begin_offset_ != 0) {
            memmove(res.data(), res.data() + begin_offset_, res.size() - begin_offset_);
            size -= begin_offset_;
            begin_offset_ = 0;
        }
        for (auto i = start_index_ + 1; i < chain_.size(); ++i) {
            memcpy(res.data() + size, chain_[i].data(), chain_[i].size());
            size += chain_[i].size();
        }
        res.resize(size);
        ++start_index_;
        return res;
    }
    return const_cast<const IOVector*>(this)->coalesce<>();
}

std::vector<adb_iovec> IOVector::iovecs() const {
    std::vector<adb_iovec> result;
    result.reserve(chain_.size() - start_index_);
    iterate_blocks([&result](const char* data, size_t len) {
        adb_iovec iov;
        iov.iov_base = const_cast<char*>(data);
        iov.iov_len = len;
        result.emplace_back(iov);
    });

    return result;
}
Loading