Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit eb2b5408 authored by Tom Cherry's avatar Tom Cherry Committed by Automerger Merge Worker
Browse files

Merge "logd: remove min heap in SerializedFlushToState" am: b92ac53a

Original change: https://android-review.googlesource.com/c/platform/system/core/+/1431987

Change-Id: I599fecb6e1f797bfa5409e3b767f6c0a14766cb9
parents 4d620fc3 b92ac53a
Loading
Loading
Loading
Loading
+33 −29
Original line number Original line Diff line number Diff line
@@ -16,6 +16,8 @@


#include "SerializedFlushToState.h"
#include "SerializedFlushToState.h"


#include <limits>

#include <android-base/logging.h>
#include <android-base/logging.h>


SerializedFlushToState::SerializedFlushToState(uint64_t start, LogMask log_mask)
SerializedFlushToState::SerializedFlushToState(uint64_t start, LogMask log_mask)
@@ -63,14 +65,13 @@ void SerializedFlushToState::CreateLogPosition(log_id_t log_id) {
    log_positions_[log_id].emplace(log_position);
    log_positions_[log_id].emplace(log_position);
}
}


void SerializedFlushToState::AddMinHeapEntry(log_id_t log_id) {
void SerializedFlushToState::UpdateLogsNeeded(log_id_t log_id) {
    auto& buffer_it = log_positions_[log_id]->buffer_it;
    auto& buffer_it = log_positions_[log_id]->buffer_it;
    auto read_offset = log_positions_[log_id]->read_offset;
    auto read_offset = log_positions_[log_id]->read_offset;


    // If there is another log to read in this buffer, add it to the min heap.
    // If there is another log to read in this buffer, let it be read.
    if (read_offset < buffer_it->write_offset()) {
    if (read_offset < buffer_it->write_offset()) {
        auto* entry = buffer_it->log_entry(read_offset);
        logs_needed_from_next_position_[log_id] = false;
        min_heap_.emplace(log_id, entry);
    } else if (read_offset == buffer_it->write_offset()) {
    } else if (read_offset == buffer_it->write_offset()) {
        // If there are no more logs to read in this buffer and it's the last buffer, then
        // If there are no more logs to read in this buffer and it's the last buffer, then
        // set logs_needed_from_next_position_ to wait until more logs get logged.
        // set logs_needed_from_next_position_ to wait until more logs get logged.
@@ -85,8 +86,7 @@ void SerializedFlushToState::AddMinHeapEntry(log_id_t log_id) {
            if (buffer_it->write_offset() == 0) {
            if (buffer_it->write_offset() == 0) {
                logs_needed_from_next_position_[log_id] = true;
                logs_needed_from_next_position_[log_id] = true;
            } else {
            } else {
                auto* entry = buffer_it->log_entry(0);
                logs_needed_from_next_position_[log_id] = false;
                min_heap_.emplace(log_id, entry);
            }
            }
        }
        }
    } else {
    } else {
@@ -106,24 +106,41 @@ void SerializedFlushToState::CheckForNewLogs() {
            }
            }
            CreateLogPosition(i);
            CreateLogPosition(i);
        }
        }
        logs_needed_from_next_position_[i] = false;
        UpdateLogsNeeded(i);
        // If it wasn't possible to insert, logs_needed_from_next_position will be set back to true.
        AddMinHeapEntry(i);
    }
    }
}
}


MinHeapElement SerializedFlushToState::PopNextUnreadLog() {
bool SerializedFlushToState::HasUnreadLogs() {
    auto top = min_heap_.top();
    CheckForNewLogs();
    min_heap_.pop();
    log_id_for_each(i) {
        if (log_positions_[i] && !logs_needed_from_next_position_[i]) {
            return true;
        }
    }
    return false;
}


    auto* entry = top.entry;
LogWithId SerializedFlushToState::PopNextUnreadLog() {
    auto log_id = top.log_id;
    uint64_t min_sequence = std::numeric_limits<uint64_t>::max();
    log_id_t log_id;
    const SerializedLogEntry* entry = nullptr;
    log_id_for_each(i) {
        if (!log_positions_[i] || logs_needed_from_next_position_[i]) {
            continue;
        }
        if (log_positions_[i]->log_entry()->sequence() < min_sequence) {
            log_id = i;
            entry = log_positions_[i]->log_entry();
            min_sequence = entry->sequence();
        }
    }
    CHECK_NE(nullptr, entry);


    log_positions_[log_id]->read_offset += entry->total_len();
    log_positions_[log_id]->read_offset += entry->total_len();


    logs_needed_from_next_position_[log_id] = true;
    logs_needed_from_next_position_[log_id] = true;


    return top;
    return {log_id, entry};
}
}


void SerializedFlushToState::Prune(log_id_t log_id,
void SerializedFlushToState::Prune(log_id_t log_id,
@@ -133,25 +150,12 @@ void SerializedFlushToState::Prune(log_id_t log_id,
        return;
        return;
    }
    }


    // // Decrease the ref count since we're deleting our reference.
    // Decrease the ref count since we're deleting our reference.
    buffer_it->DecReaderRefCount();
    buffer_it->DecReaderRefCount();


    // Delete in the reference.
    // Delete in the reference.
    log_positions_[log_id].reset();
    log_positions_[log_id].reset();


    // Remove the MinHeapElement referencing log_id, if it exists, but retain the others.
    std::vector<MinHeapElement> old_elements;
    while (!min_heap_.empty()) {
        auto& element = min_heap_.top();
        if (element.log_id != log_id) {
            old_elements.emplace_back(element);
        }
        min_heap_.pop();
    }
    for (auto&& element : old_elements) {
        min_heap_.emplace(element);
    }

    // Finally set logs_needed_from_next_position_, so CheckForNewLogs() will re-create the
    // Finally set logs_needed_from_next_position_, so CheckForNewLogs() will re-create the
    // log_position_ object during the next read.
    // log_position_ object during the next read.
    logs_needed_from_next_position_[log_id] = true;
    logs_needed_from_next_position_[log_id] = true;
+14 −26
Original line number Original line Diff line number Diff line
@@ -27,26 +27,19 @@
struct LogPosition {
struct LogPosition {
    std::list<SerializedLogChunk>::iterator buffer_it;
    std::list<SerializedLogChunk>::iterator buffer_it;
    int read_offset;
    int read_offset;

    const SerializedLogEntry* log_entry() const { return buffer_it->log_entry(read_offset); }
};
};


struct MinHeapElement {
struct LogWithId {
    MinHeapElement(log_id_t log_id, const SerializedLogEntry* entry)
        : log_id(log_id), entry(entry) {}
    log_id_t log_id;
    log_id_t log_id;
    const SerializedLogEntry* entry;
    const SerializedLogEntry* entry;
    // The change of comparison operators is intentional, std::priority_queue uses operator<() to
    // compare but creates a max heap.  Since we want a min heap, we return the opposite result.
    bool operator<(const MinHeapElement& rhs) const {
        return entry->sequence() > rhs.entry->sequence();
    }
};
};


// This class tracks the specific point where a FlushTo client has read through the logs.  It
// This class tracks the specific point where a FlushTo client has read through the logs.  It
// directly references the std::list<> iterators from the parent SerializedLogBuffer and the offset
// directly references the std::list<> iterators from the parent SerializedLogBuffer and the offset
// into each log chunk where it has last read.  All interactions with this class, except for its
// into each log chunk where it has last read.  All interactions with this class, except for its
// construction, must be done with SerializedLogBuffer::lock_ held.  No log chunks that it
// construction, must be done with SerializedLogBuffer::lock_ held.
// references may be pruned, which is handled by ensuring prune does not touch any log chunk with
// highest sequence number greater or equal to start().
class SerializedFlushToState : public FlushToState {
class SerializedFlushToState : public FlushToState {
  public:
  public:
    // Initializes this state object.  For each log buffer set in log_mask, this sets
    // Initializes this state object.  For each log buffer set in log_mask, this sets
@@ -61,31 +54,29 @@ class SerializedFlushToState : public FlushToState {
        if (logs_ == nullptr) logs_ = logs;
        if (logs_ == nullptr) logs_ = logs;
    }
    }


    bool HasUnreadLogs() {
    // Updates the state of log_positions_ and logs_needed_from_next_position_ then returns true if
        CheckForNewLogs();
    // there are any unread logs, false otherwise.
        return !min_heap_.empty();
    bool HasUnreadLogs();
    }


    // Pops the next unread log from the min heap and sets logs_needed_from_next_position_ to
    // Returns the next unread log and sets logs_needed_from_next_position_ to indicate that we're
    // indicate that we're waiting for more logs from the associated log buffer.
    // waiting for more logs from the associated log buffer.
    MinHeapElement PopNextUnreadLog();
    LogWithId PopNextUnreadLog();


    // If the parent log buffer prunes logs, the reference that this class contains may become
    // If the parent log buffer prunes logs, the reference that this class contains may become
    // invalid, so this must be called first to drop the reference to buffer_it, if any.
    // invalid, so this must be called first to drop the reference to buffer_it, if any.
    void Prune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& buffer_it);
    void Prune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& buffer_it);


  private:
  private:
    // If there is a log in the serialized log buffer for `log_id` at the read_offset, add it to the
    // Set logs_needed_from_next_position_[i] to indicate if log_positions_[i] points to an unread
    // min heap for reading, otherwise set logs_needed_from_next_position_ to indicate that we're
    // log or to the point at which the next log will appear.
    // waiting for the next log.
    void UpdateLogsNeeded(log_id_t log_id);
    void AddMinHeapEntry(log_id_t log_id);


    // Create a LogPosition object for the given log_id by searching through the log chunks for the
    // Create a LogPosition object for the given log_id by searching through the log chunks for the
    // first chunk and then first log entry within that chunk that is greater or equal to start().
    // first chunk and then first log entry within that chunk that is greater or equal to start().
    void CreateLogPosition(log_id_t log_id);
    void CreateLogPosition(log_id_t log_id);


    // Checks to see if any log buffers set in logs_needed_from_next_position_ have new logs and
    // Checks to see if any log buffers set in logs_needed_from_next_position_ have new logs and
    // calls AddMinHeapEntry() if so.
    // calls UpdateLogsNeeded() if so.
    void CheckForNewLogs();
    void CheckForNewLogs();


    std::list<SerializedLogChunk>* logs_ = nullptr;
    std::list<SerializedLogChunk>* logs_ = nullptr;
@@ -97,7 +88,4 @@ class SerializedFlushToState : public FlushToState {
    // next_log_position == logs_write_position_)`.  These will be re-checked in each
    // next_log_position == logs_write_position_)`.  These will be re-checked in each
    // loop in case new logs came in.
    // loop in case new logs came in.
    std::bitset<LOG_ID_MAX> logs_needed_from_next_position_ = {};
    std::bitset<LOG_ID_MAX> logs_needed_from_next_position_ = {};
    // A min heap that has up to one entry per log buffer, sorted by sequence number, of the next
    // element that this reader should read.
    std::priority_queue<MinHeapElement> min_heap_;
};
};
+18 −1
Original line number Original line Diff line number Diff line
@@ -288,3 +288,20 @@ TEST_F(SerializedFlushToStateTest, no_dangling_references) {


    EXPECT_FALSE(state.HasUnreadLogs());
    EXPECT_FALSE(state.HasUnreadLogs());
}
}

TEST(SerializedFlushToState, Prune) {
    auto chunk = SerializedLogChunk{kChunkSize};
    chunk.Log(1, log_time(), 0, 1, 1, "abc", 3);
    chunk.Log(2, log_time(), 0, 1, 1, "abc", 3);
    chunk.Log(3, log_time(), 0, 1, 1, "abc", 3);
    chunk.FinishWriting();

    std::list<SerializedLogChunk> log_chunks[LOG_ID_MAX];
    log_chunks[LOG_ID_MAIN].emplace_back(std::move(chunk));

    auto state = SerializedFlushToState{1, kLogMaskAll};
    state.InitializeLogs(log_chunks);
    ASSERT_TRUE(state.HasUnreadLogs());

    state.Prune(LOG_ID_MAIN, log_chunks[LOG_ID_MAIN].begin());
}
+1 −1
Original line number Original line Diff line number Diff line
@@ -211,7 +211,7 @@ bool SerializedLogBuffer::FlushTo(
    state.InitializeLogs(logs_);
    state.InitializeLogs(logs_);


    while (state.HasUnreadLogs()) {
    while (state.HasUnreadLogs()) {
        MinHeapElement top = state.PopNextUnreadLog();
        LogWithId top = state.PopNextUnreadLog();
        auto* entry = top.entry;
        auto* entry = top.entry;
        auto log_id = top.log_id;
        auto log_id = top.log_id;


+3 −0
Original line number Original line Diff line number Diff line
@@ -18,6 +18,8 @@


#include <sys/types.h>
#include <sys/types.h>


#include <android-base/logging.h>

#include "LogWriter.h"
#include "LogWriter.h"
#include "SerializedData.h"
#include "SerializedData.h"
#include "SerializedLogEntry.h"
#include "SerializedLogEntry.h"
@@ -55,6 +57,7 @@ class SerializedLogChunk {
    }
    }


    const SerializedLogEntry* log_entry(int offset) const {
    const SerializedLogEntry* log_entry(int offset) const {
        CHECK(writer_active_ || reader_ref_count_ > 0);
        return reinterpret_cast<const SerializedLogEntry*>(data() + offset);
        return reinterpret_cast<const SerializedLogEntry*>(data() + offset);
    }
    }
    const uint8_t* data() const { return contents_.data(); }
    const uint8_t* data() const { return contents_.data(); }