Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f4ccc6e3 authored by Jack He's avatar Jack He
Browse files

Cert: Wait for root server to start using future

* Wait up to 15 seconds for gRPC root server to start
* Do not use signal port any more
* Also remove signal port usage from topshim tests

Bug: 197895596
Test: gd/cert/run
Tag: #gd-refactor
Change-Id: I97768a7f818f268ce6cc9942c6524ec5b5b93d2c
parent 6411ea06
Loading
Loading
Loading
Loading
+2 −3
Original line number Diff line number Diff line
@@ -130,15 +130,14 @@ class GdDeviceBase(GdDeviceBaseCore):
        :return:
        """
        GdDeviceBaseCore.setup(self)
        # Ensure signal port is available
        # signal port is the only port that always listen on the host machine
        asserts.assert_true(self.signal_port_available, "[%s] Failed to make signal port available" % self.label)

        # Ensure backing process is started and alive
        asserts.assert_true(self.backing_process, msg="Cannot start backing_process at " + " ".join(self.cmd))
        asserts.assert_true(
            self.is_backing_process_alive,
            msg="backing_process stopped immediately after running " + " ".join(self.cmd))
        asserts.assert_true(
            self.grpc_root_server_ready, msg="gRPC root server did not start after running " + " ".join(self.cmd))

    def get_crash_snippet_and_log_tail(self):
        GdDeviceBaseCore.get_crash_snippet_and_log_tail(self)
+2 −3
Original line number Diff line number Diff line
@@ -133,15 +133,14 @@ class GdDeviceBase(GdDeviceBaseCore):
        :return:
        """
        GdDeviceBaseCore.setup(self)
        # Ensure signal port is available
        # signal port is the only port that always listen on the host machine
        asserts.assert_true(self.signal_port_available, "[%s] Failed to make signal port available" % self.label)

        # Ensure backing process is started and alive
        asserts.assert_true(self.backing_process, msg="Cannot start backing_process at " + " ".join(self.cmd))
        asserts.assert_true(
            self.is_backing_process_alive,
            msg="backing_process stopped immediately after running " + " ".join(self.cmd))
        asserts.assert_true(
            self.grpc_root_server_ready, msg="gRPC root server did not start after running " + " ".join(self.cmd))

    def get_crash_snippet_and_log_tail(self):
        GdDeviceBaseCore.get_crash_snippet_and_log_tail(self)
+29 −68
Original line number Diff line number Diff line
@@ -18,10 +18,8 @@ from abc import ABC
import logging
import os
import pathlib
import selectors
import shutil
import signal
import socket
import subprocess

import grpc
@@ -33,7 +31,6 @@ from cert.logging_client_interceptor import LoggingClientInterceptor
from cert.os_utils import get_gd_root
from cert.os_utils import read_crash_snippet_and_log_tail
from cert.os_utils import is_subprocess_alive
from cert.os_utils import make_ports_available
from cert.os_utils import TerminalColor
from facade import rootservice_pb2_grpc as facade_rootservice_pb2_grpc
from hal import hal_facade_pb2_grpc
@@ -54,6 +51,8 @@ from shim.facade import facade_pb2_grpc as shim_facade_pb2_grpc
MOBLY_CONTROLLER_CONFIG_NAME = "GdDevice"
ACTS_CONTROLLER_REFERENCE_NAME = "gd_devices"

GRPC_START_TIMEOUT_SEC = 15


def create_core(configs):
    if not configs:
@@ -141,20 +140,6 @@ class GdDeviceBaseCore(ABC):
        """Core method to set up device for test
        :return:
        """
        self.signal_port_available = make_ports_available([self.signal_port])
        if self.signal_port_available is not True:
            return
        # Start backing process
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as signal_socket:
            # Setup signaling socket
            signal_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            signal_socket.bind(("localhost", self.signal_port))
            # Allow only one incoming connection
            signal_socket.listen(1)
            # Set socket to non-blocking mode and use select() to block for timeout
            signal_socket.settimeout(0)
            signal_socket.setblocking(False)

        # Start backing process
        logging.debug("[%s] Running %s %s" % (self.type_identifier, self.label, " ".join(self.cmd)))
        self.backing_process = subprocess.Popen(
@@ -169,45 +154,9 @@ class GdDeviceBaseCore(ABC):
            return
        self.is_backing_process_alive = is_subprocess_alive(self.backing_process)
        if not self.is_backing_process_alive:
            logging.error("[%s] backing process for %s died after starting" % (self.type_identifier, self.label))
            return

            # Wait for process to be ready
            logging.debug("[%s] Waiting for %s backing_process accept at port %d" % (self.type_identifier, self.label,
                                                                                     self.signal_port))
            selector = selectors.DefaultSelector()
            selector.register(signal_socket, selectors.EVENT_READ)
            # 60 second timeout for backing process to connect to us
            timeout = 60
            ret = selector.select(timeout=timeout)
            selector.unregister(signal_socket)
            if not ret:
                logging.error("[{}] Failed to accept {} backing process connection in {} seconds".format(
                    self.type_identifier, self.label, timeout))
                signal_socket.shutdown(socket.SHUT_RDWR)
                return
            conn, addr = signal_socket.accept()
            conn.setblocking(False)
            with conn:
                logging.debug("[{}] Connected {} by {}".format(self.type_identifier, self.label, addr))
                selector.register(conn, selectors.EVENT_READ)
                # Give 10 seconds for remote to disconnect from us
                timeout = 10
                ret = selector.select(timeout=timeout)
                selector.unregister(conn)
                if not ret:
                    logging.warning("[{}] Failed to disconnect {} backing process in {} seconds".format(
                        self.type_identifier, self.label, timeout))
                else:
                    data = conn.recv(1024)
                    if data:
                        logging.warning("[{}] Received {} data {!r}, but not wanted".format(
                            self.type_identifier, self.label, data))
                    else:
                        logging.debug("[{}] Received EOF to disconnect from {}".format(
                            self.type_identifier, self.label))
                conn.shutdown(socket.SHUT_RDWR)
            signal_socket.shutdown(socket.SHUT_RDWR)

        self.backing_process_logger = AsyncSubprocessLogger(
            self.backing_process, [self.backing_process_log_path],
            log_to_stdout=self.verbose_mode,
@@ -216,6 +165,18 @@ class GdDeviceBaseCore(ABC):

        # Setup gRPC management channels
        self.grpc_root_server_channel = grpc.insecure_channel("localhost:%d" % self.grpc_root_server_port)

        self.grpc_root_server_ready = False
        try:
            logging.info("[%s] Waiting to connect to gRPC root server for %s, timeout is %d seconds" %
                         (self.type_identifier, self.label, GRPC_START_TIMEOUT_SEC))
            grpc.channel_ready_future(self.grpc_root_server_channel).result(timeout=GRPC_START_TIMEOUT_SEC)
            logging.info("[%s] Successfully connected to gRPC root server for %s" % (self.type_identifier, self.label))
            self.grpc_root_server_ready = True
        except grpc.FutureTimeoutError:
            logging.error("[%s] Failed to connect to gRPC root server for %s" % (self.type_identifier, self.label))
            return

        self.grpc_channel = grpc.insecure_channel("localhost:%d" % self.grpc_port)

        if self.verbose_mode:
+0 −16
Original line number Diff line number Diff line
@@ -14,8 +14,6 @@
 * limitations under the License.
 */

#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

@@ -100,14 +98,12 @@ int main(int argc, const char** argv) {

  int root_server_port = 8897;
  int grpc_port = 8899;
  int signal_port = 8895;

  bluetooth::common::InitFlags::SetAllForTesting();

  const std::string arg_grpc_root_server_port = "--root-server-port=";
  const std::string arg_grpc_server_port = "--grpc-port=";
  const std::string arg_rootcanal_port = "--rootcanal-port=";
  const std::string arg_signal_port = "--signal-port=";
  const std::string arg_btsnoop_path = "--btsnoop=";
  const std::string arg_btsnooz_path = "--btsnooz=";
  const std::string arg_btconfig_path = "--btconfig=";
@@ -139,22 +135,10 @@ int main(int argc, const char** argv) {
      auto btconfig_path = arg.substr(arg_btconfig_path.size());
      ::bluetooth::os::ParameterProvider::OverrideConfigFilePath(btconfig_path);
    }
    if (arg.find(arg_signal_port) == 0) {
      auto port_number = arg.substr(arg_signal_port.size());
      signal_port = std::stoi(port_number);
    }
  }

  sigaction(SIGINT, &new_act, &old_act);
  grpc_root_server.StartServer("0.0.0.0", root_server_port, grpc_port);
  int tester_signal_socket = socket(AF_INET, SOCK_STREAM, 0);
  struct sockaddr_in addr;
  memset(&addr, 0, sizeof(addr));
  addr.sin_family = AF_INET;
  addr.sin_port = htons(signal_port);
  addr.sin_addr.s_addr = htonl(INADDR_ANY);
  connect(tester_signal_socket, (sockaddr*)&addr, sizeof(addr));
  close(tester_signal_socket);
  auto wait_thread = std::thread([] { grpc_root_server.RunGrpcLoop(); });
  wait_thread.join();

+0 −9
Original line number Diff line number Diff line
@@ -15,7 +15,6 @@ use futures::stream::StreamExt;
use grpcio::*;
use log::debug;
use nix::sys::signal;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;

@@ -59,7 +58,6 @@ async fn async_main(rt: Arc<Runtime>, mut sigint: mpsc::UnboundedReceiver<()>) {

    let grpc_port = value_t!(matches, "grpc-port", u16).unwrap();
    let _rootcanal_port = value_t!(matches, "rootcanal-port", u16).ok();
    let signal_port = value_t!(matches, "signal-port", u16).ok();
    let env = Arc::new(Environment::new(2));

    let btif_intf = Arc::new(Mutex::new(btif::get_btinterface().unwrap()));
@@ -83,13 +81,6 @@ async fn async_main(rt: Arc<Runtime>, mut sigint: mpsc::UnboundedReceiver<()>) {
        .build()
        .unwrap();
    server.start();
    match TcpStream::connect_timeout(
        &SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), signal_port.unwrap()),
        std::time::Duration::from_secs(2),
    ) {
        Ok(_) => (),
        Err(e) => panic!("Error with connect: {}", e),
    }

    sigint.next().await;
    block_on(server.shutdown()).unwrap();