Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit de9f9776 authored by Matthew Maurer's avatar Matthew Maurer Committed by Automerger Merge Worker
Browse files

DoH: Automatically reconnect closed connections am: f974124e

Original change: https://android-review.googlesource.com/c/platform/packages/modules/DnsResolver/+/1870957

Change-Id: I0b7100697aed8e92ffa79aa50d2ada5c7ef32acc
parents 40357cc2 f974124e
Loading
Loading
Loading
Loading
+42 −17
Original line number Diff line number Diff line
@@ -17,7 +17,7 @@

use crate::boot_time;
use crate::boot_time::BootTime;
use log::warn;
use log::{debug, warn};
use quiche::h3;
use std::collections::HashMap;
use std::default::Default;
@@ -27,7 +27,9 @@ use std::pin::Pin;
use thiserror::Error;
use tokio::net::UdpSocket;
use tokio::select;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, oneshot, watch};

use super::Status;

#[derive(Error, Debug)]
pub enum Error {
@@ -77,6 +79,7 @@ const MAX_UDP_PACKET_SIZE: usize = 65536;

struct Driver {
    request_rx: mpsc::Receiver<Request>,
    status_tx: watch::Sender<Status>,
    quiche_conn: Pin<Box<quiche::Connection>>,
    socket: UdpSocket,
    // This buffer is large, boxing it will keep it
@@ -111,19 +114,27 @@ async fn optional_timeout(timeout: Option<boot_time::Duration>) {
/// The returned error code will explain why the connection terminated.
pub async fn drive(
    request_rx: mpsc::Receiver<Request>,
    status_tx: watch::Sender<Status>,
    quiche_conn: Pin<Box<quiche::Connection>>,
    socket: UdpSocket,
) -> Result<()> {
    Driver::new(request_rx, quiche_conn, socket).drive().await
    Driver::new(request_rx, status_tx, quiche_conn, socket).drive().await
}

impl Driver {
    fn new(
        request_rx: mpsc::Receiver<Request>,
        status_tx: watch::Sender<Status>,
        quiche_conn: Pin<Box<quiche::Connection>>,
        socket: UdpSocket,
    ) -> Self {
        Self { request_rx, quiche_conn, socket, buffer: Box::new([0; MAX_UDP_PACKET_SIZE]) }
        Self {
            request_rx,
            status_tx,
            quiche_conn,
            socket,
            buffer: Box::new([0; MAX_UDP_PACKET_SIZE]),
        }
    }

    async fn drive(mut self) -> Result<()> {
@@ -136,6 +147,8 @@ impl Driver {

    fn handle_closed(&self) -> Result<()> {
        if self.quiche_conn.is_closed() {
            // We don't care if the receiver has hung up
            let _ = self.status_tx.send(Status::Dead);
            Err(Error::Closed)
        } else {
            Ok(())
@@ -146,7 +159,9 @@ impl Driver {
        let timer = optional_timeout(self.quiche_conn.timeout());
        select! {
            // If a quiche timer would fire, call their callback
            _ = timer => self.quiche_conn.on_timeout(),
            _ = timer => {
                self.quiche_conn.on_timeout()
            }
            // If we got packets from our peer, pass them to quiche
            Ok((size, from)) = self.socket.recv_from(self.buffer.as_mut()) => {
                self.quiche_conn.recv(&mut self.buffer[..size], quiche::RecvInfo { from })?;
@@ -159,7 +174,8 @@ impl Driver {
        if self.quiche_conn.is_established() {
            let h3_config = h3::Config::new()?;
            let h3_conn = h3::Connection::with_transport(&mut self.quiche_conn, &h3_config)?;
            return H3Driver::new(self, h3_conn).drive().await;
            self = H3Driver::new(self, h3_conn).drive().await?;
            let _ = self.status_tx.send(Status::QUIC);
        }

        // If the connection has closed, tear down
@@ -195,8 +211,15 @@ impl H3Driver {
    }

    async fn drive(mut self) -> Result<Driver> {
        let _ = self.driver.status_tx.send(Status::H3);
        loop {
            self.drive_once().await?;
            match self.drive_once().await {
                Err(e) => {
                    let _ = self.driver.status_tx.send(Status::Dead);
                    return Err(e);
                }
                Ok(()) => (),
            }
        }
    }

@@ -212,16 +235,19 @@ impl H3Driver {
        select! {
            // Only attempt to enqueue new requests if we have no buffered request and aren't
            // closing
            msg = self.driver.request_rx.recv(), if !self.closing && self.buffered_request.is_none() => match msg {
            msg = self.driver.request_rx.recv(), if !self.closing && self.buffered_request.is_none() => {
                match msg {
                    Some(request) => self.handle_request(request)?,
                    None => self.shutdown(true, b"DONE").await?,
                }
            },
            // If a quiche timer would fire, call their callback
            _ = timer => self.driver.quiche_conn.on_timeout(),
            // If we got packets from our peer, pass them to quiche
            Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) => {
                self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from })?;
            _ = timer => {
                self.driver.quiche_conn.on_timeout()
            }
            // If we got packets from our peer, pass them to quiche
            Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) =>
                self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from }).map(|_| ())?,
        };

        // Any of the actions in the select could require us to send packets to the peer
@@ -250,6 +276,7 @@ impl H3Driver {
                    // buffered_request, or when buffered_request is empty. This assert just
                    // validates that we don't break that assumption later, as it could result in
                    // requests being dropped on the floor under high load.
                    debug!("Stream has become blocked, buffering one request.");
                    assert!(self.buffered_request.is_none());
                    self.buffered_request = Some(request);
                    return Ok(())
@@ -323,9 +350,7 @@ impl H3Driver {
                    self.respond(stream_id);
                }
            }
            h3::Event::Data => {
                self.recv_body(stream_id).await?;
            }
            h3::Event::Data => self.recv_body(stream_id).await?,
            h3::Event::Finished => self.respond(stream_id),
            // This clause is for quiche 0.10.x, we're still on 0.9.x
            //h3::Event::Reset(e) => {
+43 −4
Original line number Diff line number Diff line
@@ -24,8 +24,7 @@ use std::io;
use std::net::SocketAddr;
use thiserror::Error;
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::task;

mod driver;
@@ -33,9 +32,17 @@ mod driver;
pub use driver::Stream;
use driver::{drive, Request};

#[derive(Debug, Copy, Clone)]
pub enum Status {
    QUIC,
    H3,
    Dead,
}

/// Quiche HTTP/3 connection
pub struct Connection {
    request_tx: mpsc::Sender<Request>,
    status_rx: watch::Receiver<Status>,
}

fn new_scid() -> [u8; quiche::MAX_CONN_ID_LEN] {
@@ -126,13 +133,45 @@ impl Connection {
        config: &mut quiche::Config,
    ) -> Result<Self> {
        let (request_tx, request_rx) = mpsc::channel(Self::MAX_PENDING_REQUESTS);
        let (status_tx, status_rx) = watch::channel(Status::QUIC);
        let scid = new_scid();
        let quiche_conn =
            quiche::connect(server_name, &quiche::ConnectionId::from_ref(&scid), to, config)?;
        let socket = build_socket(to, socket_mark, tag_socket).await?;
        let driver = drive(request_rx, quiche_conn, socket);
        let driver = async {
            let result = drive(request_rx, status_tx, quiche_conn, socket).await;
            if let Err(ref e) = result {
                error!("Connection driver failed: {:?}", e);
            }
            result
        };
        task::spawn(driver);
        Ok(Self { request_tx })
        Ok(Self { request_tx, status_rx })
    }

    /// Waits until we're either fully alive or dead
    pub async fn wait_for_live(&mut self) -> bool {
        // Once sc-mainline-prod updates to modern tokio, use
        // borrow_and_update here.
        match *self.status_rx.borrow() {
            Status::H3 => return true,
            Status::Dead => return false,
            Status::QUIC => (),
        }
        if self.status_rx.changed().await.is_err() {
            // status_tx is gone, we're dead
            return false;
        }
        if matches!(*self.status_rx.borrow(), Status::H3) {
            return true;
        }
        // Since we're stuck on legacy tokio due to mainline, we need to try one more time in case there was an outstanding change notification. Using borrow_and_update avoids this.
        match self.status_rx.changed().await {
            // status_tx is gone, we're dead
            Err(_) => false,
            // If there's an HTTP/3 connection now we're alive, otherwise we're stuck/dead
            _ => matches!(*self.status_rx.borrow(), Status::H3),
        }
    }

    /// Send a query, produce a future which will provide a response.
+4 −1
Original line number Diff line number Diff line
@@ -286,7 +286,10 @@ pub unsafe extern "C" fn doh_query(
                        response.copy_from_slice(&answer);
                        answer.len() as ssize_t
                    }
                    _ => DOH_RESULT_CAN_NOT_SEND,
                    rsp => {
                        error!("Non-successful response: {:?}", rsp);
                        DOH_RESULT_CAN_NOT_SEND
                    }
                },
                Err(e) => {
                    error!("no result {}", e);
+18 −5
Original line number Diff line number Diff line
@@ -28,6 +28,8 @@ use tokio::task;

use super::{Query, ServerInfo, SocketTagger, ValidationReporter};

use log::debug;

pub struct Driver {
    info: ServerInfo,
    config: Config,
@@ -107,11 +109,15 @@ impl Driver {

    pub async fn drive(mut self) -> Result<()> {
        while let Some(cmd) = self.command_rx.recv().await {
            if let Err(e) = match cmd {
                Command::Probe(duration) => self.probe(duration).await,
                Command::Query(query) => self.send_query(query).await,
            } {
                self.status_tx.send(Status::Failed(Arc::new(e)))?
            match cmd {
                Command::Probe(duration) => match self.probe(duration).await {
                    Err(e) => self.status_tx.send(Status::Failed(Arc::new(e)))?,
                    Ok(()) => (),
                },
                Command::Query(query) => match self.send_query(query).await {
                    Err(e) => debug!("Unable to send query: {:?}", e),
                    Ok(()) => (),
                },
            };
        }
        Ok(())
@@ -119,6 +125,7 @@ impl Driver {

    async fn probe(&mut self, probe_timeout: Duration) -> Result<()> {
        if self.status_tx.borrow().is_failed() {
            debug!("Network is currently failed, reconnecting");
            // If our network is currently failed, it may be due to issues with the connection.
            // Re-establish before re-probing
            self.connection =
@@ -166,12 +173,18 @@ impl Driver {
    }

    async fn send_query(&mut self, query: Query) -> Result<()> {
        if !self.connection.wait_for_live().await {
            // Try reconnecting
            self.connection =
                build_connection(&self.info, &self.tag_socket, &mut self.config).await?;
        }
        let request = encoding::dns_request(&query.query, &self.info.url)?;
        let stream_fut = self.connection.query(request, Some(query.expiry)).await?;
        task::spawn(async move {
            let stream = match stream_fut.await {
                Some(stream) => stream,
                None => {
                    debug!("Connection died while processing request");
                    // We don't care if the response is gone
                    let _ =
                        query.response.send(Response::Error { error: QueryError::ConnectionError });