Loading tools/edit_monitor/daemon_manager.py +7 −2 Original line number Diff line number Diff line Loading @@ -133,7 +133,11 @@ class DaemonManager: logging.debug("in daemon manager cleanup.") try: if self.daemon_process and self.daemon_process.is_alive(): if self.daemon_process: # The daemon process might already in termination process, # wait some time before kill it explicitly. self._wait_for_process_terminate(self.daemon_process.pid, 1) if self.daemon_process.is_alive(): self._terminate_process(self.daemon_process.pid) self._remove_pidfile() logging.debug("Successfully stopped daemon manager.") Loading Loading @@ -227,6 +231,7 @@ class DaemonManager: p = multiprocessing.Process( target=self.daemon_target, args=self.daemon_args ) p.daemon = True p.start() logging.info("Start subprocess with PID %d", p.pid) Loading tools/edit_monitor/edit_monitor.py +69 −7 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ import multiprocessing.connection import os import pathlib import platform import threading import time from atest.metrics import clearcut_client Loading @@ -31,22 +32,34 @@ from watchdog.observers import Observer # Enum of the Clearcut log source defined under # /google3/wireless/android/play/playlog/proto/log_source_enum.proto LOG_SOURCE = 2524 DEFAULT_FLUSH_INTERVAL_SECONDS = 5 DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100 class ClearcutEventHandler(PatternMatchingEventHandler): def __init__( self, path: str, cclient: clearcut_client.Clearcut | None = None self, path: str, flush_interval_sec: int, single_events_size_threshold: int, cclient: clearcut_client.Clearcut | None = None, ): super().__init__(patterns=["*"], ignore_directories=True) self.root_monitoring_path = path self.flush_interval_sec = flush_interval_sec self.single_events_size_threshold = single_events_size_threshold self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE) self.user_name = getpass.getuser() self.host_name = platform.node() self.source_root = os.environ.get("ANDROID_BUILD_TOP", "") self.pending_events = [] self._scheduled_log_thread = None self._pending_events_lock = threading.Lock() def on_moved(self, event: FileSystemEvent): self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE) Loading @@ -61,6 +74,12 @@ class ClearcutEventHandler(PatternMatchingEventHandler): def flushall(self): logging.info("flushing all pending events.") if self._scheduled_log_thread: logging.info("canceling log thread") self._scheduled_log_thread.cancel() self._scheduled_log_thread = None self._log_clearcut_events() self.cclient.flush_events() def _log_edit_event( Loading Loading @@ -92,12 +111,17 @@ class ClearcutEventHandler(PatternMatchingEventHandler): file_path=event.src_path, edit_type=edit_type ) ) clearcut_log_event = clientanalytics_pb2.LogEvent( event_time_ms=int(event_time * 1000), source_extension=event_proto.SerializeToString(), with self._pending_events_lock: self.pending_events.append((event_proto, event_time)) if not self._scheduled_log_thread: logging.debug( "Scheduling thread to run in %d seconds", self.flush_interval_sec ) self._scheduled_log_thread = threading.Timer( self.flush_interval_sec, self._log_clearcut_events ) self._scheduled_log_thread.start() self.cclient.log(clearcut_log_event) except Exception: logging.exception("Failed to log edit event.") Loading @@ -114,9 +138,46 @@ class ClearcutEventHandler(PatternMatchingEventHandler): for dir in file_path.relative_to(root_path).parents ) def _log_clearcut_events(self): with self._pending_events_lock: self._scheduled_log_thread = None edit_events = self.pending_events self.pending_events = [] pending_events_size = len(edit_events) if pending_events_size > self.single_events_size_threshold: logging.info( "got %d events in %d seconds, sending aggregated events instead", pending_events_size, self.flush_interval_sec, ) aggregated_event_time = edit_events[0][1] aggregated_event_proto = edit_event_pb2.EditEvent( user_name=self.user_name, host_name=self.host_name, source_root=self.source_root, ) aggregated_event_proto.aggregated_edit_event.CopyFrom( edit_event_pb2.EditEvent.AggregatedEditEvent( num_edits=pending_events_size ) ) edit_events = [(aggregated_event_proto, aggregated_event_time)] for event_proto, event_time in edit_events: log_event = clientanalytics_pb2.LogEvent( event_time_ms=int(event_time * 1000), source_extension=event_proto.SerializeToString(), ) self.cclient.log(log_event) logging.info("sent %d edit events", len(edit_events)) def start( path: str, flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS, single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD, cclient: clearcut_client.Clearcut | None = None, pipe_sender: multiprocessing.connection.Connection | None = None, ): Loading @@ -130,7 +191,8 @@ def start( cclient: The clearcut client to send the edit logs. conn: the sender of the pipe to communicate with the deamon manager. """ event_handler = ClearcutEventHandler(path, cclient) event_handler = ClearcutEventHandler( path, flush_interval_sec, single_events_size_threshold, cclient) observer = Observer() logging.info("Starting observer on path %s.", path) Loading tools/edit_monitor/edit_monitor_test.py +38 −2 Original line number Diff line number Diff line Loading @@ -53,7 +53,7 @@ class EditMonitorTest(unittest.TestCase): self.working_dir.cleanup() super().tearDown() def test_log_edit_event_success(self): def test_log_single_edit_event_success(self): # Create the .git file under the monitoring dir. self.root_monitoring_path.joinpath('.git').touch() fake_cclient = FakeClearcutClient( Loading Loading @@ -127,6 +127,42 @@ class EditMonitorTest(unittest.TestCase): ).single_edit_event, ) def test_log_aggregated_edit_event_success(self): # Create the .git file under the monitoring dir. self.root_monitoring_path.joinpath('.git').touch() fake_cclient = FakeClearcutClient( log_output_file=self.log_event_dir.joinpath('logs.output') ) p = self._start_test_edit_monitor_process(fake_cclient) # Create 6 test files for i in range(6): test_file = self.root_monitoring_path.joinpath('test_' + str(i)) test_file.touch() # Give some time for the edit monitor to receive the edit event. time.sleep(1) # Stop the edit monitor and flush all events. os.kill(p.pid, signal.SIGINT) p.join() logged_events = self._get_logged_events() self.assertEqual(len(logged_events), 1) expected_aggregated_edit_event = ( edit_event_pb2.EditEvent.AggregatedEditEvent( num_edits=6, ) ) self.assertEqual( expected_aggregated_edit_event, edit_event_pb2.EditEvent.FromString( logged_events[0].source_extension ).aggregated_edit_event, ) def test_do_not_log_edit_event_for_directory_change(self): # Create the .git file under the monitoring dir. self.root_monitoring_path.joinpath('.git').touch() Loading Loading @@ -217,7 +253,7 @@ class EditMonitorTest(unittest.TestCase): # Start edit monitor in a subprocess. p = multiprocessing.Process( target=edit_monitor.start, args=(str(self.root_monitoring_path.resolve()), cclient, sender), args=(str(self.root_monitoring_path.resolve()), 0.5, 5, cclient, sender), ) p.daemon = True p.start() Loading Loading
tools/edit_monitor/daemon_manager.py +7 −2 Original line number Diff line number Diff line Loading @@ -133,7 +133,11 @@ class DaemonManager: logging.debug("in daemon manager cleanup.") try: if self.daemon_process and self.daemon_process.is_alive(): if self.daemon_process: # The daemon process might already in termination process, # wait some time before kill it explicitly. self._wait_for_process_terminate(self.daemon_process.pid, 1) if self.daemon_process.is_alive(): self._terminate_process(self.daemon_process.pid) self._remove_pidfile() logging.debug("Successfully stopped daemon manager.") Loading Loading @@ -227,6 +231,7 @@ class DaemonManager: p = multiprocessing.Process( target=self.daemon_target, args=self.daemon_args ) p.daemon = True p.start() logging.info("Start subprocess with PID %d", p.pid) Loading
tools/edit_monitor/edit_monitor.py +69 −7 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ import multiprocessing.connection import os import pathlib import platform import threading import time from atest.metrics import clearcut_client Loading @@ -31,22 +32,34 @@ from watchdog.observers import Observer # Enum of the Clearcut log source defined under # /google3/wireless/android/play/playlog/proto/log_source_enum.proto LOG_SOURCE = 2524 DEFAULT_FLUSH_INTERVAL_SECONDS = 5 DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100 class ClearcutEventHandler(PatternMatchingEventHandler): def __init__( self, path: str, cclient: clearcut_client.Clearcut | None = None self, path: str, flush_interval_sec: int, single_events_size_threshold: int, cclient: clearcut_client.Clearcut | None = None, ): super().__init__(patterns=["*"], ignore_directories=True) self.root_monitoring_path = path self.flush_interval_sec = flush_interval_sec self.single_events_size_threshold = single_events_size_threshold self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE) self.user_name = getpass.getuser() self.host_name = platform.node() self.source_root = os.environ.get("ANDROID_BUILD_TOP", "") self.pending_events = [] self._scheduled_log_thread = None self._pending_events_lock = threading.Lock() def on_moved(self, event: FileSystemEvent): self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE) Loading @@ -61,6 +74,12 @@ class ClearcutEventHandler(PatternMatchingEventHandler): def flushall(self): logging.info("flushing all pending events.") if self._scheduled_log_thread: logging.info("canceling log thread") self._scheduled_log_thread.cancel() self._scheduled_log_thread = None self._log_clearcut_events() self.cclient.flush_events() def _log_edit_event( Loading Loading @@ -92,12 +111,17 @@ class ClearcutEventHandler(PatternMatchingEventHandler): file_path=event.src_path, edit_type=edit_type ) ) clearcut_log_event = clientanalytics_pb2.LogEvent( event_time_ms=int(event_time * 1000), source_extension=event_proto.SerializeToString(), with self._pending_events_lock: self.pending_events.append((event_proto, event_time)) if not self._scheduled_log_thread: logging.debug( "Scheduling thread to run in %d seconds", self.flush_interval_sec ) self._scheduled_log_thread = threading.Timer( self.flush_interval_sec, self._log_clearcut_events ) self._scheduled_log_thread.start() self.cclient.log(clearcut_log_event) except Exception: logging.exception("Failed to log edit event.") Loading @@ -114,9 +138,46 @@ class ClearcutEventHandler(PatternMatchingEventHandler): for dir in file_path.relative_to(root_path).parents ) def _log_clearcut_events(self): with self._pending_events_lock: self._scheduled_log_thread = None edit_events = self.pending_events self.pending_events = [] pending_events_size = len(edit_events) if pending_events_size > self.single_events_size_threshold: logging.info( "got %d events in %d seconds, sending aggregated events instead", pending_events_size, self.flush_interval_sec, ) aggregated_event_time = edit_events[0][1] aggregated_event_proto = edit_event_pb2.EditEvent( user_name=self.user_name, host_name=self.host_name, source_root=self.source_root, ) aggregated_event_proto.aggregated_edit_event.CopyFrom( edit_event_pb2.EditEvent.AggregatedEditEvent( num_edits=pending_events_size ) ) edit_events = [(aggregated_event_proto, aggregated_event_time)] for event_proto, event_time in edit_events: log_event = clientanalytics_pb2.LogEvent( event_time_ms=int(event_time * 1000), source_extension=event_proto.SerializeToString(), ) self.cclient.log(log_event) logging.info("sent %d edit events", len(edit_events)) def start( path: str, flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS, single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD, cclient: clearcut_client.Clearcut | None = None, pipe_sender: multiprocessing.connection.Connection | None = None, ): Loading @@ -130,7 +191,8 @@ def start( cclient: The clearcut client to send the edit logs. conn: the sender of the pipe to communicate with the deamon manager. """ event_handler = ClearcutEventHandler(path, cclient) event_handler = ClearcutEventHandler( path, flush_interval_sec, single_events_size_threshold, cclient) observer = Observer() logging.info("Starting observer on path %s.", path) Loading
tools/edit_monitor/edit_monitor_test.py +38 −2 Original line number Diff line number Diff line Loading @@ -53,7 +53,7 @@ class EditMonitorTest(unittest.TestCase): self.working_dir.cleanup() super().tearDown() def test_log_edit_event_success(self): def test_log_single_edit_event_success(self): # Create the .git file under the monitoring dir. self.root_monitoring_path.joinpath('.git').touch() fake_cclient = FakeClearcutClient( Loading Loading @@ -127,6 +127,42 @@ class EditMonitorTest(unittest.TestCase): ).single_edit_event, ) def test_log_aggregated_edit_event_success(self): # Create the .git file under the monitoring dir. self.root_monitoring_path.joinpath('.git').touch() fake_cclient = FakeClearcutClient( log_output_file=self.log_event_dir.joinpath('logs.output') ) p = self._start_test_edit_monitor_process(fake_cclient) # Create 6 test files for i in range(6): test_file = self.root_monitoring_path.joinpath('test_' + str(i)) test_file.touch() # Give some time for the edit monitor to receive the edit event. time.sleep(1) # Stop the edit monitor and flush all events. os.kill(p.pid, signal.SIGINT) p.join() logged_events = self._get_logged_events() self.assertEqual(len(logged_events), 1) expected_aggregated_edit_event = ( edit_event_pb2.EditEvent.AggregatedEditEvent( num_edits=6, ) ) self.assertEqual( expected_aggregated_edit_event, edit_event_pb2.EditEvent.FromString( logged_events[0].source_extension ).aggregated_edit_event, ) def test_do_not_log_edit_event_for_directory_change(self): # Create the .git file under the monitoring dir. self.root_monitoring_path.joinpath('.git').touch() Loading Loading @@ -217,7 +253,7 @@ class EditMonitorTest(unittest.TestCase): # Start edit monitor in a subprocess. p = multiprocessing.Process( target=edit_monitor.start, args=(str(self.root_monitoring_path.resolve()), cclient, sender), args=(str(self.root_monitoring_path.resolve()), 0.5, 5, cclient, sender), ) p.daemon = True p.start() Loading