Loading system/gd/os/linux_generic/alarm.cc +6 −6 Original line number Diff line number Diff line Loading @@ -35,7 +35,7 @@ namespace os { Alarm::Alarm(Thread* thread) : thread_(thread), fd_(timerfd_create(ALARM_CLOCK, 0)) { LOG_FATAL_WHEN(fd_ != -1, "cannot create timerfd: %s", strerror(errno)); ASSERT_LOG(fd_ != -1, "cannot create timerfd: %s", strerror(errno)); token_ = thread_->GetReactor()->Register(fd_, [this] { on_fire(); }, nullptr); } Loading @@ -45,7 +45,7 @@ Alarm::~Alarm() { int close_status; RUN_NO_INTR(close_status = close(fd_)); FATAL_WHEN(close_status != -1); ASSERT(close_status != -1); } void Alarm::Schedule(Closure task, std::chrono::milliseconds delay) { Loading @@ -56,7 +56,7 @@ void Alarm::Schedule(Closure task, std::chrono::milliseconds delay) { {delay_ms / 1000, delay_ms % 1000 * 1000000} }; int result = timerfd_settime(fd_, 0, &timer_itimerspec, nullptr); FATAL_WHEN(result == 0); ASSERT(result == 0); task_ = std::move(task); } Loading @@ -65,7 +65,7 @@ void Alarm::Cancel() { std::lock_guard<std::mutex> lock(mutex_); itimerspec disarm_itimerspec{/* disarm timer */}; int result = timerfd_settime(fd_, 0, &disarm_itimerspec, nullptr); FATAL_WHEN(result == 0); ASSERT(result == 0); } void Alarm::on_fire() { Loading @@ -75,8 +75,8 @@ void Alarm::on_fire() { auto bytes_read = read(fd_, ×_invoked, sizeof(uint64_t)); lock.unlock(); task(); FATAL_WHEN(bytes_read == static_cast<ssize_t>(sizeof(uint64_t))) FATAL_WHEN(times_invoked == static_cast<uint64_t>(1)); ASSERT(bytes_read == static_cast<ssize_t>(sizeof(uint64_t))); ASSERT(times_invoked == static_cast<uint64_t>(1)); } } // namespace os Loading system/gd/os/linux_generic/handler.cc +4 −4 Original line number Diff line number Diff line Loading @@ -34,7 +34,7 @@ namespace os { Handler::Handler(Thread* thread) : thread_(thread), fd_(eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK)) { FATAL_WHEN(fd_ != -1); ASSERT(fd_ != -1); reactable_ = thread_->GetReactor()->Register(fd_, [this] { this->handle_next_event(); }, nullptr); } Loading @@ -45,7 +45,7 @@ Handler::~Handler() { int close_status; RUN_NO_INTR(close_status = close(fd_)); FATAL_WHEN(close_status != -1); ASSERT(close_status != -1); } void Handler::Post(Closure closure) { Loading @@ -55,7 +55,7 @@ void Handler::Post(Closure closure) { } uint64_t val = 1; auto write_result = eventfd_write(fd_, val); FATAL_WHEN(write_result != -1); ASSERT(write_result != -1); } void Handler::Clear() { Loading @@ -79,7 +79,7 @@ void Handler::handle_next_event() { return; } FATAL_WHEN(read_result != -1); ASSERT(read_result != -1); { std::lock_guard<std::mutex> lock(mutex_); Loading system/gd/os/linux_generic/reactor.cc +15 −15 Original line number Diff line number Diff line Loading @@ -56,43 +56,43 @@ Reactor::Reactor() is_running_(false), reactable_removed_(false) { RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC)); LOG_FATAL_WHEN(epoll_fd_ != -1, "could not create epoll fd: %s", strerror(errno)); ASSERT_LOG(epoll_fd_ != -1, "could not create epoll fd: %s", strerror(errno)); control_fd_ = eventfd(0, EFD_NONBLOCK); FATAL_WHEN(control_fd_ != -1); ASSERT(control_fd_ != -1); epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}}; int result; RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event)); FATAL_WHEN(result != -1); ASSERT(result != -1); } Reactor::~Reactor() { int result; RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr)); FATAL_WHEN(result != -1); ASSERT(result != -1); RUN_NO_INTR(result = close(control_fd_)); FATAL_WHEN(result != -1); ASSERT(result != -1); RUN_NO_INTR(result = close(epoll_fd_)); FATAL_WHEN(result != -1); ASSERT(result != -1); } void Reactor::Run() { bool previously_running = is_running_.exchange(true); FATAL_WHEN(!previously_running); ASSERT(!previously_running); for (;;) { invalidation_list_.clear(); epoll_event events[kEpollMaxEvents]; int count; RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, -1)); FATAL_WHEN(count != -1); ASSERT(count != -1); for (int i = 0; i < count; ++i) { auto event = events[i]; FATAL_WHEN(event.events != 0u); ASSERT(event.events != 0u); // If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered if (event.data.ptr == nullptr) { Loading Loading @@ -133,7 +133,7 @@ void Reactor::Stop() { LOG_WARN("not running, will stop once it's started"); } auto control = eventfd_write(control_fd_, 1); FATAL_WHEN(control != -1) ASSERT(control != -1) } Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) { Loading @@ -151,12 +151,12 @@ Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_ }; int register_fd; RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event)); FATAL_WHEN(register_fd != -1) ASSERT(register_fd != -1) return reactable; } void Reactor::Unregister(Reactor::Reactable* reactable) { FATAL_WHEN(reactable != nullptr); ASSERT(reactable != nullptr); { std::lock_guard<std::mutex> lock(mutex_); invalidation_list_.push_back(reactable); Loading @@ -168,7 +168,7 @@ void Reactor::Unregister(Reactor::Reactable* reactable) { if (result == -1 && errno == ENOENT) { LOG_INFO("reactable is invalid or unregistered"); } else { FATAL_WHEN(result != -1); ASSERT(result != -1); } // If we are unregistering during the callback event from this reactable, we delete it after the callback is executed. // reactable->is_executing_ is protected by reactable->lock_, so it's thread safe. Loading @@ -183,7 +183,7 @@ void Reactor::Unregister(Reactor::Reactable* reactable) { } void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ready, Closure on_write_ready) { FATAL_WHEN(reactable != nullptr); ASSERT(reactable != nullptr); uint32_t poll_event_type = 0; if (on_read_ready != nullptr) { Loading @@ -203,7 +203,7 @@ void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ }; int modify_fd; RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event)); FATAL_WHEN(modify_fd != -1); ASSERT(modify_fd != -1); } } // namespace os Loading system/gd/os/linux_generic/repeating_alarm.cc +5 −5 Original line number Diff line number Diff line Loading @@ -35,7 +35,7 @@ namespace os { RepeatingAlarm::RepeatingAlarm(Thread* thread) : thread_(thread), fd_(timerfd_create(ALARM_CLOCK, 0)) { FATAL_WHEN(fd_ != -1); ASSERT(fd_ != -1); token_ = thread_->GetReactor()->Register(fd_, [this] { on_fire(); }, nullptr); } Loading @@ -45,7 +45,7 @@ RepeatingAlarm::~RepeatingAlarm() { int close_status; RUN_NO_INTR(close_status = close(fd_)); FATAL_WHEN(close_status != -1); ASSERT(close_status != -1); } void RepeatingAlarm::Schedule(Closure task, std::chrono::milliseconds period) { Loading @@ -56,7 +56,7 @@ void RepeatingAlarm::Schedule(Closure task, std::chrono::milliseconds period) { {period_ms / 1000, period_ms % 1000 * 1000000} }; int result = timerfd_settime(fd_, 0, &timer_itimerspec, nullptr); FATAL_WHEN(result == 0); ASSERT(result == 0); task_ = std::move(task); } Loading @@ -65,7 +65,7 @@ void RepeatingAlarm::Cancel() { std::lock_guard<std::mutex> lock(mutex_); itimerspec disarm_itimerspec{/* disarm timer */}; int result = timerfd_settime(fd_, 0, &disarm_itimerspec, nullptr); FATAL_WHEN(result == 0); ASSERT(result == 0); } void RepeatingAlarm::on_fire() { Loading @@ -75,7 +75,7 @@ void RepeatingAlarm::on_fire() { auto bytes_read = read(fd_, ×_invoked, sizeof(uint64_t)); lock.unlock(); task(); FATAL_WHEN(bytes_read == static_cast<ssize_t>(sizeof(uint64_t))); ASSERT(bytes_read == static_cast<ssize_t>(sizeof(uint64_t))); } } // namespace os Loading system/gd/os/linux_generic/thread.cc +1 −1 Original line number Diff line number Diff line Loading @@ -55,7 +55,7 @@ Thread::~Thread() { bool Thread::Stop() { std::lock_guard<std::mutex> lock(mutex_); FATAL_WHEN(std::this_thread::get_id() != running_thread_.get_id()); ASSERT(std::this_thread::get_id() != running_thread_.get_id()); if (!running_thread_.joinable()) { return false; Loading Loading
system/gd/os/linux_generic/alarm.cc +6 −6 Original line number Diff line number Diff line Loading @@ -35,7 +35,7 @@ namespace os { Alarm::Alarm(Thread* thread) : thread_(thread), fd_(timerfd_create(ALARM_CLOCK, 0)) { LOG_FATAL_WHEN(fd_ != -1, "cannot create timerfd: %s", strerror(errno)); ASSERT_LOG(fd_ != -1, "cannot create timerfd: %s", strerror(errno)); token_ = thread_->GetReactor()->Register(fd_, [this] { on_fire(); }, nullptr); } Loading @@ -45,7 +45,7 @@ Alarm::~Alarm() { int close_status; RUN_NO_INTR(close_status = close(fd_)); FATAL_WHEN(close_status != -1); ASSERT(close_status != -1); } void Alarm::Schedule(Closure task, std::chrono::milliseconds delay) { Loading @@ -56,7 +56,7 @@ void Alarm::Schedule(Closure task, std::chrono::milliseconds delay) { {delay_ms / 1000, delay_ms % 1000 * 1000000} }; int result = timerfd_settime(fd_, 0, &timer_itimerspec, nullptr); FATAL_WHEN(result == 0); ASSERT(result == 0); task_ = std::move(task); } Loading @@ -65,7 +65,7 @@ void Alarm::Cancel() { std::lock_guard<std::mutex> lock(mutex_); itimerspec disarm_itimerspec{/* disarm timer */}; int result = timerfd_settime(fd_, 0, &disarm_itimerspec, nullptr); FATAL_WHEN(result == 0); ASSERT(result == 0); } void Alarm::on_fire() { Loading @@ -75,8 +75,8 @@ void Alarm::on_fire() { auto bytes_read = read(fd_, ×_invoked, sizeof(uint64_t)); lock.unlock(); task(); FATAL_WHEN(bytes_read == static_cast<ssize_t>(sizeof(uint64_t))) FATAL_WHEN(times_invoked == static_cast<uint64_t>(1)); ASSERT(bytes_read == static_cast<ssize_t>(sizeof(uint64_t))); ASSERT(times_invoked == static_cast<uint64_t>(1)); } } // namespace os Loading
system/gd/os/linux_generic/handler.cc +4 −4 Original line number Diff line number Diff line Loading @@ -34,7 +34,7 @@ namespace os { Handler::Handler(Thread* thread) : thread_(thread), fd_(eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK)) { FATAL_WHEN(fd_ != -1); ASSERT(fd_ != -1); reactable_ = thread_->GetReactor()->Register(fd_, [this] { this->handle_next_event(); }, nullptr); } Loading @@ -45,7 +45,7 @@ Handler::~Handler() { int close_status; RUN_NO_INTR(close_status = close(fd_)); FATAL_WHEN(close_status != -1); ASSERT(close_status != -1); } void Handler::Post(Closure closure) { Loading @@ -55,7 +55,7 @@ void Handler::Post(Closure closure) { } uint64_t val = 1; auto write_result = eventfd_write(fd_, val); FATAL_WHEN(write_result != -1); ASSERT(write_result != -1); } void Handler::Clear() { Loading @@ -79,7 +79,7 @@ void Handler::handle_next_event() { return; } FATAL_WHEN(read_result != -1); ASSERT(read_result != -1); { std::lock_guard<std::mutex> lock(mutex_); Loading
system/gd/os/linux_generic/reactor.cc +15 −15 Original line number Diff line number Diff line Loading @@ -56,43 +56,43 @@ Reactor::Reactor() is_running_(false), reactable_removed_(false) { RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC)); LOG_FATAL_WHEN(epoll_fd_ != -1, "could not create epoll fd: %s", strerror(errno)); ASSERT_LOG(epoll_fd_ != -1, "could not create epoll fd: %s", strerror(errno)); control_fd_ = eventfd(0, EFD_NONBLOCK); FATAL_WHEN(control_fd_ != -1); ASSERT(control_fd_ != -1); epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}}; int result; RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event)); FATAL_WHEN(result != -1); ASSERT(result != -1); } Reactor::~Reactor() { int result; RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr)); FATAL_WHEN(result != -1); ASSERT(result != -1); RUN_NO_INTR(result = close(control_fd_)); FATAL_WHEN(result != -1); ASSERT(result != -1); RUN_NO_INTR(result = close(epoll_fd_)); FATAL_WHEN(result != -1); ASSERT(result != -1); } void Reactor::Run() { bool previously_running = is_running_.exchange(true); FATAL_WHEN(!previously_running); ASSERT(!previously_running); for (;;) { invalidation_list_.clear(); epoll_event events[kEpollMaxEvents]; int count; RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, -1)); FATAL_WHEN(count != -1); ASSERT(count != -1); for (int i = 0; i < count; ++i) { auto event = events[i]; FATAL_WHEN(event.events != 0u); ASSERT(event.events != 0u); // If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered if (event.data.ptr == nullptr) { Loading Loading @@ -133,7 +133,7 @@ void Reactor::Stop() { LOG_WARN("not running, will stop once it's started"); } auto control = eventfd_write(control_fd_, 1); FATAL_WHEN(control != -1) ASSERT(control != -1) } Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) { Loading @@ -151,12 +151,12 @@ Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_ }; int register_fd; RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event)); FATAL_WHEN(register_fd != -1) ASSERT(register_fd != -1) return reactable; } void Reactor::Unregister(Reactor::Reactable* reactable) { FATAL_WHEN(reactable != nullptr); ASSERT(reactable != nullptr); { std::lock_guard<std::mutex> lock(mutex_); invalidation_list_.push_back(reactable); Loading @@ -168,7 +168,7 @@ void Reactor::Unregister(Reactor::Reactable* reactable) { if (result == -1 && errno == ENOENT) { LOG_INFO("reactable is invalid or unregistered"); } else { FATAL_WHEN(result != -1); ASSERT(result != -1); } // If we are unregistering during the callback event from this reactable, we delete it after the callback is executed. // reactable->is_executing_ is protected by reactable->lock_, so it's thread safe. Loading @@ -183,7 +183,7 @@ void Reactor::Unregister(Reactor::Reactable* reactable) { } void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ready, Closure on_write_ready) { FATAL_WHEN(reactable != nullptr); ASSERT(reactable != nullptr); uint32_t poll_event_type = 0; if (on_read_ready != nullptr) { Loading @@ -203,7 +203,7 @@ void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ }; int modify_fd; RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event)); FATAL_WHEN(modify_fd != -1); ASSERT(modify_fd != -1); } } // namespace os Loading
system/gd/os/linux_generic/repeating_alarm.cc +5 −5 Original line number Diff line number Diff line Loading @@ -35,7 +35,7 @@ namespace os { RepeatingAlarm::RepeatingAlarm(Thread* thread) : thread_(thread), fd_(timerfd_create(ALARM_CLOCK, 0)) { FATAL_WHEN(fd_ != -1); ASSERT(fd_ != -1); token_ = thread_->GetReactor()->Register(fd_, [this] { on_fire(); }, nullptr); } Loading @@ -45,7 +45,7 @@ RepeatingAlarm::~RepeatingAlarm() { int close_status; RUN_NO_INTR(close_status = close(fd_)); FATAL_WHEN(close_status != -1); ASSERT(close_status != -1); } void RepeatingAlarm::Schedule(Closure task, std::chrono::milliseconds period) { Loading @@ -56,7 +56,7 @@ void RepeatingAlarm::Schedule(Closure task, std::chrono::milliseconds period) { {period_ms / 1000, period_ms % 1000 * 1000000} }; int result = timerfd_settime(fd_, 0, &timer_itimerspec, nullptr); FATAL_WHEN(result == 0); ASSERT(result == 0); task_ = std::move(task); } Loading @@ -65,7 +65,7 @@ void RepeatingAlarm::Cancel() { std::lock_guard<std::mutex> lock(mutex_); itimerspec disarm_itimerspec{/* disarm timer */}; int result = timerfd_settime(fd_, 0, &disarm_itimerspec, nullptr); FATAL_WHEN(result == 0); ASSERT(result == 0); } void RepeatingAlarm::on_fire() { Loading @@ -75,7 +75,7 @@ void RepeatingAlarm::on_fire() { auto bytes_read = read(fd_, ×_invoked, sizeof(uint64_t)); lock.unlock(); task(); FATAL_WHEN(bytes_read == static_cast<ssize_t>(sizeof(uint64_t))); ASSERT(bytes_read == static_cast<ssize_t>(sizeof(uint64_t))); } } // namespace os Loading
system/gd/os/linux_generic/thread.cc +1 −1 Original line number Diff line number Diff line Loading @@ -55,7 +55,7 @@ Thread::~Thread() { bool Thread::Stop() { std::lock_guard<std::mutex> lock(mutex_); FATAL_WHEN(std::this_thread::get_id() != running_thread_.get_id()); ASSERT(std::this_thread::get_id() != running_thread_.get_id()); if (!running_thread_.joinable()) { return false; Loading