Loading adb/Android.bp +1 −0 Original line number Diff line number Diff line Loading @@ -134,6 +134,7 @@ libadb_srcs = [ "transport_fd.cpp", "transport_local.cpp", "transport_usb.cpp", "types.cpp", ] libadb_posix_srcs = [ Loading adb/daemon/usb.cpp +60 −48 Original line number Diff line number Diff line Loading @@ -117,14 +117,18 @@ struct TransferId { } }; template <class Payload> struct IoBlock { bool pending = false; struct iocb control = {}; std::shared_ptr<Block> payload; Payload payload; TransferId id() const { return TransferId::from_value(control.aio_data); } }; using IoReadBlock = IoBlock<Block>; using IoWriteBlock = IoBlock<std::shared_ptr<Block>>; struct ScopedAioContext { ScopedAioContext() = default; ~ScopedAioContext() { reset(); } Loading Loading @@ -208,16 +212,17 @@ struct UsbFfsConnection : public Connection { virtual bool Write(std::unique_ptr<apacket> packet) override final { LOG(DEBUG) << "USB write: " << dump_header(&packet->msg); Block header(sizeof(packet->msg)); memcpy(header.data(), &packet->msg, sizeof(packet->msg)); auto header = std::make_shared<Block>(sizeof(packet->msg)); memcpy(header->data(), &packet->msg, sizeof(packet->msg)); std::lock_guard<std::mutex> lock(write_mutex_); write_requests_.push_back(CreateWriteBlock(std::move(header), next_write_id_++)); write_requests_.push_back( CreateWriteBlock(std::move(header), 0, sizeof(packet->msg), next_write_id_++)); if (!packet->payload.empty()) { // The kernel attempts to allocate a contiguous block of memory for each write, // which can fail if the write is large and the kernel heap is fragmented. // Split large writes into smaller chunks to avoid this. std::shared_ptr<Block> payload = std::make_shared<Block>(std::move(packet->payload)); auto payload = std::make_shared<Block>(std::move(packet->payload)); size_t offset = 0; size_t len = payload->size(); Loading Loading @@ -464,16 +469,20 @@ struct UsbFfsConnection : public Connection { worker_thread_.join(); } void PrepareReadBlock(IoBlock* block, uint64_t id) { void PrepareReadBlock(IoReadBlock* block, uint64_t id) { block->pending = false; block->payload = std::make_shared<Block>(kUsbReadSize); if (block->payload.capacity() >= kUsbReadSize) { block->payload.resize(kUsbReadSize); } else { block->payload = Block(kUsbReadSize); } block->control.aio_data = static_cast<uint64_t>(TransferId::read(id)); block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload->data()); block->control.aio_nbytes = block->payload->size(); block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload.data()); block->control.aio_nbytes = block->payload.size(); } IoBlock CreateReadBlock(uint64_t id) { IoBlock block; IoReadBlock CreateReadBlock(uint64_t id) { IoReadBlock block; PrepareReadBlock(&block, id); block.control.aio_rw_flags = 0; block.control.aio_lio_opcode = IOCB_CMD_PREAD; Loading Loading @@ -518,9 +527,9 @@ struct UsbFfsConnection : public Connection { void HandleRead(TransferId id, int64_t size) { uint64_t read_idx = id.id % kUsbReadQueueDepth; IoBlock* block = &read_requests_[read_idx]; IoReadBlock* block = &read_requests_[read_idx]; block->pending = false; block->payload->resize(size); block->payload.resize(size); // Notification for completed reads can be received out of order. if (block->id().id != needed_read_id_) { Loading @@ -531,7 +540,7 @@ struct UsbFfsConnection : public Connection { for (uint64_t id = needed_read_id_;; ++id) { size_t read_idx = id % kUsbReadQueueDepth; IoBlock* current_block = &read_requests_[read_idx]; IoReadBlock* current_block = &read_requests_[read_idx]; if (current_block->pending) { break; } Loading @@ -540,19 +549,19 @@ struct UsbFfsConnection : public Connection { } } void ProcessRead(IoBlock* block) { if (!block->payload->empty()) { void ProcessRead(IoReadBlock* block) { if (!block->payload.empty()) { if (!incoming_header_.has_value()) { CHECK_EQ(sizeof(amessage), block->payload->size()); amessage msg; memcpy(&msg, block->payload->data(), sizeof(amessage)); CHECK_EQ(sizeof(amessage), block->payload.size()); amessage& msg = incoming_header_.emplace(); memcpy(&msg, block->payload.data(), sizeof(msg)); LOG(DEBUG) << "USB read:" << dump_header(&msg); incoming_header_ = msg; } else { size_t bytes_left = incoming_header_->data_length - incoming_payload_.size(); Block payload = std::move(*block->payload); Block payload = std::move(block->payload); CHECK_LE(payload.size(), bytes_left); incoming_payload_.append(std::make_unique<Block>(std::move(payload))); incoming_payload_.append(std::move(payload)); } if (incoming_header_->data_length == incoming_payload_.size()) { Loading @@ -560,11 +569,15 @@ struct UsbFfsConnection : public Connection { packet->msg = *incoming_header_; // TODO: Make apacket contain an IOVector so we don't have to coalesce. packet->payload = incoming_payload_.coalesce(); packet->payload = std::move(incoming_payload_).coalesce(); read_callback_(this, std::move(packet)); incoming_header_.reset(); incoming_payload_.clear(); // reuse the capacity of the incoming payload while we can. auto free_block = incoming_payload_.clear(); if (block->payload.capacity() == 0) { block->payload = std::move(free_block); } } } Loading @@ -572,7 +585,7 @@ struct UsbFfsConnection : public Connection { SubmitRead(block); } bool SubmitRead(IoBlock* block) { bool SubmitRead(IoReadBlock* block) { block->pending = true; struct iocb* iocb = &block->control; if (io_submit(aio_context_.get(), 1, &iocb) != 1) { Loading @@ -594,7 +607,7 @@ struct UsbFfsConnection : public Connection { std::lock_guard<std::mutex> lock(write_mutex_); auto it = std::find_if(write_requests_.begin(), write_requests_.end(), [id](const auto& req) { return static_cast<uint64_t>(req->id()) == static_cast<uint64_t>(id); return static_cast<uint64_t>(req.id()) == static_cast<uint64_t>(id); }); CHECK(it != write_requests_.end()); Loading @@ -605,27 +618,26 @@ struct UsbFfsConnection : public Connection { SubmitWrites(); } std::unique_ptr<IoBlock> CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset, size_t len, uint64_t id) { auto block = std::make_unique<IoBlock>(); block->payload = std::move(payload); block->control.aio_data = static_cast<uint64_t>(TransferId::write(id)); block->control.aio_rw_flags = 0; block->control.aio_lio_opcode = IOCB_CMD_PWRITE; block->control.aio_reqprio = 0; block->control.aio_fildes = write_fd_.get(); block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload->data() + offset); block->control.aio_nbytes = len; block->control.aio_offset = 0; block->control.aio_flags = IOCB_FLAG_RESFD; block->control.aio_resfd = worker_event_fd_.get(); IoWriteBlock CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset, size_t len, uint64_t id) { auto block = IoWriteBlock(); block.payload = std::move(payload); block.control.aio_data = static_cast<uint64_t>(TransferId::write(id)); block.control.aio_rw_flags = 0; block.control.aio_lio_opcode = IOCB_CMD_PWRITE; block.control.aio_reqprio = 0; block.control.aio_fildes = write_fd_.get(); block.control.aio_buf = reinterpret_cast<uintptr_t>(block.payload->data() + offset); block.control.aio_nbytes = len; block.control.aio_offset = 0; block.control.aio_flags = IOCB_FLAG_RESFD; block.control.aio_resfd = worker_event_fd_.get(); return block; } std::unique_ptr<IoBlock> CreateWriteBlock(Block payload, uint64_t id) { std::shared_ptr<Block> block = std::make_shared<Block>(std::move(payload)); size_t len = block->size(); return CreateWriteBlock(std::move(block), 0, len, id); IoWriteBlock CreateWriteBlock(Block&& payload, uint64_t id) { size_t len = payload.size(); return CreateWriteBlock(std::make_shared<Block>(std::move(payload)), 0, len, id); } void SubmitWrites() REQUIRES(write_mutex_) { Loading @@ -642,9 +654,9 @@ struct UsbFfsConnection : public Connection { struct iocb* iocbs[kUsbWriteQueueDepth]; for (int i = 0; i < writes_to_submit; ++i) { CHECK(!write_requests_[writes_submitted_ + i]->pending); write_requests_[writes_submitted_ + i]->pending = true; iocbs[i] = &write_requests_[writes_submitted_ + i]->control; CHECK(!write_requests_[writes_submitted_ + i].pending); write_requests_[writes_submitted_ + i].pending = true; iocbs[i] = &write_requests_[writes_submitted_ + i].control; LOG(VERBOSE) << "submitting write_request " << static_cast<void*>(iocbs[i]); } Loading Loading @@ -689,7 +701,7 @@ struct UsbFfsConnection : public Connection { std::optional<amessage> incoming_header_; IOVector incoming_payload_; std::array<IoBlock, kUsbReadQueueDepth> read_requests_; std::array<IoReadBlock, kUsbReadQueueDepth> read_requests_; IOVector read_data_; // ID of the next request that we're going to send out. Loading @@ -699,7 +711,7 @@ struct UsbFfsConnection : public Connection { size_t needed_read_id_ = 0; std::mutex write_mutex_; std::deque<std::unique_ptr<IoBlock>> write_requests_ GUARDED_BY(write_mutex_); std::deque<IoWriteBlock> write_requests_ GUARDED_BY(write_mutex_); size_t next_write_id_ GUARDED_BY(write_mutex_) = 0; size_t writes_submitted_ GUARDED_BY(write_mutex_) = 0; Loading adb/sockets.cpp +1 −2 Original line number Diff line number Diff line Loading @@ -125,8 +125,7 @@ static SocketFlushResult local_socket_flush_incoming(asocket* s) { if (rc > 0 && static_cast<size_t>(rc) == s->packet_queue.size()) { s->packet_queue.clear(); } else if (rc > 0) { // TODO: Implement a faster drop_front? s->packet_queue.take_front(rc); s->packet_queue.drop_front(rc); fdevent_add(s->fde, FDE_WRITE); return SocketFlushResult::TryAgain; } else if (rc == -1 && errno == EAGAIN) { Loading adb/transport_fd.cpp +7 −8 Original line number Diff line number Diff line Loading @@ -93,8 +93,8 @@ struct NonblockingFdConnection : public Connection { if (pfds[0].revents & POLLIN) { // TODO: Should we be getting blocks from a free list? auto block = std::make_unique<IOVector::block_type>(MAX_PAYLOAD); rc = adb_read(fd_.get(), &(*block)[0], block->size()); auto block = IOVector::block_type(MAX_PAYLOAD); rc = adb_read(fd_.get(), &block[0], block.size()); if (rc == -1) { *error = std::string("read failed: ") + strerror(errno); return; Loading @@ -102,7 +102,7 @@ struct NonblockingFdConnection : public Connection { *error = "read failed: EOF"; return; } block->resize(rc); block.resize(rc); read_buffer_.append(std::move(block)); if (!read_header_ && read_buffer_.size() >= sizeof(amessage)) { Loading @@ -116,7 +116,7 @@ struct NonblockingFdConnection : public Connection { auto data_chain = read_buffer_.take_front(read_header_->data_length); // TODO: Make apacket carry around a IOVector instead of coalescing. auto payload = data_chain.coalesce<apacket::payload_type>(); auto payload = std::move(data_chain).coalesce(); auto packet = std::make_unique<apacket>(); packet->msg = *read_header_; packet->payload = std::move(payload); Loading Loading @@ -184,8 +184,7 @@ struct NonblockingFdConnection : public Connection { return WriteResult::Error; } // TODO: Implement a more efficient drop_front? write_buffer_.take_front(rc); write_buffer_.drop_front(rc); writable_ = write_buffer_.empty(); if (write_buffer_.empty()) { return WriteResult::Completed; Loading @@ -199,10 +198,10 @@ struct NonblockingFdConnection : public Connection { std::lock_guard<std::mutex> lock(write_mutex_); const char* header_begin = reinterpret_cast<const char*>(&packet->msg); const char* header_end = header_begin + sizeof(packet->msg); auto header_block = std::make_unique<IOVector::block_type>(header_begin, header_end); auto header_block = IOVector::block_type(header_begin, header_end); write_buffer_.append(std::move(header_block)); if (!packet->payload.empty()) { write_buffer_.append(std::make_unique<IOVector::block_type>(std::move(packet->payload))); write_buffer_.append(std::move(packet->payload)); } WriteResult result = DispatchWrites(); Loading adb/types.cpp 0 → 100644 +204 −0 Original line number Diff line number Diff line /* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "types.h" IOVector& IOVector::operator=(IOVector&& move) noexcept { chain_ = std::move(move.chain_); chain_length_ = move.chain_length_; begin_offset_ = move.begin_offset_; start_index_ = move.start_index_; move.clear(); return *this; } IOVector::block_type IOVector::clear() { chain_length_ = 0; begin_offset_ = 0; start_index_ = 0; block_type res; if (!chain_.empty()) { res = std::move(chain_.back()); } chain_.clear(); return res; } void IOVector::drop_front(IOVector::size_type len) { if (len == 0) { return; } if (len == size()) { clear(); return; } CHECK_LT(len, size()); auto dropped = 0u; while (dropped < len) { const auto next = chain_[start_index_].size() - begin_offset_; if (dropped + next < len) { pop_front_block(); dropped += next; } else { const auto taken = len - dropped; begin_offset_ += taken; break; } } } IOVector IOVector::take_front(IOVector::size_type len) { if (len == 0) { return {}; } if (len == size()) { return std::move(*this); } CHECK_GE(size(), len); IOVector res; // first iterate over the blocks that completely go into the other vector while (chain_[start_index_].size() - begin_offset_ <= len) { chain_length_ -= chain_[start_index_].size(); len -= chain_[start_index_].size() - begin_offset_; if (chain_[start_index_].size() > begin_offset_) { res.append(std::move(chain_[start_index_])); if (begin_offset_) { res.begin_offset_ = std::exchange(begin_offset_, 0); } } else { begin_offset_ = 0; } ++start_index_; } if (len > 0) { // what's left is a single buffer that needs to be split between the |res| and |this| // we know that it has to be split - there was a check for the case when it has to // go away as a whole. if (begin_offset_ != 0 || len < chain_[start_index_].size() / 2) { // let's memcpy the data out block_type block(chain_[start_index_].begin() + begin_offset_, chain_[start_index_].begin() + begin_offset_ + len); res.append(std::move(block)); begin_offset_ += len; } else { CHECK_EQ(begin_offset_, 0u); // move out the internal buffer out and copy only the tail of it back in block_type block(chain_[start_index_].begin() + len, chain_[start_index_].end()); chain_length_ -= chain_[start_index_].size(); chain_[start_index_].resize(len); res.append(std::move(chain_[start_index_])); chain_length_ += block.size(); chain_[start_index_] = std::move(block); } } return res; } void IOVector::trim_front() { if ((begin_offset_ == 0 && start_index_ == 0) || chain_.empty()) { return; } block_type& first_block = chain_[start_index_]; if (begin_offset_ == first_block.size()) { ++start_index_; } else { memmove(first_block.data(), first_block.data() + begin_offset_, first_block.size() - begin_offset_); first_block.resize(first_block.size() - begin_offset_); } chain_length_ -= begin_offset_; begin_offset_ = 0; trim_chain_front(); } void IOVector::trim_chain_front() { if (start_index_) { chain_.erase(chain_.begin(), chain_.begin() + start_index_); start_index_ = 0; } } void IOVector::pop_front_block() { chain_length_ -= chain_[start_index_].size(); begin_offset_ = 0; chain_[start_index_].clear(); ++start_index_; if (start_index_ > std::max<size_t>(4, chain_.size() / 2)) { trim_chain_front(); } } IOVector::block_type IOVector::coalesce() && { // Destructive coalesce() may optimize for several cases when it doesn't need to allocate // new buffer, or even return one of the existing blocks as is. The only guarantee is that // after this call the IOVector is in some valid state. Nothing is guaranteed about the // specifics. if (size() == 0) { return {}; } if (begin_offset_ == chain_[start_index_].size() && chain_.size() == start_index_ + 2) { chain_length_ -= chain_.back().size(); auto res = std::move(chain_.back()); chain_.pop_back(); return res; } if (chain_.size() == start_index_ + 1) { chain_length_ -= chain_.back().size(); auto res = std::move(chain_.back()); chain_.pop_back(); if (begin_offset_ != 0) { memmove(res.data(), res.data() + begin_offset_, res.size() - begin_offset_); res.resize(res.size() - begin_offset_); begin_offset_ = 0; } return res; } if (auto& firstBuffer = chain_[start_index_]; firstBuffer.capacity() >= size()) { auto res = std::move(chain_[start_index_]); auto size = res.size(); chain_length_ -= size; if (begin_offset_ != 0) { memmove(res.data(), res.data() + begin_offset_, res.size() - begin_offset_); size -= begin_offset_; begin_offset_ = 0; } for (auto i = start_index_ + 1; i < chain_.size(); ++i) { memcpy(res.data() + size, chain_[i].data(), chain_[i].size()); size += chain_[i].size(); } res.resize(size); ++start_index_; return res; } return const_cast<const IOVector*>(this)->coalesce<>(); } std::vector<adb_iovec> IOVector::iovecs() const { std::vector<adb_iovec> result; result.reserve(chain_.size() - start_index_); iterate_blocks([&result](const char* data, size_t len) { adb_iovec iov; iov.iov_base = const_cast<char*>(data); iov.iov_len = len; result.emplace_back(iov); }); return result; } Loading
adb/Android.bp +1 −0 Original line number Diff line number Diff line Loading @@ -134,6 +134,7 @@ libadb_srcs = [ "transport_fd.cpp", "transport_local.cpp", "transport_usb.cpp", "types.cpp", ] libadb_posix_srcs = [ Loading
adb/daemon/usb.cpp +60 −48 Original line number Diff line number Diff line Loading @@ -117,14 +117,18 @@ struct TransferId { } }; template <class Payload> struct IoBlock { bool pending = false; struct iocb control = {}; std::shared_ptr<Block> payload; Payload payload; TransferId id() const { return TransferId::from_value(control.aio_data); } }; using IoReadBlock = IoBlock<Block>; using IoWriteBlock = IoBlock<std::shared_ptr<Block>>; struct ScopedAioContext { ScopedAioContext() = default; ~ScopedAioContext() { reset(); } Loading Loading @@ -208,16 +212,17 @@ struct UsbFfsConnection : public Connection { virtual bool Write(std::unique_ptr<apacket> packet) override final { LOG(DEBUG) << "USB write: " << dump_header(&packet->msg); Block header(sizeof(packet->msg)); memcpy(header.data(), &packet->msg, sizeof(packet->msg)); auto header = std::make_shared<Block>(sizeof(packet->msg)); memcpy(header->data(), &packet->msg, sizeof(packet->msg)); std::lock_guard<std::mutex> lock(write_mutex_); write_requests_.push_back(CreateWriteBlock(std::move(header), next_write_id_++)); write_requests_.push_back( CreateWriteBlock(std::move(header), 0, sizeof(packet->msg), next_write_id_++)); if (!packet->payload.empty()) { // The kernel attempts to allocate a contiguous block of memory for each write, // which can fail if the write is large and the kernel heap is fragmented. // Split large writes into smaller chunks to avoid this. std::shared_ptr<Block> payload = std::make_shared<Block>(std::move(packet->payload)); auto payload = std::make_shared<Block>(std::move(packet->payload)); size_t offset = 0; size_t len = payload->size(); Loading Loading @@ -464,16 +469,20 @@ struct UsbFfsConnection : public Connection { worker_thread_.join(); } void PrepareReadBlock(IoBlock* block, uint64_t id) { void PrepareReadBlock(IoReadBlock* block, uint64_t id) { block->pending = false; block->payload = std::make_shared<Block>(kUsbReadSize); if (block->payload.capacity() >= kUsbReadSize) { block->payload.resize(kUsbReadSize); } else { block->payload = Block(kUsbReadSize); } block->control.aio_data = static_cast<uint64_t>(TransferId::read(id)); block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload->data()); block->control.aio_nbytes = block->payload->size(); block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload.data()); block->control.aio_nbytes = block->payload.size(); } IoBlock CreateReadBlock(uint64_t id) { IoBlock block; IoReadBlock CreateReadBlock(uint64_t id) { IoReadBlock block; PrepareReadBlock(&block, id); block.control.aio_rw_flags = 0; block.control.aio_lio_opcode = IOCB_CMD_PREAD; Loading Loading @@ -518,9 +527,9 @@ struct UsbFfsConnection : public Connection { void HandleRead(TransferId id, int64_t size) { uint64_t read_idx = id.id % kUsbReadQueueDepth; IoBlock* block = &read_requests_[read_idx]; IoReadBlock* block = &read_requests_[read_idx]; block->pending = false; block->payload->resize(size); block->payload.resize(size); // Notification for completed reads can be received out of order. if (block->id().id != needed_read_id_) { Loading @@ -531,7 +540,7 @@ struct UsbFfsConnection : public Connection { for (uint64_t id = needed_read_id_;; ++id) { size_t read_idx = id % kUsbReadQueueDepth; IoBlock* current_block = &read_requests_[read_idx]; IoReadBlock* current_block = &read_requests_[read_idx]; if (current_block->pending) { break; } Loading @@ -540,19 +549,19 @@ struct UsbFfsConnection : public Connection { } } void ProcessRead(IoBlock* block) { if (!block->payload->empty()) { void ProcessRead(IoReadBlock* block) { if (!block->payload.empty()) { if (!incoming_header_.has_value()) { CHECK_EQ(sizeof(amessage), block->payload->size()); amessage msg; memcpy(&msg, block->payload->data(), sizeof(amessage)); CHECK_EQ(sizeof(amessage), block->payload.size()); amessage& msg = incoming_header_.emplace(); memcpy(&msg, block->payload.data(), sizeof(msg)); LOG(DEBUG) << "USB read:" << dump_header(&msg); incoming_header_ = msg; } else { size_t bytes_left = incoming_header_->data_length - incoming_payload_.size(); Block payload = std::move(*block->payload); Block payload = std::move(block->payload); CHECK_LE(payload.size(), bytes_left); incoming_payload_.append(std::make_unique<Block>(std::move(payload))); incoming_payload_.append(std::move(payload)); } if (incoming_header_->data_length == incoming_payload_.size()) { Loading @@ -560,11 +569,15 @@ struct UsbFfsConnection : public Connection { packet->msg = *incoming_header_; // TODO: Make apacket contain an IOVector so we don't have to coalesce. packet->payload = incoming_payload_.coalesce(); packet->payload = std::move(incoming_payload_).coalesce(); read_callback_(this, std::move(packet)); incoming_header_.reset(); incoming_payload_.clear(); // reuse the capacity of the incoming payload while we can. auto free_block = incoming_payload_.clear(); if (block->payload.capacity() == 0) { block->payload = std::move(free_block); } } } Loading @@ -572,7 +585,7 @@ struct UsbFfsConnection : public Connection { SubmitRead(block); } bool SubmitRead(IoBlock* block) { bool SubmitRead(IoReadBlock* block) { block->pending = true; struct iocb* iocb = &block->control; if (io_submit(aio_context_.get(), 1, &iocb) != 1) { Loading @@ -594,7 +607,7 @@ struct UsbFfsConnection : public Connection { std::lock_guard<std::mutex> lock(write_mutex_); auto it = std::find_if(write_requests_.begin(), write_requests_.end(), [id](const auto& req) { return static_cast<uint64_t>(req->id()) == static_cast<uint64_t>(id); return static_cast<uint64_t>(req.id()) == static_cast<uint64_t>(id); }); CHECK(it != write_requests_.end()); Loading @@ -605,27 +618,26 @@ struct UsbFfsConnection : public Connection { SubmitWrites(); } std::unique_ptr<IoBlock> CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset, size_t len, uint64_t id) { auto block = std::make_unique<IoBlock>(); block->payload = std::move(payload); block->control.aio_data = static_cast<uint64_t>(TransferId::write(id)); block->control.aio_rw_flags = 0; block->control.aio_lio_opcode = IOCB_CMD_PWRITE; block->control.aio_reqprio = 0; block->control.aio_fildes = write_fd_.get(); block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload->data() + offset); block->control.aio_nbytes = len; block->control.aio_offset = 0; block->control.aio_flags = IOCB_FLAG_RESFD; block->control.aio_resfd = worker_event_fd_.get(); IoWriteBlock CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset, size_t len, uint64_t id) { auto block = IoWriteBlock(); block.payload = std::move(payload); block.control.aio_data = static_cast<uint64_t>(TransferId::write(id)); block.control.aio_rw_flags = 0; block.control.aio_lio_opcode = IOCB_CMD_PWRITE; block.control.aio_reqprio = 0; block.control.aio_fildes = write_fd_.get(); block.control.aio_buf = reinterpret_cast<uintptr_t>(block.payload->data() + offset); block.control.aio_nbytes = len; block.control.aio_offset = 0; block.control.aio_flags = IOCB_FLAG_RESFD; block.control.aio_resfd = worker_event_fd_.get(); return block; } std::unique_ptr<IoBlock> CreateWriteBlock(Block payload, uint64_t id) { std::shared_ptr<Block> block = std::make_shared<Block>(std::move(payload)); size_t len = block->size(); return CreateWriteBlock(std::move(block), 0, len, id); IoWriteBlock CreateWriteBlock(Block&& payload, uint64_t id) { size_t len = payload.size(); return CreateWriteBlock(std::make_shared<Block>(std::move(payload)), 0, len, id); } void SubmitWrites() REQUIRES(write_mutex_) { Loading @@ -642,9 +654,9 @@ struct UsbFfsConnection : public Connection { struct iocb* iocbs[kUsbWriteQueueDepth]; for (int i = 0; i < writes_to_submit; ++i) { CHECK(!write_requests_[writes_submitted_ + i]->pending); write_requests_[writes_submitted_ + i]->pending = true; iocbs[i] = &write_requests_[writes_submitted_ + i]->control; CHECK(!write_requests_[writes_submitted_ + i].pending); write_requests_[writes_submitted_ + i].pending = true; iocbs[i] = &write_requests_[writes_submitted_ + i].control; LOG(VERBOSE) << "submitting write_request " << static_cast<void*>(iocbs[i]); } Loading Loading @@ -689,7 +701,7 @@ struct UsbFfsConnection : public Connection { std::optional<amessage> incoming_header_; IOVector incoming_payload_; std::array<IoBlock, kUsbReadQueueDepth> read_requests_; std::array<IoReadBlock, kUsbReadQueueDepth> read_requests_; IOVector read_data_; // ID of the next request that we're going to send out. Loading @@ -699,7 +711,7 @@ struct UsbFfsConnection : public Connection { size_t needed_read_id_ = 0; std::mutex write_mutex_; std::deque<std::unique_ptr<IoBlock>> write_requests_ GUARDED_BY(write_mutex_); std::deque<IoWriteBlock> write_requests_ GUARDED_BY(write_mutex_); size_t next_write_id_ GUARDED_BY(write_mutex_) = 0; size_t writes_submitted_ GUARDED_BY(write_mutex_) = 0; Loading
adb/sockets.cpp +1 −2 Original line number Diff line number Diff line Loading @@ -125,8 +125,7 @@ static SocketFlushResult local_socket_flush_incoming(asocket* s) { if (rc > 0 && static_cast<size_t>(rc) == s->packet_queue.size()) { s->packet_queue.clear(); } else if (rc > 0) { // TODO: Implement a faster drop_front? s->packet_queue.take_front(rc); s->packet_queue.drop_front(rc); fdevent_add(s->fde, FDE_WRITE); return SocketFlushResult::TryAgain; } else if (rc == -1 && errno == EAGAIN) { Loading
adb/transport_fd.cpp +7 −8 Original line number Diff line number Diff line Loading @@ -93,8 +93,8 @@ struct NonblockingFdConnection : public Connection { if (pfds[0].revents & POLLIN) { // TODO: Should we be getting blocks from a free list? auto block = std::make_unique<IOVector::block_type>(MAX_PAYLOAD); rc = adb_read(fd_.get(), &(*block)[0], block->size()); auto block = IOVector::block_type(MAX_PAYLOAD); rc = adb_read(fd_.get(), &block[0], block.size()); if (rc == -1) { *error = std::string("read failed: ") + strerror(errno); return; Loading @@ -102,7 +102,7 @@ struct NonblockingFdConnection : public Connection { *error = "read failed: EOF"; return; } block->resize(rc); block.resize(rc); read_buffer_.append(std::move(block)); if (!read_header_ && read_buffer_.size() >= sizeof(amessage)) { Loading @@ -116,7 +116,7 @@ struct NonblockingFdConnection : public Connection { auto data_chain = read_buffer_.take_front(read_header_->data_length); // TODO: Make apacket carry around a IOVector instead of coalescing. auto payload = data_chain.coalesce<apacket::payload_type>(); auto payload = std::move(data_chain).coalesce(); auto packet = std::make_unique<apacket>(); packet->msg = *read_header_; packet->payload = std::move(payload); Loading Loading @@ -184,8 +184,7 @@ struct NonblockingFdConnection : public Connection { return WriteResult::Error; } // TODO: Implement a more efficient drop_front? write_buffer_.take_front(rc); write_buffer_.drop_front(rc); writable_ = write_buffer_.empty(); if (write_buffer_.empty()) { return WriteResult::Completed; Loading @@ -199,10 +198,10 @@ struct NonblockingFdConnection : public Connection { std::lock_guard<std::mutex> lock(write_mutex_); const char* header_begin = reinterpret_cast<const char*>(&packet->msg); const char* header_end = header_begin + sizeof(packet->msg); auto header_block = std::make_unique<IOVector::block_type>(header_begin, header_end); auto header_block = IOVector::block_type(header_begin, header_end); write_buffer_.append(std::move(header_block)); if (!packet->payload.empty()) { write_buffer_.append(std::make_unique<IOVector::block_type>(std::move(packet->payload))); write_buffer_.append(std::move(packet->payload)); } WriteResult result = DispatchWrites(); Loading
adb/types.cpp 0 → 100644 +204 −0 Original line number Diff line number Diff line /* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "types.h" IOVector& IOVector::operator=(IOVector&& move) noexcept { chain_ = std::move(move.chain_); chain_length_ = move.chain_length_; begin_offset_ = move.begin_offset_; start_index_ = move.start_index_; move.clear(); return *this; } IOVector::block_type IOVector::clear() { chain_length_ = 0; begin_offset_ = 0; start_index_ = 0; block_type res; if (!chain_.empty()) { res = std::move(chain_.back()); } chain_.clear(); return res; } void IOVector::drop_front(IOVector::size_type len) { if (len == 0) { return; } if (len == size()) { clear(); return; } CHECK_LT(len, size()); auto dropped = 0u; while (dropped < len) { const auto next = chain_[start_index_].size() - begin_offset_; if (dropped + next < len) { pop_front_block(); dropped += next; } else { const auto taken = len - dropped; begin_offset_ += taken; break; } } } IOVector IOVector::take_front(IOVector::size_type len) { if (len == 0) { return {}; } if (len == size()) { return std::move(*this); } CHECK_GE(size(), len); IOVector res; // first iterate over the blocks that completely go into the other vector while (chain_[start_index_].size() - begin_offset_ <= len) { chain_length_ -= chain_[start_index_].size(); len -= chain_[start_index_].size() - begin_offset_; if (chain_[start_index_].size() > begin_offset_) { res.append(std::move(chain_[start_index_])); if (begin_offset_) { res.begin_offset_ = std::exchange(begin_offset_, 0); } } else { begin_offset_ = 0; } ++start_index_; } if (len > 0) { // what's left is a single buffer that needs to be split between the |res| and |this| // we know that it has to be split - there was a check for the case when it has to // go away as a whole. if (begin_offset_ != 0 || len < chain_[start_index_].size() / 2) { // let's memcpy the data out block_type block(chain_[start_index_].begin() + begin_offset_, chain_[start_index_].begin() + begin_offset_ + len); res.append(std::move(block)); begin_offset_ += len; } else { CHECK_EQ(begin_offset_, 0u); // move out the internal buffer out and copy only the tail of it back in block_type block(chain_[start_index_].begin() + len, chain_[start_index_].end()); chain_length_ -= chain_[start_index_].size(); chain_[start_index_].resize(len); res.append(std::move(chain_[start_index_])); chain_length_ += block.size(); chain_[start_index_] = std::move(block); } } return res; } void IOVector::trim_front() { if ((begin_offset_ == 0 && start_index_ == 0) || chain_.empty()) { return; } block_type& first_block = chain_[start_index_]; if (begin_offset_ == first_block.size()) { ++start_index_; } else { memmove(first_block.data(), first_block.data() + begin_offset_, first_block.size() - begin_offset_); first_block.resize(first_block.size() - begin_offset_); } chain_length_ -= begin_offset_; begin_offset_ = 0; trim_chain_front(); } void IOVector::trim_chain_front() { if (start_index_) { chain_.erase(chain_.begin(), chain_.begin() + start_index_); start_index_ = 0; } } void IOVector::pop_front_block() { chain_length_ -= chain_[start_index_].size(); begin_offset_ = 0; chain_[start_index_].clear(); ++start_index_; if (start_index_ > std::max<size_t>(4, chain_.size() / 2)) { trim_chain_front(); } } IOVector::block_type IOVector::coalesce() && { // Destructive coalesce() may optimize for several cases when it doesn't need to allocate // new buffer, or even return one of the existing blocks as is. The only guarantee is that // after this call the IOVector is in some valid state. Nothing is guaranteed about the // specifics. if (size() == 0) { return {}; } if (begin_offset_ == chain_[start_index_].size() && chain_.size() == start_index_ + 2) { chain_length_ -= chain_.back().size(); auto res = std::move(chain_.back()); chain_.pop_back(); return res; } if (chain_.size() == start_index_ + 1) { chain_length_ -= chain_.back().size(); auto res = std::move(chain_.back()); chain_.pop_back(); if (begin_offset_ != 0) { memmove(res.data(), res.data() + begin_offset_, res.size() - begin_offset_); res.resize(res.size() - begin_offset_); begin_offset_ = 0; } return res; } if (auto& firstBuffer = chain_[start_index_]; firstBuffer.capacity() >= size()) { auto res = std::move(chain_[start_index_]); auto size = res.size(); chain_length_ -= size; if (begin_offset_ != 0) { memmove(res.data(), res.data() + begin_offset_, res.size() - begin_offset_); size -= begin_offset_; begin_offset_ = 0; } for (auto i = start_index_ + 1; i < chain_.size(); ++i) { memcpy(res.data() + size, chain_[i].data(), chain_[i].size()); size += chain_[i].size(); } res.resize(size); ++start_index_; return res; } return const_cast<const IOVector*>(this)->coalesce<>(); } std::vector<adb_iovec> IOVector::iovecs() const { std::vector<adb_iovec> result; result.reserve(chain_.size() - start_index_); iterate_blocks([&result](const char* data, size_t len) { adb_iovec iov; iov.iov_base = const_cast<char*>(data); iov.iov_len = len; result.emplace_back(iov); }); return result; }