Loading system/gd/rust/linux/stack/src/socket_manager.rs +133 −72 Original line number Diff line number Diff line Loading @@ -2,7 +2,6 @@ use bt_topshim::btif::{BluetoothInterface, BtStatus, RawAddress, Uuid}; use bt_topshim::profiles::socket; use bt_topshim::topstack; use log; use nix::sys::socket::{recvmsg, ControlMessageOwned}; use nix::sys::uio::IoVec; Loading Loading @@ -86,10 +85,10 @@ impl BluetoothServerSocket { } } fn make_l2cap_le_channel(flags: i32) -> Self { fn make_l2cap_channel(flags: i32) -> Self { BluetoothServerSocket { id: 0, sock_type: SocketType::L2capLe, sock_type: SocketType::L2cap, flags: flags | socket::SOCK_FLAG_NO_SDP, psm: Some(DYNAMIC_PSM_NO_SDP), channel: None, Loading Loading @@ -519,28 +518,12 @@ impl BluetoothSocketManager { // Push new socket into listeners. let id = self.next_socket_id(); socket_info.id = id; let (forwarded_socket, forwarded_status) = (socket_info.clone(), status.clone()); let (runner_tx, runner_rx) = channel::<SocketRunnerActions>(10); // Keep track of active listener sockets. let listener = InternalListeningSocket::new(cbid, id, runner_tx); self.listening.entry(cbid).or_default().push(listener); // Notify via callbacks that this socket is ready to be listened to. // This is done via message passing so that the current method call // returns first before the server socket is marked ready to be // listened to. let tx = self.tx.clone(); topstack::get_runtime().spawn(async move { let _ = tx .send(Message::SocketManagerActions(SocketActions::OnIncomingSocketReady( cbid, forwarded_socket, forwarded_status, ))) .await; }); // Push a listening task to local runtime to wait for device to // start accepting or get closed. let rpc_tx = self.tx.clone(); Loading @@ -554,9 +537,12 @@ impl BluetoothSocketManager { } }; // We only send socket ready after we've read the channel out. let listen_status = status.clone(); let joinhandle = self.runtime.spawn(async move { BluetoothSocketManager::listening_task( cbid, listen_status, runner_rx, socket_info, stream, Loading Loading @@ -661,15 +647,67 @@ impl BluetoothSocketManager { async fn listening_task( cbid: CallbackId, listen_status: BtStatus, mut runner_rx: Receiver<SocketRunnerActions>, socket_info: BluetoothServerSocket, mut socket_info: BluetoothServerSocket, stream: UnixStream, rpc_tx: Sender<Message>, ) { let mut accepting: Option<JoinHandle<()>> = None; let stream = Arc::new(stream); let connection_timeout = Duration::from_millis(CONNECT_COMPLETE_TIMEOUT_MS); // Wait for stream to be readable, then read channel. This is the first thing that must // happen in the listening channel. If this fails, close the channel. let mut channel_bytes = [0 as u8; 4]; let mut status = Self::wait_and_read_stream(connection_timeout, &stream, &mut channel_bytes).await; let channel = i32::from_ne_bytes(channel_bytes); if channel <= 0 { status = BtStatus::Fail; } // If we don't get a valid channel, consider the socket as closed. if status != BtStatus::Success { // First send the incoming socket ready signal and then closed. If we // are unable to read the channel, the client needs to consider the // socket as closed. let _ = rpc_tx .send(Message::SocketManagerActions(SocketActions::OnIncomingSocketReady( cbid, socket_info.clone(), status, ))) .await; let _ = rpc_tx .send(Message::SocketManagerActions(SocketActions::OnIncomingSocketClosed( cbid, socket_info.id, BtStatus::Success, ))) .await; return; } match socket_info.sock_type { SocketType::Rfcomm => socket_info.channel = Some(channel), SocketType::L2cap | SocketType::L2capLe => socket_info.psm = Some(channel), // Don't care about other types. We don't support them in this path. _ => (), }; // Notify via callbacks that this socket is ready to be listened to since we have the // channel available now. let (forwarded_socket, forwarded_status) = (socket_info.clone(), listen_status.clone()); let _ = rpc_tx .send(Message::SocketManagerActions(SocketActions::OnIncomingSocketReady( cbid, forwarded_socket, forwarded_status, ))) .await; loop { let m = match runner_rx.recv().await { Some(v) => v, Loading Loading @@ -727,15 +765,18 @@ impl BluetoothSocketManager { // Read the accepted socket information and use // CMSG to grab the sockets also transferred over // this socket. let sock = { let mut data = [0; socket::CONNECT_COMPLETE_SIZE + 1]; let rawfd = cstream.as_raw_fd(); let socket_info_inner = cloned_socket_info.clone(); let sock: std::io::Result<Option<BluetoothSocket>> = cstream.try_io(tokio::io::Interest::READABLE, || { let mut data = [0; socket::CONNECT_COMPLETE_SIZE]; let iov = [IoVec::from_mut_slice(&mut data)]; let mut cspace = nix::cmsg_space!(RawFd); let maybe_sock = match recvmsg( cstream.as_raw_fd(), rawfd, &iov, Some(&mut cspace), nix::sys::socket::MsgFlags::empty(), nix::sys::socket::MsgFlags::MSG_DONTWAIT, ) { Ok(recv) => { let fd = match recv.cmsgs().next() { Loading @@ -758,32 +799,48 @@ impl BluetoothSocketManager { } }; match socket::ConnectionComplete::try_from( &data[0..recv.bytes], return match socket::ConnectionComplete::try_from( &data[0..socket::CONNECT_COMPLETE_SIZE], ) { Ok(cc) => { let status = BtStatus::from(cc.status as u32); let sock = cloned_socket_info.to_connecting_socket(cc, fd); let sock = socket_info_inner .to_connecting_socket(cc, fd); if status == BtStatus::Success && sock.fd.is_some() if status == BtStatus::Success && sock.fd.is_some() { Some(sock) Ok(Some(sock)) } else { None Ok(None) } } Err(_) => None, } Err(e) => Ok(None), }; } Err(_) => None, Err(e) => { if e == nix::Error::Sys(nix::errno::Errno::EAGAIN) { Err(std::io::Error::new( std::io::ErrorKind::WouldBlock, "Recvfrom is readable but would block on read", )) } else { Ok(None) } } }; maybe_sock }; }); // If we returned an error for the above socket, then the recv failed. // Just continue this loop. if !sock.is_ok() { continue; } match sock { match sock.unwrap_or(None) { Some(s) => { let _ = tx .send(Message::SocketManagerActions( Loading Loading @@ -910,7 +967,7 @@ impl BluetoothSocketManager { let mut channel_bytes = [0 as u8; 4]; let mut status = Self::wait_and_read_stream(connection_timeout, &stream, &mut channel_bytes).await; if i32::from_be_bytes(channel_bytes) <= 0 { if i32::from_ne_bytes(channel_bytes) <= 0 { status = BtStatus::Fail; } if status != BtStatus::Success { Loading Loading @@ -999,6 +1056,11 @@ impl BluetoothSocketManager { SocketActions::OnIncomingSocketClosed(cbid, socket_id, status) => { if let Some(callback) = self.callbacks.get_by_id(cbid) { callback.on_incoming_socket_closed(socket_id, status); // Also make sure to remove the socket from listening list. self.listening .entry(cbid) .and_modify(|v| v.retain(|s| s.socket_id != socket_id)); } } Loading Loading @@ -1037,7 +1099,7 @@ impl IBluetoothSocketManager for BluetoothSocketManager { return SocketResult::new(BtStatus::NotReady, INVALID_SOCKET_ID); } let socket_info = BluetoothServerSocket::make_l2cap_le_channel(socket::SOCK_FLAG_NONE); let socket_info = BluetoothServerSocket::make_l2cap_channel(socket::SOCK_FLAG_NONE); self.socket_listen(socket_info, callback) } Loading @@ -1046,8 +1108,7 @@ impl IBluetoothSocketManager for BluetoothSocketManager { return SocketResult::new(BtStatus::NotReady, INVALID_SOCKET_ID); } let socket_info = BluetoothServerSocket::make_l2cap_le_channel(socket::SOCK_META_FLAG_SECURE); let socket_info = BluetoothServerSocket::make_l2cap_channel(socket::SOCK_META_FLAG_SECURE); self.socket_listen(socket_info, callback) } Loading Loading
system/gd/rust/linux/stack/src/socket_manager.rs +133 −72 Original line number Diff line number Diff line Loading @@ -2,7 +2,6 @@ use bt_topshim::btif::{BluetoothInterface, BtStatus, RawAddress, Uuid}; use bt_topshim::profiles::socket; use bt_topshim::topstack; use log; use nix::sys::socket::{recvmsg, ControlMessageOwned}; use nix::sys::uio::IoVec; Loading Loading @@ -86,10 +85,10 @@ impl BluetoothServerSocket { } } fn make_l2cap_le_channel(flags: i32) -> Self { fn make_l2cap_channel(flags: i32) -> Self { BluetoothServerSocket { id: 0, sock_type: SocketType::L2capLe, sock_type: SocketType::L2cap, flags: flags | socket::SOCK_FLAG_NO_SDP, psm: Some(DYNAMIC_PSM_NO_SDP), channel: None, Loading Loading @@ -519,28 +518,12 @@ impl BluetoothSocketManager { // Push new socket into listeners. let id = self.next_socket_id(); socket_info.id = id; let (forwarded_socket, forwarded_status) = (socket_info.clone(), status.clone()); let (runner_tx, runner_rx) = channel::<SocketRunnerActions>(10); // Keep track of active listener sockets. let listener = InternalListeningSocket::new(cbid, id, runner_tx); self.listening.entry(cbid).or_default().push(listener); // Notify via callbacks that this socket is ready to be listened to. // This is done via message passing so that the current method call // returns first before the server socket is marked ready to be // listened to. let tx = self.tx.clone(); topstack::get_runtime().spawn(async move { let _ = tx .send(Message::SocketManagerActions(SocketActions::OnIncomingSocketReady( cbid, forwarded_socket, forwarded_status, ))) .await; }); // Push a listening task to local runtime to wait for device to // start accepting or get closed. let rpc_tx = self.tx.clone(); Loading @@ -554,9 +537,12 @@ impl BluetoothSocketManager { } }; // We only send socket ready after we've read the channel out. let listen_status = status.clone(); let joinhandle = self.runtime.spawn(async move { BluetoothSocketManager::listening_task( cbid, listen_status, runner_rx, socket_info, stream, Loading Loading @@ -661,15 +647,67 @@ impl BluetoothSocketManager { async fn listening_task( cbid: CallbackId, listen_status: BtStatus, mut runner_rx: Receiver<SocketRunnerActions>, socket_info: BluetoothServerSocket, mut socket_info: BluetoothServerSocket, stream: UnixStream, rpc_tx: Sender<Message>, ) { let mut accepting: Option<JoinHandle<()>> = None; let stream = Arc::new(stream); let connection_timeout = Duration::from_millis(CONNECT_COMPLETE_TIMEOUT_MS); // Wait for stream to be readable, then read channel. This is the first thing that must // happen in the listening channel. If this fails, close the channel. let mut channel_bytes = [0 as u8; 4]; let mut status = Self::wait_and_read_stream(connection_timeout, &stream, &mut channel_bytes).await; let channel = i32::from_ne_bytes(channel_bytes); if channel <= 0 { status = BtStatus::Fail; } // If we don't get a valid channel, consider the socket as closed. if status != BtStatus::Success { // First send the incoming socket ready signal and then closed. If we // are unable to read the channel, the client needs to consider the // socket as closed. let _ = rpc_tx .send(Message::SocketManagerActions(SocketActions::OnIncomingSocketReady( cbid, socket_info.clone(), status, ))) .await; let _ = rpc_tx .send(Message::SocketManagerActions(SocketActions::OnIncomingSocketClosed( cbid, socket_info.id, BtStatus::Success, ))) .await; return; } match socket_info.sock_type { SocketType::Rfcomm => socket_info.channel = Some(channel), SocketType::L2cap | SocketType::L2capLe => socket_info.psm = Some(channel), // Don't care about other types. We don't support them in this path. _ => (), }; // Notify via callbacks that this socket is ready to be listened to since we have the // channel available now. let (forwarded_socket, forwarded_status) = (socket_info.clone(), listen_status.clone()); let _ = rpc_tx .send(Message::SocketManagerActions(SocketActions::OnIncomingSocketReady( cbid, forwarded_socket, forwarded_status, ))) .await; loop { let m = match runner_rx.recv().await { Some(v) => v, Loading Loading @@ -727,15 +765,18 @@ impl BluetoothSocketManager { // Read the accepted socket information and use // CMSG to grab the sockets also transferred over // this socket. let sock = { let mut data = [0; socket::CONNECT_COMPLETE_SIZE + 1]; let rawfd = cstream.as_raw_fd(); let socket_info_inner = cloned_socket_info.clone(); let sock: std::io::Result<Option<BluetoothSocket>> = cstream.try_io(tokio::io::Interest::READABLE, || { let mut data = [0; socket::CONNECT_COMPLETE_SIZE]; let iov = [IoVec::from_mut_slice(&mut data)]; let mut cspace = nix::cmsg_space!(RawFd); let maybe_sock = match recvmsg( cstream.as_raw_fd(), rawfd, &iov, Some(&mut cspace), nix::sys::socket::MsgFlags::empty(), nix::sys::socket::MsgFlags::MSG_DONTWAIT, ) { Ok(recv) => { let fd = match recv.cmsgs().next() { Loading @@ -758,32 +799,48 @@ impl BluetoothSocketManager { } }; match socket::ConnectionComplete::try_from( &data[0..recv.bytes], return match socket::ConnectionComplete::try_from( &data[0..socket::CONNECT_COMPLETE_SIZE], ) { Ok(cc) => { let status = BtStatus::from(cc.status as u32); let sock = cloned_socket_info.to_connecting_socket(cc, fd); let sock = socket_info_inner .to_connecting_socket(cc, fd); if status == BtStatus::Success && sock.fd.is_some() if status == BtStatus::Success && sock.fd.is_some() { Some(sock) Ok(Some(sock)) } else { None Ok(None) } } Err(_) => None, } Err(e) => Ok(None), }; } Err(_) => None, Err(e) => { if e == nix::Error::Sys(nix::errno::Errno::EAGAIN) { Err(std::io::Error::new( std::io::ErrorKind::WouldBlock, "Recvfrom is readable but would block on read", )) } else { Ok(None) } } }; maybe_sock }; }); // If we returned an error for the above socket, then the recv failed. // Just continue this loop. if !sock.is_ok() { continue; } match sock { match sock.unwrap_or(None) { Some(s) => { let _ = tx .send(Message::SocketManagerActions( Loading Loading @@ -910,7 +967,7 @@ impl BluetoothSocketManager { let mut channel_bytes = [0 as u8; 4]; let mut status = Self::wait_and_read_stream(connection_timeout, &stream, &mut channel_bytes).await; if i32::from_be_bytes(channel_bytes) <= 0 { if i32::from_ne_bytes(channel_bytes) <= 0 { status = BtStatus::Fail; } if status != BtStatus::Success { Loading Loading @@ -999,6 +1056,11 @@ impl BluetoothSocketManager { SocketActions::OnIncomingSocketClosed(cbid, socket_id, status) => { if let Some(callback) = self.callbacks.get_by_id(cbid) { callback.on_incoming_socket_closed(socket_id, status); // Also make sure to remove the socket from listening list. self.listening .entry(cbid) .and_modify(|v| v.retain(|s| s.socket_id != socket_id)); } } Loading Loading @@ -1037,7 +1099,7 @@ impl IBluetoothSocketManager for BluetoothSocketManager { return SocketResult::new(BtStatus::NotReady, INVALID_SOCKET_ID); } let socket_info = BluetoothServerSocket::make_l2cap_le_channel(socket::SOCK_FLAG_NONE); let socket_info = BluetoothServerSocket::make_l2cap_channel(socket::SOCK_FLAG_NONE); self.socket_listen(socket_info, callback) } Loading @@ -1046,8 +1108,7 @@ impl IBluetoothSocketManager for BluetoothSocketManager { return SocketResult::new(BtStatus::NotReady, INVALID_SOCKET_ID); } let socket_info = BluetoothServerSocket::make_l2cap_le_channel(socket::SOCK_META_FLAG_SECURE); let socket_info = BluetoothServerSocket::make_l2cap_channel(socket::SOCK_META_FLAG_SECURE); self.socket_listen(socket_info, callback) } Loading