Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 32559741 authored by Luis Hector Chavez's avatar Luis Hector Chavez
Browse files

adb: Add a test for emulator connection

This should prevent regressions in the future.

Bug: 78991667
Test: python system/core/adb/test_adb.py
Change-Id: I4d6da40da82c6d79797cec82ffaf071d4b56ddc7
parent fbee0a91
Loading
Loading
Loading
Loading
+58 −0
Original line number Diff line number Diff line
@@ -119,6 +119,31 @@ def adb_connect(unittest, serial):
                         stderr=subprocess.PIPE).communicate()


@contextlib.contextmanager
def adb_server():
    """Context manager for an ADB server.

    This creates an ADB server and returns the port it's listening on.
    """

    port = 5038
    # Kill any existing server on this non-default port.
    subprocess.check_output(['adb', '-P', str(port), 'kill-server'],
                            stderr=subprocess.STDOUT)
    read_pipe, write_pipe = os.pipe()
    proc = subprocess.Popen(['adb', '-L', 'tcp:localhost:{}'.format(port),
                             'fork-server', 'server',
                             '--reply-fd', str(write_pipe)])
    try:
        os.close(write_pipe)
        greeting = os.read(read_pipe, 1024)
        assert greeting == 'OK\n', repr(greeting)
        yield port
    finally:
        proc.terminate()
        proc.wait()


class CommandlineTest(unittest.TestCase):
    """Tests for the ADB commandline."""

@@ -310,6 +335,39 @@ class EmulatorTest(unittest.TestCase):
            # reading the response from the adb emu kill command (on Windows).
            self.assertEqual(0, proc.returncode)

    def test_emulator_connect(self):
        """Ensure that the emulator can connect.

        Bug: http://b/78991667
        """
        with adb_server() as server_port:
            with fake_adbd() as port:
                serial = 'emulator-{}'.format(port - 1)
                # Ensure that the emulator is not there.
                try:
                    subprocess.check_output(['adb', '-P', str(server_port),
                                             '-s', serial, 'get-state'],
                                            stderr=subprocess.STDOUT)
                    self.fail('Device should not be available')
                except subprocess.CalledProcessError as err:
                    self.assertEqual(
                        err.output.strip(),
                        'error: device \'{}\' not found'.format(serial))

                # Let the ADB server know that the emulator has started.
                with contextlib.closing(
                    socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
                    sock.connect(('localhost', server_port))
                    command = 'host:emulator:{}'.format(port)
                    sock.sendall('%04x%s' % (len(command), command))

                # Ensure the emulator is there.
                subprocess.check_call(['adb', '-P', str(server_port),
                                       '-s', serial, 'wait-for-device'])
                output = subprocess.check_output(['adb', '-P', str(server_port),
                                                  '-s', serial, 'get-state'])
                self.assertEqual(output.strip(), 'device')


class ConnectionTest(unittest.TestCase):
    """Tests for adb connect."""