Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 8a225796 authored by Zhuoyao Zhang's avatar Zhuoyao Zhang
Browse files

Performance optimization for edit monitor

Instead of log every edit event immediately when received the event,
cache the events and log the cached events in batch periodically. In
case when there are many edits events recieved in a short time (probably
due to non-human operation like repo sync), send an aggregated edit
event instead to prevent performance degrade.

Test: atest edit_monitor_test
Bug: 365617369
Change-Id: Ibe1613cf1e2eb37ebc5dfa5c029b990854fcf91e
parent 35bd3d27
Loading
Loading
Loading
Loading
+7 −2
Original line number Diff line number Diff line
@@ -133,7 +133,11 @@ class DaemonManager:

    logging.debug("in daemon manager cleanup.")
    try:
      if self.daemon_process and self.daemon_process.is_alive():
      if self.daemon_process:
        # The daemon process might already in termination process,
        # wait some time before kill it explicitly.
        self._wait_for_process_terminate(self.daemon_process.pid, 1)
        if self.daemon_process.is_alive():
          self._terminate_process(self.daemon_process.pid)
      self._remove_pidfile()
      logging.debug("Successfully stopped daemon manager.")
@@ -227,6 +231,7 @@ class DaemonManager:
    p = multiprocessing.Process(
        target=self.daemon_target, args=self.daemon_args
    )
    p.daemon = True
    p.start()

    logging.info("Start subprocess with PID %d", p.pid)
+69 −7
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ import multiprocessing.connection
import os
import pathlib
import platform
import threading
import time

from atest.metrics import clearcut_client
@@ -31,22 +32,34 @@ from watchdog.observers import Observer
# Enum of the Clearcut log source defined under
# /google3/wireless/android/play/playlog/proto/log_source_enum.proto
LOG_SOURCE = 2524
DEFAULT_FLUSH_INTERVAL_SECONDS = 5
DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD = 100


class ClearcutEventHandler(PatternMatchingEventHandler):

  def __init__(
      self, path: str, cclient: clearcut_client.Clearcut | None = None
      self,
      path: str,
      flush_interval_sec: int,
      single_events_size_threshold: int,
      cclient: clearcut_client.Clearcut | None = None,
  ):

    super().__init__(patterns=["*"], ignore_directories=True)
    self.root_monitoring_path = path
    self.flush_interval_sec = flush_interval_sec
    self.single_events_size_threshold = single_events_size_threshold
    self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE)

    self.user_name = getpass.getuser()
    self.host_name = platform.node()
    self.source_root = os.environ.get("ANDROID_BUILD_TOP", "")

    self.pending_events = []
    self._scheduled_log_thread = None
    self._pending_events_lock = threading.Lock()

  def on_moved(self, event: FileSystemEvent):
    self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE)

@@ -61,6 +74,12 @@ class ClearcutEventHandler(PatternMatchingEventHandler):

  def flushall(self):
    logging.info("flushing all pending events.")
    if self._scheduled_log_thread:
      logging.info("canceling log thread")
      self._scheduled_log_thread.cancel()
      self._scheduled_log_thread = None

    self._log_clearcut_events()
    self.cclient.flush_events()

  def _log_edit_event(
@@ -92,12 +111,17 @@ class ClearcutEventHandler(PatternMatchingEventHandler):
              file_path=event.src_path, edit_type=edit_type
          )
      )
      clearcut_log_event = clientanalytics_pb2.LogEvent(
          event_time_ms=int(event_time * 1000),
          source_extension=event_proto.SerializeToString(),
      with self._pending_events_lock:
        self.pending_events.append((event_proto, event_time))
        if not self._scheduled_log_thread:
          logging.debug(
              "Scheduling thread to run in %d seconds", self.flush_interval_sec
          )
          self._scheduled_log_thread = threading.Timer(
              self.flush_interval_sec, self._log_clearcut_events
          )
          self._scheduled_log_thread.start()

      self.cclient.log(clearcut_log_event)
    except Exception:
      logging.exception("Failed to log edit event.")

@@ -114,9 +138,46 @@ class ClearcutEventHandler(PatternMatchingEventHandler):
        for dir in file_path.relative_to(root_path).parents
    )

  def _log_clearcut_events(self):
    with self._pending_events_lock:
      self._scheduled_log_thread = None
      edit_events = self.pending_events
      self.pending_events = []

    pending_events_size = len(edit_events)
    if pending_events_size > self.single_events_size_threshold:
      logging.info(
          "got %d events in %d seconds, sending aggregated events instead",
          pending_events_size,
          self.flush_interval_sec,
      )
      aggregated_event_time = edit_events[0][1]
      aggregated_event_proto = edit_event_pb2.EditEvent(
          user_name=self.user_name,
          host_name=self.host_name,
          source_root=self.source_root,
      )
      aggregated_event_proto.aggregated_edit_event.CopyFrom(
          edit_event_pb2.EditEvent.AggregatedEditEvent(
              num_edits=pending_events_size
          )
      )
      edit_events = [(aggregated_event_proto, aggregated_event_time)]

    for event_proto, event_time in edit_events:
      log_event = clientanalytics_pb2.LogEvent(
          event_time_ms=int(event_time * 1000),
          source_extension=event_proto.SerializeToString(),
      )
      self.cclient.log(log_event)

    logging.info("sent %d edit events", len(edit_events))


def start(
    path: str,
    flush_interval_sec: int = DEFAULT_FLUSH_INTERVAL_SECONDS,
    single_events_size_threshold: int = DEFAULT_SINGLE_EVENTS_SIZE_THRESHOLD,
    cclient: clearcut_client.Clearcut | None = None,
    pipe_sender: multiprocessing.connection.Connection | None = None,
):
@@ -130,7 +191,8 @@ def start(
    cclient: The clearcut client to send the edit logs.
    conn: the sender of the pipe to communicate with the deamon manager.
  """
  event_handler = ClearcutEventHandler(path, cclient)
  event_handler = ClearcutEventHandler(
      path, flush_interval_sec, single_events_size_threshold, cclient)
  observer = Observer()

  logging.info("Starting observer on path %s.", path)
+38 −2
Original line number Diff line number Diff line
@@ -53,7 +53,7 @@ class EditMonitorTest(unittest.TestCase):
    self.working_dir.cleanup()
    super().tearDown()

  def test_log_edit_event_success(self):
  def test_log_single_edit_event_success(self):
    # Create the .git file under the monitoring dir.
    self.root_monitoring_path.joinpath('.git').touch()
    fake_cclient = FakeClearcutClient(
@@ -127,6 +127,42 @@ class EditMonitorTest(unittest.TestCase):
        ).single_edit_event,
    )


  def test_log_aggregated_edit_event_success(self):
    # Create the .git file under the monitoring dir.
    self.root_monitoring_path.joinpath('.git').touch()
    fake_cclient = FakeClearcutClient(
        log_output_file=self.log_event_dir.joinpath('logs.output')
    )
    p = self._start_test_edit_monitor_process(fake_cclient)

    # Create 6 test files
    for i in range(6):
      test_file = self.root_monitoring_path.joinpath('test_' + str(i))
      test_file.touch()

    # Give some time for the edit monitor to receive the edit event.
    time.sleep(1)
    # Stop the edit monitor and flush all events.
    os.kill(p.pid, signal.SIGINT)
    p.join()

    logged_events = self._get_logged_events()
    self.assertEqual(len(logged_events), 1)

    expected_aggregated_edit_event = (
        edit_event_pb2.EditEvent.AggregatedEditEvent(
            num_edits=6,
        )
    )

    self.assertEqual(
        expected_aggregated_edit_event,
        edit_event_pb2.EditEvent.FromString(
            logged_events[0].source_extension
        ).aggregated_edit_event,
    )

  def test_do_not_log_edit_event_for_directory_change(self):
    # Create the .git file under the monitoring dir.
    self.root_monitoring_path.joinpath('.git').touch()
@@ -217,7 +253,7 @@ class EditMonitorTest(unittest.TestCase):
    # Start edit monitor in a subprocess.
    p = multiprocessing.Process(
        target=edit_monitor.start,
        args=(str(self.root_monitoring_path.resolve()), cclient, sender),
        args=(str(self.root_monitoring_path.resolve()), 0.5, 5, cclient, sender),
    )
    p.daemon = True
    p.start()