Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f3fcc9ea authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "[Pandora] Utils to control rootcanal from PTSbot MMIs"

parents 24ce5aa1 d5a38c8d
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ from mmi2grpc.hfp import HFPProxy
from mmi2grpc.hogp import HOGPProxy
from mmi2grpc.sdp import SDPProxy
from mmi2grpc.sm import SMProxy
from mmi2grpc._rootcanal import RootCanal
from mmi2grpc._helpers import format_proxy

from pandora.host_grpc import Host
@@ -69,6 +70,7 @@ class IUT:
        """Resets the IUT when starting a PTS test."""
        # Note: we don't keep a single gRPC channel instance in the IUT class
        # because reset is allowed to close the gRPC server.
        RootCanal().reconnect_phone()
        with grpc.insecure_channel(f'localhost:{self.port}') as channel:
            self._retry(Host(channel).HardReset)(wait_for_ready=True)

+163 −0
Original line number Diff line number Diff line
"""
Copied from tools/rootcanal/scripts/test_channel.py
"""

import socket
from time import sleep


class Connection:

    def __init__(self, port):
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._socket.connect(("localhost", port))

    def close(self):
        self._socket.close()

    def send(self, data):
        self._socket.sendall(data.encode())

    def receive(self, size):
        return self._socket.recv(size)


class TestChannel:

    def __init__(self, port):
        self._connection = Connection(port)
        self._closed = False

    def close(self):
        self._connection.close()
        self._closed = True

    def send_command(self, name, args):
        args = [str(arg) for arg in args]
        name_size = len(name)
        args_size = len(args)
        self.lint_command(name, args, name_size, args_size)
        encoded_name = chr(name_size) + name
        encoded_args = chr(args_size) + "".join(chr(len(arg)) + arg for arg in args)
        command = encoded_name + encoded_args
        if self._closed:
            return
        self._connection.send(command)
        if name != "CLOSE_TEST_CHANNEL":
            return self.receive_response().decode()

    def receive_response(self):
        if self._closed:
            return b"Closed"
        size_chars = self._connection.receive(4)
        if not size_chars:
            return b"No response, assuming that the connection is broken"
        response_size = 0
        for i in range(0, len(size_chars) - 1):
            response_size |= size_chars[i] << (8 * i)
        response = self._connection.receive(response_size)
        return response

    def lint_command(self, name, args, name_size, args_size):
        assert name_size == len(name) and args_size == len(args)
        try:
            name.encode()
            for arg in args:
                arg.encode()
        except UnicodeError:
            print("Unrecognized characters.")
            raise
        if name_size > 255 or args_size > 255:
            raise ValueError  # Size must be encodable in one octet.
        for arg in args:
            if len(arg) > 255:
                raise ValueError  # Size must be encodable in one octet.


class RootCanal:

    def __init__(self):
        # port is CONTROL_ROOTCANAL_PORT defined in tradefed
        self.channel = TestChannel(port=6212)
        self.disconnected_dev_phys = None

        # discard initialization messages
        self.channel.receive_response()

    @staticmethod
    def _parse_device_list(raw):
        # time for some cursed parsing!
        categories = {}
        curr_category = None
        for line in raw.split("\n"):
            line = line.strip()
            if not line:
                continue
            if line[0].isdigit():
                # list entry
                if curr_category is None or ":" not in line:
                    raise Exception("Failed to parse rootcanal device list output")
                curr_category.append(line.split(":", 1)[1])
            else:
                if line.endswith(":"):
                    line = line[:-1]
                curr_category = []
                categories[line] = curr_category
        return categories

    @staticmethod
    def _parse_phy(raw):
        transport, idxs = raw.split(":")
        idxs = [int(x) for x in idxs.split(",") if x.strip()]
        return transport, idxs

    def reconnect_phone(self):
        devices = self.channel.send_command("list", [])
        devices = self._parse_device_list(devices)

        for dev_i, name in enumerate(devices["Devices"]):
            # the default transports are always 0 and 1
            classic_phy = 0
            le_phy = 1
            if "beacon" in name:
                target_phys = [le_phy]
            elif "hci_device" in name:
                target_phys = [classic_phy, le_phy]

            for phy in target_phys:
                if dev_i not in self._parse_phy(devices["Phys"][phy])[1]:
                    self.channel.send_command("add_device_to_phy", [dev_i, phy])

    def disconnect_phy(self):
        # first, list all devices
        devices = self.channel.send_command("list", [])
        devices = self._parse_device_list(devices)
        dev_phys = []

        for phy_i, phy in enumerate(devices["Phys"]):
            _, idxs = self._parse_phy(phy)

            for dev_i in idxs:
                dev_phys.append((dev_i, phy_i))

        # now, disconnect all pairs
        for dev_i, phy_i in dev_phys:
            self.channel.send_command("del_device_from_phy", [dev_i, phy_i])

        devices = self.channel.send_command("list", [])
        devices = self._parse_device_list(devices)

        self.disconnected_dev_phys = dev_phys

    def reconnect_phy_if_needed(self):
        if self.disconnected_dev_phys is not None:
            for dev_i, phy_i in self.disconnected_dev_phys:
                self.channel.send_command("add_device_to_phy", [dev_i, phy_i])

            self.disconnected_dev_phys = None

    def reconnect_phy(self):
        if self.disconnected_dev_phys is None:
            raise Exception("cannot reconnect_phy before disconnect_phy")

        self.reconnect_phy_if_needed()