Loading logd/SerializedLogBuffer.cpp +44 −83 Original line number Diff line number Diff line Loading @@ -75,6 +75,10 @@ int SerializedLogBuffer::Log(log_id_t log_id, log_time realtime, uid_t uid, pid_ return -EINVAL; } if (len > LOGGER_ENTRY_MAX_PAYLOAD) { len = LOGGER_ENTRY_MAX_PAYLOAD; } if (!ShouldLog(log_id, msg, len)) { stats_->AddTotal(log_id, len); return -EACCES; Loading Loading @@ -135,33 +139,42 @@ void SerializedLogBuffer::NotifyReadersOfPrune( } } bool SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) { // Don't prune logs that are newer than the point at which any reader threads are reading from. LogReaderThread* oldest = nullptr; void SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) { auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()}; auto& log_buffer = logs_[log_id]; auto it = log_buffer.begin(); while (it != log_buffer.end()) { for (const auto& reader_thread : reader_list_->reader_threads()) { if (!reader_thread->IsWatching(log_id)) { continue; } if (!oldest || oldest->start() > reader_thread->start() || (oldest->start() == reader_thread->start() && reader_thread->deadline().time_since_epoch().count() != 0)) { oldest = reader_thread.get(); } if (reader_thread->deadline().time_since_epoch().count() != 0) { // Always wake up wrapped readers when pruning. 'Wrapped' readers are an // optimization that allows the reader to wait until logs starting at a specified // time stamp are about to be pruned. This is error-prone however, since if that // timestamp is about to be pruned, the reader is not likely to read the messages // fast enough to not back-up logd. Instead, we can achieve an nearly-as-efficient // but not error-prune batching effect by waking the reader whenever any chunk is // about to be pruned. reader_thread->triggerReader_Locked(); } auto& log_buffer = logs_[log_id]; auto it = log_buffer.begin(); while (it != log_buffer.end()) { if (oldest != nullptr && it->highest_sequence_number() >= oldest->start()) { break; // Some readers may be still reading from this log chunk, log a warning that they are // about to lose logs. // TODO: We should forcefully disconnect the reader instead, such that the reader itself // has an indication that they've lost logs. if (reader_thread->start() <= it->highest_sequence_number()) { LOG(WARNING) << "Skipping entries from slow reader, " << reader_thread->name() << ", from LogBuffer::Prune()"; } } // Increment ahead of time since we're going to erase this iterator from the list. auto it_to_prune = it++; // The sequence number check ensures that all readers have read all logs in this chunk, but // they may still hold a reference to the chunk to track their last read log_position. // Readers may have a reference to the chunk to track their last read log_position. // Notify them to delete the reference. NotifyReadersOfPrune(log_id, it_to_prune); Loading @@ -176,42 +189,11 @@ bool SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid RemoveChunkFromStats(log_id, *it_to_prune); log_buffer.erase(it_to_prune); if (buffer_size >= bytes_to_free) { return true; return; } bytes_to_free -= buffer_size; } } // If we've deleted all buffers without bytes_to_free hitting 0, then we're called by Clear() // and should return true. if (it == log_buffer.end()) { return true; } // Otherwise we are stuck due to a reader, so mitigate it. CHECK(oldest != nullptr); KickReader(oldest, log_id, bytes_to_free); return false; } // If the selected reader is blocking our pruning progress, decide on // what kind of mitigation is necessary to unblock the situation. void SerializedLogBuffer::KickReader(LogReaderThread* reader, log_id_t id, size_t bytes_to_free) { if (bytes_to_free >= max_size_[id]) { // +100% // A misbehaving or slow reader is dropped if we hit too much memory pressure. LOG(WARNING) << "Kicking blocked reader, " << reader->name() << ", from LogBuffer::kickMe()"; reader->release_Locked(); } else if (reader->deadline().time_since_epoch().count() != 0) { // Allow a blocked WRAP deadline reader to trigger and start reporting the log data. reader->triggerReader_Locked(); } else { // Tell slow reader to skip entries to catch up. unsigned long prune_rows = bytes_to_free / 300; LOG(WARNING) << "Skipping " << prune_rows << " entries from slow reader, " << reader->name() << ", from LogBuffer::kickMe()"; reader->triggerSkip_Locked(id, prune_rows); } } std::unique_ptr<FlushToState> SerializedLogBuffer::CreateFlushToState(uint64_t start, Loading Loading @@ -252,12 +234,18 @@ bool SerializedLogBuffer::FlushTo( } } // We copy the log entry such that we can flush it without the lock. We never block pruning // waiting for this Flush() to complete. constexpr size_t kMaxEntrySize = sizeof(*entry) + LOGGER_ENTRY_MAX_PAYLOAD + 1; unsigned char entry_copy[kMaxEntrySize] __attribute__((uninitialized)); CHECK_LT(entry->msg_len(), LOGGER_ENTRY_MAX_PAYLOAD + 1); memcpy(entry_copy, entry, sizeof(*entry) + entry->msg_len()); lock.unlock(); // We never prune logs equal to or newer than any LogReaderThreads' `start` value, so the // `entry` pointer is safe here without the lock if (!entry->Flush(writer, log_id)) { if (!reinterpret_cast<SerializedLogEntry*>(entry_copy)->Flush(writer, log_id)) { return false; } lock.lock(); } Loading @@ -266,39 +254,12 @@ bool SerializedLogBuffer::FlushTo( } bool SerializedLogBuffer::Clear(log_id_t id, uid_t uid) { // Try three times to clear, then disconnect the readers and try one final time. for (int retry = 0; retry < 3; ++retry) { { auto lock = std::lock_guard{lock_}; bool prune_success = Prune(id, ULONG_MAX, uid); if (prune_success) { Prune(id, ULONG_MAX, uid); // Clearing SerializedLogBuffer never waits for readers and therefore is always successful. return true; } } sleep(1); } // Check if it is still busy after the sleep, we try to prune one entry, not another clear run, // so we are looking for the quick side effect of the return value to tell us if we have a // _blocked_ reader. bool busy = false; { auto lock = std::lock_guard{lock_}; busy = !Prune(id, 1, uid); } // It is still busy, disconnect all readers. if (busy) { auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()}; for (const auto& reader_thread : reader_list_->reader_threads()) { if (reader_thread->IsWatching(id)) { LOG(WARNING) << "Kicking blocked reader, " << reader_thread->name() << ", from LogBuffer::clear()"; reader_thread->release_Locked(); } } } auto lock = std::lock_guard{lock_}; return Prune(id, ULONG_MAX, uid); } size_t SerializedLogBuffer::GetSizeUsed(log_id_t id) { size_t total_size = 0; Loading logd/SerializedLogBuffer.h +1 −3 Original line number Diff line number Diff line Loading @@ -55,9 +55,7 @@ class SerializedLogBuffer final : public LogBuffer { private: bool ShouldLog(log_id_t log_id, const char* msg, uint16_t len); void MaybePrune(log_id_t log_id) REQUIRES(lock_); bool Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) REQUIRES(lock_); void KickReader(LogReaderThread* reader, log_id_t id, size_t bytes_to_free) REQUIRES_SHARED(lock_); void Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) REQUIRES(lock_); void NotifyReadersOfPrune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& chunk) REQUIRES(reader_list_->reader_threads_lock()); void RemoveChunkFromStats(log_id_t log_id, SerializedLogChunk& chunk); Loading Loading
logd/SerializedLogBuffer.cpp +44 −83 Original line number Diff line number Diff line Loading @@ -75,6 +75,10 @@ int SerializedLogBuffer::Log(log_id_t log_id, log_time realtime, uid_t uid, pid_ return -EINVAL; } if (len > LOGGER_ENTRY_MAX_PAYLOAD) { len = LOGGER_ENTRY_MAX_PAYLOAD; } if (!ShouldLog(log_id, msg, len)) { stats_->AddTotal(log_id, len); return -EACCES; Loading Loading @@ -135,33 +139,42 @@ void SerializedLogBuffer::NotifyReadersOfPrune( } } bool SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) { // Don't prune logs that are newer than the point at which any reader threads are reading from. LogReaderThread* oldest = nullptr; void SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) { auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()}; auto& log_buffer = logs_[log_id]; auto it = log_buffer.begin(); while (it != log_buffer.end()) { for (const auto& reader_thread : reader_list_->reader_threads()) { if (!reader_thread->IsWatching(log_id)) { continue; } if (!oldest || oldest->start() > reader_thread->start() || (oldest->start() == reader_thread->start() && reader_thread->deadline().time_since_epoch().count() != 0)) { oldest = reader_thread.get(); } if (reader_thread->deadline().time_since_epoch().count() != 0) { // Always wake up wrapped readers when pruning. 'Wrapped' readers are an // optimization that allows the reader to wait until logs starting at a specified // time stamp are about to be pruned. This is error-prone however, since if that // timestamp is about to be pruned, the reader is not likely to read the messages // fast enough to not back-up logd. Instead, we can achieve an nearly-as-efficient // but not error-prune batching effect by waking the reader whenever any chunk is // about to be pruned. reader_thread->triggerReader_Locked(); } auto& log_buffer = logs_[log_id]; auto it = log_buffer.begin(); while (it != log_buffer.end()) { if (oldest != nullptr && it->highest_sequence_number() >= oldest->start()) { break; // Some readers may be still reading from this log chunk, log a warning that they are // about to lose logs. // TODO: We should forcefully disconnect the reader instead, such that the reader itself // has an indication that they've lost logs. if (reader_thread->start() <= it->highest_sequence_number()) { LOG(WARNING) << "Skipping entries from slow reader, " << reader_thread->name() << ", from LogBuffer::Prune()"; } } // Increment ahead of time since we're going to erase this iterator from the list. auto it_to_prune = it++; // The sequence number check ensures that all readers have read all logs in this chunk, but // they may still hold a reference to the chunk to track their last read log_position. // Readers may have a reference to the chunk to track their last read log_position. // Notify them to delete the reference. NotifyReadersOfPrune(log_id, it_to_prune); Loading @@ -176,42 +189,11 @@ bool SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid RemoveChunkFromStats(log_id, *it_to_prune); log_buffer.erase(it_to_prune); if (buffer_size >= bytes_to_free) { return true; return; } bytes_to_free -= buffer_size; } } // If we've deleted all buffers without bytes_to_free hitting 0, then we're called by Clear() // and should return true. if (it == log_buffer.end()) { return true; } // Otherwise we are stuck due to a reader, so mitigate it. CHECK(oldest != nullptr); KickReader(oldest, log_id, bytes_to_free); return false; } // If the selected reader is blocking our pruning progress, decide on // what kind of mitigation is necessary to unblock the situation. void SerializedLogBuffer::KickReader(LogReaderThread* reader, log_id_t id, size_t bytes_to_free) { if (bytes_to_free >= max_size_[id]) { // +100% // A misbehaving or slow reader is dropped if we hit too much memory pressure. LOG(WARNING) << "Kicking blocked reader, " << reader->name() << ", from LogBuffer::kickMe()"; reader->release_Locked(); } else if (reader->deadline().time_since_epoch().count() != 0) { // Allow a blocked WRAP deadline reader to trigger and start reporting the log data. reader->triggerReader_Locked(); } else { // Tell slow reader to skip entries to catch up. unsigned long prune_rows = bytes_to_free / 300; LOG(WARNING) << "Skipping " << prune_rows << " entries from slow reader, " << reader->name() << ", from LogBuffer::kickMe()"; reader->triggerSkip_Locked(id, prune_rows); } } std::unique_ptr<FlushToState> SerializedLogBuffer::CreateFlushToState(uint64_t start, Loading Loading @@ -252,12 +234,18 @@ bool SerializedLogBuffer::FlushTo( } } // We copy the log entry such that we can flush it without the lock. We never block pruning // waiting for this Flush() to complete. constexpr size_t kMaxEntrySize = sizeof(*entry) + LOGGER_ENTRY_MAX_PAYLOAD + 1; unsigned char entry_copy[kMaxEntrySize] __attribute__((uninitialized)); CHECK_LT(entry->msg_len(), LOGGER_ENTRY_MAX_PAYLOAD + 1); memcpy(entry_copy, entry, sizeof(*entry) + entry->msg_len()); lock.unlock(); // We never prune logs equal to or newer than any LogReaderThreads' `start` value, so the // `entry` pointer is safe here without the lock if (!entry->Flush(writer, log_id)) { if (!reinterpret_cast<SerializedLogEntry*>(entry_copy)->Flush(writer, log_id)) { return false; } lock.lock(); } Loading @@ -266,39 +254,12 @@ bool SerializedLogBuffer::FlushTo( } bool SerializedLogBuffer::Clear(log_id_t id, uid_t uid) { // Try three times to clear, then disconnect the readers and try one final time. for (int retry = 0; retry < 3; ++retry) { { auto lock = std::lock_guard{lock_}; bool prune_success = Prune(id, ULONG_MAX, uid); if (prune_success) { Prune(id, ULONG_MAX, uid); // Clearing SerializedLogBuffer never waits for readers and therefore is always successful. return true; } } sleep(1); } // Check if it is still busy after the sleep, we try to prune one entry, not another clear run, // so we are looking for the quick side effect of the return value to tell us if we have a // _blocked_ reader. bool busy = false; { auto lock = std::lock_guard{lock_}; busy = !Prune(id, 1, uid); } // It is still busy, disconnect all readers. if (busy) { auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()}; for (const auto& reader_thread : reader_list_->reader_threads()) { if (reader_thread->IsWatching(id)) { LOG(WARNING) << "Kicking blocked reader, " << reader_thread->name() << ", from LogBuffer::clear()"; reader_thread->release_Locked(); } } } auto lock = std::lock_guard{lock_}; return Prune(id, ULONG_MAX, uid); } size_t SerializedLogBuffer::GetSizeUsed(log_id_t id) { size_t total_size = 0; Loading
logd/SerializedLogBuffer.h +1 −3 Original line number Diff line number Diff line Loading @@ -55,9 +55,7 @@ class SerializedLogBuffer final : public LogBuffer { private: bool ShouldLog(log_id_t log_id, const char* msg, uint16_t len); void MaybePrune(log_id_t log_id) REQUIRES(lock_); bool Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) REQUIRES(lock_); void KickReader(LogReaderThread* reader, log_id_t id, size_t bytes_to_free) REQUIRES_SHARED(lock_); void Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) REQUIRES(lock_); void NotifyReadersOfPrune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& chunk) REQUIRES(reader_list_->reader_threads_lock()); void RemoveChunkFromStats(log_id_t log_id, SerializedLogChunk& chunk); Loading