Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit b390d8e7 authored by Henri Chataing's avatar Henri Chataing Committed by Gerrit Code Review
Browse files

Merge "Pandora: Refactor the helpers for disconnecting and reconnecting...

Merge "Pandora: Refactor the helpers for disconnecting and reconnecting rootcanal devices" into main
parents bb6c315e 54574b59
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -93,7 +93,7 @@ class IUT:
    def __enter__(self):
        """Resets the IUT when starting a PTS test."""
        self.rootcanal = RootCanal(port=self.rootcanal_control_port)
        self.rootcanal.reconnect_phone()
        self.rootcanal.move_in_range()

        self.modem = Modem(port=self.modem_simulator_port)

+49 −80
Original line number Diff line number Diff line
@@ -86,86 +86,55 @@ class RootCanal:
    def close(self):
        self.channel.close()

    @staticmethod
    def _parse_device_list(raw):
        # time for some cursed parsing!
        categories = {}
        curr_category = None
        for line in raw.split("\n"):
    def move_out_of_range(self):
        """Space out the connected devices to generate a supervision
        timeout for all existing connections."""
        # Disconnect all devices from all phys.
        (devices, phys) = self._read_device_list()
        for (device_id, _) in devices:
            for (phy_id, _, phy_devices) in phys:
                if device_id in phy_devices:
                    self.channel.send_command("del_device_from_phy", [device_id, phy_id])

    def move_in_range(self):
        """Move the connected devices to the same point to ensure
        the reconnection of previous links."""
        # Reconnect all devices to all phys.
        # Beacons are only added back to LE phys.
        (devices, phys) = self._read_device_list()
        for (device_id, device_name) in devices:
            target_phys = ["LOW_ENERGY"]
            if device_name.startswith("hci_device"):
                target_phys.append("BR_EDR")

            for (phy_id, phy_name, phy_devices) in phys:
                if phy_name in target_phys and not device_id in phy_devices:
                    self.channel.send_command("add_device_to_phy", [device_id, phy_id])

    def _read_device_list(self):
        """Query the list of connected devices."""
        response = self.channel.send_command("list", [])

        devices = []
        phys = []
        category = None

        for line in response.split("\n"):
            line = line.strip()
            if not line:
                continue
            if line[0].isdigit():
                # list entry
                if curr_category is None or ":" not in line:
                    raise Exception("Failed to parse rootcanal device list output")
                curr_category.append(line.split(":", 1)[1])
            else:
                if line.endswith(":"):
                    line = line[:-1]
                curr_category = []
                categories[line] = curr_category
        return categories

    @staticmethod
    def _parse_phy(raw):
        transport, idxs = raw.split(":")
        idxs = [int(x) for x in idxs.split(",") if x.strip()]
        return transport, idxs

    def reconnect_phone(self):
        raw_devices = None
        try:
            raw_devices = self.channel.send_command("list", [])
            devices = self._parse_device_list(raw_devices)

            for dev_i, name in enumerate(devices["Devices"]):
                # the default transports are always 0 and 1
                classic_phy = 0
                le_phy = 1
                if "beacon" in name:
                    target_phys = [le_phy]
                elif "hci_device" in name:
                    target_phys = [classic_phy, le_phy]
                else:
                    target_phys = []

                for phy in target_phys:
                    if dev_i not in self._parse_phy(devices["Phys"][phy])[1]:
                        self.channel.send_command("add_device_to_phy", [dev_i, phy])
        except Exception as e:
            print(raw_devices, e)

    def disconnect_phy(self):
        # first, list all devices
        devices = self.channel.send_command("list", [])
        devices = self._parse_device_list(devices)
        dev_phys = []

        for phy_i, phy in enumerate(devices["Phys"]):
            _, idxs = self._parse_phy(phy)

            for dev_i in idxs:
                dev_phys.append((dev_i, phy_i))

        # now, disconnect all pairs
        for dev_i, phy_i in dev_phys:
            self.channel.send_command("del_device_from_phy", [dev_i, phy_i])

        devices = self.channel.send_command("list", [])
        devices = self._parse_device_list(devices)

        self.disconnected_dev_phys = dev_phys

    def reconnect_phy_if_needed(self):
        if self.disconnected_dev_phys is not None:
            for dev_i, phy_i in self.disconnected_dev_phys:
                self.channel.send_command("add_device_to_phy", [dev_i, phy_i])

            self.disconnected_dev_phys = None

    def reconnect_phy(self):
        if self.disconnected_dev_phys is None:
            raise Exception("cannot reconnect_phy before disconnect_phy")

        self.reconnect_phy_if_needed()
            if line.startswith("Devices") or line.startswith("Phys"):
                category = line.split(":")[0]
            elif category == "Devices":
                parts = line.split(":")
                device_id = int(parts[0])
                device_name = parts[1]
                devices.append((device_id, device_name))
            elif category == "Phys":
                parts = line.split(":")
                phy_id = int(parts[0])
                phy_name = parts[1]
                phy_devices = [int(id.strip()) for id in parts[2].split(",") if id.strip()]
                phys.append((phy_id, phy_name, phy_devices))

        return (devices, phys)
+4 −3
Original line number Diff line number Diff line
@@ -21,7 +21,6 @@ from grpc import RpcError
from mmi2grpc._audio import AudioSignal
from mmi2grpc._helpers import assert_description, match_description
from mmi2grpc._proxy import ProfileProxy
from mmi2grpc._rootcanal import RootCanal
from pandora.a2dp_grpc import A2DP
from pandora.a2dp_pb2 import Sink, Source, PlaybackAudioRequest
from pandora.host_grpc import Host
@@ -164,7 +163,8 @@ class A2DPProxy(ProfileProxy):
        Action: This
        can be also be done by placing the IUT or PTS in an RF shielded box.
         """
        self.rootcanal.disconnect_phy()

        self.rootcanal.move_out_of_range()

        return "OK"

@@ -396,7 +396,8 @@ class A2DPProxy(ProfileProxy):
        Action: Press OK when the
        IUT is ready to accept Bluetooth connections again.
        """
        self.rootcanal.reconnect_phy_if_needed()

        self.rootcanal.move_in_range()

        return "OK"

+2 −2
Original line number Diff line number Diff line
@@ -868,7 +868,7 @@ class HFPProxy(ProfileProxy):

        def shield_iut_or_pts():
            time.sleep(2)
            self.rootcanal.disconnect_phy()
            self.rootcanal.move_out_of_range()

        threading.Thread(target=shield_iut_or_pts).start()

@@ -884,7 +884,7 @@ class HFPProxy(ProfileProxy):

        def shield_open():
            time.sleep(2)
            self.rootcanal.reconnect_phy_if_needed()
            self.rootcanal.move_in_range()

        threading.Thread(target=shield_open).start()

+6 −7
Original line number Diff line number Diff line
@@ -6,7 +6,6 @@ from mmi2grpc._proxy import ProfileProxy
from pandora_experimental.hid_grpc import HID
from pandora.host_grpc import Host
from pandora_experimental.hid_pb2 import HID_REPORT_TYPE_OUTPUT
from mmi2grpc._rootcanal import RootCanal


class HIDProxy(ProfileProxy):
@@ -25,7 +24,7 @@ class HIDProxy(ProfileProxy):
        PTS.
        """

        self.rootcanal.reconnect_phy_if_needed()
        self.rootcanal.move_in_range()
        self.connection = self.host.Connect(address=pts_addr).connection

        return "OK"
@@ -46,7 +45,7 @@ class HIDProxy(ProfileProxy):
        # Performing out of range action
        def disconnect():
            sleep(2)
            self.rootcanal.disconnect_phy()
            self.rootcanal.move_out_of_range()

        Thread(target=disconnect).start()

@@ -74,7 +73,7 @@ class HIDProxy(ProfileProxy):
        Please prepare the IUT to accept connection from PTS and then click OK.
        """

        self.rootcanal.reconnect_phy_if_needed()
        self.rootcanal.move_in_range()

        return "OK"

@@ -84,7 +83,7 @@ class HIDProxy(ProfileProxy):
        Make the Implementation Under Test (IUT) connectable, then click Ok.
        """

        self.rootcanal.reconnect_phy_if_needed()
        self.rootcanal.move_in_range()

        return "OK"

@@ -167,7 +166,7 @@ class HIDProxy(ProfileProxy):

        def disconnect():
            sleep(2)
            self.rootcanal.disconnect_phy()
            self.rootcanal.move_out_of_range()

        Thread(target=disconnect).start()

@@ -182,7 +181,7 @@ class HIDProxy(ProfileProxy):

        def connect():
            sleep(1)
            self.rootcanal.reconnect_phy_if_needed()
            self.rootcanal.move_in_range()
            self.connection = self.host.Connect(address=pts_addr).connection

        Thread(target=connect).start()