Loading logd/SerializedFlushToState.cpp +33 −29 Original line number Diff line number Diff line Loading @@ -16,6 +16,8 @@ #include "SerializedFlushToState.h" #include <limits> #include <android-base/logging.h> SerializedFlushToState::SerializedFlushToState(uint64_t start, LogMask log_mask) Loading Loading @@ -63,14 +65,13 @@ void SerializedFlushToState::CreateLogPosition(log_id_t log_id) { log_positions_[log_id].emplace(log_position); } void SerializedFlushToState::AddMinHeapEntry(log_id_t log_id) { void SerializedFlushToState::UpdateLogsNeeded(log_id_t log_id) { auto& buffer_it = log_positions_[log_id]->buffer_it; auto read_offset = log_positions_[log_id]->read_offset; // If there is another log to read in this buffer, add it to the min heap. // If there is another log to read in this buffer, let it be read. if (read_offset < buffer_it->write_offset()) { auto* entry = buffer_it->log_entry(read_offset); min_heap_.emplace(log_id, entry); logs_needed_from_next_position_[log_id] = false; } else if (read_offset == buffer_it->write_offset()) { // If there are no more logs to read in this buffer and it's the last buffer, then // set logs_needed_from_next_position_ to wait until more logs get logged. Loading @@ -85,8 +86,7 @@ void SerializedFlushToState::AddMinHeapEntry(log_id_t log_id) { if (buffer_it->write_offset() == 0) { logs_needed_from_next_position_[log_id] = true; } else { auto* entry = buffer_it->log_entry(0); min_heap_.emplace(log_id, entry); logs_needed_from_next_position_[log_id] = false; } } } else { Loading @@ -106,24 +106,41 @@ void SerializedFlushToState::CheckForNewLogs() { } CreateLogPosition(i); } logs_needed_from_next_position_[i] = false; // If it wasn't possible to insert, logs_needed_from_next_position will be set back to true. AddMinHeapEntry(i); UpdateLogsNeeded(i); } } MinHeapElement SerializedFlushToState::PopNextUnreadLog() { auto top = min_heap_.top(); min_heap_.pop(); bool SerializedFlushToState::HasUnreadLogs() { CheckForNewLogs(); log_id_for_each(i) { if (log_positions_[i] && !logs_needed_from_next_position_[i]) { return true; } } return false; } auto* entry = top.entry; auto log_id = top.log_id; LogWithId SerializedFlushToState::PopNextUnreadLog() { uint64_t min_sequence = std::numeric_limits<uint64_t>::max(); log_id_t log_id; const SerializedLogEntry* entry = nullptr; log_id_for_each(i) { if (!log_positions_[i] || logs_needed_from_next_position_[i]) { continue; } if (log_positions_[i]->log_entry()->sequence() < min_sequence) { log_id = i; entry = log_positions_[i]->log_entry(); min_sequence = entry->sequence(); } } CHECK_NE(nullptr, entry); log_positions_[log_id]->read_offset += entry->total_len(); logs_needed_from_next_position_[log_id] = true; return top; return {log_id, entry}; } void SerializedFlushToState::Prune(log_id_t log_id, Loading @@ -133,25 +150,12 @@ void SerializedFlushToState::Prune(log_id_t log_id, return; } // // Decrease the ref count since we're deleting our reference. // Decrease the ref count since we're deleting our reference. buffer_it->DecReaderRefCount(); // Delete in the reference. log_positions_[log_id].reset(); // Remove the MinHeapElement referencing log_id, if it exists, but retain the others. std::vector<MinHeapElement> old_elements; while (!min_heap_.empty()) { auto& element = min_heap_.top(); if (element.log_id != log_id) { old_elements.emplace_back(element); } min_heap_.pop(); } for (auto&& element : old_elements) { min_heap_.emplace(element); } // Finally set logs_needed_from_next_position_, so CheckForNewLogs() will re-create the // log_position_ object during the next read. logs_needed_from_next_position_[log_id] = true; Loading logd/SerializedFlushToState.h +14 −26 Original line number Diff line number Diff line Loading @@ -27,26 +27,19 @@ struct LogPosition { std::list<SerializedLogChunk>::iterator buffer_it; int read_offset; const SerializedLogEntry* log_entry() const { return buffer_it->log_entry(read_offset); } }; struct MinHeapElement { MinHeapElement(log_id_t log_id, const SerializedLogEntry* entry) : log_id(log_id), entry(entry) {} struct LogWithId { log_id_t log_id; const SerializedLogEntry* entry; // The change of comparison operators is intentional, std::priority_queue uses operator<() to // compare but creates a max heap. Since we want a min heap, we return the opposite result. bool operator<(const MinHeapElement& rhs) const { return entry->sequence() > rhs.entry->sequence(); } }; // This class tracks the specific point where a FlushTo client has read through the logs. It // directly references the std::list<> iterators from the parent SerializedLogBuffer and the offset // into each log chunk where it has last read. All interactions with this class, except for its // construction, must be done with SerializedLogBuffer::lock_ held. No log chunks that it // references may be pruned, which is handled by ensuring prune does not touch any log chunk with // highest sequence number greater or equal to start(). // construction, must be done with SerializedLogBuffer::lock_ held. class SerializedFlushToState : public FlushToState { public: // Initializes this state object. For each log buffer set in log_mask, this sets Loading @@ -61,31 +54,29 @@ class SerializedFlushToState : public FlushToState { if (logs_ == nullptr) logs_ = logs; } bool HasUnreadLogs() { CheckForNewLogs(); return !min_heap_.empty(); } // Updates the state of log_positions_ and logs_needed_from_next_position_ then returns true if // there are any unread logs, false otherwise. bool HasUnreadLogs(); // Pops the next unread log from the min heap and sets logs_needed_from_next_position_ to // indicate that we're waiting for more logs from the associated log buffer. MinHeapElement PopNextUnreadLog(); // Returns the next unread log and sets logs_needed_from_next_position_ to indicate that we're // waiting for more logs from the associated log buffer. LogWithId PopNextUnreadLog(); // If the parent log buffer prunes logs, the reference that this class contains may become // invalid, so this must be called first to drop the reference to buffer_it, if any. void Prune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& buffer_it); private: // If there is a log in the serialized log buffer for `log_id` at the read_offset, add it to the // min heap for reading, otherwise set logs_needed_from_next_position_ to indicate that we're // waiting for the next log. void AddMinHeapEntry(log_id_t log_id); // Set logs_needed_from_next_position_[i] to indicate if log_positions_[i] points to an unread // log or to the point at which the next log will appear. void UpdateLogsNeeded(log_id_t log_id); // Create a LogPosition object for the given log_id by searching through the log chunks for the // first chunk and then first log entry within that chunk that is greater or equal to start(). void CreateLogPosition(log_id_t log_id); // Checks to see if any log buffers set in logs_needed_from_next_position_ have new logs and // calls AddMinHeapEntry() if so. // calls UpdateLogsNeeded() if so. void CheckForNewLogs(); std::list<SerializedLogChunk>* logs_ = nullptr; Loading @@ -97,7 +88,4 @@ class SerializedFlushToState : public FlushToState { // next_log_position == logs_write_position_)`. These will be re-checked in each // loop in case new logs came in. std::bitset<LOG_ID_MAX> logs_needed_from_next_position_ = {}; // A min heap that has up to one entry per log buffer, sorted by sequence number, of the next // element that this reader should read. std::priority_queue<MinHeapElement> min_heap_; }; logd/SerializedFlushToStateTest.cpp +18 −1 Original line number Diff line number Diff line Loading @@ -288,3 +288,20 @@ TEST_F(SerializedFlushToStateTest, no_dangling_references) { EXPECT_FALSE(state.HasUnreadLogs()); } TEST(SerializedFlushToState, Prune) { auto chunk = SerializedLogChunk{kChunkSize}; chunk.Log(1, log_time(), 0, 1, 1, "abc", 3); chunk.Log(2, log_time(), 0, 1, 1, "abc", 3); chunk.Log(3, log_time(), 0, 1, 1, "abc", 3); chunk.FinishWriting(); std::list<SerializedLogChunk> log_chunks[LOG_ID_MAX]; log_chunks[LOG_ID_MAIN].emplace_back(std::move(chunk)); auto state = SerializedFlushToState{1, kLogMaskAll}; state.InitializeLogs(log_chunks); ASSERT_TRUE(state.HasUnreadLogs()); state.Prune(LOG_ID_MAIN, log_chunks[LOG_ID_MAIN].begin()); } logd/SerializedLogBuffer.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -211,7 +211,7 @@ bool SerializedLogBuffer::FlushTo( state.InitializeLogs(logs_); while (state.HasUnreadLogs()) { MinHeapElement top = state.PopNextUnreadLog(); LogWithId top = state.PopNextUnreadLog(); auto* entry = top.entry; auto log_id = top.log_id; Loading logd/SerializedLogChunk.h +3 −0 Original line number Diff line number Diff line Loading @@ -18,6 +18,8 @@ #include <sys/types.h> #include <android-base/logging.h> #include "LogWriter.h" #include "SerializedData.h" #include "SerializedLogEntry.h" Loading Loading @@ -55,6 +57,7 @@ class SerializedLogChunk { } const SerializedLogEntry* log_entry(int offset) const { CHECK(writer_active_ || reader_ref_count_ > 0); return reinterpret_cast<const SerializedLogEntry*>(data() + offset); } const uint8_t* data() const { return contents_.data(); } Loading Loading
logd/SerializedFlushToState.cpp +33 −29 Original line number Diff line number Diff line Loading @@ -16,6 +16,8 @@ #include "SerializedFlushToState.h" #include <limits> #include <android-base/logging.h> SerializedFlushToState::SerializedFlushToState(uint64_t start, LogMask log_mask) Loading Loading @@ -63,14 +65,13 @@ void SerializedFlushToState::CreateLogPosition(log_id_t log_id) { log_positions_[log_id].emplace(log_position); } void SerializedFlushToState::AddMinHeapEntry(log_id_t log_id) { void SerializedFlushToState::UpdateLogsNeeded(log_id_t log_id) { auto& buffer_it = log_positions_[log_id]->buffer_it; auto read_offset = log_positions_[log_id]->read_offset; // If there is another log to read in this buffer, add it to the min heap. // If there is another log to read in this buffer, let it be read. if (read_offset < buffer_it->write_offset()) { auto* entry = buffer_it->log_entry(read_offset); min_heap_.emplace(log_id, entry); logs_needed_from_next_position_[log_id] = false; } else if (read_offset == buffer_it->write_offset()) { // If there are no more logs to read in this buffer and it's the last buffer, then // set logs_needed_from_next_position_ to wait until more logs get logged. Loading @@ -85,8 +86,7 @@ void SerializedFlushToState::AddMinHeapEntry(log_id_t log_id) { if (buffer_it->write_offset() == 0) { logs_needed_from_next_position_[log_id] = true; } else { auto* entry = buffer_it->log_entry(0); min_heap_.emplace(log_id, entry); logs_needed_from_next_position_[log_id] = false; } } } else { Loading @@ -106,24 +106,41 @@ void SerializedFlushToState::CheckForNewLogs() { } CreateLogPosition(i); } logs_needed_from_next_position_[i] = false; // If it wasn't possible to insert, logs_needed_from_next_position will be set back to true. AddMinHeapEntry(i); UpdateLogsNeeded(i); } } MinHeapElement SerializedFlushToState::PopNextUnreadLog() { auto top = min_heap_.top(); min_heap_.pop(); bool SerializedFlushToState::HasUnreadLogs() { CheckForNewLogs(); log_id_for_each(i) { if (log_positions_[i] && !logs_needed_from_next_position_[i]) { return true; } } return false; } auto* entry = top.entry; auto log_id = top.log_id; LogWithId SerializedFlushToState::PopNextUnreadLog() { uint64_t min_sequence = std::numeric_limits<uint64_t>::max(); log_id_t log_id; const SerializedLogEntry* entry = nullptr; log_id_for_each(i) { if (!log_positions_[i] || logs_needed_from_next_position_[i]) { continue; } if (log_positions_[i]->log_entry()->sequence() < min_sequence) { log_id = i; entry = log_positions_[i]->log_entry(); min_sequence = entry->sequence(); } } CHECK_NE(nullptr, entry); log_positions_[log_id]->read_offset += entry->total_len(); logs_needed_from_next_position_[log_id] = true; return top; return {log_id, entry}; } void SerializedFlushToState::Prune(log_id_t log_id, Loading @@ -133,25 +150,12 @@ void SerializedFlushToState::Prune(log_id_t log_id, return; } // // Decrease the ref count since we're deleting our reference. // Decrease the ref count since we're deleting our reference. buffer_it->DecReaderRefCount(); // Delete in the reference. log_positions_[log_id].reset(); // Remove the MinHeapElement referencing log_id, if it exists, but retain the others. std::vector<MinHeapElement> old_elements; while (!min_heap_.empty()) { auto& element = min_heap_.top(); if (element.log_id != log_id) { old_elements.emplace_back(element); } min_heap_.pop(); } for (auto&& element : old_elements) { min_heap_.emplace(element); } // Finally set logs_needed_from_next_position_, so CheckForNewLogs() will re-create the // log_position_ object during the next read. logs_needed_from_next_position_[log_id] = true; Loading
logd/SerializedFlushToState.h +14 −26 Original line number Diff line number Diff line Loading @@ -27,26 +27,19 @@ struct LogPosition { std::list<SerializedLogChunk>::iterator buffer_it; int read_offset; const SerializedLogEntry* log_entry() const { return buffer_it->log_entry(read_offset); } }; struct MinHeapElement { MinHeapElement(log_id_t log_id, const SerializedLogEntry* entry) : log_id(log_id), entry(entry) {} struct LogWithId { log_id_t log_id; const SerializedLogEntry* entry; // The change of comparison operators is intentional, std::priority_queue uses operator<() to // compare but creates a max heap. Since we want a min heap, we return the opposite result. bool operator<(const MinHeapElement& rhs) const { return entry->sequence() > rhs.entry->sequence(); } }; // This class tracks the specific point where a FlushTo client has read through the logs. It // directly references the std::list<> iterators from the parent SerializedLogBuffer and the offset // into each log chunk where it has last read. All interactions with this class, except for its // construction, must be done with SerializedLogBuffer::lock_ held. No log chunks that it // references may be pruned, which is handled by ensuring prune does not touch any log chunk with // highest sequence number greater or equal to start(). // construction, must be done with SerializedLogBuffer::lock_ held. class SerializedFlushToState : public FlushToState { public: // Initializes this state object. For each log buffer set in log_mask, this sets Loading @@ -61,31 +54,29 @@ class SerializedFlushToState : public FlushToState { if (logs_ == nullptr) logs_ = logs; } bool HasUnreadLogs() { CheckForNewLogs(); return !min_heap_.empty(); } // Updates the state of log_positions_ and logs_needed_from_next_position_ then returns true if // there are any unread logs, false otherwise. bool HasUnreadLogs(); // Pops the next unread log from the min heap and sets logs_needed_from_next_position_ to // indicate that we're waiting for more logs from the associated log buffer. MinHeapElement PopNextUnreadLog(); // Returns the next unread log and sets logs_needed_from_next_position_ to indicate that we're // waiting for more logs from the associated log buffer. LogWithId PopNextUnreadLog(); // If the parent log buffer prunes logs, the reference that this class contains may become // invalid, so this must be called first to drop the reference to buffer_it, if any. void Prune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& buffer_it); private: // If there is a log in the serialized log buffer for `log_id` at the read_offset, add it to the // min heap for reading, otherwise set logs_needed_from_next_position_ to indicate that we're // waiting for the next log. void AddMinHeapEntry(log_id_t log_id); // Set logs_needed_from_next_position_[i] to indicate if log_positions_[i] points to an unread // log or to the point at which the next log will appear. void UpdateLogsNeeded(log_id_t log_id); // Create a LogPosition object for the given log_id by searching through the log chunks for the // first chunk and then first log entry within that chunk that is greater or equal to start(). void CreateLogPosition(log_id_t log_id); // Checks to see if any log buffers set in logs_needed_from_next_position_ have new logs and // calls AddMinHeapEntry() if so. // calls UpdateLogsNeeded() if so. void CheckForNewLogs(); std::list<SerializedLogChunk>* logs_ = nullptr; Loading @@ -97,7 +88,4 @@ class SerializedFlushToState : public FlushToState { // next_log_position == logs_write_position_)`. These will be re-checked in each // loop in case new logs came in. std::bitset<LOG_ID_MAX> logs_needed_from_next_position_ = {}; // A min heap that has up to one entry per log buffer, sorted by sequence number, of the next // element that this reader should read. std::priority_queue<MinHeapElement> min_heap_; };
logd/SerializedFlushToStateTest.cpp +18 −1 Original line number Diff line number Diff line Loading @@ -288,3 +288,20 @@ TEST_F(SerializedFlushToStateTest, no_dangling_references) { EXPECT_FALSE(state.HasUnreadLogs()); } TEST(SerializedFlushToState, Prune) { auto chunk = SerializedLogChunk{kChunkSize}; chunk.Log(1, log_time(), 0, 1, 1, "abc", 3); chunk.Log(2, log_time(), 0, 1, 1, "abc", 3); chunk.Log(3, log_time(), 0, 1, 1, "abc", 3); chunk.FinishWriting(); std::list<SerializedLogChunk> log_chunks[LOG_ID_MAX]; log_chunks[LOG_ID_MAIN].emplace_back(std::move(chunk)); auto state = SerializedFlushToState{1, kLogMaskAll}; state.InitializeLogs(log_chunks); ASSERT_TRUE(state.HasUnreadLogs()); state.Prune(LOG_ID_MAIN, log_chunks[LOG_ID_MAIN].begin()); }
logd/SerializedLogBuffer.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -211,7 +211,7 @@ bool SerializedLogBuffer::FlushTo( state.InitializeLogs(logs_); while (state.HasUnreadLogs()) { MinHeapElement top = state.PopNextUnreadLog(); LogWithId top = state.PopNextUnreadLog(); auto* entry = top.entry; auto log_id = top.log_id; Loading
logd/SerializedLogChunk.h +3 −0 Original line number Diff line number Diff line Loading @@ -18,6 +18,8 @@ #include <sys/types.h> #include <android-base/logging.h> #include "LogWriter.h" #include "SerializedData.h" #include "SerializedLogEntry.h" Loading Loading @@ -55,6 +57,7 @@ class SerializedLogChunk { } const SerializedLogEntry* log_entry(int offset) const { CHECK(writer_active_ || reader_ref_count_ > 0); return reinterpret_cast<const SerializedLogEntry*>(data() + offset); } const uint8_t* data() const { return contents_.data(); } Loading