Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ec44d35f authored by Josh Gao's avatar Josh Gao
Browse files

adb: implement LZ4 compression.

Add support for LZ4 compression, which compresses and decompresses far
more quickly than brotli, at the cost of worse compression ratio.

`adb sync -d system` speeds (in MB/s) on aosp_blueline-eng:

           none    brotli    lz4
USB 3.0     120       110    190
USB 2.0      38        75     63

Bug: https://issuetracker.google.com/150827486
Test: python3 -m unittest test_device.FileOperationsTest{Uncompressed,Brotli,LZ4}
Change-Id: Ibef6ac15a76b4e5dcd02d7fb9433cbb1c02b8382
parent 21657178
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -470,6 +470,7 @@ cc_library {
        "libadbd_core",
        "libbrotli",
        "libdiagnose_usb",
        "liblz4",
    ],

    shared_libs: [
@@ -571,6 +572,7 @@ cc_library {
        "libbrotli",
        "libcutils_sockets",
        "libdiagnose_usb",
        "liblz4",
        "libmdnssd",
    ],

@@ -605,6 +607,7 @@ cc_binary {
        "libadbd_services",
        "libasyncio",
        "libcap",
        "liblz4",
        "libminijail",
        "libssl",
    ],
+2 −0
Original line number Diff line number Diff line
@@ -1331,6 +1331,8 @@ static CompressionType parse_compression_type(const std::string& str, bool allow

    if (str == "brotli") {
        return CompressionType::Brotli;
    } else if (str == "lz4") {
        return CompressionType::LZ4;
    }

    error_exit("unexpected compression type %s", str.c_str());
+25 −5
Original line number Diff line number Diff line
@@ -237,6 +237,7 @@ class SyncConnection {
            have_ls_v2_ = CanUseFeature(features_, kFeatureLs2);
            have_sendrecv_v2_ = CanUseFeature(features_, kFeatureSendRecv2);
            have_sendrecv_v2_brotli_ = CanUseFeature(features_, kFeatureSendRecv2Brotli);
            have_sendrecv_v2_lz4_ = CanUseFeature(features_, kFeatureSendRecv2LZ4);
            fd.reset(adb_connect("sync:", &error));
            if (fd < 0) {
                Error("connect failed: %s", error.c_str());
@@ -262,12 +263,15 @@ class SyncConnection {

    bool HaveSendRecv2() const { return have_sendrecv_v2_; }
    bool HaveSendRecv2Brotli() const { return have_sendrecv_v2_brotli_; }
    bool HaveSendRecv2LZ4() const { return have_sendrecv_v2_lz4_; }

    // Resolve a compression type which might be CompressionType::Any to a specific compression
    // algorithm.
    CompressionType ResolveCompressionType(CompressionType compression) const {
        if (compression == CompressionType::Any) {
            if (HaveSendRecv2Brotli()) {
            if (HaveSendRecv2LZ4()) {
                return CompressionType::LZ4;
            } else if (HaveSendRecv2Brotli()) {
                return CompressionType::Brotli;
            }
            return CompressionType::None;
@@ -361,6 +365,10 @@ class SyncConnection {
                msg.send_v2_setup.flags = kSyncFlagBrotli;
                break;

            case CompressionType::LZ4:
                msg.send_v2_setup.flags = kSyncFlagLZ4;
                break;

            case CompressionType::Any:
                LOG(FATAL) << "unexpected CompressionType::Any";
        }
@@ -400,6 +408,10 @@ class SyncConnection {
                msg.recv_v2_setup.flags |= kSyncFlagBrotli;
                break;

            case CompressionType::LZ4:
                msg.recv_v2_setup.flags |= kSyncFlagLZ4;
                break;

            case CompressionType::Any:
                LOG(FATAL) << "unexpected CompressionType::Any";
        }
@@ -599,7 +611,7 @@ class SyncConnection {
        syncsendbuf sbuf;
        sbuf.id = ID_DATA;

        std::variant<std::monostate, NullEncoder, BrotliEncoder> encoder_storage;
        std::variant<std::monostate, NullEncoder, BrotliEncoder, LZ4Encoder> encoder_storage;
        Encoder* encoder = nullptr;
        switch (compression) {
            case CompressionType::None:
@@ -610,6 +622,10 @@ class SyncConnection {
                encoder = &encoder_storage.emplace<BrotliEncoder>(SYNC_DATA_MAX);
                break;

            case CompressionType::LZ4:
                encoder = &encoder_storage.emplace<LZ4Encoder>(SYNC_DATA_MAX);
                break;

            case CompressionType::Any:
                LOG(FATAL) << "unexpected CompressionType::Any";
        }
@@ -891,6 +907,7 @@ class SyncConnection {
    bool have_ls_v2_;
    bool have_sendrecv_v2_;
    bool have_sendrecv_v2_brotli_;
    bool have_sendrecv_v2_lz4_;

    TransferLedger global_ledger_;
    TransferLedger current_ledger_;
@@ -1093,7 +1110,7 @@ static bool sync_recv_v2(SyncConnection& sc, const char* rpath, const char* lpat
    uint64_t bytes_copied = 0;

    Block buffer(SYNC_DATA_MAX);
    std::variant<std::monostate, NullDecoder, BrotliDecoder> decoder_storage;
    std::variant<std::monostate, NullDecoder, BrotliDecoder, LZ4Decoder> decoder_storage;
    Decoder* decoder = nullptr;

    std::span buffer_span(buffer.data(), buffer.size());
@@ -1106,6 +1123,10 @@ static bool sync_recv_v2(SyncConnection& sc, const char* rpath, const char* lpat
            decoder = &decoder_storage.emplace<BrotliDecoder>(buffer_span);
            break;

        case CompressionType::LZ4:
            decoder = &decoder_storage.emplace<LZ4Decoder>(buffer_span);
            break;

        case CompressionType::Any:
            LOG(FATAL) << "unexpected CompressionType::Any";
    }
@@ -1160,8 +1181,7 @@ static bool sync_recv_v2(SyncConnection& sc, const char* rpath, const char* lpat
            }

            bytes_copied += output.size();

            sc.RecordBytesTransferred(msg.data.size);
            sc.RecordBytesTransferred(output.size());
            sc.ReportProgress(name != nullptr ? name : rpath, bytes_copied, expected_size);

            if (result == DecodeResult::NeedInput) {
+152 −0
Original line number Diff line number Diff line
@@ -24,6 +24,7 @@

#include <brotli/decode.h>
#include <brotli/encode.h>
#include <lz4frame.h>

#include "types.h"

@@ -229,3 +230,154 @@ struct BrotliEncoder final : public Encoder {
    size_t output_bytes_left_;
    std::unique_ptr<BrotliEncoderState, void (*)(BrotliEncoderState*)> encoder_;
};

struct LZ4Decoder final : public Decoder {
    explicit LZ4Decoder(std::span<char> output_buffer)
        : Decoder(output_buffer), decoder_(nullptr, nullptr) {
        LZ4F_dctx* dctx;
        if (LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION) != 0) {
            LOG(FATAL) << "failed to initialize LZ4 decompression context";
        }
        decoder_ = std::unique_ptr<LZ4F_dctx, decltype(&LZ4F_freeDecompressionContext)>(
                dctx, LZ4F_freeDecompressionContext);
    }

    DecodeResult Decode(std::span<char>* output) final {
        size_t available_in = input_buffer_.front_size();
        const char* next_in = input_buffer_.front_data();

        size_t available_out = output_buffer_.size();
        char* next_out = output_buffer_.data();

        size_t rc = LZ4F_decompress(decoder_.get(), next_out, &available_out, next_in,
                                    &available_in, nullptr);
        if (LZ4F_isError(rc)) {
            LOG(ERROR) << "LZ4F_decompress failed: " << LZ4F_getErrorName(rc);
            return DecodeResult::Error;
        }

        input_buffer_.drop_front(available_in);

        if (rc == 0) {
            if (!input_buffer_.empty()) {
                LOG(ERROR) << "LZ4 stream hit end before reading all data";
                return DecodeResult::Error;
            }
            lz4_done_ = true;
        }

        *output = std::span<char>(output_buffer_.data(), available_out);

        if (finished_) {
            return input_buffer_.empty() && lz4_done_ ? DecodeResult::Done
                                                      : DecodeResult::MoreOutput;
        }

        return DecodeResult::NeedInput;
    }

  private:
    bool lz4_done_ = false;
    std::unique_ptr<LZ4F_dctx, LZ4F_errorCode_t (*)(LZ4F_dctx*)> decoder_;
};

struct LZ4Encoder final : public Encoder {
    explicit LZ4Encoder(size_t output_block_size)
        : Encoder(output_block_size), encoder_(nullptr, nullptr) {
        LZ4F_cctx* cctx;
        if (LZ4F_createCompressionContext(&cctx, LZ4F_VERSION) != 0) {
            LOG(FATAL) << "failed to initialize LZ4 compression context";
        }
        encoder_ = std::unique_ptr<LZ4F_cctx, decltype(&LZ4F_freeCompressionContext)>(
                cctx, LZ4F_freeCompressionContext);
        Block header(LZ4F_HEADER_SIZE_MAX);
        size_t rc = LZ4F_compressBegin(encoder_.get(), header.data(), header.size(), nullptr);
        if (LZ4F_isError(rc)) {
            LOG(FATAL) << "LZ4F_compressBegin failed: %s", LZ4F_getErrorName(rc);
        }
        header.resize(rc);
        output_buffer_.append(std::move(header));
    }

    // As an optimization, only emit a block if we have an entire output block ready, or we're done.
    bool OutputReady() const {
        return output_buffer_.size() >= output_block_size_ || lz4_finalized_;
    }

    // TODO: Switch the output type to IOVector to remove a copy?
    EncodeResult Encode(Block* output) final {
        size_t available_in = input_buffer_.front_size();
        const char* next_in = input_buffer_.front_data();

        // LZ4 makes no guarantees about being able to recover from trying to compress with an
        // insufficiently large output buffer. LZ4F_compressBound tells us how much buffer we
        // need to compress a given number of bytes, but the smallest value seems to be bigger
        // than SYNC_DATA_MAX, so we need to buffer ourselves.

        // Input size chosen to be a local maximum for LZ4F_compressBound (i.e. the block size).
        constexpr size_t max_input_size = 65536;
        const size_t encode_block_size = LZ4F_compressBound(max_input_size, nullptr);

        if (available_in != 0) {
            if (lz4_finalized_) {
                LOG(ERROR) << "LZ4Encoder received data after Finish?";
                return EncodeResult::Error;
            }

            available_in = std::min(available_in, max_input_size);

            Block encode_block(encode_block_size);
            size_t available_out = encode_block.capacity();
            char* next_out = encode_block.data();

            size_t rc = LZ4F_compressUpdate(encoder_.get(), next_out, available_out, next_in,
                                            available_in, nullptr);
            if (LZ4F_isError(rc)) {
                LOG(ERROR) << "LZ4F_compressUpdate failed: " << LZ4F_getErrorName(rc);
                return EncodeResult::Error;
            }

            input_buffer_.drop_front(available_in);

            available_out -= rc;
            next_out += rc;

            encode_block.resize(encode_block_size - available_out);
            output_buffer_.append(std::move(encode_block));
        }

        if (finished_ && !lz4_finalized_) {
            lz4_finalized_ = true;

            Block final_block(encode_block_size + 4);
            size_t rc = LZ4F_compressEnd(encoder_.get(), final_block.data(), final_block.size(),
                                         nullptr);
            if (LZ4F_isError(rc)) {
                LOG(ERROR) << "LZ4F_compressEnd failed: " << LZ4F_getErrorName(rc);
                return EncodeResult::Error;
            }

            final_block.resize(rc);
            output_buffer_.append(std::move(final_block));
        }

        if (OutputReady()) {
            size_t len = std::min(output_block_size_, output_buffer_.size());
            *output = output_buffer_.take_front(len).coalesce();
        } else {
            output->clear();
        }

        if (lz4_finalized_ && output_buffer_.empty()) {
            return EncodeResult::Done;
        } else if (OutputReady()) {
            return EncodeResult::MoreOutput;
        }
        return EncodeResult::NeedInput;
    }

  private:
    bool lz4_finalized_ = false;
    std::unique_ptr<LZ4F_cctx, LZ4F_errorCode_t (*)(LZ4F_cctx*)> encoder_;
    IOVector output_buffer_;
};
+28 −2
Original line number Diff line number Diff line
@@ -272,7 +272,7 @@ static bool handle_send_file_data(borrowed_fd s, unique_fd fd, uint32_t* timesta
    syncmsg msg;
    Block buffer(SYNC_DATA_MAX);
    std::span<char> buffer_span(buffer.data(), buffer.size());
    std::variant<std::monostate, NullDecoder, BrotliDecoder> decoder_storage;
    std::variant<std::monostate, NullDecoder, BrotliDecoder, LZ4Decoder> decoder_storage;
    Decoder* decoder = nullptr;

    switch (compression) {
@@ -284,6 +284,10 @@ static bool handle_send_file_data(borrowed_fd s, unique_fd fd, uint32_t* timesta
            decoder = &decoder_storage.emplace<BrotliDecoder>(buffer_span);
            break;

        case CompressionType::LZ4:
            decoder = &decoder_storage.emplace<LZ4Decoder>(buffer_span);
            break;

        case CompressionType::Any:
            LOG(FATAL) << "unexpected CompressionType::Any";
    }
@@ -569,6 +573,15 @@ static bool do_send_v2(int s, const std::string& path, std::vector<char>& buffer
        }
        compression = CompressionType::Brotli;
    }
    if (msg.send_v2_setup.flags & kSyncFlagLZ4) {
        msg.send_v2_setup.flags &= ~kSyncFlagLZ4;
        if (compression) {
            SendSyncFail(s, android::base::StringPrintf("multiple compression flags received: %d",
                                                        orig_flags));
            return false;
        }
        compression = CompressionType::LZ4;
    }

    if (msg.send_v2_setup.flags) {
        SendSyncFail(s, android::base::StringPrintf("unknown flags: %d", msg.send_v2_setup.flags));
@@ -598,7 +611,7 @@ static bool recv_impl(borrowed_fd s, const char* path, CompressionType compressi
    syncmsg msg;
    msg.data.id = ID_DATA;

    std::variant<std::monostate, NullEncoder, BrotliEncoder> encoder_storage;
    std::variant<std::monostate, NullEncoder, BrotliEncoder, LZ4Encoder> encoder_storage;
    Encoder* encoder;

    switch (compression) {
@@ -610,6 +623,10 @@ static bool recv_impl(borrowed_fd s, const char* path, CompressionType compressi
            encoder = &encoder_storage.emplace<BrotliEncoder>(SYNC_DATA_MAX);
            break;

        case CompressionType::LZ4:
            encoder = &encoder_storage.emplace<LZ4Encoder>(SYNC_DATA_MAX);
            break;

        case CompressionType::Any:
            LOG(FATAL) << "unexpected CompressionType::Any";
    }
@@ -688,6 +705,15 @@ static bool do_recv_v2(borrowed_fd s, const char* path, std::vector<char>& buffe
        }
        compression = CompressionType::Brotli;
    }
    if (msg.recv_v2_setup.flags & kSyncFlagLZ4) {
        msg.recv_v2_setup.flags &= ~kSyncFlagLZ4;
        if (compression) {
            SendSyncFail(s, android::base::StringPrintf("multiple compression flags received: %d",
                                                        orig_flags));
            return false;
        }
        compression = CompressionType::LZ4;
    }

    if (msg.recv_v2_setup.flags) {
        SendSyncFail(s, android::base::StringPrintf("unknown flags: %d", msg.recv_v2_setup.flags));
Loading