Loading logd/LogBuffer.h +8 −9 Original line number Original line Diff line number Diff line Loading @@ -21,13 +21,11 @@ #include <functional> #include <functional> #include <log/log.h> #include <log/log.h> #include <sysutils/SocketClient.h> #include <log/log_read.h> #include "LogBufferElement.h" #include "LogWriter.h" class LogWriter; enum class FilterResult { enum class FlushToResult { kSkip, kSkip, kStop, kStop, kWrite, kWrite, Loading @@ -45,10 +43,11 @@ class LogBuffer { // valid message was from the same source so we can differentiate chatty // valid message was from the same source so we can differentiate chatty // filter types (identical or expired) // filter types (identical or expired) static const uint64_t FLUSH_ERROR = 0; static const uint64_t FLUSH_ERROR = 0; virtual uint64_t FlushTo( virtual uint64_t FlushTo(LogWriter* writer, uint64_t start, LogWriter* writer, uint64_t start, pid_t* last_tid, // nullable pid_t* last_tid, // nullable const std::function<FlushToResult(const LogBufferElement* element)>& filter) = 0; const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, uint16_t dropped_count)>& filter) = 0; virtual bool Clear(log_id_t id, uid_t uid) = 0; virtual bool Clear(log_id_t id, uid_t uid) = 0; virtual unsigned long GetSize(log_id_t id) = 0; virtual unsigned long GetSize(log_id_t id) = 0; Loading logd/LogReader.cpp +15 −13 Original line number Original line Diff line number Diff line Loading @@ -171,27 +171,29 @@ bool LogReader::onDataAvailable(SocketClient* cli) { if (start != log_time::EPOCH) { if (start != log_time::EPOCH) { bool start_time_set = false; bool start_time_set = false; uint64_t last = sequence; uint64_t last = sequence; auto log_find_start = [pid, logMask, start, &sequence, &start_time_set, auto log_find_start = [pid, logMask, start, &sequence, &start_time_set, &last]( &last](const LogBufferElement* element) -> FlushToResult { log_id_t element_log_id, pid_t element_pid, if (pid && pid != element->getPid()) { uint64_t element_sequence, log_time element_realtime, return FlushToResult::kSkip; uint16_t) -> FilterResult { if (pid && pid != element_pid) { return FilterResult::kSkip; } } if ((logMask & (1 << element->getLogId())) == 0) { if ((logMask & (1 << element_log_id)) == 0) { return FlushToResult::kSkip; return FilterResult::kSkip; } } if (start == element->getRealTime()) { if (start == element_realtime) { sequence = element->getSequence(); sequence = element_sequence; start_time_set = true; start_time_set = true; return FlushToResult::kStop; return FilterResult::kStop; } else { } else { if (start < element->getRealTime()) { if (start < element_realtime) { sequence = last; sequence = last; start_time_set = true; start_time_set = true; return FlushToResult::kStop; return FilterResult::kStop; } } last = element->getSequence(); last = element_sequence; } } return FlushToResult::kSkip; return FilterResult::kSkip; }; }; log_buffer_->FlushTo(socket_log_writer.get(), sequence, nullptr, log_find_start); log_buffer_->FlushTo(socket_log_writer.get(), sequence, nullptr, log_find_start); Loading logd/LogReaderThread.cpp +38 −28 Original line number Original line Diff line number Diff line Loading @@ -75,13 +75,21 @@ void LogReaderThread::ThreadFunction() { if (tail_) { if (tail_) { log_buffer_->FlushTo(writer_.get(), start, nullptr, log_buffer_->FlushTo(writer_.get(), start, nullptr, std::bind(&LogReaderThread::FilterFirstPass, this, _1)); [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, uint16_t dropped_count) { return FilterFirstPass(log_id, pid, sequence, realtime, dropped_count); }); leading_dropped_ = leading_dropped_ = true; // TODO: Likely a bug, if leading_dropped_ was not true before calling true; // TODO: Likely a bug, if leading_dropped_ was not true before calling // flushTo(), then it should not be reset to true after. // flushTo(), then it should not be reset to true after. } } start = log_buffer_->FlushTo(writer_.get(), start, last_tid_, start = log_buffer_->FlushTo(writer_.get(), start, last_tid_, std::bind(&LogReaderThread::FilterSecondPass, this, _1)); [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, uint16_t dropped_count) { return FilterSecondPass(log_id, pid, sequence, realtime, dropped_count); }); // We only ignore entries before the original start time for the first flushTo(), if we // We only ignore entries before the original start time for the first flushTo(), if we // get entries after this first flush before the original start time, then the client // get entries after this first flush before the original start time, then the client Loading Loading @@ -123,65 +131,67 @@ void LogReaderThread::ThreadFunction() { } } // A first pass to count the number of elements // A first pass to count the number of elements FlushToResult LogReaderThread::FilterFirstPass(const LogBufferElement* element) { FilterResult LogReaderThread::FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, uint16_t dropped_count) { auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; if (leading_dropped_) { if (leading_dropped_) { if (element->getDropped()) { if (dropped_count) { return FlushToResult::kSkip; return FilterResult::kSkip; } } leading_dropped_ = false; leading_dropped_ = false; } } if (count_ == 0) { if (count_ == 0) { start_ = element->getSequence(); start_ = sequence; } } if ((!pid_ || pid_ == element->getPid()) && IsWatching(element->getLogId()) && if ((!pid_ || pid_ == pid) && IsWatching(log_id) && (start_time_ == log_time::EPOCH || start_time_ <= element->getRealTime())) { (start_time_ == log_time::EPOCH || start_time_ <= realtime)) { ++count_; ++count_; } } return FlushToResult::kSkip; return FilterResult::kSkip; } } // A second pass to send the selected elements // A second pass to send the selected elements FlushToResult LogReaderThread::FilterSecondPass(const LogBufferElement* element) { FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, uint16_t dropped_count) { auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; start_ = element->getSequence(); start_ = sequence; if (skip_ahead_[element->getLogId()]) { if (skip_ahead_[log_id]) { skip_ahead_[element->getLogId()]--; skip_ahead_[log_id]--; return FlushToResult::kSkip; return FilterResult::kSkip; } } if (leading_dropped_) { if (leading_dropped_) { if (element->getDropped()) { if (dropped_count) { return FlushToResult::kSkip; return FilterResult::kSkip; } } leading_dropped_ = false; leading_dropped_ = false; } } // Truncate to close race between first and second pass // Truncate to close race between first and second pass if (non_block_ && tail_ && index_ >= count_) { if (non_block_ && tail_ && index_ >= count_) { return FlushToResult::kStop; return FilterResult::kStop; } } if (!IsWatching(element->getLogId())) { if (!IsWatching(log_id)) { return FlushToResult::kSkip; return FilterResult::kSkip; } } if (pid_ && pid_ != element->getPid()) { if (pid_ && pid_ != pid) { return FlushToResult::kSkip; return FilterResult::kSkip; } } if (start_time_ != log_time::EPOCH && element->getRealTime() <= start_time_) { if (start_time_ != log_time::EPOCH && realtime <= start_time_) { return FlushToResult::kSkip; return FilterResult::kSkip; } } if (release_) { if (release_) { return FlushToResult::kStop; return FilterResult::kStop; } } if (!tail_) { if (!tail_) { Loading @@ -191,7 +201,7 @@ FlushToResult LogReaderThread::FilterSecondPass(const LogBufferElement* element) ++index_; ++index_; if (count_ > tail_ && index_ <= (count_ - tail_)) { if (count_ > tail_ && index_ <= (count_ - tail_)) { return FlushToResult::kSkip; return FilterResult::kSkip; } } if (!non_block_) { if (!non_block_) { Loading @@ -199,10 +209,10 @@ FlushToResult LogReaderThread::FilterSecondPass(const LogBufferElement* element) } } ok: ok: if (!skip_ahead_[element->getLogId()]) { if (!skip_ahead_[log_id]) { return FlushToResult::kWrite; return FilterResult::kWrite; } } return FlushToResult::kSkip; return FilterResult::kSkip; } } void LogReaderThread::cleanSkip_Locked(void) { void LogReaderThread::cleanSkip_Locked(void) { Loading logd/LogReaderThread.h +4 −3 Original line number Original line Diff line number Diff line Loading @@ -30,7 +30,6 @@ #include <sysutils/SocketClient.h> #include <sysutils/SocketClient.h> #include "LogBuffer.h" #include "LogBuffer.h" #include "LogBufferElement.h" #include "LogWriter.h" #include "LogWriter.h" class LogReaderList; class LogReaderList; Loading Loading @@ -63,8 +62,10 @@ class LogReaderThread { private: private: void ThreadFunction(); void ThreadFunction(); // flushTo filter callbacks // flushTo filter callbacks FlushToResult FilterFirstPass(const LogBufferElement* element); FilterResult FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, FlushToResult FilterSecondPass(const LogBufferElement* element); uint16_t dropped_count); FilterResult FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, uint16_t dropped_count); std::condition_variable thread_triggered_condition_; std::condition_variable thread_triggered_condition_; LogBuffer* log_buffer_; LogBuffer* log_buffer_; Loading logd/SimpleLogBuffer.cpp +6 −4 Original line number Original line Diff line number Diff line Loading @@ -112,7 +112,8 @@ void SimpleLogBuffer::LogInternal(LogBufferElement&& elem) { uint64_t SimpleLogBuffer::FlushTo( uint64_t SimpleLogBuffer::FlushTo( LogWriter* writer, uint64_t start, pid_t* last_tid, LogWriter* writer, uint64_t start, pid_t* last_tid, const std::function<FlushToResult(const LogBufferElement* element)>& filter) { const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, uint16_t dropped_count)>& filter) { auto shared_lock = SharedLock{lock_}; auto shared_lock = SharedLock{lock_}; std::list<LogBufferElement>::iterator it; std::list<LogBufferElement>::iterator it; Loading Loading @@ -146,11 +147,12 @@ uint64_t SimpleLogBuffer::FlushTo( } } if (filter) { if (filter) { FlushToResult ret = filter(&element); FilterResult ret = filter(element.getLogId(), element.getPid(), element.getSequence(), if (ret == FlushToResult::kSkip) { element.getRealTime(), element.getDropped()); if (ret == FilterResult::kSkip) { continue; continue; } } if (ret == FlushToResult::kStop) { if (ret == FilterResult::kStop) { break; break; } } } } Loading Loading
logd/LogBuffer.h +8 −9 Original line number Original line Diff line number Diff line Loading @@ -21,13 +21,11 @@ #include <functional> #include <functional> #include <log/log.h> #include <log/log.h> #include <sysutils/SocketClient.h> #include <log/log_read.h> #include "LogBufferElement.h" #include "LogWriter.h" class LogWriter; enum class FilterResult { enum class FlushToResult { kSkip, kSkip, kStop, kStop, kWrite, kWrite, Loading @@ -45,10 +43,11 @@ class LogBuffer { // valid message was from the same source so we can differentiate chatty // valid message was from the same source so we can differentiate chatty // filter types (identical or expired) // filter types (identical or expired) static const uint64_t FLUSH_ERROR = 0; static const uint64_t FLUSH_ERROR = 0; virtual uint64_t FlushTo( virtual uint64_t FlushTo(LogWriter* writer, uint64_t start, LogWriter* writer, uint64_t start, pid_t* last_tid, // nullable pid_t* last_tid, // nullable const std::function<FlushToResult(const LogBufferElement* element)>& filter) = 0; const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, uint16_t dropped_count)>& filter) = 0; virtual bool Clear(log_id_t id, uid_t uid) = 0; virtual bool Clear(log_id_t id, uid_t uid) = 0; virtual unsigned long GetSize(log_id_t id) = 0; virtual unsigned long GetSize(log_id_t id) = 0; Loading
logd/LogReader.cpp +15 −13 Original line number Original line Diff line number Diff line Loading @@ -171,27 +171,29 @@ bool LogReader::onDataAvailable(SocketClient* cli) { if (start != log_time::EPOCH) { if (start != log_time::EPOCH) { bool start_time_set = false; bool start_time_set = false; uint64_t last = sequence; uint64_t last = sequence; auto log_find_start = [pid, logMask, start, &sequence, &start_time_set, auto log_find_start = [pid, logMask, start, &sequence, &start_time_set, &last]( &last](const LogBufferElement* element) -> FlushToResult { log_id_t element_log_id, pid_t element_pid, if (pid && pid != element->getPid()) { uint64_t element_sequence, log_time element_realtime, return FlushToResult::kSkip; uint16_t) -> FilterResult { if (pid && pid != element_pid) { return FilterResult::kSkip; } } if ((logMask & (1 << element->getLogId())) == 0) { if ((logMask & (1 << element_log_id)) == 0) { return FlushToResult::kSkip; return FilterResult::kSkip; } } if (start == element->getRealTime()) { if (start == element_realtime) { sequence = element->getSequence(); sequence = element_sequence; start_time_set = true; start_time_set = true; return FlushToResult::kStop; return FilterResult::kStop; } else { } else { if (start < element->getRealTime()) { if (start < element_realtime) { sequence = last; sequence = last; start_time_set = true; start_time_set = true; return FlushToResult::kStop; return FilterResult::kStop; } } last = element->getSequence(); last = element_sequence; } } return FlushToResult::kSkip; return FilterResult::kSkip; }; }; log_buffer_->FlushTo(socket_log_writer.get(), sequence, nullptr, log_find_start); log_buffer_->FlushTo(socket_log_writer.get(), sequence, nullptr, log_find_start); Loading
logd/LogReaderThread.cpp +38 −28 Original line number Original line Diff line number Diff line Loading @@ -75,13 +75,21 @@ void LogReaderThread::ThreadFunction() { if (tail_) { if (tail_) { log_buffer_->FlushTo(writer_.get(), start, nullptr, log_buffer_->FlushTo(writer_.get(), start, nullptr, std::bind(&LogReaderThread::FilterFirstPass, this, _1)); [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, uint16_t dropped_count) { return FilterFirstPass(log_id, pid, sequence, realtime, dropped_count); }); leading_dropped_ = leading_dropped_ = true; // TODO: Likely a bug, if leading_dropped_ was not true before calling true; // TODO: Likely a bug, if leading_dropped_ was not true before calling // flushTo(), then it should not be reset to true after. // flushTo(), then it should not be reset to true after. } } start = log_buffer_->FlushTo(writer_.get(), start, last_tid_, start = log_buffer_->FlushTo(writer_.get(), start, last_tid_, std::bind(&LogReaderThread::FilterSecondPass, this, _1)); [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, uint16_t dropped_count) { return FilterSecondPass(log_id, pid, sequence, realtime, dropped_count); }); // We only ignore entries before the original start time for the first flushTo(), if we // We only ignore entries before the original start time for the first flushTo(), if we // get entries after this first flush before the original start time, then the client // get entries after this first flush before the original start time, then the client Loading Loading @@ -123,65 +131,67 @@ void LogReaderThread::ThreadFunction() { } } // A first pass to count the number of elements // A first pass to count the number of elements FlushToResult LogReaderThread::FilterFirstPass(const LogBufferElement* element) { FilterResult LogReaderThread::FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, uint16_t dropped_count) { auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; if (leading_dropped_) { if (leading_dropped_) { if (element->getDropped()) { if (dropped_count) { return FlushToResult::kSkip; return FilterResult::kSkip; } } leading_dropped_ = false; leading_dropped_ = false; } } if (count_ == 0) { if (count_ == 0) { start_ = element->getSequence(); start_ = sequence; } } if ((!pid_ || pid_ == element->getPid()) && IsWatching(element->getLogId()) && if ((!pid_ || pid_ == pid) && IsWatching(log_id) && (start_time_ == log_time::EPOCH || start_time_ <= element->getRealTime())) { (start_time_ == log_time::EPOCH || start_time_ <= realtime)) { ++count_; ++count_; } } return FlushToResult::kSkip; return FilterResult::kSkip; } } // A second pass to send the selected elements // A second pass to send the selected elements FlushToResult LogReaderThread::FilterSecondPass(const LogBufferElement* element) { FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, uint16_t dropped_count) { auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; start_ = element->getSequence(); start_ = sequence; if (skip_ahead_[element->getLogId()]) { if (skip_ahead_[log_id]) { skip_ahead_[element->getLogId()]--; skip_ahead_[log_id]--; return FlushToResult::kSkip; return FilterResult::kSkip; } } if (leading_dropped_) { if (leading_dropped_) { if (element->getDropped()) { if (dropped_count) { return FlushToResult::kSkip; return FilterResult::kSkip; } } leading_dropped_ = false; leading_dropped_ = false; } } // Truncate to close race between first and second pass // Truncate to close race between first and second pass if (non_block_ && tail_ && index_ >= count_) { if (non_block_ && tail_ && index_ >= count_) { return FlushToResult::kStop; return FilterResult::kStop; } } if (!IsWatching(element->getLogId())) { if (!IsWatching(log_id)) { return FlushToResult::kSkip; return FilterResult::kSkip; } } if (pid_ && pid_ != element->getPid()) { if (pid_ && pid_ != pid) { return FlushToResult::kSkip; return FilterResult::kSkip; } } if (start_time_ != log_time::EPOCH && element->getRealTime() <= start_time_) { if (start_time_ != log_time::EPOCH && realtime <= start_time_) { return FlushToResult::kSkip; return FilterResult::kSkip; } } if (release_) { if (release_) { return FlushToResult::kStop; return FilterResult::kStop; } } if (!tail_) { if (!tail_) { Loading @@ -191,7 +201,7 @@ FlushToResult LogReaderThread::FilterSecondPass(const LogBufferElement* element) ++index_; ++index_; if (count_ > tail_ && index_ <= (count_ - tail_)) { if (count_ > tail_ && index_ <= (count_ - tail_)) { return FlushToResult::kSkip; return FilterResult::kSkip; } } if (!non_block_) { if (!non_block_) { Loading @@ -199,10 +209,10 @@ FlushToResult LogReaderThread::FilterSecondPass(const LogBufferElement* element) } } ok: ok: if (!skip_ahead_[element->getLogId()]) { if (!skip_ahead_[log_id]) { return FlushToResult::kWrite; return FilterResult::kWrite; } } return FlushToResult::kSkip; return FilterResult::kSkip; } } void LogReaderThread::cleanSkip_Locked(void) { void LogReaderThread::cleanSkip_Locked(void) { Loading
logd/LogReaderThread.h +4 −3 Original line number Original line Diff line number Diff line Loading @@ -30,7 +30,6 @@ #include <sysutils/SocketClient.h> #include <sysutils/SocketClient.h> #include "LogBuffer.h" #include "LogBuffer.h" #include "LogBufferElement.h" #include "LogWriter.h" #include "LogWriter.h" class LogReaderList; class LogReaderList; Loading Loading @@ -63,8 +62,10 @@ class LogReaderThread { private: private: void ThreadFunction(); void ThreadFunction(); // flushTo filter callbacks // flushTo filter callbacks FlushToResult FilterFirstPass(const LogBufferElement* element); FilterResult FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, FlushToResult FilterSecondPass(const LogBufferElement* element); uint16_t dropped_count); FilterResult FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, uint16_t dropped_count); std::condition_variable thread_triggered_condition_; std::condition_variable thread_triggered_condition_; LogBuffer* log_buffer_; LogBuffer* log_buffer_; Loading
logd/SimpleLogBuffer.cpp +6 −4 Original line number Original line Diff line number Diff line Loading @@ -112,7 +112,8 @@ void SimpleLogBuffer::LogInternal(LogBufferElement&& elem) { uint64_t SimpleLogBuffer::FlushTo( uint64_t SimpleLogBuffer::FlushTo( LogWriter* writer, uint64_t start, pid_t* last_tid, LogWriter* writer, uint64_t start, pid_t* last_tid, const std::function<FlushToResult(const LogBufferElement* element)>& filter) { const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime, uint16_t dropped_count)>& filter) { auto shared_lock = SharedLock{lock_}; auto shared_lock = SharedLock{lock_}; std::list<LogBufferElement>::iterator it; std::list<LogBufferElement>::iterator it; Loading Loading @@ -146,11 +147,12 @@ uint64_t SimpleLogBuffer::FlushTo( } } if (filter) { if (filter) { FlushToResult ret = filter(&element); FilterResult ret = filter(element.getLogId(), element.getPid(), element.getSequence(), if (ret == FlushToResult::kSkip) { element.getRealTime(), element.getDropped()); if (ret == FilterResult::kSkip) { continue; continue; } } if (ret == FlushToResult::kStop) { if (ret == FilterResult::kStop) { break; break; } } } } Loading