Loading doh/connection/driver.rs +63 −10 Original line number Diff line number Diff line Loading @@ -17,7 +17,7 @@ use crate::boot_time; use crate::boot_time::BootTime; use log::{debug, warn}; use log::{debug, trace, warn}; use quiche::h3; use std::collections::HashMap; use std::default::Default; Loading Loading @@ -86,6 +86,7 @@ struct Driver { // off the stack and prevent it being copied during // moves of the driver. buffer: Box<[u8; MAX_UDP_PACKET_SIZE]>, net_id: u32, } struct H3Driver { Loading @@ -103,7 +104,8 @@ struct H3Driver { streams: HashMap<u64, Stream>, } async fn optional_timeout(timeout: Option<boot_time::Duration>) { async fn optional_timeout(timeout: Option<boot_time::Duration>, net_id: u32) { trace!("optional_timeout: timeout={:?}, network {}", timeout, net_id); match timeout { Some(timeout) => boot_time::sleep(timeout).await, None => future::pending().await, Loading @@ -117,8 +119,9 @@ pub async fn drive( status_tx: watch::Sender<Status>, quiche_conn: Pin<Box<quiche::Connection>>, socket: UdpSocket, net_id: u32, ) -> Result<()> { Driver::new(request_rx, status_tx, quiche_conn, socket).drive().await Driver::new(request_rx, status_tx, quiche_conn, socket, net_id).drive().await } impl Driver { Loading @@ -127,6 +130,7 @@ impl Driver { status_tx: watch::Sender<Status>, quiche_conn: Pin<Box<quiche::Connection>>, socket: UdpSocket, net_id: u32, ) -> Self { Self { request_rx, Loading @@ -134,6 +138,7 @@ impl Driver { quiche_conn, socket, buffer: Box::new([0; MAX_UDP_PACKET_SIZE]), net_id, } } Loading @@ -147,6 +152,12 @@ impl Driver { fn handle_closed(&self) -> Result<()> { if self.quiche_conn.is_closed() { // TODO: Also log local_error() once Quiche 0.10.0 is available. debug!( "Connection closed on network {}, peer_error={:?}", self.net_id, self.quiche_conn.peer_error() ); // We don't care if the receiver has hung up let _ = self.status_tx.send(Status::Dead); Err(Error::Closed) Loading @@ -156,15 +167,17 @@ impl Driver { } async fn drive_once(mut self) -> Result<Self> { let timer = optional_timeout(self.quiche_conn.timeout()); let timer = optional_timeout(self.quiche_conn.timeout(), self.net_id); select! { // If a quiche timer would fire, call their callback _ = timer => { debug!("Driver: Timer expired on network {}", self.net_id); self.quiche_conn.on_timeout() } // If we got packets from our peer, pass them to quiche Ok((size, from)) = self.socket.recv_from(self.buffer.as_mut()) => { self.quiche_conn.recv(&mut self.buffer[..size], quiche::RecvInfo { from })?; debug!("Received {} bytes on network {}", size, self.net_id); } }; // Any of the actions in the select could require us to send packets to the peer Loading Loading @@ -192,6 +205,7 @@ impl Driver { Err(e) => return Err(e.into()), Ok((valid_len, send_info)) => { self.socket.send_to(&send_buf[..valid_len], send_info.to).await?; debug!("Sent {} bytes on network {}", valid_len, self.net_id); } } } Loading Loading @@ -226,7 +240,7 @@ impl H3Driver { async fn drive_once(&mut self) -> Result<()> { // We can't call self.driver.drive_once at the same time as // self.driver.request_rx.recv() due to ownership let timer = optional_timeout(self.driver.quiche_conn.timeout()); let timer = optional_timeout(self.driver.quiche_conn.timeout(), self.driver.net_id); // If we've buffered a request (due to the connection being full) // try to resend that first if let Some(request) = self.buffered_request.take() { Loading @@ -243,11 +257,14 @@ impl H3Driver { }, // If a quiche timer would fire, call their callback _ = timer => { debug!("H3Driver: Timer expired on network {}", self.driver.net_id); self.driver.quiche_conn.on_timeout() } // If we got packets from our peer, pass them to quiche Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) => self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from }).map(|_| ())?, Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) => { self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from }).map(|_| ())?; debug!("Received {} bytes on network {}", size, self.driver.net_id); } }; // Any of the actions in the select could require us to send packets to the peer Loading @@ -261,6 +278,8 @@ impl H3Driver { } fn handle_request(&mut self, request: Request) -> Result<()> { debug!("Handling DNS request on network {}, stats={:?}, peer_streams_left_bidi={}, peer_streams_left_uni={}", self.driver.net_id, self.driver.quiche_conn.stats(), self.driver.quiche_conn.peer_streams_left_bidi(), self.driver.quiche_conn.peer_streams_left_uni()); // If the request has already timed out, don't issue it to the server. if let Some(expiry) = request.expiry { if BootTime::now() > expiry { Loading @@ -283,6 +302,12 @@ impl H3Driver { } result => result?, }; debug!( "Handled DNS request: stream ID {}, network {}, stream_capacity={:?}", stream_id, self.driver.net_id, self.driver.quiche_conn.stream_capacity(stream_id) ); self.requests.insert(stream_id, request); Ok(()) } Loading @@ -303,10 +328,17 @@ impl H3Driver { return Ok(()); } Err(e) => { debug!("recv_body: Error={:?}", e); stream.data.truncate(base_len); return Err(e.into()); } Ok(recvd) => stream.data.truncate(base_len + recvd), Ok(recvd) => { stream.data.truncate(base_len + recvd); debug!( "Got {} bytes of response data from stream ID {} on network {}", recvd, stream_id, self.driver.net_id ); } } } } else { Loading Loading @@ -342,6 +374,10 @@ impl H3Driver { } match event { h3::Event::Headers { list, has_body } => { debug!( "process_h3_event: h3::Event::Headers on stream ID {}, network {}", stream_id, self.driver.net_id ); let stream = Stream::new(list); if self.streams.insert(stream_id, stream).is_some() { warn!("Re-using stream ID {} before it was completed.", stream_id) Loading @@ -350,8 +386,20 @@ impl H3Driver { self.respond(stream_id); } } h3::Event::Data => self.recv_body(stream_id).await?, h3::Event::Finished => self.respond(stream_id), h3::Event::Data => { debug!( "process_h3_event: h3::Event::Data on stream ID {}, network {}", stream_id, self.driver.net_id ); self.recv_body(stream_id).await?; } h3::Event::Finished => { debug!( "process_h3_event: h3::Event::Finished on stream ID {}, network {}", stream_id, self.driver.net_id ); self.respond(stream_id) } // This clause is for quiche 0.10.x, we're still on 0.9.x //h3::Event::Reset(e) => { // self.streams.get_mut(&stream_id).map(|stream| stream.error = Some(e)); Loading @@ -369,6 +417,7 @@ impl H3Driver { } async fn shutdown(&mut self, send_goaway: bool, msg: &[u8]) -> Result<()> { debug!("Closing connection on network {} with msg {:?}", self.driver.net_id, msg); self.driver.request_rx.close(); while self.driver.request_rx.recv().await.is_some() {} self.closing = true; Loading @@ -384,6 +433,10 @@ impl H3Driver { fn respond(&mut self, stream_id: u64) { match (self.streams.remove(&stream_id), self.requests.remove(&stream_id)) { (Some(stream), Some(request)) => { debug!( "Sending answer back to resolv, stream ID: {}, network {}", stream_id, self.driver.net_id ); // We don't care about the error, because it means the requestor has left. let _ = request.response_tx.send(stream); } Loading doh/connection/mod.rs +5 −4 Original line number Diff line number Diff line Loading @@ -17,7 +17,7 @@ use crate::boot_time::BootTime; use crate::network::SocketTagger; use log::error; use log::{error, warn}; use quiche::h3; use std::future::Future; use std::io; Loading Loading @@ -129,6 +129,7 @@ impl Connection { server_name: Option<&str>, to: SocketAddr, socket_mark: u32, net_id: u32, tag_socket: &SocketTagger, config: &mut quiche::Config, ) -> Result<Self> { Loading @@ -138,10 +139,10 @@ impl Connection { let quiche_conn = quiche::connect(server_name, &quiche::ConnectionId::from_ref(&scid), to, config)?; let socket = build_socket(to, socket_mark, tag_socket).await?; let driver = async { let result = drive(request_rx, status_tx, quiche_conn, socket).await; let driver = async move { let result = drive(request_rx, status_tx, quiche_conn, socket, net_id).await; if let Err(ref e) = result { error!("Connection driver failed: {:?}", e); warn!("Connection driver returns some Err: {:?}", e); } result }; Loading doh/dispatcher/driver.rs +1 −1 Original line number Diff line number Diff line Loading @@ -107,7 +107,7 @@ impl Driver { // If we have a network registered to the provided net_id, but the server info doesn't // match, our API has been used incorrectly. Attempt to recover by deleting the old // network and recreating it according to the probe request. warn!("Probing net_id={} with mismatched server info", info.net_id); warn!("Probing net_id={} with mismatched server info {:?}", info.net_id, info); self.networks.remove(&info.net_id); } // Can't use or_insert_with because creating a network may fail Loading doh/network/driver.rs +2 −0 Original line number Diff line number Diff line Loading @@ -82,6 +82,7 @@ async fn build_connection( info.domain.as_deref(), info.peer_addr, info.sk_mark, info.net_id, tag_socket, config.take().await.deref_mut(), ) Loading Loading @@ -141,6 +142,7 @@ impl Driver { } async fn force_probe(&mut self, probe_timeout: Duration) -> Result<()> { debug!("Sending probe to server {} on Network {}", self.info.peer_addr, self.info.net_id); let probe = encoding::probe_query()?; let dns_request = encoding::dns_request(&probe, &self.info.url)?; let expiry = BootTime::now().checked_add(probe_timeout); Loading res_send.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -1398,7 +1398,7 @@ ssize_t res_doh_send(ResState* statp, const Slice query, const Slice answer, int queryTimeout = 1000; } ssize_t result = privateDnsConfiguration.dohQuery(netId, query, answer, queryTimeout); LOG(INFO) << __func__ << ": Https query result: " << result; LOG(INFO) << __func__ << ": Https query result: " << result << ", netid=" << netId; if (result == DOH_RESULT_CAN_NOT_SEND) return DOH_RESULT_CAN_NOT_SEND; Loading Loading
doh/connection/driver.rs +63 −10 Original line number Diff line number Diff line Loading @@ -17,7 +17,7 @@ use crate::boot_time; use crate::boot_time::BootTime; use log::{debug, warn}; use log::{debug, trace, warn}; use quiche::h3; use std::collections::HashMap; use std::default::Default; Loading Loading @@ -86,6 +86,7 @@ struct Driver { // off the stack and prevent it being copied during // moves of the driver. buffer: Box<[u8; MAX_UDP_PACKET_SIZE]>, net_id: u32, } struct H3Driver { Loading @@ -103,7 +104,8 @@ struct H3Driver { streams: HashMap<u64, Stream>, } async fn optional_timeout(timeout: Option<boot_time::Duration>) { async fn optional_timeout(timeout: Option<boot_time::Duration>, net_id: u32) { trace!("optional_timeout: timeout={:?}, network {}", timeout, net_id); match timeout { Some(timeout) => boot_time::sleep(timeout).await, None => future::pending().await, Loading @@ -117,8 +119,9 @@ pub async fn drive( status_tx: watch::Sender<Status>, quiche_conn: Pin<Box<quiche::Connection>>, socket: UdpSocket, net_id: u32, ) -> Result<()> { Driver::new(request_rx, status_tx, quiche_conn, socket).drive().await Driver::new(request_rx, status_tx, quiche_conn, socket, net_id).drive().await } impl Driver { Loading @@ -127,6 +130,7 @@ impl Driver { status_tx: watch::Sender<Status>, quiche_conn: Pin<Box<quiche::Connection>>, socket: UdpSocket, net_id: u32, ) -> Self { Self { request_rx, Loading @@ -134,6 +138,7 @@ impl Driver { quiche_conn, socket, buffer: Box::new([0; MAX_UDP_PACKET_SIZE]), net_id, } } Loading @@ -147,6 +152,12 @@ impl Driver { fn handle_closed(&self) -> Result<()> { if self.quiche_conn.is_closed() { // TODO: Also log local_error() once Quiche 0.10.0 is available. debug!( "Connection closed on network {}, peer_error={:?}", self.net_id, self.quiche_conn.peer_error() ); // We don't care if the receiver has hung up let _ = self.status_tx.send(Status::Dead); Err(Error::Closed) Loading @@ -156,15 +167,17 @@ impl Driver { } async fn drive_once(mut self) -> Result<Self> { let timer = optional_timeout(self.quiche_conn.timeout()); let timer = optional_timeout(self.quiche_conn.timeout(), self.net_id); select! { // If a quiche timer would fire, call their callback _ = timer => { debug!("Driver: Timer expired on network {}", self.net_id); self.quiche_conn.on_timeout() } // If we got packets from our peer, pass them to quiche Ok((size, from)) = self.socket.recv_from(self.buffer.as_mut()) => { self.quiche_conn.recv(&mut self.buffer[..size], quiche::RecvInfo { from })?; debug!("Received {} bytes on network {}", size, self.net_id); } }; // Any of the actions in the select could require us to send packets to the peer Loading Loading @@ -192,6 +205,7 @@ impl Driver { Err(e) => return Err(e.into()), Ok((valid_len, send_info)) => { self.socket.send_to(&send_buf[..valid_len], send_info.to).await?; debug!("Sent {} bytes on network {}", valid_len, self.net_id); } } } Loading Loading @@ -226,7 +240,7 @@ impl H3Driver { async fn drive_once(&mut self) -> Result<()> { // We can't call self.driver.drive_once at the same time as // self.driver.request_rx.recv() due to ownership let timer = optional_timeout(self.driver.quiche_conn.timeout()); let timer = optional_timeout(self.driver.quiche_conn.timeout(), self.driver.net_id); // If we've buffered a request (due to the connection being full) // try to resend that first if let Some(request) = self.buffered_request.take() { Loading @@ -243,11 +257,14 @@ impl H3Driver { }, // If a quiche timer would fire, call their callback _ = timer => { debug!("H3Driver: Timer expired on network {}", self.driver.net_id); self.driver.quiche_conn.on_timeout() } // If we got packets from our peer, pass them to quiche Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) => self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from }).map(|_| ())?, Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) => { self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from }).map(|_| ())?; debug!("Received {} bytes on network {}", size, self.driver.net_id); } }; // Any of the actions in the select could require us to send packets to the peer Loading @@ -261,6 +278,8 @@ impl H3Driver { } fn handle_request(&mut self, request: Request) -> Result<()> { debug!("Handling DNS request on network {}, stats={:?}, peer_streams_left_bidi={}, peer_streams_left_uni={}", self.driver.net_id, self.driver.quiche_conn.stats(), self.driver.quiche_conn.peer_streams_left_bidi(), self.driver.quiche_conn.peer_streams_left_uni()); // If the request has already timed out, don't issue it to the server. if let Some(expiry) = request.expiry { if BootTime::now() > expiry { Loading @@ -283,6 +302,12 @@ impl H3Driver { } result => result?, }; debug!( "Handled DNS request: stream ID {}, network {}, stream_capacity={:?}", stream_id, self.driver.net_id, self.driver.quiche_conn.stream_capacity(stream_id) ); self.requests.insert(stream_id, request); Ok(()) } Loading @@ -303,10 +328,17 @@ impl H3Driver { return Ok(()); } Err(e) => { debug!("recv_body: Error={:?}", e); stream.data.truncate(base_len); return Err(e.into()); } Ok(recvd) => stream.data.truncate(base_len + recvd), Ok(recvd) => { stream.data.truncate(base_len + recvd); debug!( "Got {} bytes of response data from stream ID {} on network {}", recvd, stream_id, self.driver.net_id ); } } } } else { Loading Loading @@ -342,6 +374,10 @@ impl H3Driver { } match event { h3::Event::Headers { list, has_body } => { debug!( "process_h3_event: h3::Event::Headers on stream ID {}, network {}", stream_id, self.driver.net_id ); let stream = Stream::new(list); if self.streams.insert(stream_id, stream).is_some() { warn!("Re-using stream ID {} before it was completed.", stream_id) Loading @@ -350,8 +386,20 @@ impl H3Driver { self.respond(stream_id); } } h3::Event::Data => self.recv_body(stream_id).await?, h3::Event::Finished => self.respond(stream_id), h3::Event::Data => { debug!( "process_h3_event: h3::Event::Data on stream ID {}, network {}", stream_id, self.driver.net_id ); self.recv_body(stream_id).await?; } h3::Event::Finished => { debug!( "process_h3_event: h3::Event::Finished on stream ID {}, network {}", stream_id, self.driver.net_id ); self.respond(stream_id) } // This clause is for quiche 0.10.x, we're still on 0.9.x //h3::Event::Reset(e) => { // self.streams.get_mut(&stream_id).map(|stream| stream.error = Some(e)); Loading @@ -369,6 +417,7 @@ impl H3Driver { } async fn shutdown(&mut self, send_goaway: bool, msg: &[u8]) -> Result<()> { debug!("Closing connection on network {} with msg {:?}", self.driver.net_id, msg); self.driver.request_rx.close(); while self.driver.request_rx.recv().await.is_some() {} self.closing = true; Loading @@ -384,6 +433,10 @@ impl H3Driver { fn respond(&mut self, stream_id: u64) { match (self.streams.remove(&stream_id), self.requests.remove(&stream_id)) { (Some(stream), Some(request)) => { debug!( "Sending answer back to resolv, stream ID: {}, network {}", stream_id, self.driver.net_id ); // We don't care about the error, because it means the requestor has left. let _ = request.response_tx.send(stream); } Loading
doh/connection/mod.rs +5 −4 Original line number Diff line number Diff line Loading @@ -17,7 +17,7 @@ use crate::boot_time::BootTime; use crate::network::SocketTagger; use log::error; use log::{error, warn}; use quiche::h3; use std::future::Future; use std::io; Loading Loading @@ -129,6 +129,7 @@ impl Connection { server_name: Option<&str>, to: SocketAddr, socket_mark: u32, net_id: u32, tag_socket: &SocketTagger, config: &mut quiche::Config, ) -> Result<Self> { Loading @@ -138,10 +139,10 @@ impl Connection { let quiche_conn = quiche::connect(server_name, &quiche::ConnectionId::from_ref(&scid), to, config)?; let socket = build_socket(to, socket_mark, tag_socket).await?; let driver = async { let result = drive(request_rx, status_tx, quiche_conn, socket).await; let driver = async move { let result = drive(request_rx, status_tx, quiche_conn, socket, net_id).await; if let Err(ref e) = result { error!("Connection driver failed: {:?}", e); warn!("Connection driver returns some Err: {:?}", e); } result }; Loading
doh/dispatcher/driver.rs +1 −1 Original line number Diff line number Diff line Loading @@ -107,7 +107,7 @@ impl Driver { // If we have a network registered to the provided net_id, but the server info doesn't // match, our API has been used incorrectly. Attempt to recover by deleting the old // network and recreating it according to the probe request. warn!("Probing net_id={} with mismatched server info", info.net_id); warn!("Probing net_id={} with mismatched server info {:?}", info.net_id, info); self.networks.remove(&info.net_id); } // Can't use or_insert_with because creating a network may fail Loading
doh/network/driver.rs +2 −0 Original line number Diff line number Diff line Loading @@ -82,6 +82,7 @@ async fn build_connection( info.domain.as_deref(), info.peer_addr, info.sk_mark, info.net_id, tag_socket, config.take().await.deref_mut(), ) Loading Loading @@ -141,6 +142,7 @@ impl Driver { } async fn force_probe(&mut self, probe_timeout: Duration) -> Result<()> { debug!("Sending probe to server {} on Network {}", self.info.peer_addr, self.info.net_id); let probe = encoding::probe_query()?; let dns_request = encoding::dns_request(&probe, &self.info.url)?; let expiry = BootTime::now().checked_add(probe_timeout); Loading
res_send.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -1398,7 +1398,7 @@ ssize_t res_doh_send(ResState* statp, const Slice query, const Slice answer, int queryTimeout = 1000; } ssize_t result = privateDnsConfiguration.dohQuery(netId, query, answer, queryTimeout); LOG(INFO) << __func__ << ": Https query result: " << result; LOG(INFO) << __func__ << ": Https query result: " << result << ", netid=" << netId; if (result == DOH_RESULT_CAN_NOT_SEND) return DOH_RESULT_CAN_NOT_SEND; Loading