Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c251ec55 authored by Josh Gao's avatar Josh Gao
Browse files

adb: don't abort when connecting to the same address twice.

When connecting to an address, we construct a transport first, and then
check whether we've already connected to that address. The consequent
destruction of the BlockingConnectionAdapter attempts to join threads
that haven't been started, which aborts.

Make it safe to destruct a BlockingConnectionAdapter without calling
Start on it first, to solve this.

Bug: http://b/69137547
Test: nc -l 12345 & (adb connect localhost:12345; adb connect localhost:12345)
Test: python test_adb.py
Change-Id: I6cb968a62dbac6332907e06575893d764905ee62
parent a10d40e2
Loading
Loading
Loading
Loading
+20 −3
Original line number Diff line number Diff line
@@ -162,15 +162,14 @@ class NonApiTest(unittest.TestCase):

        Bug: https://code.google.com/p/android/issues/detail?id=21021
        """
        port = 12345

        with contextlib.closing(
                socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener:
            # Use SO_REUSEADDR so subsequent runs of the test can grab the port
            # even if it is in TIME_WAIT.
            listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            listener.bind(('127.0.0.1', port))
            listener.bind(('127.0.0.1', 0))
            listener.listen(4)
            port = listener.getsockname()[1]

            # Now that listening has started, start adb emu kill, telling it to
            # connect to our mock emulator.
@@ -233,6 +232,24 @@ class NonApiTest(unittest.TestCase):
                output.strip(), 'connected to localhost:{}'.format(port))
            s.close()

    def test_already_connected(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind(('127.0.0.1', 0))
        s.listen(2)

        port = s.getsockname()[1]
        output = subprocess.check_output(
            ['adb', 'connect', 'localhost:{}'.format(port)])

        self.assertEqual(
            output.strip(), 'connected to localhost:{}'.format(port))

        # b/31250450: this always returns 0 but probably shouldn't.
        output = subprocess.check_output(
            ['adb', 'connect', 'localhost:{}'.format(port)])

        self.assertEqual(
            output.strip(), 'already connected to localhost:{}'.format(port))

def main():
    random.seed(0)
+43 −12
Original line number Diff line number Diff line
@@ -77,7 +77,15 @@ BlockingConnectionAdapter::~BlockingConnectionAdapter() {
    Stop();
}

static void AssumeLocked(std::mutex& mutex) ASSERT_CAPABILITY(mutex) {}

void BlockingConnectionAdapter::Start() {
    std::lock_guard<std::mutex> lock(mutex_);
    if (started_) {
        LOG(FATAL) << "BlockingConnectionAdapter(" << this->transport_name_
                   << "): started multiple times";
    }

    read_thread_ = std::thread([this]() {
        LOG(INFO) << this->transport_name_ << ": read thread spawning";
        while (true) {
@@ -95,7 +103,11 @@ void BlockingConnectionAdapter::Start() {
        LOG(INFO) << this->transport_name_ << ": write thread spawning";
        while (true) {
            std::unique_lock<std::mutex> lock(mutex_);
            cv_.wait(lock, [this]() { return this->stopped_ || !this->write_queue_.empty(); });
            cv_.wait(lock, [this]() REQUIRES(mutex_) {
                return this->stopped_ || !this->write_queue_.empty();
            });

            AssumeLocked(mutex_);

            if (this->stopped_) {
                return;
@@ -111,25 +123,44 @@ void BlockingConnectionAdapter::Start() {
        }
        std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "write failed"); });
    });

    started_ = true;
}

void BlockingConnectionAdapter::Stop() {
    std::unique_lock<std::mutex> lock(mutex_);
    {
        std::lock_guard<std::mutex> lock(mutex_);
        if (!started_) {
            LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): not started";
            return;
        }

        if (stopped_) {
        LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): already stopped";
            LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_
                      << "): already stopped";
            return;
        }

        stopped_ = true;
    lock.unlock();
    }

    LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopping";

    this->underlying_->Close();

    this->cv_.notify_one();
    read_thread_.join();
    write_thread_.join();

    // Move the threads out into locals with the lock taken, and then unlock to let them exit.
    std::thread read_thread;
    std::thread write_thread;

    {
        std::lock_guard<std::mutex> lock(mutex_);
        read_thread = std::move(read_thread_);
        write_thread = std::move(write_thread_);
    }

    read_thread.join();
    write_thread.join();

    LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopped";
    std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "requested stop"); });
@@ -137,7 +168,7 @@ void BlockingConnectionAdapter::Stop() {

bool BlockingConnectionAdapter::Write(std::unique_ptr<apacket> packet) {
    {
        std::unique_lock<std::mutex> lock(this->mutex_);
        std::lock_guard<std::mutex> lock(this->mutex_);
        write_queue_.emplace_back(std::move(packet));
    }

+6 −4
Original line number Diff line number Diff line
@@ -30,6 +30,7 @@
#include <thread>
#include <unordered_set>

#include <android-base/thread_annotations.h>
#include <openssl/rsa.h>

#include "adb.h"
@@ -121,13 +122,14 @@ struct BlockingConnectionAdapter : public Connection {
    virtual void Start() override final;
    virtual void Stop() override final;

    bool stopped_ = false;
    bool started_ GUARDED_BY(mutex_) = false;
    bool stopped_ GUARDED_BY(mutex_) = false;

    std::unique_ptr<BlockingConnection> underlying_;
    std::thread read_thread_;
    std::thread write_thread_;
    std::thread read_thread_ GUARDED_BY(mutex_);
    std::thread write_thread_ GUARDED_BY(mutex_);

    std::deque<std::unique_ptr<apacket>> write_queue_;
    std::deque<std::unique_ptr<apacket>> write_queue_ GUARDED_BY(mutex_);
    std::mutex mutex_;
    std::condition_variable cv_;