Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit bae5f72a authored by Zhuoyao Zhang's avatar Zhuoyao Zhang
Browse files

Add the basic edit monitor logic

The edit monitor uses the watchdog lib to montior any file changes
(create, modify, move, delete) under
a specific directory, create a edit log based on the file change event
and send the edit log to Sawmill through clearcut.

Test: Atest edit_monitor_test
Bug: 365617369
Change-Id: Ib6e04a43e4dd78ca58c7ccdef56fb4f6bda6e7a5
parent 4a1b00e0
Loading
Loading
Loading
Loading
+21 −0
Original line number Diff line number Diff line
@@ -35,6 +35,12 @@ python_library_host {
    pkg_path: "edit_monitor",
    srcs: [
        "daemon_manager.py",
        "edit_monitor.py",
    ],
    libs: [
        "asuite_cc_client",
        "edit_event_proto",
        "watchdog",
    ],
}

@@ -53,6 +59,21 @@ python_test_host {
    },
}

python_test_host {
    name: "edit_monitor_test",
    main: "edit_monitor_test.py",
    pkg_path: "edit_monitor",
    srcs: [
        "edit_monitor_test.py",
    ],
    libs: [
        "edit_monitor_lib",
    ],
    test_options: {
        unit_test: true,
    },
}

python_binary_host {
    name: "edit_monitor",
    pkg_path: "edit_monitor",
+125 −0
Original line number Diff line number Diff line
# Copyright 2024, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import getpass
import logging
import multiprocessing.connection
import os
import platform
import time

from atest.metrics import clearcut_client
from atest.proto import clientanalytics_pb2
from proto import edit_event_pb2
from watchdog.events import FileSystemEvent
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer

# Enum of the Clearcut log source defined under
# /google3/wireless/android/play/playlog/proto/log_source_enum.proto
LOG_SOURCE = 2524


class ClearcutEventHandler(PatternMatchingEventHandler):

  def __init__(
      self, path: str, cclient: clearcut_client.Clearcut | None = None
  ):

    super().__init__(patterns=["*"], ignore_directories=True)
    self.root_monitoring_path = path
    self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE)

    self.user_name = getpass.getuser()
    self.host_name = platform.node()
    self.source_root = os.environ.get("ANDROID_BUILD_TOP", "")

  def on_moved(self, event: FileSystemEvent):
    self._log_edit_event(event, edit_event_pb2.EditEvent.MOVE)

  def on_created(self, event: FileSystemEvent):
    self._log_edit_event(event, edit_event_pb2.EditEvent.CREATE)

  def on_deleted(self, event: FileSystemEvent):
    self._log_edit_event(event, edit_event_pb2.EditEvent.DELETE)

  def on_modified(self, event: FileSystemEvent):
    self._log_edit_event(event, edit_event_pb2.EditEvent.MODIFY)

  def flushall(self):
    logging.info("flushing all pending events.")
    self.cclient.flush_events()

  def _log_edit_event(
      self, event: FileSystemEvent, edit_type: edit_event_pb2.EditEvent.EditType
  ):
    event_time = time.time()

    logging.info("%s: %s", event.event_type, event.src_path)
    try:
      event_proto = edit_event_pb2.EditEvent(
          user_name=self.user_name,
          host_name=self.host_name,
          source_root=self.source_root,
      )
      event_proto.single_edit_event.CopyFrom(
          edit_event_pb2.EditEvent.SingleEditEvent(
              file_path=event.src_path, edit_type=edit_type
          )
      )
      clearcut_log_event = clientanalytics_pb2.LogEvent(
          event_time_ms=int(event_time * 1000),
          source_extension=event_proto.SerializeToString(),
      )

      self.cclient.log(clearcut_log_event)
    except Exception:
      logging.exception("Failed to log edit event.")


def start(
    path: str,
    cclient: clearcut_client.Clearcut | None = None,
    pipe_sender: multiprocessing.connection.Connection | None = None,
):
  """Method to start the edit monitor.

  This is the entry point to start the edit monitor as a subprocess of
  the daemon manager.

  params:
    path: The root path to monitor
    cclient: The clearcut client to send the edit logs.
    conn: the sender of the pipe to communicate with the deamon manager.
  """
  event_handler = ClearcutEventHandler(path, cclient)
  observer = Observer()

  logging.info("Starting observer on path %s.", path)
  observer.schedule(event_handler, path, recursive=True)
  observer.start()
  logging.info("Observer started.")
  if pipe_sender:
    pipe_sender.send("Observer started.")

  try:
    while True:
      time.sleep(1)
  finally:
    event_handler.flushall()
    observer.stop()
    observer.join()
    if pipe_sender:
      pipe_sender.close()
+198 −0
Original line number Diff line number Diff line
# Copyright 2024, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Unittests for Edit Monitor."""

import logging
import multiprocessing
import os
import pathlib
import signal
import sys
import tempfile
import time
import unittest

from atest.proto import clientanalytics_pb2
from edit_monitor import edit_monitor
from proto import edit_event_pb2


class EditMonitorTest(unittest.TestCase):

  @classmethod
  def setUpClass(cls):
    super().setUpClass()
    # Configure to print logging to stdout.
    logging.basicConfig(filename=None, level=logging.DEBUG)
    console = logging.StreamHandler(sys.stdout)
    logging.getLogger('').addHandler(console)

  def setUp(self):
    super().setUp()
    self.working_dir = tempfile.TemporaryDirectory()
    self.root_monitoring_path = pathlib.Path(self.working_dir.name).joinpath(
        'files'
    )
    self.root_monitoring_path.mkdir()
    self.log_event_dir = pathlib.Path(self.working_dir.name).joinpath('logs')
    self.log_event_dir.mkdir()

  def tearDown(self):
    self.working_dir.cleanup()
    super().tearDown()

  def test_log_edit_event_success(self):
    fake_cclient = FakeClearcutClient(
        log_output_file=self.log_event_dir.joinpath('logs.output')
    )
    p = self._start_test_edit_monitor_process(fake_cclient)

    # Create and modify a file.
    test_file = self.root_monitoring_path.joinpath('test.txt')
    with open(test_file, 'w') as f:
      f.write('something')
    # Move the file.
    test_file_moved = self.root_monitoring_path.joinpath('new_test.txt')
    test_file.rename(test_file_moved)
    # Delete the file.
    test_file_moved.unlink()
    # Give some time for the edit monitor to receive the edit event.
    time.sleep(1)
    # Stop the edit monitor and flush all events.
    os.kill(p.pid, signal.SIGINT)
    p.join()

    logged_events = self._get_logged_events()
    self.assertEqual(len(logged_events), 4)
    expected_create_event = edit_event_pb2.EditEvent.SingleEditEvent(
        file_path=str(
            self.root_monitoring_path.joinpath('test.txt').resolve()
        ),
        edit_type=edit_event_pb2.EditEvent.CREATE,
    )
    expected_modify_event = edit_event_pb2.EditEvent.SingleEditEvent(
        file_path=str(
            self.root_monitoring_path.joinpath('test.txt').resolve()
        ),
        edit_type=edit_event_pb2.EditEvent.MODIFY,
    )
    expected_move_event = edit_event_pb2.EditEvent.SingleEditEvent(
        file_path=str(
            self.root_monitoring_path.joinpath('test.txt').resolve()
        ),
        edit_type=edit_event_pb2.EditEvent.MOVE,
    )
    expected_delete_event = edit_event_pb2.EditEvent.SingleEditEvent(
        file_path=str(
            self.root_monitoring_path.joinpath('new_test.txt').resolve()
        ),
        edit_type=edit_event_pb2.EditEvent.DELETE,
    )
    self.assertEqual(
        expected_create_event,
        edit_event_pb2.EditEvent.FromString(
            logged_events[0].source_extension
        ).single_edit_event,
    )
    self.assertEqual(
        expected_modify_event,
        edit_event_pb2.EditEvent.FromString(
            logged_events[1].source_extension
        ).single_edit_event,
    )
    self.assertEqual(
        expected_move_event,
        edit_event_pb2.EditEvent.FromString(
            logged_events[2].source_extension
        ).single_edit_event,
    )
    self.assertEqual(
        expected_delete_event,
        edit_event_pb2.EditEvent.FromString(
            logged_events[3].source_extension
        ).single_edit_event,
    )

  def test_log_edit_event_fail(self):
    fake_cclient = FakeClearcutClient(
        log_output_file=self.log_event_dir.joinpath('logs.output'),
        raise_log_exception=True,
    )
    p = self._start_test_edit_monitor_process(fake_cclient)

    # Create a file.
    self.root_monitoring_path.joinpath('test.txt').touch()
    # Give some time for the edit monitor to receive the edit event.
    time.sleep(1)
    # Stop the edit monitor and flush all events.
    os.kill(p.pid, signal.SIGINT)
    p.join()

    logged_events = self._get_logged_events()
    self.assertEqual(len(logged_events), 0)

  def _start_test_edit_monitor_process(
      self, cclient
  ) -> multiprocessing.Process:
    receiver, sender = multiprocessing.Pipe()
    # Start edit monitor in a subprocess.
    p = multiprocessing.Process(
        target=edit_monitor.start,
        args=(str(self.root_monitoring_path.resolve()), cclient, sender),
    )
    p.daemon = True
    p.start()

    # Wait until observer started.
    received_data = receiver.recv()
    self.assertEquals(received_data, 'Observer started.')

    receiver.close()
    return p

  def _get_logged_events(self):
    with open(self.log_event_dir.joinpath('logs.output'), 'rb') as f:
      data = f.read()

    return [
        clientanalytics_pb2.LogEvent.FromString(record)
        for record in data.split(b'\x00')
        if record
    ]


class FakeClearcutClient:

  def __init__(self, log_output_file, raise_log_exception=False):
    self.pending_log_events = []
    self.raise_log_exception = raise_log_exception
    self.log_output_file = log_output_file

  def log(self, log_event):
    if self.raise_log_exception:
      raise Exception('unknown exception')
    self.pending_log_events.append(log_event)

  def flush_events(self):
    delimiter = b'\x00'  # Use a null byte as the delimiter
    with open(self.log_output_file, 'wb') as f:
      for log_event in self.pending_log_events:
        f.write(log_event.SerializeToString() + delimiter)

    self.pending_log_events.clear()


if __name__ == '__main__':
  unittest.main()