Loading adb/test_adb.py +20 −3 Original line number Diff line number Diff line Loading @@ -162,15 +162,14 @@ class NonApiTest(unittest.TestCase): Bug: https://code.google.com/p/android/issues/detail?id=21021 """ port = 12345 with contextlib.closing( socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener: # Use SO_REUSEADDR so subsequent runs of the test can grab the port # even if it is in TIME_WAIT. listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listener.bind(('127.0.0.1', port)) listener.bind(('127.0.0.1', 0)) listener.listen(4) port = listener.getsockname()[1] # Now that listening has started, start adb emu kill, telling it to # connect to our mock emulator. Loading Loading @@ -233,6 +232,24 @@ class NonApiTest(unittest.TestCase): output.strip(), 'connected to localhost:{}'.format(port)) s.close() def test_already_connected(self): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('127.0.0.1', 0)) s.listen(2) port = s.getsockname()[1] output = subprocess.check_output( ['adb', 'connect', 'localhost:{}'.format(port)]) self.assertEqual( output.strip(), 'connected to localhost:{}'.format(port)) # b/31250450: this always returns 0 but probably shouldn't. output = subprocess.check_output( ['adb', 'connect', 'localhost:{}'.format(port)]) self.assertEqual( output.strip(), 'already connected to localhost:{}'.format(port)) def main(): random.seed(0) Loading adb/transport.cpp +43 −12 Original line number Diff line number Diff line Loading @@ -77,7 +77,15 @@ BlockingConnectionAdapter::~BlockingConnectionAdapter() { Stop(); } static void AssumeLocked(std::mutex& mutex) ASSERT_CAPABILITY(mutex) {} void BlockingConnectionAdapter::Start() { std::lock_guard<std::mutex> lock(mutex_); if (started_) { LOG(FATAL) << "BlockingConnectionAdapter(" << this->transport_name_ << "): started multiple times"; } read_thread_ = std::thread([this]() { LOG(INFO) << this->transport_name_ << ": read thread spawning"; while (true) { Loading @@ -95,7 +103,11 @@ void BlockingConnectionAdapter::Start() { LOG(INFO) << this->transport_name_ << ": write thread spawning"; while (true) { std::unique_lock<std::mutex> lock(mutex_); cv_.wait(lock, [this]() { return this->stopped_ || !this->write_queue_.empty(); }); cv_.wait(lock, [this]() REQUIRES(mutex_) { return this->stopped_ || !this->write_queue_.empty(); }); AssumeLocked(mutex_); if (this->stopped_) { return; Loading @@ -111,25 +123,44 @@ void BlockingConnectionAdapter::Start() { } std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "write failed"); }); }); started_ = true; } void BlockingConnectionAdapter::Stop() { std::unique_lock<std::mutex> lock(mutex_); { std::lock_guard<std::mutex> lock(mutex_); if (!started_) { LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): not started"; return; } if (stopped_) { LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): already stopped"; LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): already stopped"; return; } stopped_ = true; lock.unlock(); } LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopping"; this->underlying_->Close(); this->cv_.notify_one(); read_thread_.join(); write_thread_.join(); // Move the threads out into locals with the lock taken, and then unlock to let them exit. std::thread read_thread; std::thread write_thread; { std::lock_guard<std::mutex> lock(mutex_); read_thread = std::move(read_thread_); write_thread = std::move(write_thread_); } read_thread.join(); write_thread.join(); LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopped"; std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "requested stop"); }); Loading @@ -137,7 +168,7 @@ void BlockingConnectionAdapter::Stop() { bool BlockingConnectionAdapter::Write(std::unique_ptr<apacket> packet) { { std::unique_lock<std::mutex> lock(this->mutex_); std::lock_guard<std::mutex> lock(this->mutex_); write_queue_.emplace_back(std::move(packet)); } Loading adb/transport.h +6 −4 Original line number Diff line number Diff line Loading @@ -30,6 +30,7 @@ #include <thread> #include <unordered_set> #include <android-base/thread_annotations.h> #include <openssl/rsa.h> #include "adb.h" Loading Loading @@ -121,13 +122,14 @@ struct BlockingConnectionAdapter : public Connection { virtual void Start() override final; virtual void Stop() override final; bool stopped_ = false; bool started_ GUARDED_BY(mutex_) = false; bool stopped_ GUARDED_BY(mutex_) = false; std::unique_ptr<BlockingConnection> underlying_; std::thread read_thread_; std::thread write_thread_; std::thread read_thread_ GUARDED_BY(mutex_); std::thread write_thread_ GUARDED_BY(mutex_); std::deque<std::unique_ptr<apacket>> write_queue_; std::deque<std::unique_ptr<apacket>> write_queue_ GUARDED_BY(mutex_); std::mutex mutex_; std::condition_variable cv_; Loading Loading
adb/test_adb.py +20 −3 Original line number Diff line number Diff line Loading @@ -162,15 +162,14 @@ class NonApiTest(unittest.TestCase): Bug: https://code.google.com/p/android/issues/detail?id=21021 """ port = 12345 with contextlib.closing( socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener: # Use SO_REUSEADDR so subsequent runs of the test can grab the port # even if it is in TIME_WAIT. listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listener.bind(('127.0.0.1', port)) listener.bind(('127.0.0.1', 0)) listener.listen(4) port = listener.getsockname()[1] # Now that listening has started, start adb emu kill, telling it to # connect to our mock emulator. Loading Loading @@ -233,6 +232,24 @@ class NonApiTest(unittest.TestCase): output.strip(), 'connected to localhost:{}'.format(port)) s.close() def test_already_connected(self): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('127.0.0.1', 0)) s.listen(2) port = s.getsockname()[1] output = subprocess.check_output( ['adb', 'connect', 'localhost:{}'.format(port)]) self.assertEqual( output.strip(), 'connected to localhost:{}'.format(port)) # b/31250450: this always returns 0 but probably shouldn't. output = subprocess.check_output( ['adb', 'connect', 'localhost:{}'.format(port)]) self.assertEqual( output.strip(), 'already connected to localhost:{}'.format(port)) def main(): random.seed(0) Loading
adb/transport.cpp +43 −12 Original line number Diff line number Diff line Loading @@ -77,7 +77,15 @@ BlockingConnectionAdapter::~BlockingConnectionAdapter() { Stop(); } static void AssumeLocked(std::mutex& mutex) ASSERT_CAPABILITY(mutex) {} void BlockingConnectionAdapter::Start() { std::lock_guard<std::mutex> lock(mutex_); if (started_) { LOG(FATAL) << "BlockingConnectionAdapter(" << this->transport_name_ << "): started multiple times"; } read_thread_ = std::thread([this]() { LOG(INFO) << this->transport_name_ << ": read thread spawning"; while (true) { Loading @@ -95,7 +103,11 @@ void BlockingConnectionAdapter::Start() { LOG(INFO) << this->transport_name_ << ": write thread spawning"; while (true) { std::unique_lock<std::mutex> lock(mutex_); cv_.wait(lock, [this]() { return this->stopped_ || !this->write_queue_.empty(); }); cv_.wait(lock, [this]() REQUIRES(mutex_) { return this->stopped_ || !this->write_queue_.empty(); }); AssumeLocked(mutex_); if (this->stopped_) { return; Loading @@ -111,25 +123,44 @@ void BlockingConnectionAdapter::Start() { } std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "write failed"); }); }); started_ = true; } void BlockingConnectionAdapter::Stop() { std::unique_lock<std::mutex> lock(mutex_); { std::lock_guard<std::mutex> lock(mutex_); if (!started_) { LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): not started"; return; } if (stopped_) { LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): already stopped"; LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): already stopped"; return; } stopped_ = true; lock.unlock(); } LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopping"; this->underlying_->Close(); this->cv_.notify_one(); read_thread_.join(); write_thread_.join(); // Move the threads out into locals with the lock taken, and then unlock to let them exit. std::thread read_thread; std::thread write_thread; { std::lock_guard<std::mutex> lock(mutex_); read_thread = std::move(read_thread_); write_thread = std::move(write_thread_); } read_thread.join(); write_thread.join(); LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopped"; std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "requested stop"); }); Loading @@ -137,7 +168,7 @@ void BlockingConnectionAdapter::Stop() { bool BlockingConnectionAdapter::Write(std::unique_ptr<apacket> packet) { { std::unique_lock<std::mutex> lock(this->mutex_); std::lock_guard<std::mutex> lock(this->mutex_); write_queue_.emplace_back(std::move(packet)); } Loading
adb/transport.h +6 −4 Original line number Diff line number Diff line Loading @@ -30,6 +30,7 @@ #include <thread> #include <unordered_set> #include <android-base/thread_annotations.h> #include <openssl/rsa.h> #include "adb.h" Loading Loading @@ -121,13 +122,14 @@ struct BlockingConnectionAdapter : public Connection { virtual void Start() override final; virtual void Stop() override final; bool stopped_ = false; bool started_ GUARDED_BY(mutex_) = false; bool stopped_ GUARDED_BY(mutex_) = false; std::unique_ptr<BlockingConnection> underlying_; std::thread read_thread_; std::thread write_thread_; std::thread read_thread_ GUARDED_BY(mutex_); std::thread write_thread_ GUARDED_BY(mutex_); std::deque<std::unique_ptr<apacket>> write_queue_; std::deque<std::unique_ptr<apacket>> write_queue_ GUARDED_BY(mutex_); std::mutex mutex_; std::condition_variable cv_; Loading