Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ba53ef24 authored by Mike Yu's avatar Mike Yu Committed by Automerger Merge Worker
Browse files

DoH: Handle connection draining state am: dfbcdde5 am: 9e6b08ae

Original change: https://android-review.googlesource.com/c/platform/packages/modules/DnsResolver/+/1978403

Change-Id: I1e7dc21d15ad3a2816d97a065f275ded93667645
parents 3dc25cec 9e6b08ae
Loading
Loading
Loading
Loading
+35 −1
Original line number Diff line number Diff line
@@ -154,7 +154,7 @@ impl Driver {
        if self.quiche_conn.is_closed() {
            // TODO: Also log local_error() once Quiche 0.10.0 is available.
            debug!(
                "Connection closed on network {}, peer_error={:?}",
                "Connection closed on network {}, peer_error={:x?}",
                self.net_id,
                self.quiche_conn.peer_error()
            );
@@ -166,6 +166,28 @@ impl Driver {
        }
    }

    fn handle_draining(&mut self) {
        if self.quiche_conn.is_draining() {
            // TODO: avoid running the code below more than once.
            // TODO: Also log local_error() once Quiche 0.10.0 is available.
            debug!(
                "Connection is draining on network {}, peer_error={:x?}",
                self.net_id,
                self.quiche_conn.peer_error()
            );
            // We don't care if the receiver has hung up
            let _ = self.status_tx.send(Status::Dead { session: self.quiche_conn.session() });

            self.request_rx.close();
            // Drain the pending DNS requests from the queue to make their corresponding future
            // tasks return some error quickly rather than timeout. However, the DNS requests
            // that has been sent will still time out.
            // TODO: re-issue the outstanding DNS requests, such as passing H3Driver.requests
            // along with Status::Dead to the `Network` that can re-issue the DNS requests.
            while self.request_rx.try_recv().is_ok() {}
        }
    }

    async fn drive_once(mut self) -> Result<Self> {
        let timer = optional_timeout(self.quiche_conn.timeout(), self.net_id);
        select! {
@@ -191,6 +213,12 @@ impl Driver {
            let _ = self.status_tx.send(Status::QUIC);
        }

        // If the connection has entered draining state (the server is closing the connection),
        // tell the status watcher not to use the connection. Besides, per Quiche document,
        // the connection should not be dropped until is_closed() returns true.
        // This tokio task will become unowned and get dropped when is_closed() returns true.
        self.handle_draining();

        // If the connection has closed, tear down
        self.handle_closed()?;

@@ -276,6 +304,12 @@ impl H3Driver {
        // Process any incoming HTTP/3 events
        self.flush_h3().await?;

        // If the connection has entered draining state (the server is closing the connection),
        // tell the status watcher not to use the connection. Besides, per Quiche document,
        // the connection should not be dropped until is_closed() returns true.
        // This tokio task will become unowned and get dropped when is_closed() returns true.
        self.driver.handle_draining();

        // If the connection has closed, tear down
        self.driver.handle_closed()
    }
+4 −0
Original line number Diff line number Diff line
@@ -230,6 +230,10 @@ impl Client {
    pub fn is_resumed(&self) -> bool {
        self.conn.is_resumed()
    }

    pub fn close(&mut self) {
        let _ = self.conn.close(false, 0, b"Graceful shutdown");
    }
}

impl std::fmt::Debug for Client {
+28 −1
Original line number Diff line number Diff line
@@ -21,7 +21,7 @@ use crate::config::{Config, QUICHE_IDLE_TIMEOUT_MS};
use crate::stats::Stats;
use anyhow::{bail, ensure, Result};
use lazy_static::lazy_static;
use log::{debug, error};
use log::{debug, error, warn};
use std::fs::File;
use std::io::Write;
use std::os::unix::io::{AsRawFd, FromRawFd};
@@ -55,6 +55,7 @@ enum InternalCommand {
enum ControlCommand {
    Stats { resp: oneshot::Sender<Stats> },
    StatsClearQueries,
    CloseConnection,
}

/// Frontend object.
@@ -132,10 +133,16 @@ impl DohFrontend {
    }

    pub fn stop(&mut self) -> Result<()> {
        debug!("DohFrontend: stopping: {:?}", self);
        if let Some(worker_thread) = self.worker_thread.take() {
            // Update latest_stats before stopping worker_thread.
            let _ = self.request_stats();

            self.command_tx.as_ref().unwrap().send(ControlCommand::CloseConnection)?;
            if let Err(e) = self.wait_for_connections_closed() {
                warn!("wait_for_connections_closed failed: {}", e);
            }

            worker_thread.abort();
        }

@@ -246,6 +253,20 @@ impl DohFrontend {
            command_rx,
        })
    }

    fn wait_for_connections_closed(&mut self) -> Result<()> {
        for _ in 0..3 {
            std::thread::sleep(Duration::from_millis(50));
            match self.request_stats() {
                Ok(stats) if stats.alive_connections == 0 => return Ok(()),
                Ok(_) => (),

                // The worker thread is down. No connection is alive.
                Err(_) => return Ok(()),
            }
        }
        bail!("Some connections still alive")
    }
}

async fn worker_thread(params: WorkerParams) -> Result<()> {
@@ -380,6 +401,12 @@ async fn worker_thread(params: WorkerParams) -> Result<()> {
                        }
                    }
                    ControlCommand::StatsClearQueries => queries_received = 0,
                    ControlCommand::CloseConnection => {
                        for (_, client) in clients.iter_mut() {
                            client.close();
                            event_tx.send(InternalCommand::MaybeWrite { connection_id: client.connection_id().clone() })?;
                        }
                    }
                }
            }
        }
+27 −0
Original line number Diff line number Diff line
@@ -992,3 +992,30 @@ TEST_F(PrivateDnsDohTest, SessionResumption) {
        EXPECT_EQ(doh.resumedConnections(), (flag == "1" ? 2 : 0));
    }
}

// Tests that after the connection is closed by the server (known by sending CONNECTION_CLOSE
// frame), the DnsResolver can initiate another new connection for DNS requests.
TEST_F(PrivateDnsDohTest, RemoteConnectionClosed) {
    const auto parcel = DnsResponderClient::GetDefaultResolverParamsParcel();
    ASSERT_TRUE(mDnsClient.SetResolversFromParcel(parcel));
    EXPECT_TRUE(WaitForDohValidation(test::kDefaultListenAddr, true));
    EXPECT_TRUE(WaitForDotValidation(test::kDefaultListenAddr, true));
    EXPECT_TRUE(dot.waitForQueries(1));
    dot.clearQueries();
    doh.clearQueries();
    dns.clearQueries();

    EXPECT_NO_FAILURE(sendQueryAndCheckResult());
    EXPECT_NO_FAILURE(expectQueries(0 /* dns */, 0 /* dot */, 2 /* doh */));
    flushCache();
    EXPECT_EQ(doh.connections(), 1);

    // Make the server close the connection. This will also reset the stats, so the doh query
    // count below is still 2 rather than 4.
    ASSERT_TRUE(doh.stopServer());
    ASSERT_TRUE(doh.startServer());

    EXPECT_NO_FAILURE(sendQueryAndCheckResult());
    EXPECT_NO_FAILURE(expectQueries(0 /* dns */, 0 /* dot */, 2 /* doh */));
    EXPECT_EQ(doh.connections(), 1);
}