Loading doh.rs +44 −27 Original line number Original line Diff line number Diff line Loading @@ -53,7 +53,6 @@ const MAX_INCOMING_BUFFER_SIZE_WHOLE: u64 = 10000000; const MAX_INCOMING_BUFFER_SIZE_EACH: u64 = 1000000; const MAX_INCOMING_BUFFER_SIZE_EACH: u64 = 1000000; const MAX_CONCURRENT_STREAM_SIZE: u64 = 100; const MAX_CONCURRENT_STREAM_SIZE: u64 = 100; const MAX_DATAGRAM_SIZE: usize = 1350; const MAX_DATAGRAM_SIZE: usize = 1350; const MAX_DATAGRAM_SIZE_U64: u64 = 1350; const DOH_PORT: u16 = 443; const DOH_PORT: u16 = 443; const QUICHE_IDLE_TIMEOUT_MS: u64 = 180000; const QUICHE_IDLE_TIMEOUT_MS: u64 = 180000; const SYSTEM_CERT_PATH: &str = "/system/etc/security/cacerts"; const SYSTEM_CERT_PATH: &str = "/system/etc/security/cacerts"; Loading @@ -78,19 +77,19 @@ pub struct DohDispatcher { join_handle: task::JoinHandle<Result<()>>, join_handle: task::JoinHandle<Result<()>>, } } fn make_doh_udp_socket(ip_addr: &str, mark: u32) -> Result<std::net::UdpSocket> { fn make_doh_udp_socket(ip_addr: &str, mark: u32) -> Result<(SocketAddr, std::net::UdpSocket)> { let sock_addr = SocketAddr::new(IpAddr::from_str(&ip_addr)?, DOH_PORT); let peer_addr = SocketAddr::new(IpAddr::from_str(&ip_addr)?, DOH_PORT); let bind_addr = match sock_addr { let bind_addr = match peer_addr { std::net::SocketAddr::V4(_) => "0.0.0.0:0", std::net::SocketAddr::V4(_) => "0.0.0.0:0", std::net::SocketAddr::V6(_) => "[::]:0", std::net::SocketAddr::V6(_) => "[::]:0", }; }; let udp_sk = std::net::UdpSocket::bind(bind_addr)?; let udp_sk = std::net::UdpSocket::bind(bind_addr)?; udp_sk.set_nonblocking(true)?; udp_sk.set_nonblocking(true)?; mark_socket(udp_sk.as_raw_fd(), mark)?; mark_socket(udp_sk.as_raw_fd(), mark)?; udp_sk.connect(sock_addr)?; udp_sk.connect(peer_addr)?; debug!("connecting to {:} from {:}", sock_addr, udp_sk.local_addr()?); debug!("connecting to {:} from {:}", peer_addr, udp_sk.local_addr()?); Ok(udp_sk) Ok((peer_addr, udp_sk)) } } // DoH dispatcher // DoH dispatcher Loading @@ -102,13 +101,14 @@ impl DohDispatcher { cert_path: Option<&str>, cert_path: Option<&str>, ) -> Result<Box<DohDispatcher>> { ) -> Result<Box<DohDispatcher>> { // Setup socket // Setup socket let udp_sk = make_doh_udp_socket(&ip_addr, mark)?; let (peer_addr, udp_sk) = make_doh_udp_socket(&ip_addr, mark)?; DohDispatcher::new_with_socket(url, ip_addr, mark, cert_path, udp_sk) DohDispatcher::new_with_socket(url, ip_addr, peer_addr, mark, cert_path, udp_sk) } } fn new_with_socket( fn new_with_socket( url: &str, url: &str, ip_addr: &str, ip_addr: &str, peer_addr: SocketAddr, mark: u32, mark: u32, cert_path: Option<&str>, cert_path: Option<&str>, udp_sk: std::net::UdpSocket, udp_sk: std::net::UdpSocket, Loading @@ -128,8 +128,15 @@ impl DohDispatcher { "Creating a doh handler task: url={}, ip_addr={}, mark={:#x}, scid {:x?}", "Creating a doh handler task: url={}, ip_addr={}, mark={:#x}, scid {:x?}", url, ip_addr, mark, &scid url, ip_addr, mark, &scid ); ); let join_handle = let join_handle = RUNTIME_STATIC.spawn(doh_handler( RUNTIME_STATIC.spawn(doh_handler(url, udp_sk, config, h3_config, scid, cmd_receiver)); url, peer_addr, udp_sk, config, h3_config, scid, cmd_receiver, )); Ok(Box::new(DohDispatcher { query_sender: cmd_sender, join_handle })) Ok(Box::new(DohDispatcher { query_sender: cmd_sender, join_handle })) } } Loading @@ -145,6 +152,7 @@ impl DohDispatcher { async fn doh_handler( async fn doh_handler( url: url::Url, url: url::Url, peer_addr: SocketAddr, udp_sk: std::net::UdpSocket, udp_sk: std::net::UdpSocket, mut config: quiche::Config, mut config: quiche::Config, h3_config: h3::Config, h3_config: h3::Config, Loading @@ -153,8 +161,10 @@ async fn doh_handler( ) -> Result<()> { ) -> Result<()> { debug!("doh_handler: url={:?}", url); debug!("doh_handler: url={:?}", url); let connid = quiche::ConnectionId::from_ref(&scid); let sk = UdpSocket::from_std(udp_sk)?; let sk = UdpSocket::from_std(udp_sk)?; let mut conn = quiche::connect(url.domain(), &scid, &mut config)?; let mut conn = quiche::connect(url.domain(), &connid, peer_addr, &mut config)?; let recv_info = quiche::RecvInfo { from: peer_addr }; let mut quic_conn_start = std::time::Instant::now(); let mut quic_conn_start = std::time::Instant::now(); let mut h3_conn: Option<h3::Connection> = None; let mut h3_conn: Option<h3::Connection> = None; let mut is_idle = false; let mut is_idle = false; Loading @@ -170,7 +180,7 @@ async fn doh_handler( debug!("recv {:?} ", size); debug!("recv {:?} ", size); match size { match size { Ok(size) => { Ok(size) => { let processed = match conn.recv(&mut buf[..size]) { let processed = match conn.recv(&mut buf[..size], recv_info) { Ok(l) => l, Ok(l) => l, Err(e) => { Err(e) => { error!("quic recv failed: {:?}", e); error!("quic recv failed: {:?}", e); Loading Loading @@ -207,7 +217,7 @@ async fn doh_handler( // If there is any pending query, resume the quic connection. // If there is any pending query, resume the quic connection. if !pending_cmds.is_empty() { if !pending_cmds.is_empty() { info!("still some pending queries but connection is not avaiable, resume it"); info!("still some pending queries but connection is not avaiable, resume it"); conn = quiche::connect(url.domain(), &scid, &mut config)?; conn = quiche::connect(url.domain(), &connid, peer_addr, &mut config)?; quic_conn_start = std::time::Instant::now(); quic_conn_start = std::time::Instant::now(); h3_conn = None; h3_conn = None; is_idle = false; is_idle = false; Loading Loading @@ -264,15 +274,15 @@ fn send_dns_query( path.push_str("?dns="); path.push_str("?dns="); path.push_str(std::str::from_utf8(&query)?); path.push_str(std::str::from_utf8(&query)?); let _req = vec![ let _req = vec![ quiche::h3::Header::new(":method", "GET"), quiche::h3::Header::new(b":method", b"GET"), quiche::h3::Header::new(":scheme", "https"), quiche::h3::Header::new(b":scheme", b"https"), quiche::h3::Header::new( quiche::h3::Header::new( ":authority", b":authority", url.host_str().ok_or_else(|| anyhow!("failed to get host"))?, url.host_str().ok_or_else(|| anyhow!("failed to get host"))?.as_bytes(), ), ), quiche::h3::Header::new(":path", &path), quiche::h3::Header::new(b":path", path.as_bytes()), quiche::h3::Header::new("user-agent", "quiche"), quiche::h3::Header::new(b"user-agent", b"quiche"), quiche::h3::Header::new("accept", "application/dns-message"), quiche::h3::Header::new(b"accept", b"application/dns-message"), // TODO: is content-length required? // TODO: is content-length required? ]; ]; Loading Loading @@ -334,7 +344,7 @@ async fn recv_query( async fn flush_tx(sk: &UdpSocket, conn: &mut quiche::Connection) -> Result<()> { async fn flush_tx(sk: &UdpSocket, conn: &mut quiche::Connection) -> Result<()> { let mut out = [0; MAX_DATAGRAM_SIZE]; let mut out = [0; MAX_DATAGRAM_SIZE]; loop { loop { let write = match conn.send(&mut out) { let (write, _) = match conn.send(&mut out) { Ok(v) => v, Ok(v) => v, Err(quiche::Error::Done) => { Err(quiche::Error::Done) => { debug!("done writing"); debug!("done writing"); Loading @@ -358,7 +368,7 @@ fn create_quiche_config(cert_path: Option<&str>) -> Result<quiche::Config> { config.load_verify_locations_from_directory(cert_path.unwrap_or(SYSTEM_CERT_PATH))?; config.load_verify_locations_from_directory(cert_path.unwrap_or(SYSTEM_CERT_PATH))?; // Some of these configs are necessary, or the server can't respond the HTTP/3 request. // Some of these configs are necessary, or the server can't respond the HTTP/3 request. config.set_max_idle_timeout(QUICHE_IDLE_TIMEOUT_MS); config.set_max_idle_timeout(QUICHE_IDLE_TIMEOUT_MS); config.set_max_udp_payload_size(MAX_DATAGRAM_SIZE_U64); config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE); config.set_initial_max_data(MAX_INCOMING_BUFFER_SIZE_WHOLE); config.set_initial_max_data(MAX_INCOMING_BUFFER_SIZE_WHOLE); config.set_initial_max_stream_data_bidi_local(MAX_INCOMING_BUFFER_SIZE_EACH); config.set_initial_max_stream_data_bidi_local(MAX_INCOMING_BUFFER_SIZE_EACH); config.set_initial_max_stream_data_bidi_remote(MAX_INCOMING_BUFFER_SIZE_EACH); config.set_initial_max_stream_data_bidi_remote(MAX_INCOMING_BUFFER_SIZE_EACH); Loading Loading @@ -516,7 +526,7 @@ mod tests { assert!(super::make_doh_udp_socket(ip, 0).is_err(), "udp socket should not be created"); assert!(super::make_doh_udp_socket(ip, 0).is_err(), "udp socket should not be created"); } } // Make a socket connecting to loopback with a test mark. // Make a socket connecting to loopback with a test mark. let sk = super::make_doh_udp_socket(LOOPBACK_ADDR, TEST_MARK).unwrap(); let (_, sk) = super::make_doh_udp_socket(LOOPBACK_ADDR, TEST_MARK).unwrap(); // Check if the socket is connected to loopback. // Check if the socket is connected to loopback. assert_eq!( assert_eq!( sk.peer_addr().unwrap(), sk.peer_addr().unwrap(), Loading Loading @@ -568,9 +578,16 @@ mod tests { const SAMPLE_QUERY: &str = "q80BAAABAAAAAAAAA3d3dwdleGFtcGxlA2NvbQAAAQAB"; const SAMPLE_QUERY: &str = "q80BAAABAAAAAAAAA3d3dwdleGFtcGxlA2NvbQAAAQAB"; #[test] #[test] fn close_doh() { fn close_doh() { let udp_sk = super::make_doh_udp_socket(LOOPBACK_ADDR, TEST_MARK).unwrap(); let (peer_socket, udp_sk) = super::make_doh_udp_socket(LOOPBACK_ADDR, TEST_MARK).unwrap(); let doh = let doh = DohDispatcher::new_with_socket( DohDispatcher::new_with_socket(GOOGLE_DNS_URL, GOOGLE_DNS_IP, 0, None, udp_sk).unwrap(); GOOGLE_DNS_URL, GOOGLE_DNS_IP, peer_socket, 0, None, udp_sk, ) .unwrap(); let (resp_tx, resp_rx) = oneshot::channel(); let (resp_tx, resp_rx) = oneshot::channel(); let cmd = Command::DohQuery { query: SAMPLE_QUERY.as_bytes().to_vec(), resp: resp_tx }; let cmd = Command::DohQuery { query: SAMPLE_QUERY.as_bytes().to_vec(), resp: resp_tx }; assert!(doh.query(cmd).is_ok(), "Send query failed"); assert!(doh.query(cmd).is_ok(), "Send query failed"); Loading Loading
doh.rs +44 −27 Original line number Original line Diff line number Diff line Loading @@ -53,7 +53,6 @@ const MAX_INCOMING_BUFFER_SIZE_WHOLE: u64 = 10000000; const MAX_INCOMING_BUFFER_SIZE_EACH: u64 = 1000000; const MAX_INCOMING_BUFFER_SIZE_EACH: u64 = 1000000; const MAX_CONCURRENT_STREAM_SIZE: u64 = 100; const MAX_CONCURRENT_STREAM_SIZE: u64 = 100; const MAX_DATAGRAM_SIZE: usize = 1350; const MAX_DATAGRAM_SIZE: usize = 1350; const MAX_DATAGRAM_SIZE_U64: u64 = 1350; const DOH_PORT: u16 = 443; const DOH_PORT: u16 = 443; const QUICHE_IDLE_TIMEOUT_MS: u64 = 180000; const QUICHE_IDLE_TIMEOUT_MS: u64 = 180000; const SYSTEM_CERT_PATH: &str = "/system/etc/security/cacerts"; const SYSTEM_CERT_PATH: &str = "/system/etc/security/cacerts"; Loading @@ -78,19 +77,19 @@ pub struct DohDispatcher { join_handle: task::JoinHandle<Result<()>>, join_handle: task::JoinHandle<Result<()>>, } } fn make_doh_udp_socket(ip_addr: &str, mark: u32) -> Result<std::net::UdpSocket> { fn make_doh_udp_socket(ip_addr: &str, mark: u32) -> Result<(SocketAddr, std::net::UdpSocket)> { let sock_addr = SocketAddr::new(IpAddr::from_str(&ip_addr)?, DOH_PORT); let peer_addr = SocketAddr::new(IpAddr::from_str(&ip_addr)?, DOH_PORT); let bind_addr = match sock_addr { let bind_addr = match peer_addr { std::net::SocketAddr::V4(_) => "0.0.0.0:0", std::net::SocketAddr::V4(_) => "0.0.0.0:0", std::net::SocketAddr::V6(_) => "[::]:0", std::net::SocketAddr::V6(_) => "[::]:0", }; }; let udp_sk = std::net::UdpSocket::bind(bind_addr)?; let udp_sk = std::net::UdpSocket::bind(bind_addr)?; udp_sk.set_nonblocking(true)?; udp_sk.set_nonblocking(true)?; mark_socket(udp_sk.as_raw_fd(), mark)?; mark_socket(udp_sk.as_raw_fd(), mark)?; udp_sk.connect(sock_addr)?; udp_sk.connect(peer_addr)?; debug!("connecting to {:} from {:}", sock_addr, udp_sk.local_addr()?); debug!("connecting to {:} from {:}", peer_addr, udp_sk.local_addr()?); Ok(udp_sk) Ok((peer_addr, udp_sk)) } } // DoH dispatcher // DoH dispatcher Loading @@ -102,13 +101,14 @@ impl DohDispatcher { cert_path: Option<&str>, cert_path: Option<&str>, ) -> Result<Box<DohDispatcher>> { ) -> Result<Box<DohDispatcher>> { // Setup socket // Setup socket let udp_sk = make_doh_udp_socket(&ip_addr, mark)?; let (peer_addr, udp_sk) = make_doh_udp_socket(&ip_addr, mark)?; DohDispatcher::new_with_socket(url, ip_addr, mark, cert_path, udp_sk) DohDispatcher::new_with_socket(url, ip_addr, peer_addr, mark, cert_path, udp_sk) } } fn new_with_socket( fn new_with_socket( url: &str, url: &str, ip_addr: &str, ip_addr: &str, peer_addr: SocketAddr, mark: u32, mark: u32, cert_path: Option<&str>, cert_path: Option<&str>, udp_sk: std::net::UdpSocket, udp_sk: std::net::UdpSocket, Loading @@ -128,8 +128,15 @@ impl DohDispatcher { "Creating a doh handler task: url={}, ip_addr={}, mark={:#x}, scid {:x?}", "Creating a doh handler task: url={}, ip_addr={}, mark={:#x}, scid {:x?}", url, ip_addr, mark, &scid url, ip_addr, mark, &scid ); ); let join_handle = let join_handle = RUNTIME_STATIC.spawn(doh_handler( RUNTIME_STATIC.spawn(doh_handler(url, udp_sk, config, h3_config, scid, cmd_receiver)); url, peer_addr, udp_sk, config, h3_config, scid, cmd_receiver, )); Ok(Box::new(DohDispatcher { query_sender: cmd_sender, join_handle })) Ok(Box::new(DohDispatcher { query_sender: cmd_sender, join_handle })) } } Loading @@ -145,6 +152,7 @@ impl DohDispatcher { async fn doh_handler( async fn doh_handler( url: url::Url, url: url::Url, peer_addr: SocketAddr, udp_sk: std::net::UdpSocket, udp_sk: std::net::UdpSocket, mut config: quiche::Config, mut config: quiche::Config, h3_config: h3::Config, h3_config: h3::Config, Loading @@ -153,8 +161,10 @@ async fn doh_handler( ) -> Result<()> { ) -> Result<()> { debug!("doh_handler: url={:?}", url); debug!("doh_handler: url={:?}", url); let connid = quiche::ConnectionId::from_ref(&scid); let sk = UdpSocket::from_std(udp_sk)?; let sk = UdpSocket::from_std(udp_sk)?; let mut conn = quiche::connect(url.domain(), &scid, &mut config)?; let mut conn = quiche::connect(url.domain(), &connid, peer_addr, &mut config)?; let recv_info = quiche::RecvInfo { from: peer_addr }; let mut quic_conn_start = std::time::Instant::now(); let mut quic_conn_start = std::time::Instant::now(); let mut h3_conn: Option<h3::Connection> = None; let mut h3_conn: Option<h3::Connection> = None; let mut is_idle = false; let mut is_idle = false; Loading @@ -170,7 +180,7 @@ async fn doh_handler( debug!("recv {:?} ", size); debug!("recv {:?} ", size); match size { match size { Ok(size) => { Ok(size) => { let processed = match conn.recv(&mut buf[..size]) { let processed = match conn.recv(&mut buf[..size], recv_info) { Ok(l) => l, Ok(l) => l, Err(e) => { Err(e) => { error!("quic recv failed: {:?}", e); error!("quic recv failed: {:?}", e); Loading Loading @@ -207,7 +217,7 @@ async fn doh_handler( // If there is any pending query, resume the quic connection. // If there is any pending query, resume the quic connection. if !pending_cmds.is_empty() { if !pending_cmds.is_empty() { info!("still some pending queries but connection is not avaiable, resume it"); info!("still some pending queries but connection is not avaiable, resume it"); conn = quiche::connect(url.domain(), &scid, &mut config)?; conn = quiche::connect(url.domain(), &connid, peer_addr, &mut config)?; quic_conn_start = std::time::Instant::now(); quic_conn_start = std::time::Instant::now(); h3_conn = None; h3_conn = None; is_idle = false; is_idle = false; Loading Loading @@ -264,15 +274,15 @@ fn send_dns_query( path.push_str("?dns="); path.push_str("?dns="); path.push_str(std::str::from_utf8(&query)?); path.push_str(std::str::from_utf8(&query)?); let _req = vec![ let _req = vec![ quiche::h3::Header::new(":method", "GET"), quiche::h3::Header::new(b":method", b"GET"), quiche::h3::Header::new(":scheme", "https"), quiche::h3::Header::new(b":scheme", b"https"), quiche::h3::Header::new( quiche::h3::Header::new( ":authority", b":authority", url.host_str().ok_or_else(|| anyhow!("failed to get host"))?, url.host_str().ok_or_else(|| anyhow!("failed to get host"))?.as_bytes(), ), ), quiche::h3::Header::new(":path", &path), quiche::h3::Header::new(b":path", path.as_bytes()), quiche::h3::Header::new("user-agent", "quiche"), quiche::h3::Header::new(b"user-agent", b"quiche"), quiche::h3::Header::new("accept", "application/dns-message"), quiche::h3::Header::new(b"accept", b"application/dns-message"), // TODO: is content-length required? // TODO: is content-length required? ]; ]; Loading Loading @@ -334,7 +344,7 @@ async fn recv_query( async fn flush_tx(sk: &UdpSocket, conn: &mut quiche::Connection) -> Result<()> { async fn flush_tx(sk: &UdpSocket, conn: &mut quiche::Connection) -> Result<()> { let mut out = [0; MAX_DATAGRAM_SIZE]; let mut out = [0; MAX_DATAGRAM_SIZE]; loop { loop { let write = match conn.send(&mut out) { let (write, _) = match conn.send(&mut out) { Ok(v) => v, Ok(v) => v, Err(quiche::Error::Done) => { Err(quiche::Error::Done) => { debug!("done writing"); debug!("done writing"); Loading @@ -358,7 +368,7 @@ fn create_quiche_config(cert_path: Option<&str>) -> Result<quiche::Config> { config.load_verify_locations_from_directory(cert_path.unwrap_or(SYSTEM_CERT_PATH))?; config.load_verify_locations_from_directory(cert_path.unwrap_or(SYSTEM_CERT_PATH))?; // Some of these configs are necessary, or the server can't respond the HTTP/3 request. // Some of these configs are necessary, or the server can't respond the HTTP/3 request. config.set_max_idle_timeout(QUICHE_IDLE_TIMEOUT_MS); config.set_max_idle_timeout(QUICHE_IDLE_TIMEOUT_MS); config.set_max_udp_payload_size(MAX_DATAGRAM_SIZE_U64); config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE); config.set_initial_max_data(MAX_INCOMING_BUFFER_SIZE_WHOLE); config.set_initial_max_data(MAX_INCOMING_BUFFER_SIZE_WHOLE); config.set_initial_max_stream_data_bidi_local(MAX_INCOMING_BUFFER_SIZE_EACH); config.set_initial_max_stream_data_bidi_local(MAX_INCOMING_BUFFER_SIZE_EACH); config.set_initial_max_stream_data_bidi_remote(MAX_INCOMING_BUFFER_SIZE_EACH); config.set_initial_max_stream_data_bidi_remote(MAX_INCOMING_BUFFER_SIZE_EACH); Loading Loading @@ -516,7 +526,7 @@ mod tests { assert!(super::make_doh_udp_socket(ip, 0).is_err(), "udp socket should not be created"); assert!(super::make_doh_udp_socket(ip, 0).is_err(), "udp socket should not be created"); } } // Make a socket connecting to loopback with a test mark. // Make a socket connecting to loopback with a test mark. let sk = super::make_doh_udp_socket(LOOPBACK_ADDR, TEST_MARK).unwrap(); let (_, sk) = super::make_doh_udp_socket(LOOPBACK_ADDR, TEST_MARK).unwrap(); // Check if the socket is connected to loopback. // Check if the socket is connected to loopback. assert_eq!( assert_eq!( sk.peer_addr().unwrap(), sk.peer_addr().unwrap(), Loading Loading @@ -568,9 +578,16 @@ mod tests { const SAMPLE_QUERY: &str = "q80BAAABAAAAAAAAA3d3dwdleGFtcGxlA2NvbQAAAQAB"; const SAMPLE_QUERY: &str = "q80BAAABAAAAAAAAA3d3dwdleGFtcGxlA2NvbQAAAQAB"; #[test] #[test] fn close_doh() { fn close_doh() { let udp_sk = super::make_doh_udp_socket(LOOPBACK_ADDR, TEST_MARK).unwrap(); let (peer_socket, udp_sk) = super::make_doh_udp_socket(LOOPBACK_ADDR, TEST_MARK).unwrap(); let doh = let doh = DohDispatcher::new_with_socket( DohDispatcher::new_with_socket(GOOGLE_DNS_URL, GOOGLE_DNS_IP, 0, None, udp_sk).unwrap(); GOOGLE_DNS_URL, GOOGLE_DNS_IP, peer_socket, 0, None, udp_sk, ) .unwrap(); let (resp_tx, resp_rx) = oneshot::channel(); let (resp_tx, resp_rx) = oneshot::channel(); let cmd = Command::DohQuery { query: SAMPLE_QUERY.as_bytes().to_vec(), resp: resp_tx }; let cmd = Command::DohQuery { query: SAMPLE_QUERY.as_bytes().to_vec(), resp: resp_tx }; assert!(doh.query(cmd).is_ok(), "Send query failed"); assert!(doh.query(cmd).is_ok(), "Send query failed"); Loading