Loading doh/connection/driver.rs +5 −2 Original line number Diff line number Diff line Loading @@ -159,7 +159,7 @@ impl Driver { self.quiche_conn.peer_error() ); // We don't care if the receiver has hung up let _ = self.status_tx.send(Status::Dead); let _ = self.status_tx.send(Status::Dead { session: self.quiche_conn.session() }); Err(Error::Closed) } else { Ok(()) Loading Loading @@ -229,7 +229,10 @@ impl H3Driver { loop { match self.drive_once().await { Err(e) => { let _ = self.driver.status_tx.send(Status::Dead); let _ = self .driver .status_tx .send(Status::Dead { session: self.driver.quiche_conn.session() }); return Err(e); } Ok(()) => (), Loading doh/connection/mod.rs +22 −6 Original line number Diff line number Diff line Loading @@ -17,7 +17,7 @@ use crate::boot_time::BootTime; use crate::network::SocketTagger; use log::{error, warn}; use log::{debug, error, warn}; use quiche::h3; use std::future::Future; use std::io; Loading @@ -32,11 +32,14 @@ mod driver; pub use driver::Stream; use driver::{drive, Request}; #[derive(Debug, Copy, Clone)] #[derive(Debug, Clone)] pub enum Status { QUIC, H3, Dead, Dead { /// The session of the closed connection. session: Option<Vec<u8>>, }, } /// Quiche HTTP/3 connection Loading Loading @@ -132,12 +135,18 @@ impl Connection { net_id: u32, tag_socket: &SocketTagger, config: &mut quiche::Config, session: Option<Vec<u8>>, ) -> Result<Self> { let (request_tx, request_rx) = mpsc::channel(Self::MAX_PENDING_REQUESTS); let (status_tx, status_rx) = watch::channel(Status::QUIC); let scid = new_scid(); let quiche_conn = let mut quiche_conn = quiche::connect(server_name, &quiche::ConnectionId::from_ref(&scid), to, config)?; if let Some(session) = session { debug!("Setting session"); quiche_conn.set_session(&session)?; } let socket = build_socket(to, socket_mark, tag_socket).await?; let driver = async move { let result = drive(request_rx, status_tx, quiche_conn, socket, net_id).await; Loading @@ -154,9 +163,9 @@ impl Connection { pub async fn wait_for_live(&mut self) -> bool { // Once sc-mainline-prod updates to modern tokio, use // borrow_and_update here. match *self.status_rx.borrow() { match &*self.status_rx.borrow() { Status::H3 => return true, Status::Dead => return false, Status::Dead { .. } => return false, Status::QUIC => (), } if self.status_rx.changed().await.is_err() { Loading @@ -175,6 +184,13 @@ impl Connection { } } pub fn session(&self) -> Option<Vec<u8>> { match &*self.status_rx.borrow() { Status::Dead { session } => session.clone(), _ => None, } } /// Send a query, produce a future which will provide a response. /// The future is separately returned rather than awaited to allow it to be waited on without /// keeping the `Connection` itself borrowed. Loading doh/network/driver.rs +6 −3 Original line number Diff line number Diff line Loading @@ -76,6 +76,7 @@ async fn build_connection( info: &ServerInfo, tag_socket: &SocketTagger, config: &mut Config, session: Option<Vec<u8>>, ) -> Result<Connection> { use std::ops::DerefMut; Ok(Connection::new( Loading @@ -85,6 +86,7 @@ async fn build_connection( info.net_id, tag_socket, config.take().await.deref_mut(), session, ) .await?) } Loading @@ -100,7 +102,7 @@ impl Driver { ) -> Result<(Self, mpsc::Sender<Command>, watch::Receiver<Status>)> { let (command_tx, command_rx) = mpsc::channel(Self::MAX_BUFFERED_COMMANDS); let (status_tx, status_rx) = watch::channel(Status::Unprobed); let connection = build_connection(&info, &tag_socket, &mut config).await?; let connection = build_connection(&info, &tag_socket, &mut config, None).await?; Ok(( Self { info, config, connection, status_tx, command_rx, validation, tag_socket }, command_tx, Loading Loading @@ -130,7 +132,7 @@ impl Driver { // If our network is currently failed, it may be due to issues with the connection. // Re-establish before re-probing self.connection = build_connection(&self.info, &self.tag_socket, &mut self.config).await?; build_connection(&self.info, &self.tag_socket, &mut self.config, None).await?; self.status_tx.send(Status::Unprobed)?; } if self.status_tx.borrow().is_live() { Loading Loading @@ -183,9 +185,10 @@ impl Driver { } if !self.connection.wait_for_live().await { let session = self.connection.session(); // Try reconnecting self.connection = build_connection(&self.info, &self.tag_socket, &mut self.config).await?; build_connection(&self.info, &self.tag_socket, &mut self.config, session).await?; } let request = encoding::dns_request(&query.query, &self.info.url)?; let stream_fut = self.connection.query(request, Some(query.expiry)).await?; Loading tests/Android.bp +6 −3 Original line number Diff line number Diff line Loading @@ -166,6 +166,7 @@ cc_test { "tun_forwarder.cpp", ], header_libs: [ "bpf_headers", "dnsproxyd_protocol_headers", "libnetd_resolv_headers", ], Loading @@ -177,7 +178,6 @@ cc_test { ], static_libs: [ "dnsresolver_aidl_interface-lateststable-ndk", "libbpf_android", "libcrypto_static", "libgmock", "libmodules-utils-build", Loading Loading @@ -275,9 +275,12 @@ cc_test { srcs: [ "resolv_stats_test_utils_test.cpp", ], defaults: ["netd_defaults", "resolv_test_defaults"], defaults: [ "netd_defaults", "resolv_test_defaults", "connectivity-mainline-presubmit-cc-defaults", ], test_suites: ["general-tests"], test_mainline_modules: ["CaptivePortalLoginGoogle.apk+NetworkStackGoogle.apk+com.google.android.resolv.apex+com.google.android.tethering.apex"], static_libs: [ "libgmock", "libprotobuf-cpp-lite", Loading tests/doh/include/lib.rs.h +2 −0 Original line number Diff line number Diff line Loading @@ -33,6 +33,8 @@ struct Stats { uint32_t connections_accepted; /// The number of QUIC connections alive. uint32_t alive_connections; /// The number of QUIC connections using session resumption. uint32_t resumed_connections; }; extern "C" { Loading Loading
doh/connection/driver.rs +5 −2 Original line number Diff line number Diff line Loading @@ -159,7 +159,7 @@ impl Driver { self.quiche_conn.peer_error() ); // We don't care if the receiver has hung up let _ = self.status_tx.send(Status::Dead); let _ = self.status_tx.send(Status::Dead { session: self.quiche_conn.session() }); Err(Error::Closed) } else { Ok(()) Loading Loading @@ -229,7 +229,10 @@ impl H3Driver { loop { match self.drive_once().await { Err(e) => { let _ = self.driver.status_tx.send(Status::Dead); let _ = self .driver .status_tx .send(Status::Dead { session: self.driver.quiche_conn.session() }); return Err(e); } Ok(()) => (), Loading
doh/connection/mod.rs +22 −6 Original line number Diff line number Diff line Loading @@ -17,7 +17,7 @@ use crate::boot_time::BootTime; use crate::network::SocketTagger; use log::{error, warn}; use log::{debug, error, warn}; use quiche::h3; use std::future::Future; use std::io; Loading @@ -32,11 +32,14 @@ mod driver; pub use driver::Stream; use driver::{drive, Request}; #[derive(Debug, Copy, Clone)] #[derive(Debug, Clone)] pub enum Status { QUIC, H3, Dead, Dead { /// The session of the closed connection. session: Option<Vec<u8>>, }, } /// Quiche HTTP/3 connection Loading Loading @@ -132,12 +135,18 @@ impl Connection { net_id: u32, tag_socket: &SocketTagger, config: &mut quiche::Config, session: Option<Vec<u8>>, ) -> Result<Self> { let (request_tx, request_rx) = mpsc::channel(Self::MAX_PENDING_REQUESTS); let (status_tx, status_rx) = watch::channel(Status::QUIC); let scid = new_scid(); let quiche_conn = let mut quiche_conn = quiche::connect(server_name, &quiche::ConnectionId::from_ref(&scid), to, config)?; if let Some(session) = session { debug!("Setting session"); quiche_conn.set_session(&session)?; } let socket = build_socket(to, socket_mark, tag_socket).await?; let driver = async move { let result = drive(request_rx, status_tx, quiche_conn, socket, net_id).await; Loading @@ -154,9 +163,9 @@ impl Connection { pub async fn wait_for_live(&mut self) -> bool { // Once sc-mainline-prod updates to modern tokio, use // borrow_and_update here. match *self.status_rx.borrow() { match &*self.status_rx.borrow() { Status::H3 => return true, Status::Dead => return false, Status::Dead { .. } => return false, Status::QUIC => (), } if self.status_rx.changed().await.is_err() { Loading @@ -175,6 +184,13 @@ impl Connection { } } pub fn session(&self) -> Option<Vec<u8>> { match &*self.status_rx.borrow() { Status::Dead { session } => session.clone(), _ => None, } } /// Send a query, produce a future which will provide a response. /// The future is separately returned rather than awaited to allow it to be waited on without /// keeping the `Connection` itself borrowed. Loading
doh/network/driver.rs +6 −3 Original line number Diff line number Diff line Loading @@ -76,6 +76,7 @@ async fn build_connection( info: &ServerInfo, tag_socket: &SocketTagger, config: &mut Config, session: Option<Vec<u8>>, ) -> Result<Connection> { use std::ops::DerefMut; Ok(Connection::new( Loading @@ -85,6 +86,7 @@ async fn build_connection( info.net_id, tag_socket, config.take().await.deref_mut(), session, ) .await?) } Loading @@ -100,7 +102,7 @@ impl Driver { ) -> Result<(Self, mpsc::Sender<Command>, watch::Receiver<Status>)> { let (command_tx, command_rx) = mpsc::channel(Self::MAX_BUFFERED_COMMANDS); let (status_tx, status_rx) = watch::channel(Status::Unprobed); let connection = build_connection(&info, &tag_socket, &mut config).await?; let connection = build_connection(&info, &tag_socket, &mut config, None).await?; Ok(( Self { info, config, connection, status_tx, command_rx, validation, tag_socket }, command_tx, Loading Loading @@ -130,7 +132,7 @@ impl Driver { // If our network is currently failed, it may be due to issues with the connection. // Re-establish before re-probing self.connection = build_connection(&self.info, &self.tag_socket, &mut self.config).await?; build_connection(&self.info, &self.tag_socket, &mut self.config, None).await?; self.status_tx.send(Status::Unprobed)?; } if self.status_tx.borrow().is_live() { Loading Loading @@ -183,9 +185,10 @@ impl Driver { } if !self.connection.wait_for_live().await { let session = self.connection.session(); // Try reconnecting self.connection = build_connection(&self.info, &self.tag_socket, &mut self.config).await?; build_connection(&self.info, &self.tag_socket, &mut self.config, session).await?; } let request = encoding::dns_request(&query.query, &self.info.url)?; let stream_fut = self.connection.query(request, Some(query.expiry)).await?; Loading
tests/Android.bp +6 −3 Original line number Diff line number Diff line Loading @@ -166,6 +166,7 @@ cc_test { "tun_forwarder.cpp", ], header_libs: [ "bpf_headers", "dnsproxyd_protocol_headers", "libnetd_resolv_headers", ], Loading @@ -177,7 +178,6 @@ cc_test { ], static_libs: [ "dnsresolver_aidl_interface-lateststable-ndk", "libbpf_android", "libcrypto_static", "libgmock", "libmodules-utils-build", Loading Loading @@ -275,9 +275,12 @@ cc_test { srcs: [ "resolv_stats_test_utils_test.cpp", ], defaults: ["netd_defaults", "resolv_test_defaults"], defaults: [ "netd_defaults", "resolv_test_defaults", "connectivity-mainline-presubmit-cc-defaults", ], test_suites: ["general-tests"], test_mainline_modules: ["CaptivePortalLoginGoogle.apk+NetworkStackGoogle.apk+com.google.android.resolv.apex+com.google.android.tethering.apex"], static_libs: [ "libgmock", "libprotobuf-cpp-lite", Loading
tests/doh/include/lib.rs.h +2 −0 Original line number Diff line number Diff line Loading @@ -33,6 +33,8 @@ struct Stats { uint32_t connections_accepted; /// The number of QUIC connections alive. uint32_t alive_connections; /// The number of QUIC connections using session resumption. uint32_t resumed_connections; }; extern "C" { Loading