Loading Android.bp +8 −5 Original line number Original line Diff line number Diff line Loading @@ -248,15 +248,11 @@ cc_library { "libcrypto", "libcrypto", "liblog", //Used by libstatslog_resolv "liblog", //Used by libstatslog_resolv "libssl", "libssl", "libstatssocket", ], ], header_libs: [ header_libs: [ "libnetdbinder_utils_headers", "libnetdbinder_utils_headers", ], ], runtime_libs: [ // Causes the linkerconfig to create a namespace link from resolv to the // libstatssocket library within the statsd apex "libstatssocket", ], export_include_dirs: ["include"], export_include_dirs: ["include"], product_variables: { product_variables: { Loading Loading @@ -361,6 +357,7 @@ doh_rust_deps = [ "liblibc", "liblibc", "liblog_rust", "liblog_rust", "libring", "libring", "libstatslog_rust", "libthiserror", "libthiserror", "libtokio", "libtokio", "liburl", "liburl", Loading Loading @@ -407,6 +404,11 @@ rust_ffi_static { rlibs: doh_rust_deps + ["libquiche_static"], rlibs: doh_rust_deps + ["libquiche_static"], prefer_rlib: true, prefer_rlib: true, shared_libs: [ "libstatssocket", ], // TODO(b/194022174), for unit tests to run on the Android 10 platform, // TODO(b/194022174), for unit tests to run on the Android 10 platform, // libunwind must be statically linked. // libunwind must be statically linked. whole_static_libs: ["libunwind"], whole_static_libs: ["libunwind"], Loading Loading @@ -437,6 +439,7 @@ rust_ffi_static { "liblog_rust", "liblog_rust", "libquiche_static", "libquiche_static", "libring", "libring", "libstatslog_rust", "libthiserror", "libthiserror", "libtokio", "libtokio", "liburl", "liburl", Loading PrivateDnsConfiguration.cpp +6 −1 Original line number Original line Diff line number Diff line Loading @@ -568,6 +568,9 @@ int PrivateDnsConfiguration::setDoh(int32_t netId, uint32_t mark, return 0; return 0; } } const NetworkType networkType = resolv_get_network_types_for_net(netId); const PrivateDnsStatus status = getStatusLocked(netId); const auto getTimeoutFromFlag = [&](const std::string_view key, int defaultValue) -> uint64_t { const auto getTimeoutFromFlag = [&](const std::string_view key, int defaultValue) -> uint64_t { static constexpr int kMinTimeoutMs = 1000; static constexpr int kMinTimeoutMs = 1000; uint64_t timeout = Experiments::getInstance()->getFlag(key, defaultValue); uint64_t timeout = Experiments::getInstance()->getFlag(key, defaultValue); Loading Loading @@ -628,8 +631,10 @@ int PrivateDnsConfiguration::setDoh(int32_t netId, uint32_t mark, << ", use_session_resumption=" << flags.use_session_resumption << ", use_session_resumption=" << flags.use_session_resumption << ", enable_early_data=" << flags.enable_early_data; << ", enable_early_data=" << flags.enable_early_data; const PrivateDnsModes privateDnsMode = convertEnumType(status.mode); return doh_net_new(mDohDispatcher, netId, dohId.httpsTemplate.c_str(), dohId.host.c_str(), return doh_net_new(mDohDispatcher, netId, dohId.httpsTemplate.c_str(), dohId.host.c_str(), dohId.ipAddr.c_str(), mark, caCert.c_str(), &flags); dohId.ipAddr.c_str(), mark, caCert.c_str(), &flags, networkType, privateDnsMode); } } LOG(INFO) << __func__ << ": No suitable DoH server found"; LOG(INFO) << __func__ << ": No suitable DoH server found"; Loading doh.h +1 −1 Original line number Original line Diff line number Diff line Loading @@ -92,7 +92,7 @@ void doh_dispatcher_delete(DohDispatcher* doh); /// `url`, `domain`, `ip_addr`, `cert_path` are null terminated strings. /// `url`, `domain`, `ip_addr`, `cert_path` are null terminated strings. int32_t doh_net_new(DohDispatcher* doh, uint32_t net_id, const char* url, const char* domain, int32_t doh_net_new(DohDispatcher* doh, uint32_t net_id, const char* url, const char* domain, const char* ip_addr, uint32_t sk_mark, const char* cert_path, const char* ip_addr, uint32_t sk_mark, const char* cert_path, const FeatureFlags* flags); const FeatureFlags* flags, uint32_t network_type, uint32_t private_dns_mode); /// Sends a DNS query via the network associated to the given |net_id| and waits for the response. /// Sends a DNS query via the network associated to the given |net_id| and waits for the response. /// The return code should be either one of the public constant RESULT_* to indicate the error or /// The return code should be either one of the public constant RESULT_* to indicate the error or Loading doh/connection/driver.rs +76 −2 Original line number Original line Diff line number Diff line Loading @@ -17,12 +17,14 @@ use crate::boot_time; use crate::boot_time; use crate::boot_time::BootTime; use crate::boot_time::BootTime; use crate::metrics::log_handshake_event_stats; use log::{debug, info, warn}; use log::{debug, info, warn}; use quiche::h3; use quiche::h3; use std::collections::HashMap; use std::collections::HashMap; use std::default::Default; use std::default::Default; use std::future; use std::future; use std::io; use std::io; use std::time::Instant; use thiserror::Error; use thiserror::Error; use tokio::net::UdpSocket; use tokio::net::UdpSocket; use tokio::select; use tokio::select; Loading @@ -30,6 +32,50 @@ use tokio::sync::{mpsc, oneshot, watch}; use super::Status; use super::Status; #[derive(Copy, Clone, Debug)] pub enum Cause { Probe, Reconnect, Retry, } #[derive(Clone)] #[allow(dead_code)] pub enum HandshakeResult { Unknown, Success, Timeout, TlsFail, ServerUnreachable, } #[derive(Copy, Clone, Debug)] pub struct HandshakeInfo { pub cause: Cause, pub sent_bytes: u64, pub recv_bytes: u64, pub elapsed: u128, pub quic_version: u32, pub network_type: u32, pub private_dns_mode: u32, pub session_hit_checker: bool, } impl std::fmt::Display for HandshakeInfo { #[inline] fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( f, "cause={:?}, sent_bytes={}, recv_bytes={}, quic_version={}, session_hit_checker={}", self.cause, self.sent_bytes, self.recv_bytes, self.quic_version, self.session_hit_checker ) } } #[derive(Error, Debug)] #[derive(Error, Debug)] pub enum Error { pub enum Error { #[error("network IO error: {0}")] #[error("network IO error: {0}")] Loading Loading @@ -92,6 +138,8 @@ struct Driver { // if we poll on a dead receiver in a select! it will immediately return None. As a result, we // if we poll on a dead receiver in a select! it will immediately return None. As a result, we // need this to gate whether or not to include .recv() in our select! // need this to gate whether or not to include .recv() in our select! closing: bool, closing: bool, handshake_info: HandshakeInfo, connection_start: Instant, } } struct H3Driver { struct H3Driver { Loading Loading @@ -121,8 +169,9 @@ pub async fn drive( quiche_conn: quiche::Connection, quiche_conn: quiche::Connection, socket: UdpSocket, socket: UdpSocket, net_id: u32, net_id: u32, handshake_info: HandshakeInfo, ) -> Result<()> { ) -> Result<()> { Driver::new(request_rx, status_tx, quiche_conn, socket, net_id).drive().await Driver::new(request_rx, status_tx, quiche_conn, socket, net_id, handshake_info).drive().await } } impl Driver { impl Driver { Loading @@ -132,6 +181,7 @@ impl Driver { quiche_conn: quiche::Connection, quiche_conn: quiche::Connection, socket: UdpSocket, socket: UdpSocket, net_id: u32, net_id: u32, handshake_info: HandshakeInfo, ) -> Self { ) -> Self { Self { Self { request_rx, request_rx, Loading @@ -141,10 +191,13 @@ impl Driver { buffer: Box::new([0; MAX_UDP_PACKET_SIZE]), buffer: Box::new([0; MAX_UDP_PACKET_SIZE]), net_id, net_id, closing: false, closing: false, handshake_info, connection_start: Instant::now(), } } } } async fn drive(mut self) -> Result<()> { async fn drive(mut self) -> Result<()> { self.connection_start = Instant::now(); // Prime connection // Prime connection self.flush_tx().await?; self.flush_tx().await?; loop { loop { Loading Loading @@ -202,6 +255,13 @@ impl Driver { self.quiche_conn.trace_id(), self.quiche_conn.trace_id(), self.net_id self.net_id ); ); self.handshake_info.elapsed = self.connection_start.elapsed().as_micros(); // In Stats, sent_bytes implements the way that omits the length of padding data // append to the datagram. self.handshake_info.sent_bytes = self.quiche_conn.stats().sent_bytes; self.handshake_info.recv_bytes = self.quiche_conn.stats().recv_bytes; self.handshake_info.quic_version = quiche::PROTOCOL_VERSION; log_handshake_event_stats(HandshakeResult::Success, self.handshake_info); let h3_config = h3::Config::new()?; let h3_config = h3::Config::new()?; let h3_conn = h3::Connection::with_transport(&mut self.quiche_conn, &h3_config)?; let h3_conn = h3::Connection::with_transport(&mut self.quiche_conn, &h3_config)?; self = H3Driver::new(self, h3_conn).drive().await?; self = H3Driver::new(self, h3_conn).drive().await?; Loading @@ -213,7 +273,20 @@ impl Driver { // If a quiche timer would fire, call their callback // If a quiche timer would fire, call their callback _ = timer => { _ = timer => { info!("Driver: Timer expired on network {}", self.net_id); info!("Driver: Timer expired on network {}", self.net_id); self.quiche_conn.on_timeout() self.quiche_conn.on_timeout(); if !self.quiche_conn.is_established() && self.quiche_conn.is_closed() { info!( "Connection {} timeouted on network {}", self.quiche_conn.trace_id(), self.net_id ); self.handshake_info.elapsed = self.connection_start.elapsed().as_micros(); log_handshake_event_stats( HandshakeResult::Timeout, self.handshake_info, ); } } } // If we got packets from our peer, pass them to quiche // If we got packets from our peer, pass them to quiche Ok((size, from)) = self.socket.recv_from(self.buffer.as_mut()) => { Ok((size, from)) = self.socket.recv_from(self.buffer.as_mut()) => { Loading @@ -221,6 +294,7 @@ impl Driver { debug!("Received {} bytes on network {}", size, self.net_id); debug!("Received {} bytes on network {}", size, self.net_id); } } }; }; // Any of the actions in the select could require us to send packets to the peer // Any of the actions in the select could require us to send packets to the peer self.flush_tx().await?; self.flush_tx().await?; Loading doh/connection/mod.rs +22 −8 Original line number Original line Diff line number Diff line Loading @@ -16,6 +16,9 @@ //! Module providing an async abstraction around a quiche HTTP/3 connection //! Module providing an async abstraction around a quiche HTTP/3 connection use crate::boot_time::BootTime; use crate::boot_time::BootTime; use crate::connection::driver::Cause; use crate::connection::driver::HandshakeInfo; use crate::network::ServerInfo; use crate::network::SocketTagger; use crate::network::SocketTagger; use log::{debug, error, warn}; use log::{debug, error, warn}; use quiche::h3; use quiche::h3; Loading @@ -27,7 +30,7 @@ use tokio::net::UdpSocket; use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch}; use tokio::task; use tokio::task; mod driver; pub mod driver; pub use driver::Stream; pub use driver::Stream; use driver::{drive, Request}; use driver::{drive, Request}; Loading Loading @@ -129,29 +132,40 @@ impl Connection { const MAX_PENDING_REQUESTS: usize = 10; const MAX_PENDING_REQUESTS: usize = 10; /// Create a new connection with a background task handling IO. /// Create a new connection with a background task handling IO. pub async fn new( pub async fn new( server_name: Option<&str>, info: &ServerInfo, to: SocketAddr, socket_mark: u32, net_id: u32, tag_socket: &SocketTagger, tag_socket: &SocketTagger, config: &mut quiche::Config, config: &mut quiche::Config, session: Option<Vec<u8>>, session: Option<Vec<u8>>, cause: Cause, ) -> Result<Self> { ) -> Result<Self> { let server_name = info.domain.as_deref(); let to = info.peer_addr; let socket_mark = info.sk_mark; let net_id = info.net_id; let (request_tx, request_rx) = mpsc::channel(Self::MAX_PENDING_REQUESTS); let (request_tx, request_rx) = mpsc::channel(Self::MAX_PENDING_REQUESTS); let (status_tx, status_rx) = watch::channel(Status::QUIC); let (status_tx, status_rx) = watch::channel(Status::QUIC); let scid = new_scid(); let scid = new_scid(); let mut quiche_conn = let mut quiche_conn = quiche::connect(server_name, &quiche::ConnectionId::from_ref(&scid), to, config)?; quiche::connect(server_name, &quiche::ConnectionId::from_ref(&scid), to, config)?; // We will fall back to a full handshake if the session is expired. // We will fall back to a full handshake if the session is expired. if let Some(session) = session { if let Some(session) = session { debug!("Setting session"); debug!("Setting session"); quiche_conn.set_session(&session)?; quiche_conn.set_session(&session)?; } } let handshake_info = HandshakeInfo { cause, sent_bytes: 0, recv_bytes: 0, elapsed: 0, quic_version: 0, network_type: info.network_type, private_dns_mode: info.private_dns_mode, session_hit_checker: quiche_conn.session().is_some(), }; let socket = build_socket(to, socket_mark, tag_socket).await?; let socket = build_socket(to, socket_mark, tag_socket).await?; let driver = async move { let driver = async move { let result = drive(request_rx, status_tx, quiche_conn, socket, net_id).await; let result = drive(request_rx, status_tx, quiche_conn, socket, net_id, handshake_info).await; if let Err(ref e) = result { if let Err(ref e) = result { warn!("Connection driver returns some Err: {:?}", e); warn!("Connection driver returns some Err: {:?}", e); } } Loading Loading
Android.bp +8 −5 Original line number Original line Diff line number Diff line Loading @@ -248,15 +248,11 @@ cc_library { "libcrypto", "libcrypto", "liblog", //Used by libstatslog_resolv "liblog", //Used by libstatslog_resolv "libssl", "libssl", "libstatssocket", ], ], header_libs: [ header_libs: [ "libnetdbinder_utils_headers", "libnetdbinder_utils_headers", ], ], runtime_libs: [ // Causes the linkerconfig to create a namespace link from resolv to the // libstatssocket library within the statsd apex "libstatssocket", ], export_include_dirs: ["include"], export_include_dirs: ["include"], product_variables: { product_variables: { Loading Loading @@ -361,6 +357,7 @@ doh_rust_deps = [ "liblibc", "liblibc", "liblog_rust", "liblog_rust", "libring", "libring", "libstatslog_rust", "libthiserror", "libthiserror", "libtokio", "libtokio", "liburl", "liburl", Loading Loading @@ -407,6 +404,11 @@ rust_ffi_static { rlibs: doh_rust_deps + ["libquiche_static"], rlibs: doh_rust_deps + ["libquiche_static"], prefer_rlib: true, prefer_rlib: true, shared_libs: [ "libstatssocket", ], // TODO(b/194022174), for unit tests to run on the Android 10 platform, // TODO(b/194022174), for unit tests to run on the Android 10 platform, // libunwind must be statically linked. // libunwind must be statically linked. whole_static_libs: ["libunwind"], whole_static_libs: ["libunwind"], Loading Loading @@ -437,6 +439,7 @@ rust_ffi_static { "liblog_rust", "liblog_rust", "libquiche_static", "libquiche_static", "libring", "libring", "libstatslog_rust", "libthiserror", "libthiserror", "libtokio", "libtokio", "liburl", "liburl", Loading
PrivateDnsConfiguration.cpp +6 −1 Original line number Original line Diff line number Diff line Loading @@ -568,6 +568,9 @@ int PrivateDnsConfiguration::setDoh(int32_t netId, uint32_t mark, return 0; return 0; } } const NetworkType networkType = resolv_get_network_types_for_net(netId); const PrivateDnsStatus status = getStatusLocked(netId); const auto getTimeoutFromFlag = [&](const std::string_view key, int defaultValue) -> uint64_t { const auto getTimeoutFromFlag = [&](const std::string_view key, int defaultValue) -> uint64_t { static constexpr int kMinTimeoutMs = 1000; static constexpr int kMinTimeoutMs = 1000; uint64_t timeout = Experiments::getInstance()->getFlag(key, defaultValue); uint64_t timeout = Experiments::getInstance()->getFlag(key, defaultValue); Loading Loading @@ -628,8 +631,10 @@ int PrivateDnsConfiguration::setDoh(int32_t netId, uint32_t mark, << ", use_session_resumption=" << flags.use_session_resumption << ", use_session_resumption=" << flags.use_session_resumption << ", enable_early_data=" << flags.enable_early_data; << ", enable_early_data=" << flags.enable_early_data; const PrivateDnsModes privateDnsMode = convertEnumType(status.mode); return doh_net_new(mDohDispatcher, netId, dohId.httpsTemplate.c_str(), dohId.host.c_str(), return doh_net_new(mDohDispatcher, netId, dohId.httpsTemplate.c_str(), dohId.host.c_str(), dohId.ipAddr.c_str(), mark, caCert.c_str(), &flags); dohId.ipAddr.c_str(), mark, caCert.c_str(), &flags, networkType, privateDnsMode); } } LOG(INFO) << __func__ << ": No suitable DoH server found"; LOG(INFO) << __func__ << ": No suitable DoH server found"; Loading
doh.h +1 −1 Original line number Original line Diff line number Diff line Loading @@ -92,7 +92,7 @@ void doh_dispatcher_delete(DohDispatcher* doh); /// `url`, `domain`, `ip_addr`, `cert_path` are null terminated strings. /// `url`, `domain`, `ip_addr`, `cert_path` are null terminated strings. int32_t doh_net_new(DohDispatcher* doh, uint32_t net_id, const char* url, const char* domain, int32_t doh_net_new(DohDispatcher* doh, uint32_t net_id, const char* url, const char* domain, const char* ip_addr, uint32_t sk_mark, const char* cert_path, const char* ip_addr, uint32_t sk_mark, const char* cert_path, const FeatureFlags* flags); const FeatureFlags* flags, uint32_t network_type, uint32_t private_dns_mode); /// Sends a DNS query via the network associated to the given |net_id| and waits for the response. /// Sends a DNS query via the network associated to the given |net_id| and waits for the response. /// The return code should be either one of the public constant RESULT_* to indicate the error or /// The return code should be either one of the public constant RESULT_* to indicate the error or Loading
doh/connection/driver.rs +76 −2 Original line number Original line Diff line number Diff line Loading @@ -17,12 +17,14 @@ use crate::boot_time; use crate::boot_time; use crate::boot_time::BootTime; use crate::boot_time::BootTime; use crate::metrics::log_handshake_event_stats; use log::{debug, info, warn}; use log::{debug, info, warn}; use quiche::h3; use quiche::h3; use std::collections::HashMap; use std::collections::HashMap; use std::default::Default; use std::default::Default; use std::future; use std::future; use std::io; use std::io; use std::time::Instant; use thiserror::Error; use thiserror::Error; use tokio::net::UdpSocket; use tokio::net::UdpSocket; use tokio::select; use tokio::select; Loading @@ -30,6 +32,50 @@ use tokio::sync::{mpsc, oneshot, watch}; use super::Status; use super::Status; #[derive(Copy, Clone, Debug)] pub enum Cause { Probe, Reconnect, Retry, } #[derive(Clone)] #[allow(dead_code)] pub enum HandshakeResult { Unknown, Success, Timeout, TlsFail, ServerUnreachable, } #[derive(Copy, Clone, Debug)] pub struct HandshakeInfo { pub cause: Cause, pub sent_bytes: u64, pub recv_bytes: u64, pub elapsed: u128, pub quic_version: u32, pub network_type: u32, pub private_dns_mode: u32, pub session_hit_checker: bool, } impl std::fmt::Display for HandshakeInfo { #[inline] fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( f, "cause={:?}, sent_bytes={}, recv_bytes={}, quic_version={}, session_hit_checker={}", self.cause, self.sent_bytes, self.recv_bytes, self.quic_version, self.session_hit_checker ) } } #[derive(Error, Debug)] #[derive(Error, Debug)] pub enum Error { pub enum Error { #[error("network IO error: {0}")] #[error("network IO error: {0}")] Loading Loading @@ -92,6 +138,8 @@ struct Driver { // if we poll on a dead receiver in a select! it will immediately return None. As a result, we // if we poll on a dead receiver in a select! it will immediately return None. As a result, we // need this to gate whether or not to include .recv() in our select! // need this to gate whether or not to include .recv() in our select! closing: bool, closing: bool, handshake_info: HandshakeInfo, connection_start: Instant, } } struct H3Driver { struct H3Driver { Loading Loading @@ -121,8 +169,9 @@ pub async fn drive( quiche_conn: quiche::Connection, quiche_conn: quiche::Connection, socket: UdpSocket, socket: UdpSocket, net_id: u32, net_id: u32, handshake_info: HandshakeInfo, ) -> Result<()> { ) -> Result<()> { Driver::new(request_rx, status_tx, quiche_conn, socket, net_id).drive().await Driver::new(request_rx, status_tx, quiche_conn, socket, net_id, handshake_info).drive().await } } impl Driver { impl Driver { Loading @@ -132,6 +181,7 @@ impl Driver { quiche_conn: quiche::Connection, quiche_conn: quiche::Connection, socket: UdpSocket, socket: UdpSocket, net_id: u32, net_id: u32, handshake_info: HandshakeInfo, ) -> Self { ) -> Self { Self { Self { request_rx, request_rx, Loading @@ -141,10 +191,13 @@ impl Driver { buffer: Box::new([0; MAX_UDP_PACKET_SIZE]), buffer: Box::new([0; MAX_UDP_PACKET_SIZE]), net_id, net_id, closing: false, closing: false, handshake_info, connection_start: Instant::now(), } } } } async fn drive(mut self) -> Result<()> { async fn drive(mut self) -> Result<()> { self.connection_start = Instant::now(); // Prime connection // Prime connection self.flush_tx().await?; self.flush_tx().await?; loop { loop { Loading Loading @@ -202,6 +255,13 @@ impl Driver { self.quiche_conn.trace_id(), self.quiche_conn.trace_id(), self.net_id self.net_id ); ); self.handshake_info.elapsed = self.connection_start.elapsed().as_micros(); // In Stats, sent_bytes implements the way that omits the length of padding data // append to the datagram. self.handshake_info.sent_bytes = self.quiche_conn.stats().sent_bytes; self.handshake_info.recv_bytes = self.quiche_conn.stats().recv_bytes; self.handshake_info.quic_version = quiche::PROTOCOL_VERSION; log_handshake_event_stats(HandshakeResult::Success, self.handshake_info); let h3_config = h3::Config::new()?; let h3_config = h3::Config::new()?; let h3_conn = h3::Connection::with_transport(&mut self.quiche_conn, &h3_config)?; let h3_conn = h3::Connection::with_transport(&mut self.quiche_conn, &h3_config)?; self = H3Driver::new(self, h3_conn).drive().await?; self = H3Driver::new(self, h3_conn).drive().await?; Loading @@ -213,7 +273,20 @@ impl Driver { // If a quiche timer would fire, call their callback // If a quiche timer would fire, call their callback _ = timer => { _ = timer => { info!("Driver: Timer expired on network {}", self.net_id); info!("Driver: Timer expired on network {}", self.net_id); self.quiche_conn.on_timeout() self.quiche_conn.on_timeout(); if !self.quiche_conn.is_established() && self.quiche_conn.is_closed() { info!( "Connection {} timeouted on network {}", self.quiche_conn.trace_id(), self.net_id ); self.handshake_info.elapsed = self.connection_start.elapsed().as_micros(); log_handshake_event_stats( HandshakeResult::Timeout, self.handshake_info, ); } } } // If we got packets from our peer, pass them to quiche // If we got packets from our peer, pass them to quiche Ok((size, from)) = self.socket.recv_from(self.buffer.as_mut()) => { Ok((size, from)) = self.socket.recv_from(self.buffer.as_mut()) => { Loading @@ -221,6 +294,7 @@ impl Driver { debug!("Received {} bytes on network {}", size, self.net_id); debug!("Received {} bytes on network {}", size, self.net_id); } } }; }; // Any of the actions in the select could require us to send packets to the peer // Any of the actions in the select could require us to send packets to the peer self.flush_tx().await?; self.flush_tx().await?; Loading
doh/connection/mod.rs +22 −8 Original line number Original line Diff line number Diff line Loading @@ -16,6 +16,9 @@ //! Module providing an async abstraction around a quiche HTTP/3 connection //! Module providing an async abstraction around a quiche HTTP/3 connection use crate::boot_time::BootTime; use crate::boot_time::BootTime; use crate::connection::driver::Cause; use crate::connection::driver::HandshakeInfo; use crate::network::ServerInfo; use crate::network::SocketTagger; use crate::network::SocketTagger; use log::{debug, error, warn}; use log::{debug, error, warn}; use quiche::h3; use quiche::h3; Loading @@ -27,7 +30,7 @@ use tokio::net::UdpSocket; use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch}; use tokio::task; use tokio::task; mod driver; pub mod driver; pub use driver::Stream; pub use driver::Stream; use driver::{drive, Request}; use driver::{drive, Request}; Loading Loading @@ -129,29 +132,40 @@ impl Connection { const MAX_PENDING_REQUESTS: usize = 10; const MAX_PENDING_REQUESTS: usize = 10; /// Create a new connection with a background task handling IO. /// Create a new connection with a background task handling IO. pub async fn new( pub async fn new( server_name: Option<&str>, info: &ServerInfo, to: SocketAddr, socket_mark: u32, net_id: u32, tag_socket: &SocketTagger, tag_socket: &SocketTagger, config: &mut quiche::Config, config: &mut quiche::Config, session: Option<Vec<u8>>, session: Option<Vec<u8>>, cause: Cause, ) -> Result<Self> { ) -> Result<Self> { let server_name = info.domain.as_deref(); let to = info.peer_addr; let socket_mark = info.sk_mark; let net_id = info.net_id; let (request_tx, request_rx) = mpsc::channel(Self::MAX_PENDING_REQUESTS); let (request_tx, request_rx) = mpsc::channel(Self::MAX_PENDING_REQUESTS); let (status_tx, status_rx) = watch::channel(Status::QUIC); let (status_tx, status_rx) = watch::channel(Status::QUIC); let scid = new_scid(); let scid = new_scid(); let mut quiche_conn = let mut quiche_conn = quiche::connect(server_name, &quiche::ConnectionId::from_ref(&scid), to, config)?; quiche::connect(server_name, &quiche::ConnectionId::from_ref(&scid), to, config)?; // We will fall back to a full handshake if the session is expired. // We will fall back to a full handshake if the session is expired. if let Some(session) = session { if let Some(session) = session { debug!("Setting session"); debug!("Setting session"); quiche_conn.set_session(&session)?; quiche_conn.set_session(&session)?; } } let handshake_info = HandshakeInfo { cause, sent_bytes: 0, recv_bytes: 0, elapsed: 0, quic_version: 0, network_type: info.network_type, private_dns_mode: info.private_dns_mode, session_hit_checker: quiche_conn.session().is_some(), }; let socket = build_socket(to, socket_mark, tag_socket).await?; let socket = build_socket(to, socket_mark, tag_socket).await?; let driver = async move { let driver = async move { let result = drive(request_rx, status_tx, quiche_conn, socket, net_id).await; let result = drive(request_rx, status_tx, quiche_conn, socket, net_id, handshake_info).await; if let Err(ref e) = result { if let Err(ref e) = result { warn!("Connection driver returns some Err: {:?}", e); warn!("Connection driver returns some Err: {:?}", e); } } Loading