Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 65c065b4 authored by Tao Bao's avatar Tao Bao Committed by Gerrit Code Review
Browse files

Merge "updater: Move RangeSinkWrite into RangeSinkState."

parents 58cb690e 60a70afc
Loading
Loading
Loading
Loading
+146 −164
Original line number Diff line number Diff line
@@ -232,119 +232,129 @@ static void allocate(size_t size, std::vector<uint8_t>& buffer) {
    buffer.resize(size);
}

struct RangeSinkState {
    explicit RangeSinkState(RangeSet& rs) : tgt(rs) { };

    int fd;
    const RangeSet& tgt;
    size_t p_block;
    size_t p_remain;
/**
 * RangeSinkWriter reads data from the given FD, and writes them to the destination specified by the
 * given RangeSet.
 */
class RangeSinkWriter {
 public:
  RangeSinkWriter(int fd, const RangeSet& tgt)
      : fd_(fd), tgt_(tgt), next_range_(0), current_range_left_(0) {
    CHECK_NE(tgt.count, static_cast<size_t>(0));
  };

static size_t RangeSinkWrite(const uint8_t* data, size_t size, RangeSinkState* rss) {
  if (rss->p_remain == 0) {
    LOG(ERROR) << "range sink write overrun";
  bool Finished() const {
    return next_range_ == tgt_.count && current_range_left_ == 0;
  }

  size_t Write(const uint8_t* data, size_t size) {
    if (Finished()) {
      LOG(ERROR) << "range sink write overrun; can't write " << size << " bytes";
      return 0;
    }

    size_t written = 0;
    while (size > 0) {
    size_t write_now = size;

    if (rss->p_remain < write_now) {
      write_now = rss->p_remain;
      // Move to the next range as needed.
      if (current_range_left_ == 0) {
        if (next_range_ < tgt_.count) {
          off64_t offset = static_cast<off64_t>(tgt_.pos[next_range_ * 2]) * BLOCKSIZE;
          current_range_left_ =
              (tgt_.pos[next_range_ * 2 + 1] - tgt_.pos[next_range_ * 2]) * BLOCKSIZE;
          next_range_++;
          if (!discard_blocks(fd_, offset, current_range_left_)) {
            break;
          }

    if (write_all(rss->fd, data, write_now) == -1) {
          if (!check_lseek(fd_, offset, SEEK_SET)) {
            break;
          }

    data += write_now;
    size -= write_now;

    rss->p_remain -= write_now;
    written += write_now;

    if (rss->p_remain == 0) {
      // Move to the next block.
      ++rss->p_block;
      if (rss->p_block < rss->tgt.count) {
        rss->p_remain =
            (rss->tgt.pos[rss->p_block * 2 + 1] - rss->tgt.pos[rss->p_block * 2]) * BLOCKSIZE;

        off64_t offset = static_cast<off64_t>(rss->tgt.pos[rss->p_block * 2]) * BLOCKSIZE;
        if (!discard_blocks(rss->fd, offset, rss->p_remain)) {
        } else {
          // We can't write any more; return how many bytes have been written so far.
          break;
        }
      }

        if (!check_lseek(rss->fd, offset, SEEK_SET)) {
          break;
      size_t write_now = size;
      if (current_range_left_ < write_now) {
        write_now = current_range_left_;
      }

      } else {
        // We can't write any more; return how many bytes have been written so far.
      if (write_all(fd_, data, write_now) == -1) {
        break;
      }
    }

      data += write_now;
      size -= write_now;

      current_range_left_ -= write_now;
      written += write_now;
    }

    return written;
  }

// All of the data for all the 'new' transfers is contained in one
// file in the update package, concatenated together in the order in
// which transfers.list will need it.  We want to stream it out of the
// archive (it's compressed) without writing it to a temp file, but we
// can't write each section until it's that transfer's turn to go.
//
// To achieve this, we expand the new data from the archive in a
// background thread, and block that threads 'receive uncompressed
// data' function until the main thread has reached a point where we
// want some new data to be written.  We signal the background thread
// with the destination for the data and block the main thread,
// waiting for the background thread to complete writing that section.
// Then it signals the main thread to wake up and goes back to
// blocking waiting for a transfer.
//
// NewThreadInfo is the struct used to pass information back and forth
// between the two threads.  When the main thread wants some data
// written, it sets rss to the destination location and signals the
// condition.  When the background thread is done writing, it clears
// rss and signals the condition again.
 private:
  // The input data.
  int fd_;
  // The destination for the data.
  const RangeSet& tgt_;
  // The next range that we should write to.
  size_t next_range_;
  // The number of bytes to write before moving to the next range.
  size_t current_range_left_;
};

/**
 * All of the data for all the 'new' transfers is contained in one file in the update package,
 * concatenated together in the order in which transfers.list will need it. We want to stream it out
 * of the archive (it's compressed) without writing it to a temp file, but we can't write each
 * section until it's that transfer's turn to go.
 *
 * To achieve this, we expand the new data from the archive in a background thread, and block that
 * threads 'receive uncompressed data' function until the main thread has reached a point where we
 * want some new data to be written. We signal the background thread with the destination for the
 * data and block the main thread, waiting for the background thread to complete writing that
 * section. Then it signals the main thread to wake up and goes back to blocking waiting for a
 * transfer.
 *
 * NewThreadInfo is the struct used to pass information back and forth between the two threads. When
 * the main thread wants some data written, it sets writer to the destination location and signals
 * the condition. When the background thread is done writing, it clears writer and signals the
 * condition again.
 */
struct NewThreadInfo {
  ZipArchiveHandle za;
  ZipEntry entry;

    RangeSinkState* rss;
  RangeSinkWriter* writer;

  pthread_mutex_t mu;
  pthread_cond_t cv;
};

static bool receive_new_data(const uint8_t* data, size_t size, void* cookie) {
    NewThreadInfo* nti = reinterpret_cast<NewThreadInfo*>(cookie);
  NewThreadInfo* nti = static_cast<NewThreadInfo*>(cookie);

  while (size > 0) {
        // Wait for nti->rss to be non-null, indicating some of this
        // data is wanted.
    // Wait for nti->writer to be non-null, indicating some of this data is wanted.
    pthread_mutex_lock(&nti->mu);
        while (nti->rss == nullptr) {
    while (nti->writer == nullptr) {
      pthread_cond_wait(&nti->cv, &nti->mu);
    }
    pthread_mutex_unlock(&nti->mu);

        // At this point nti->rss is set, and we own it.  The main
        // thread is waiting for it to disappear from nti.
        size_t written = RangeSinkWrite(data, size, nti->rss);
    // At this point nti->writer is set, and we own it. The main thread is waiting for it to
    // disappear from nti.
    size_t written = nti->writer->Write(data, size);
    data += written;
    size -= written;

        if (nti->rss->p_block == nti->rss->tgt.count) {
            // we have written all the bytes desired by this rss.
    if (nti->writer->Finished()) {
      // We have written all the bytes desired by this writer.

      pthread_mutex_lock(&nti->mu);
            nti->rss = nullptr;
      nti->writer = nullptr;
      pthread_cond_broadcast(&nti->cv);
      pthread_mutex_unlock(&nti->mu);
    }
@@ -381,9 +391,7 @@ static int ReadBlocks(const RangeSet& src, std::vector<uint8_t>& buffer, int fd)
}

static int WriteBlocks(const RangeSet& tgt, const std::vector<uint8_t>& buffer, int fd) {
    const uint8_t* data = buffer.data();

    size_t p = 0;
  size_t written = 0;
  for (size_t i = 0; i < tgt.count; ++i) {
    off64_t offset = static_cast<off64_t>(tgt.pos[i * 2]) * BLOCKSIZE;
    size_t size = (tgt.pos[i * 2 + 1] - tgt.pos[i * 2]) * BLOCKSIZE;
@@ -395,11 +403,11 @@ static int WriteBlocks(const RangeSet& tgt, const std::vector<uint8_t>& buffer,
      return -1;
    }

        if (write_all(fd, data + p, size) == -1) {
    if (write_all(fd, buffer.data() + written, size) == -1) {
      return -1;
    }

        p += size;
    written += size;
  }

  return 0;
@@ -1215,7 +1223,6 @@ static int PerformCommandZero(CommandParameters& params) {
}

static int PerformCommandNew(CommandParameters& params) {

  if (params.cpos >= params.tokens.size()) {
    LOG(ERROR) << "missing target blocks for new";
    return -1;
@@ -1226,25 +1233,12 @@ static int PerformCommandNew(CommandParameters& params) {
  if (params.canwrite) {
    LOG(INFO) << " writing " << tgt.size << " blocks of new data";

        RangeSinkState rss(tgt);
        rss.fd = params.fd;
        rss.p_block = 0;
        rss.p_remain = (tgt.pos[1] - tgt.pos[0]) * BLOCKSIZE;

        off64_t offset = static_cast<off64_t>(tgt.pos[0]) * BLOCKSIZE;
        if (!discard_blocks(params.fd, offset, tgt.size * BLOCKSIZE)) {
            return -1;
        }

        if (!check_lseek(params.fd, offset, SEEK_SET)) {
            return -1;
        }

    RangeSinkWriter writer(params.fd, tgt);
    pthread_mutex_lock(&params.nti.mu);
        params.nti.rss = &rss;
    params.nti.writer = &writer;
    pthread_cond_broadcast(&params.nti.cv);

        while (params.nti.rss) {
    while (params.nti.writer != nullptr) {
      pthread_cond_wait(&params.nti.cv, &params.nti.mu);
    }

@@ -1296,32 +1290,20 @@ static int PerformCommandDiff(CommandParameters& params) {
      LOG(INFO) << "patching " << blocks << " blocks to " << tgt.size;
      Value patch_value(
          VAL_BLOB, std::string(reinterpret_cast<const char*>(params.patch_start + offset), len));
      RangeSinkState rss(tgt);
      rss.fd = params.fd;
      rss.p_block = 0;
      rss.p_remain = (tgt.pos[1] - tgt.pos[0]) * BLOCKSIZE;

      off64_t offset = static_cast<off64_t>(tgt.pos[0]) * BLOCKSIZE;
      if (!discard_blocks(params.fd, offset, rss.p_remain)) {
        return -1;
      }

      if (!check_lseek(params.fd, offset, SEEK_SET)) {
        return -1;
      }

      RangeSinkWriter writer(params.fd, tgt);
      if (params.cmdname[0] == 'i') {  // imgdiff
        if (ApplyImagePatch(
                params.buffer.data(), blocks * BLOCKSIZE, &patch_value,
                std::bind(&RangeSinkWrite, std::placeholders::_1, std::placeholders::_2, &rss),
        if (ApplyImagePatch(params.buffer.data(), blocks * BLOCKSIZE, &patch_value,
                            std::bind(&RangeSinkWriter::Write, &writer, std::placeholders::_1,
                                      std::placeholders::_2),
                            nullptr, nullptr) != 0) {
          LOG(ERROR) << "Failed to apply image patch.";
          return -1;
        }
      } else {
        if (ApplyBSDiffPatch(
                params.buffer.data(), blocks * BLOCKSIZE, &patch_value, 0,
                std::bind(&RangeSinkWrite, std::placeholders::_1, std::placeholders::_2, &rss),
        if (ApplyBSDiffPatch(params.buffer.data(), blocks * BLOCKSIZE, &patch_value, 0,
                             std::bind(&RangeSinkWriter::Write, &writer, std::placeholders::_1,
                                       std::placeholders::_2),
                             nullptr) != 0) {
          LOG(ERROR) << "Failed to apply bsdiff patch.";
          return -1;
@@ -1329,7 +1311,7 @@ static int PerformCommandDiff(CommandParameters& params) {
      }

      // We expect the output of the patcher to fill the tgt ranges exactly.
      if (rss.p_block != tgt.count || rss.p_remain != 0) {
      if (!writer.Finished()) {
        LOG(ERROR) << "range sink underrun?";
      }
    } else {