Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 231d2b2e authored by Matthew Maurer's avatar Matthew Maurer
Browse files

DoH: Factor out Quiche Config Cache

* Add tests for the cache
* Remove option deref hack

Bug: 202081046
Change-Id: I6d4c296fafd03e4d9e462fbf6c09bac2c748f9e4
parent 020120f5
Loading
Loading
Loading
Loading

doh/config.rs

0 → 100644
+239 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

//! Quiche Config support
//!
//! Quiche config objects are needed mutably for constructing a Quiche
//! connection object, but not when they are actually being used. As these
//! objects include a `SSL_CTX` which can be somewhat expensive and large when
//! using a certificate path, it can be beneficial to cache them.
//!
//! This module provides a caching layer for loading and constructing
//! these configurations.

use quiche::{h3, Result};
use std::collections::HashMap;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex, RwLock, Weak};

type WeakConfig = Weak<Mutex<quiche::Config>>;

/// A cheaply clonable `quiche::Config`
#[derive(Clone)]
pub struct Config(Arc<Mutex<quiche::Config>>);

const MAX_INCOMING_BUFFER_SIZE_WHOLE: u64 = 10000000;
const MAX_INCOMING_BUFFER_SIZE_EACH: u64 = 1000000;
const MAX_CONCURRENT_STREAM_SIZE: u64 = 100;
/// Maximum datagram size we will accept.
pub const MAX_DATAGRAM_SIZE: usize = 1350;
/// How long with no packets before we assume a connection is dead, in milliseconds.
pub const QUICHE_IDLE_TIMEOUT_MS: u64 = 180000;

impl Config {
    fn from_weak(weak: &WeakConfig) -> Option<Self> {
        weak.upgrade().map(Self)
    }

    fn to_weak(&self) -> WeakConfig {
        Arc::downgrade(&self.0)
    }

    /// Construct a `Config` object from certificate path. If no path
    /// is provided, peers will not be verified.
    pub fn from_cert_path(cert_path: Option<&str>) -> Result<Self> {
        let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
        config.set_application_protos(h3::APPLICATION_PROTOCOL)?;
        match cert_path {
            Some(path) => {
                config.verify_peer(true);
                config.load_verify_locations_from_directory(path)?;
            }
            None => config.verify_peer(false),
        }

        // Some of these configs are necessary, or the server can't respond the HTTP/3 request.
        config.set_max_idle_timeout(QUICHE_IDLE_TIMEOUT_MS);
        config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
        config.set_initial_max_data(MAX_INCOMING_BUFFER_SIZE_WHOLE);
        config.set_initial_max_stream_data_bidi_local(MAX_INCOMING_BUFFER_SIZE_EACH);
        config.set_initial_max_stream_data_bidi_remote(MAX_INCOMING_BUFFER_SIZE_EACH);
        config.set_initial_max_stream_data_uni(MAX_INCOMING_BUFFER_SIZE_EACH);
        config.set_initial_max_streams_bidi(MAX_CONCURRENT_STREAM_SIZE);
        config.set_initial_max_streams_uni(MAX_CONCURRENT_STREAM_SIZE);
        config.set_disable_active_migration(true);
        Ok(Self(Arc::new(Mutex::new(config))))
    }

    /// Take the underlying config, usable as `&mut quiche::Config` for use
    /// with `quiche::connect`.
    pub fn take(&mut self) -> impl DerefMut<Target = quiche::Config> + '_ {
        self.0.lock().unwrap()
    }
}

#[derive(Clone, Default)]
struct State {
    // Mapping from cert_path to configs
    path_to_config: HashMap<Option<String>, WeakConfig>,
    // Keep latest config alive to minimize reparsing when flapping
    // If more keep-alive is needed, replace with a LRU LinkedList
    latest: Option<Config>,
}

impl State {
    fn get_config(&self, cert_path: &Option<String>) -> Option<Config> {
        self.path_to_config.get(cert_path).and_then(Config::from_weak)
    }

    fn keep_alive(&mut self, config: Config) {
        self.latest = Some(config);
    }

    fn garbage_collect(&mut self) {
        self.path_to_config.retain(|_, config| config.strong_count() != 0)
    }
}

/// Cache of Quiche Config objects
///
/// Cloning this cache will create another handle to the same cache.
///
/// Loading a config object through this caching layer will only keep the
/// latest config loaded alive directly, but will still act as a cache
/// for any configurations still in use - if the returned `Config` is still
/// live, queries to `Cache` will not reconstruct it.
#[derive(Clone, Default)]
pub struct Cache {
    // Shared state amongst cache handles
    state: Arc<RwLock<State>>,
}

impl Cache {
    /// Creates a fresh empty cache
    pub fn new() -> Self {
        Default::default()
    }

    /// Behaves as `Config::from_cert_path`, but with a cache.
    /// If any object previously given out by this cache is still live,
    /// a duplicate will not be made.
    pub fn from_cert_path(&self, cert_path: &Option<String>) -> Result<Config> {
        // Fast path - read-only access to state retrieves config
        if let Some(config) = self.state.read().unwrap().get_config(cert_path) {
            return Ok(config);
        }

        // Unlocked, calculate config. If we have two racing attempts to load
        // the cert path, we'll arbitrate that in the next step, but this
        // makes sure loading a new cert path doesn't block other loads to
        // refresh connections.
        let config = Config::from_cert_path(cert_path.as_deref())?;

        let mut state = self.state.write().unwrap();
        // We now have exclusive access to the state.
        // If someone else calculated a config at the same time as us, we
        // want to discard ours and use theirs, since it will result in
        // less total memory used.
        if let Some(config) = state.get_config(cert_path) {
            return Ok(config);
        }

        // We have exclusive access and a fresh config. Install it into
        // the cache.
        state.keep_alive(config.clone());
        state.path_to_config.insert(cert_path.to_owned(), config.to_weak());
        Ok(config)
    }

    /// Purges any config paths which no longer point to a config entry.
    pub fn garbage_collect(&self) {
        self.state.write().unwrap().garbage_collect();
    }
}

#[test]
fn create_quiche_config() {
    assert!(Config::from_cert_path(None).is_ok(), "quiche config without cert creating failed");
    assert!(
        Config::from_cert_path(Some("data/local/tmp/")).is_ok(),
        "quiche config with cert creating failed"
    );
}

#[test]
fn shared_cache() {
    let cache_a = Cache::new();
    let cache_b = cache_a.clone();
    let config_a = cache_a.from_cert_path(&None).unwrap();
    assert_eq!(Arc::strong_count(&config_a.0), 2);
    let _config_b = cache_b.from_cert_path(&None).unwrap();
    assert_eq!(Arc::strong_count(&config_a.0), 3);
}

#[test]
fn lifetimes() {
    let cache = Cache::new();
    let config_none = cache.from_cert_path(&None).unwrap();
    let config_a = cache.from_cert_path(&Some("a".to_string())).unwrap();
    let config_b = cache.from_cert_path(&Some("b".to_string())).unwrap();
    // The first two we created should have a strong count of one - those handles are the only
    // thing keeping them alive.
    assert_eq!(Arc::strong_count(&config_none.0), 1);
    assert_eq!(Arc::strong_count(&config_a.0), 1);

    // If we try to get another handle we already have, it should be the same one.
    let _config_a2 = cache.from_cert_path(&Some("a".to_string())).unwrap();
    assert_eq!(Arc::strong_count(&config_a.0), 2);

    // config_b was most recently created, so it should have a keep-alive
    // inside the cache.
    assert_eq!(Arc::strong_count(&config_b.0), 2);

    // If we weaken one of the first handles, then drop it, the weak handle should break
    let config_none_weak = Config::to_weak(&config_none);
    assert_eq!(config_none_weak.strong_count(), 1);
    drop(config_none);
    assert_eq!(config_none_weak.strong_count(), 0);
    assert!(Config::from_weak(&config_none_weak).is_none());

    // If we weaken the most *recent* handle, it should keep working
    let config_b_weak = Config::to_weak(&config_b);
    assert_eq!(config_b_weak.strong_count(), 2);
    drop(config_b);
    assert_eq!(config_b_weak.strong_count(), 1);
    assert!(Config::from_weak(&config_b_weak).is_some());
    assert_eq!(config_b_weak.strong_count(), 1);

    // If we try to get a config which is still kept alive by the cache, we should get the same
    // one.
    let _config_b2 = cache.from_cert_path(&Some("b".to_string())).unwrap();
    assert_eq!(config_b_weak.strong_count(), 2);

    // We broke None, but "a" and "b" should still both be alive. Check that
    // this is still the case in the mapping after garbage collection.
    cache.garbage_collect();
    assert_eq!(cache.state.read().unwrap().path_to_config.len(), 2);
}

#[test]
fn quiche_connect() {
    use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
    let mut config = Config::from_cert_path(None).unwrap();
    let socket_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 42));
    let conn_id = quiche::ConnectionId::from_ref(&[]);
    quiche::connect(None, &conn_id, socket_addr, &mut config.take()).unwrap();
}
+29 −102
Original line number Diff line number Diff line
@@ -27,10 +27,8 @@ use ring::rand::SecureRandom;
use std::collections::HashMap;
use std::ffi::CString;
use std::net::SocketAddr;
use std::ops::Deref;
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use tokio::net::UdpSocket;
use tokio::runtime::{Builder, Runtime};
use tokio::sync::{mpsc, oneshot};
@@ -38,17 +36,14 @@ use tokio::task;
use url::Url;

pub mod boot_time;
mod config;
mod ffi;

use boot_time::{timeout, BootTime, Duration};
use config::Config;

const MAX_BUFFERED_CMD_SIZE: usize = 400;
const MAX_INCOMING_BUFFER_SIZE_WHOLE: u64 = 10000000;
const MAX_INCOMING_BUFFER_SIZE_EACH: u64 = 1000000;
const MAX_CONCURRENT_STREAM_SIZE: u64 = 100;
const MAX_DATAGRAM_SIZE: usize = 1350;
const DOH_PORT: u16 = 443;
const QUICHE_IDLE_TIMEOUT_MS: u64 = 180000;
const NS_T_AAAA: u8 = 28;
const NS_C_IN: u8 = 1;
// Used to randomly generate query prefix and query id.
@@ -124,16 +119,6 @@ enum H3Result {
    Ignore,
}

trait OptionDeref<T: Deref> {
    fn as_deref(&self) -> Option<&T::Target>;
}

impl<T: Deref> OptionDeref<T> for Option<T> {
    fn as_deref(&self) -> Option<&T::Target> {
        self.as_ref().map(Deref::deref)
    }
}

/// Context for a running DoH engine.
pub struct DohDispatcher {
    /// Used to submit cmds to the I/O task.
@@ -174,7 +159,7 @@ impl DohDispatcher {

struct DohConnection {
    info: ServerInfo,
    shared_config: Arc<Mutex<QuicheConfigCache>>,
    config: Config,
    scid: SCID,
    state: ConnectionState,
    pending_queries: Vec<(DnsRequest, QueryResponder, BootTime)>,
@@ -185,14 +170,14 @@ struct DohConnection {
impl DohConnection {
    fn new(
        info: &ServerInfo,
        shared_config: Arc<Mutex<QuicheConfigCache>>,
        config: Config,
        tag_socket_fn: TagSocketCallback,
    ) -> Result<DohConnection> {
        let mut scid = [0; quiche::MAX_CONN_ID_LEN];
        ring::rand::SystemRandom::new().fill(&mut scid).context("failed to generate scid")?;
        Ok(DohConnection {
            info: info.clone(),
            shared_config,
            config,
            scid,
            state: ConnectionState::Idle,
            pending_queries: Vec::new(),
@@ -208,15 +193,12 @@ impl DohConnection {
                (self.tag_socket_fn)(udp_sk_std.as_raw_fd());
                let udp_sk = UdpSocket::from_std(udp_sk_std)?;
                let connid = quiche::ConnectionId::from_ref(&self.scid);
                let mut cache = self.shared_config.lock().unwrap();
                let config =
                    cache.get(&self.info.cert_path)?.ok_or_else(|| anyhow!("no quiche config"))?;
                debug!("init the connection for Network {}", self.info.net_id);
                let mut quic_conn = quiche::connect(
                    self.info.domain.as_deref(),
                    &connid,
                    self.info.peer_addr,
                    config,
                    &mut self.config.take(),
                )?;
                if let Some(session) = &self.cached_session {
                    if quic_conn.set_session(session).is_err() {
@@ -516,7 +498,7 @@ fn recv_h3(
        // Process HTTP/3 events.
        Ok((stream_id, quiche::h3::Event::Data)) => {
            debug!("quiche::h3::Event::Data");
            let mut buf = vec![0; MAX_DATAGRAM_SIZE];
            let mut buf = vec![0; config::MAX_DATAGRAM_SIZE];
            match h3_conn.recv_body(quic_conn, stream_id, &mut buf) {
                Ok(read) => {
                    trace!(
@@ -603,7 +585,7 @@ async fn recv_rx(
) -> Result<()> {
    // TODO: Evaluate if we could make the buffer smaller.
    let mut buf = [0; 65535];
    let quic_idle_timeout_ms = Duration::from_millis(QUICHE_IDLE_TIMEOUT_MS);
    let quic_idle_timeout_ms = Duration::from_millis(config::QUICHE_IDLE_TIMEOUT_MS);
    let ts = quic_conn.timeout().unwrap_or(quic_idle_timeout_ms);

    if let Some(next_expired) = BootTime::now().checked_add(quic_idle_timeout_ms) {
@@ -640,7 +622,7 @@ async fn flush_tx(
    quic_conn: &mut Pin<Box<quiche::Connection>>,
    udp_sk: &mut UdpSocket,
) -> Result<()> {
    let mut out = [0; MAX_DATAGRAM_SIZE];
    let mut out = [0; config::MAX_DATAGRAM_SIZE];
    loop {
        let (write, _) = match quic_conn.send(&mut out) {
            Ok(v) => v,
@@ -734,7 +716,7 @@ async fn probe_task(
fn make_connection_if_needed(
    info: &ServerInfo,
    doh_conn_map: &mut HashMap<u32, (ServerInfo, Option<DohConnection>)>,
    shared_config: Arc<Mutex<QuicheConfigCache>>,
    config_cache: &config::Cache,
    tag_socket_fn: TagSocketCallback,
) -> Result<Option<DohConnection>> {
    // Check if connection exists.
@@ -751,28 +733,12 @@ fn make_connection_if_needed(
        // TODO: change the inner connection instead of removing?
        _ => doh_conn_map.remove(&info.net_id),
    };
    let doh = DohConnection::new(info, shared_config, tag_socket_fn)?;
    let config = config_cache.from_cert_path(&info.cert_path)?;
    let doh = DohConnection::new(info, config, tag_socket_fn)?;
    doh_conn_map.insert(info.net_id, (info.clone(), None));
    Ok(Some(doh))
}

struct QuicheConfigCache {
    cert_path: Option<String>,
    config: Option<quiche::Config>,
}

impl QuicheConfigCache {
    fn get(&mut self, cert_path: &Option<String>) -> Result<Option<&mut quiche::Config>> {
        // No config is cached or the cached config isn't matched with the input cert_path
        // Create it with the input cert_path.
        if self.config.is_none() || self.cert_path != *cert_path {
            self.config = Some(create_quiche_config(cert_path.as_deref())?);
            self.cert_path = cert_path.clone();
        }
        return Ok(self.config.as_mut());
    }
}

async fn handle_query_cmd(
    net_id: u32,
    base64_query: Base64Query,
@@ -824,7 +790,7 @@ async fn doh_handler(
    tag_socket_fn: TagSocketCallback,
) -> Result<()> {
    info!("doh_dispatcher entry");
    let config_cache = Arc::new(Mutex::new(QuicheConfigCache { cert_path: None, config: None }));
    let config_cache = config::Cache::new();

    // Currently, only support 1 server per network.
    let mut doh_conn_map: HashMap<u32, (ServerInfo, Option<DohConnection>)> = HashMap::new();
@@ -848,7 +814,7 @@ async fn doh_handler(
                trace!("recv {:?}", cmd);
                match cmd {
                    DohCommand::Probe { info, timeout: t } => {
                        match make_connection_if_needed(&info, &mut doh_conn_map, config_cache.clone(), tag_socket_fn) {
                        match make_connection_if_needed(&info, &mut doh_conn_map, &config_cache, tag_socket_fn) {
                            Ok(Some(doh)) => {
                                // Create a new async task associated to the DoH connection.
                                probe_futures.push(probe_task(info, doh, t));
@@ -871,6 +837,7 @@ async fn doh_handler(
                    DohCommand::Clear { net_id } => {
                        doh_conn_map.remove(&net_id);
                        info!("Doh Clear server for netid: {}", net_id);
                        config_cache.garbage_collect();
                    },
                    DohCommand::Exit => return Ok(()),
                }
@@ -915,30 +882,6 @@ fn make_doh_udp_socket(peer_addr: SocketAddr, mark: u32) -> Result<std::net::Udp
    Ok(udp_sk)
}

fn create_quiche_config(cert_path: Option<&str>) -> Result<quiche::Config> {
    let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
    config.set_application_protos(h3::APPLICATION_PROTOCOL)?;
    match cert_path {
        Some(path) => {
            config.verify_peer(true);
            config.load_verify_locations_from_directory(path)?;
        }
        None => config.verify_peer(false),
    }

    // Some of these configs are necessary, or the server can't respond the HTTP/3 request.
    config.set_max_idle_timeout(QUICHE_IDLE_TIMEOUT_MS);
    config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
    config.set_initial_max_data(MAX_INCOMING_BUFFER_SIZE_WHOLE);
    config.set_initial_max_stream_data_bidi_local(MAX_INCOMING_BUFFER_SIZE_EACH);
    config.set_initial_max_stream_data_bidi_remote(MAX_INCOMING_BUFFER_SIZE_EACH);
    config.set_initial_max_stream_data_uni(MAX_INCOMING_BUFFER_SIZE_EACH);
    config.set_initial_max_streams_bidi(MAX_CONCURRENT_STREAM_SIZE);
    config.set_initial_max_streams_uni(MAX_CONCURRENT_STREAM_SIZE);
    config.set_disable_active_migration(true);
    Ok(config)
}

fn mark_socket(fd: RawFd, mark: u32) -> Result<()> {
    // libc::setsockopt is a wrapper function calling into bionic setsockopt.
    // Both fd and mark are valid, which makes the function call mostly safe.
@@ -994,14 +937,11 @@ mod tests {
    const LOOPBACK_ADDR: &str = "127.0.0.1:443";
    const LOCALHOST_URL: &str = "https://mylocal.com/dns-query";

    // TODO: Make some tests for DohConnection and QuicheConfigCache.
    // TODO: Make some tests for DohConnection.

    fn make_testing_variables() -> (
        ServerInfo,
        HashMap<u32, (ServerInfo, Option<DohConnection>)>,
        Arc<Mutex<QuicheConfigCache>>,
        Runtime,
    ) {
    fn make_testing_variables(
    ) -> (ServerInfo, HashMap<u32, (ServerInfo, Option<DohConnection>)>, config::Cache, Runtime)
    {
        let test_map: HashMap<u32, (ServerInfo, Option<DohConnection>)> = HashMap::new();
        let info = ServerInfo {
            net_id: TEST_NET_ID,
@@ -1011,9 +951,7 @@ mod tests {
            sk_mark: 0,
            cert_path: None,
        };
        let config_cache =
            Arc::new(Mutex::new(QuicheConfigCache { cert_path: None, config: None }));

        let config_cache = config::Cache::new();
        let rt = Builder::new_current_thread()
            .thread_name("test-runtime")
            .enable_all()
@@ -1028,13 +966,13 @@ mod tests {

    #[test]
    fn make_connection_if_needed() {
        let (info, mut test_map, config, rt) = make_testing_variables();
        let (info, mut test_map, config_cache, rt) = make_testing_variables();
        rt.block_on(async {
            // Expect to make a new connection.
            let mut doh = super::make_connection_if_needed(
                &info,
                &mut test_map,
                config.clone(),
                &config_cache,
                tag_socket_cb,
            )
            .unwrap()
@@ -1047,7 +985,7 @@ mod tests {
            let mut doh = super::make_connection_if_needed(
                &info,
                &mut test_map,
                config.clone(),
                &config_cache,
                tag_socket_cb,
            )
            .unwrap()
@@ -1060,7 +998,7 @@ mod tests {
            assert!(super::make_connection_if_needed(
                &info,
                &mut test_map,
                config.clone(),
                &config_cache,
                tag_socket_cb
            )
            .unwrap()
@@ -1070,7 +1008,7 @@ mod tests {

    #[test]
    fn handle_query_cmd() {
        let (info, mut test_map, config, rt) = make_testing_variables();
        let (info, mut test_map, config_cache, rt) = make_testing_variables();
        let t = Duration::from_millis(100);

        rt.block_on(async {
@@ -1111,7 +1049,7 @@ mod tests {
            let mut doh = super::make_connection_if_needed(
                &info,
                &mut test_map,
                config.clone(),
                &config_cache,
                tag_socket_cb,
            )
            .unwrap()
@@ -1178,9 +1116,10 @@ mod tests {
        let mut scid = [0; quiche::MAX_CONN_ID_LEN];
        ring::rand::SystemRandom::new().fill(&mut scid).context("failed to generate scid").unwrap();
        let connid = quiche::ConnectionId::from_ref(&scid);
        let mut config = super::create_quiche_config(None).unwrap();
        let mut config = Config::from_cert_path(None).unwrap();
        let quic_conn =
            quiche::connect(None, &connid, LOOPBACK_ADDR.parse().unwrap(), &mut config).unwrap();
            quiche::connect(None, &connid, LOOPBACK_ADDR.parse().unwrap(), &mut config.take())
                .unwrap();
        (quic_conn, udp_sk)
    }

@@ -1262,18 +1201,6 @@ mod tests {
        // TODO: Parse the result to ensure it's a valid DNS packet.
    }

    #[test]
    fn create_quiche_config() {
        assert!(
            super::create_quiche_config(None).is_ok(),
            "quiche config without cert creating failed"
        );
        assert!(
            super::create_quiche_config(Some("data/local/tmp/")).is_ok(),
            "quiche config with cert creating failed"
        );
    }

    #[test]
    fn make_doh_udp_socket() {
        // Make a socket connecting to loopback with a test mark.