Loading logd/FlushCommand.cpp +2 −21 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ void FlushCommand::runSocketCommand(SocketClient* client) { LogTimeEntry::wrlock(); LastLogTimes::iterator it = times.begin(); while (it != times.end()) { entry = (*it); entry = it->get(); if (entry->mClient == client) { if (!entry->isWatchingMultiple(mLogMask)) { LogTimeEntry::unlock(); Loading @@ -63,31 +63,12 @@ void FlushCommand::runSocketCommand(SocketClient* client) { } } entry->triggerReader_Locked(); if (entry->runningReader_Locked()) { LogTimeEntry::unlock(); return; } entry->incRef_Locked(); break; } it++; } if (it == times.end()) { // Create LogTimeEntry in notifyNewLog() ? if (mTail == (unsigned long)-1) { LogTimeEntry::unlock(); return; } entry = new LogTimeEntry(mReader, client, mNonBlock, mTail, mLogMask, mPid, mStart, mTimeout); times.push_front(entry); } client->incRef(); // release client and entry reference counts once done entry->startReader_Locked(); LogTimeEntry::unlock(); } Loading logd/FlushCommand.h +1 −26 Original line number Diff line number Diff line Loading @@ -27,36 +27,11 @@ class LogReader; class FlushCommand : public SocketClientCommand { LogReader& mReader; bool mNonBlock; unsigned long mTail; log_mask_t mLogMask; pid_t mPid; log_time mStart; uint64_t mTimeout; public: // for opening a reader explicit FlushCommand(LogReader& reader, bool nonBlock, unsigned long tail, log_mask_t logMask, pid_t pid, log_time start, uint64_t timeout) : mReader(reader), mNonBlock(nonBlock), mTail(tail), mLogMask(logMask), mPid(pid), mStart(start), mTimeout((start != log_time::EPOCH) ? timeout : 0) { } // for notification of an update explicit FlushCommand(LogReader& reader, log_mask_t logMask) : mReader(reader), mNonBlock(false), mTail(-1), mLogMask(logMask), mPid(0), mStart(log_time::EPOCH), mTimeout(0) { : mReader(reader), mLogMask(logMask) { } virtual void runSocketCommand(SocketClient* client); Loading logd/LogBuffer.cpp +15 −19 Original line number Diff line number Diff line Loading @@ -105,10 +105,8 @@ void LogBuffer::init() { LastLogTimes::iterator times = mTimes.begin(); while (times != mTimes.end()) { LogTimeEntry* entry = (*times); if (entry->owned_Locked()) { LogTimeEntry* entry = times->get(); entry->triggerReader_Locked(); } times++; } Loading Loading @@ -409,8 +407,7 @@ void LogBuffer::log(LogBufferElement* elem) { LastLogTimes::iterator times = mTimes.begin(); while (times != mTimes.end()) { LogTimeEntry* entry = (*times); if (entry->owned_Locked()) { LogTimeEntry* entry = times->get(); if (!entry->mNonBlock) { end_always = true; break; Loading @@ -420,7 +417,6 @@ void LogBuffer::log(LogBufferElement* elem) { end = entry->mEnd; end_set = true; } } times++; } Loading Loading @@ -710,8 +706,8 @@ bool LogBuffer::prune(log_id_t id, unsigned long pruneRows, uid_t caller_uid) { // Region locked? LastLogTimes::iterator times = mTimes.begin(); while (times != mTimes.end()) { LogTimeEntry* entry = (*times); if (entry->owned_Locked() && entry->isWatching(id) && LogTimeEntry* entry = times->get(); if (entry->isWatching(id) && (!oldest || (oldest->mStart > entry->mStart) || ((oldest->mStart == entry->mStart) && (entry->mTimeout.tv_sec || entry->mTimeout.tv_nsec)))) { Loading Loading @@ -1052,9 +1048,9 @@ bool LogBuffer::clear(log_id_t id, uid_t uid) { LogTimeEntry::wrlock(); LastLogTimes::iterator times = mTimes.begin(); while (times != mTimes.end()) { LogTimeEntry* entry = (*times); LogTimeEntry* entry = times->get(); // Killer punch if (entry->owned_Locked() && entry->isWatching(id)) { if (entry->isWatching(id)) { entry->release_Locked(); } times++; Loading logd/LogReader.cpp +27 −4 Original line number Diff line number Diff line Loading @@ -41,6 +41,7 @@ void LogReader::notifyNewLog(log_mask_t logMask) { runOnEachSocket(&command); } // Note returning false will release the SocketClient instance. bool LogReader::onDataAvailable(SocketClient* cli) { static bool name_set; if (!name_set) { Loading @@ -57,6 +58,18 @@ bool LogReader::onDataAvailable(SocketClient* cli) { } buffer[len] = '\0'; // Clients are only allowed to send one command, disconnect them if they // send another. LogTimeEntry::wrlock(); for (const auto& entry : mLogbuf.mTimes) { if (entry->mClient == cli) { entry->release_Locked(); LogTimeEntry::unlock(); return false; } } LogTimeEntry::unlock(); unsigned long tail = 0; static const char _tail[] = " tail="; char* cp = strstr(buffer, _tail); Loading Loading @@ -199,14 +212,25 @@ bool LogReader::onDataAvailable(SocketClient* cli) { cli->getUid(), cli->getGid(), cli->getPid(), nonBlock ? 'n' : 'b', tail, logMask, (int)pid, sequence.nsec(), timeout); FlushCommand command(*this, nonBlock, tail, logMask, pid, sequence, timeout); LogTimeEntry::wrlock(); auto entry = std::make_unique<LogTimeEntry>( *this, cli, nonBlock, tail, logMask, pid, sequence, timeout); if (!entry->startReader_Locked()) { LogTimeEntry::unlock(); return false; } // release client and entry reference counts once done cli->incRef(); mLogbuf.mTimes.emplace_front(std::move(entry)); // Set acceptable upper limit to wait for slow reader processing b/27242723 struct timeval t = { LOGD_SNDTIMEO, 0 }; setsockopt(cli->getSocket(), SOL_SOCKET, SO_SNDTIMEO, (const char*)&t, sizeof(t)); command.runSocketCommand(cli); LogTimeEntry::unlock(); return true; } Loading @@ -215,9 +239,8 @@ void LogReader::doSocketDelete(SocketClient* cli) { LogTimeEntry::wrlock(); LastLogTimes::iterator it = times.begin(); while (it != times.end()) { LogTimeEntry* entry = (*it); LogTimeEntry* entry = it->get(); if (entry->mClient == cli) { times.erase(it); entry->release_Locked(); break; } Loading logd/LogTimes.cpp +21 −68 Original line number Diff line number Diff line Loading @@ -30,11 +30,7 @@ pthread_mutex_t LogTimeEntry::timesLock = PTHREAD_MUTEX_INITIALIZER; LogTimeEntry::LogTimeEntry(LogReader& reader, SocketClient* client, bool nonBlock, unsigned long tail, log_mask_t logMask, pid_t pid, log_time start, uint64_t timeout) : mRefCount(1), mRelease(false), mError(false), threadRunning(false), leadingDropped(false), : leadingDropped(false), mReader(reader), mLogMask(logMask), mPid(pid), Loading @@ -52,65 +48,21 @@ LogTimeEntry::LogTimeEntry(LogReader& reader, SocketClient* client, cleanSkip_Locked(); } void LogTimeEntry::startReader_Locked(void) { bool LogTimeEntry::startReader_Locked() { pthread_attr_t attr; threadRunning = true; if (!pthread_attr_init(&attr)) { if (!pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)) { if (!pthread_create(&mThread, &attr, LogTimeEntry::threadStart, this)) { pthread_attr_destroy(&attr); return; return true; } } pthread_attr_destroy(&attr); } threadRunning = false; if (mClient) { mClient->decRef(); } decRef_Locked(); } void LogTimeEntry::threadStop(void* obj) { LogTimeEntry* me = reinterpret_cast<LogTimeEntry*>(obj); wrlock(); if (me->mNonBlock) { me->error_Locked(); } SocketClient* client = me->mClient; if (me->isError_Locked()) { LogReader& reader = me->mReader; LastLogTimes& times = reader.logbuf().mTimes; LastLogTimes::iterator it = times.begin(); while (it != times.end()) { if (*it == me) { times.erase(it); me->release_nodelete_Locked(); break; } it++; } me->mClient = nullptr; reader.release(client); } if (client) { client->decRef(); } me->threadRunning = false; me->decRef_Locked(); unlock(); return false; } void* LogTimeEntry::threadStart(void* obj) { Loading @@ -118,13 +70,7 @@ void* LogTimeEntry::threadStart(void* obj) { LogTimeEntry* me = reinterpret_cast<LogTimeEntry*>(obj); pthread_cleanup_push(threadStop, obj); SocketClient* client = me->mClient; if (!client) { me->error(); return nullptr; } LogBuffer& logbuf = me->mReader.logbuf(); Loading @@ -137,14 +83,14 @@ void* LogTimeEntry::threadStart(void* obj) { log_time start = me->mStart; while (me->threadRunning && !me->isError_Locked()) { while (!me->mRelease) { if (me->mTimeout.tv_sec || me->mTimeout.tv_nsec) { if (pthread_cond_timedwait(&me->threadTriggeredCondition, ×Lock, &me->mTimeout) == ETIMEDOUT) { me->mTimeout.tv_sec = 0; me->mTimeout.tv_nsec = 0; } if (!me->threadRunning || me->isError_Locked()) { if (me->mRelease) { break; } } Loading @@ -162,13 +108,12 @@ void* LogTimeEntry::threadStart(void* obj) { wrlock(); if (start == LogBufferElement::FLUSH_ERROR) { me->error_Locked(); break; } me->mStart = start + log_time(0, 1); if (me->mNonBlock || !me->threadRunning || me->isError_Locked()) { if (me->mNonBlock || me->mRelease) { break; } Loading @@ -179,9 +124,21 @@ void* LogTimeEntry::threadStart(void* obj) { } } unlock(); LogReader& reader = me->mReader; reader.release(client); pthread_cleanup_pop(true); client->decRef(); LastLogTimes& times = reader.logbuf().mTimes; auto it = std::find_if(times.begin(), times.end(), [&me](const auto& other) { return other.get() == me; }); if (it != times.end()) { times.erase(it); } unlock(); return nullptr; } Loading Loading @@ -247,10 +204,6 @@ int LogTimeEntry::FilterSecondPass(const LogBufferElement* element, void* obj) { goto skip; } if (me->isError_Locked()) { goto stop; } if (!me->mTail) { goto ok; } Loading Loading
logd/FlushCommand.cpp +2 −21 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ void FlushCommand::runSocketCommand(SocketClient* client) { LogTimeEntry::wrlock(); LastLogTimes::iterator it = times.begin(); while (it != times.end()) { entry = (*it); entry = it->get(); if (entry->mClient == client) { if (!entry->isWatchingMultiple(mLogMask)) { LogTimeEntry::unlock(); Loading @@ -63,31 +63,12 @@ void FlushCommand::runSocketCommand(SocketClient* client) { } } entry->triggerReader_Locked(); if (entry->runningReader_Locked()) { LogTimeEntry::unlock(); return; } entry->incRef_Locked(); break; } it++; } if (it == times.end()) { // Create LogTimeEntry in notifyNewLog() ? if (mTail == (unsigned long)-1) { LogTimeEntry::unlock(); return; } entry = new LogTimeEntry(mReader, client, mNonBlock, mTail, mLogMask, mPid, mStart, mTimeout); times.push_front(entry); } client->incRef(); // release client and entry reference counts once done entry->startReader_Locked(); LogTimeEntry::unlock(); } Loading
logd/FlushCommand.h +1 −26 Original line number Diff line number Diff line Loading @@ -27,36 +27,11 @@ class LogReader; class FlushCommand : public SocketClientCommand { LogReader& mReader; bool mNonBlock; unsigned long mTail; log_mask_t mLogMask; pid_t mPid; log_time mStart; uint64_t mTimeout; public: // for opening a reader explicit FlushCommand(LogReader& reader, bool nonBlock, unsigned long tail, log_mask_t logMask, pid_t pid, log_time start, uint64_t timeout) : mReader(reader), mNonBlock(nonBlock), mTail(tail), mLogMask(logMask), mPid(pid), mStart(start), mTimeout((start != log_time::EPOCH) ? timeout : 0) { } // for notification of an update explicit FlushCommand(LogReader& reader, log_mask_t logMask) : mReader(reader), mNonBlock(false), mTail(-1), mLogMask(logMask), mPid(0), mStart(log_time::EPOCH), mTimeout(0) { : mReader(reader), mLogMask(logMask) { } virtual void runSocketCommand(SocketClient* client); Loading
logd/LogBuffer.cpp +15 −19 Original line number Diff line number Diff line Loading @@ -105,10 +105,8 @@ void LogBuffer::init() { LastLogTimes::iterator times = mTimes.begin(); while (times != mTimes.end()) { LogTimeEntry* entry = (*times); if (entry->owned_Locked()) { LogTimeEntry* entry = times->get(); entry->triggerReader_Locked(); } times++; } Loading Loading @@ -409,8 +407,7 @@ void LogBuffer::log(LogBufferElement* elem) { LastLogTimes::iterator times = mTimes.begin(); while (times != mTimes.end()) { LogTimeEntry* entry = (*times); if (entry->owned_Locked()) { LogTimeEntry* entry = times->get(); if (!entry->mNonBlock) { end_always = true; break; Loading @@ -420,7 +417,6 @@ void LogBuffer::log(LogBufferElement* elem) { end = entry->mEnd; end_set = true; } } times++; } Loading Loading @@ -710,8 +706,8 @@ bool LogBuffer::prune(log_id_t id, unsigned long pruneRows, uid_t caller_uid) { // Region locked? LastLogTimes::iterator times = mTimes.begin(); while (times != mTimes.end()) { LogTimeEntry* entry = (*times); if (entry->owned_Locked() && entry->isWatching(id) && LogTimeEntry* entry = times->get(); if (entry->isWatching(id) && (!oldest || (oldest->mStart > entry->mStart) || ((oldest->mStart == entry->mStart) && (entry->mTimeout.tv_sec || entry->mTimeout.tv_nsec)))) { Loading Loading @@ -1052,9 +1048,9 @@ bool LogBuffer::clear(log_id_t id, uid_t uid) { LogTimeEntry::wrlock(); LastLogTimes::iterator times = mTimes.begin(); while (times != mTimes.end()) { LogTimeEntry* entry = (*times); LogTimeEntry* entry = times->get(); // Killer punch if (entry->owned_Locked() && entry->isWatching(id)) { if (entry->isWatching(id)) { entry->release_Locked(); } times++; Loading
logd/LogReader.cpp +27 −4 Original line number Diff line number Diff line Loading @@ -41,6 +41,7 @@ void LogReader::notifyNewLog(log_mask_t logMask) { runOnEachSocket(&command); } // Note returning false will release the SocketClient instance. bool LogReader::onDataAvailable(SocketClient* cli) { static bool name_set; if (!name_set) { Loading @@ -57,6 +58,18 @@ bool LogReader::onDataAvailable(SocketClient* cli) { } buffer[len] = '\0'; // Clients are only allowed to send one command, disconnect them if they // send another. LogTimeEntry::wrlock(); for (const auto& entry : mLogbuf.mTimes) { if (entry->mClient == cli) { entry->release_Locked(); LogTimeEntry::unlock(); return false; } } LogTimeEntry::unlock(); unsigned long tail = 0; static const char _tail[] = " tail="; char* cp = strstr(buffer, _tail); Loading Loading @@ -199,14 +212,25 @@ bool LogReader::onDataAvailable(SocketClient* cli) { cli->getUid(), cli->getGid(), cli->getPid(), nonBlock ? 'n' : 'b', tail, logMask, (int)pid, sequence.nsec(), timeout); FlushCommand command(*this, nonBlock, tail, logMask, pid, sequence, timeout); LogTimeEntry::wrlock(); auto entry = std::make_unique<LogTimeEntry>( *this, cli, nonBlock, tail, logMask, pid, sequence, timeout); if (!entry->startReader_Locked()) { LogTimeEntry::unlock(); return false; } // release client and entry reference counts once done cli->incRef(); mLogbuf.mTimes.emplace_front(std::move(entry)); // Set acceptable upper limit to wait for slow reader processing b/27242723 struct timeval t = { LOGD_SNDTIMEO, 0 }; setsockopt(cli->getSocket(), SOL_SOCKET, SO_SNDTIMEO, (const char*)&t, sizeof(t)); command.runSocketCommand(cli); LogTimeEntry::unlock(); return true; } Loading @@ -215,9 +239,8 @@ void LogReader::doSocketDelete(SocketClient* cli) { LogTimeEntry::wrlock(); LastLogTimes::iterator it = times.begin(); while (it != times.end()) { LogTimeEntry* entry = (*it); LogTimeEntry* entry = it->get(); if (entry->mClient == cli) { times.erase(it); entry->release_Locked(); break; } Loading
logd/LogTimes.cpp +21 −68 Original line number Diff line number Diff line Loading @@ -30,11 +30,7 @@ pthread_mutex_t LogTimeEntry::timesLock = PTHREAD_MUTEX_INITIALIZER; LogTimeEntry::LogTimeEntry(LogReader& reader, SocketClient* client, bool nonBlock, unsigned long tail, log_mask_t logMask, pid_t pid, log_time start, uint64_t timeout) : mRefCount(1), mRelease(false), mError(false), threadRunning(false), leadingDropped(false), : leadingDropped(false), mReader(reader), mLogMask(logMask), mPid(pid), Loading @@ -52,65 +48,21 @@ LogTimeEntry::LogTimeEntry(LogReader& reader, SocketClient* client, cleanSkip_Locked(); } void LogTimeEntry::startReader_Locked(void) { bool LogTimeEntry::startReader_Locked() { pthread_attr_t attr; threadRunning = true; if (!pthread_attr_init(&attr)) { if (!pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)) { if (!pthread_create(&mThread, &attr, LogTimeEntry::threadStart, this)) { pthread_attr_destroy(&attr); return; return true; } } pthread_attr_destroy(&attr); } threadRunning = false; if (mClient) { mClient->decRef(); } decRef_Locked(); } void LogTimeEntry::threadStop(void* obj) { LogTimeEntry* me = reinterpret_cast<LogTimeEntry*>(obj); wrlock(); if (me->mNonBlock) { me->error_Locked(); } SocketClient* client = me->mClient; if (me->isError_Locked()) { LogReader& reader = me->mReader; LastLogTimes& times = reader.logbuf().mTimes; LastLogTimes::iterator it = times.begin(); while (it != times.end()) { if (*it == me) { times.erase(it); me->release_nodelete_Locked(); break; } it++; } me->mClient = nullptr; reader.release(client); } if (client) { client->decRef(); } me->threadRunning = false; me->decRef_Locked(); unlock(); return false; } void* LogTimeEntry::threadStart(void* obj) { Loading @@ -118,13 +70,7 @@ void* LogTimeEntry::threadStart(void* obj) { LogTimeEntry* me = reinterpret_cast<LogTimeEntry*>(obj); pthread_cleanup_push(threadStop, obj); SocketClient* client = me->mClient; if (!client) { me->error(); return nullptr; } LogBuffer& logbuf = me->mReader.logbuf(); Loading @@ -137,14 +83,14 @@ void* LogTimeEntry::threadStart(void* obj) { log_time start = me->mStart; while (me->threadRunning && !me->isError_Locked()) { while (!me->mRelease) { if (me->mTimeout.tv_sec || me->mTimeout.tv_nsec) { if (pthread_cond_timedwait(&me->threadTriggeredCondition, ×Lock, &me->mTimeout) == ETIMEDOUT) { me->mTimeout.tv_sec = 0; me->mTimeout.tv_nsec = 0; } if (!me->threadRunning || me->isError_Locked()) { if (me->mRelease) { break; } } Loading @@ -162,13 +108,12 @@ void* LogTimeEntry::threadStart(void* obj) { wrlock(); if (start == LogBufferElement::FLUSH_ERROR) { me->error_Locked(); break; } me->mStart = start + log_time(0, 1); if (me->mNonBlock || !me->threadRunning || me->isError_Locked()) { if (me->mNonBlock || me->mRelease) { break; } Loading @@ -179,9 +124,21 @@ void* LogTimeEntry::threadStart(void* obj) { } } unlock(); LogReader& reader = me->mReader; reader.release(client); pthread_cleanup_pop(true); client->decRef(); LastLogTimes& times = reader.logbuf().mTimes; auto it = std::find_if(times.begin(), times.end(), [&me](const auto& other) { return other.get() == me; }); if (it != times.end()) { times.erase(it); } unlock(); return nullptr; } Loading Loading @@ -247,10 +204,6 @@ int LogTimeEntry::FilterSecondPass(const LogBufferElement* element, void* obj) { goto skip; } if (me->isError_Locked()) { goto stop; } if (!me->mTail) { goto ok; } Loading