Loading system/gd/rust/linux/stack/src/socket_manager.rs +55 −25 Original line number Diff line number Diff line Loading @@ -403,6 +403,9 @@ struct InternalListeningSocket { /// Used by admin uuid: Option<Uuid>, /// Used for tracing task status joinhandle: JoinHandle<()>, } impl InternalListeningSocket { Loading @@ -411,22 +414,24 @@ impl InternalListeningSocket { socket_id: SocketId, tx: Sender<SocketRunnerActions>, uuid: Option<Uuid>, joinhandle: JoinHandle<()>, ) -> Self { InternalListeningSocket { _callback_id, socket_id, tx, uuid } InternalListeningSocket { _callback_id, socket_id, tx, uuid, joinhandle } } } /// Internal connecting socket data. struct InternalConnectingSocket { _callback_id: CallbackId, socket_info: BluetoothSocket, stream: Option<UnixStream>, socket_id: SocketId, /// Used for cleaning up joinhandle: JoinHandle<()>, } impl InternalConnectingSocket { fn new(_callback_id: CallbackId, socket_info: BluetoothSocket, fd: std::fs::File) -> Self { let stream = file_to_unixstream(fd); InternalConnectingSocket { _callback_id, socket_info, stream } fn new(_callback_id: CallbackId, socket_id: SocketId, joinhandle: JoinHandle<()>) -> Self { InternalConnectingSocket { _callback_id, socket_id, joinhandle } } } Loading Loading @@ -488,8 +493,8 @@ pub struct BluetoothSocketManager { /// List of listening sockets. listening: HashMap<CallbackId, Vec<InternalListeningSocket>>, /// Current futures mapped by callback id (so we can drop if callback disconnects). futures: HashMap<CallbackId, Vec<JoinHandle<()>>>, /// List of connecting sockets with futures (so we can drop if callback disconnects). connecting: HashMap<CallbackId, Vec<InternalConnectingSocket>>, /// Separate runtime for socket listeners (so they're not dependent on the /// same runtime as RPC). Loading @@ -514,7 +519,7 @@ impl BluetoothSocketManager { pub fn new(tx: Sender<Message>, admin: Arc<Mutex<Box<BluetoothAdmin>>>) -> Self { let callbacks = Callbacks::new(tx.clone(), Message::SocketManagerCallbackDisconnected); let socket_counter: u64 = 1000; let futures = HashMap::new(); let connecting = HashMap::new(); let listening = HashMap::new(); let runtime = Arc::new( Builder::new_multi_thread() Loading @@ -527,7 +532,7 @@ impl BluetoothSocketManager { BluetoothSocketManager { callbacks, futures, connecting, listening, runtime, sock: None, Loading Loading @@ -599,10 +604,7 @@ impl BluetoothSocketManager { let id = self.next_socket_id(); socket_info.id = id; let (runner_tx, runner_rx) = channel::<SocketRunnerActions>(10); // Keep track of active listener sockets. let listener = InternalListeningSocket::new(cbid, id, runner_tx, socket_info.uuid); self.listening.entry(cbid).or_default().push(listener); let uuid = socket_info.uuid.clone(); // Push a listening task to local runtime to wait for device to // start accepting or get closed. Loading Loading @@ -631,7 +633,11 @@ impl BluetoothSocketManager { .await; }); self.futures.entry(cbid).or_default().push(joinhandle); // Keep track of active listener sockets. self.listening .entry(cbid) .or_default() .push(InternalListeningSocket::new(cbid, id, runner_tx, uuid, joinhandle)); SocketResult::new(status, id) } Loading Loading @@ -697,7 +703,6 @@ impl BluetoothSocketManager { // callbacks. let id = self.next_socket_id(); socket_info.id = id; let connector = InternalConnectingSocket::new(cbid, socket_info, file); // Push a connecting task to local runtime to wait for connection // completion. Loading @@ -707,7 +712,8 @@ impl BluetoothSocketManager { cbid, id, tx, connector, socket_info, file_to_unixstream(file), Duration::from_millis(CONNECT_COMPLETE_TIMEOUT_MS), ) .await; Loading @@ -715,7 +721,10 @@ impl BluetoothSocketManager { // Keep track of these futures in case they need to be cancelled due to callback // disconnecting. self.futures.entry(cbid).or_default().push(joinhandle); self.connecting .entry(cbid) .or_default() .push(InternalConnectingSocket::new(cbid, id, joinhandle)); SocketResult::new(status, id) } Loading Loading @@ -1030,12 +1039,13 @@ impl BluetoothSocketManager { cbid: CallbackId, socket_id: SocketId, tx: Sender<Message>, connector: InternalConnectingSocket, socket_info: BluetoothSocket, stream: Option<UnixStream>, connection_timeout: Duration, ) { // If the unixstream isn't available for this connection, immediately return // a failure. let stream = match connector.stream { let stream = match stream { Some(s) => s, None => { let _ = tx Loading @@ -1060,7 +1070,7 @@ impl BluetoothSocketManager { if status != BtStatus::Success { log::info!( "Connecting socket to {} failed while trying to read channel from stream", connector.socket_info socket_info ); let _ = tx .send(Message::SocketManagerActions(SocketActions::OnOutgoingConnectionResult( Loading @@ -1076,7 +1086,7 @@ impl BluetoothSocketManager { if status != BtStatus::Success { log::info!( "Connecting socket to {} failed while trying to read connect complete from stream", connector.socket_info socket_info ); let _ = tx .send(Message::SocketManagerActions(SocketActions::OnOutgoingConnectionResult( Loading @@ -1100,7 +1110,7 @@ impl BluetoothSocketManager { )) .await; } else { let mut sock = connector.socket_info; let mut sock = socket_info; sock.fd = Some(unixstream_to_file(stream)); sock.port = cc.channel; sock.max_rx_size = cc.max_rx_packet_size.into(); Loading Loading @@ -1160,6 +1170,11 @@ impl BluetoothSocketManager { SocketActions::OnOutgoingConnectionResult(cbid, socket_id, status, socket) => { if let Some(callback) = self.callbacks.get_by_id_mut(cbid) { callback.on_outgoing_connection_result(socket_id, status, socket); // Also make sure to remove the socket from connecting list. self.connecting .entry(cbid) .and_modify(|v| v.retain(|s| s.socket_id != socket_id)); } } } Loading Loading @@ -1196,8 +1211,23 @@ impl BluetoothSocketManager { pub fn remove_callback(&mut self, callback: CallbackId) { // Remove any associated futures and sockets waiting to accept. self.futures.remove(&callback); self.listening.remove(&callback); self.connecting.remove(&callback).map(|sockets| { for s in sockets { s.joinhandle.abort(); } }); self.listening.remove(&callback).map(|sockets| { for s in sockets { if s.joinhandle.is_finished() { continue; } let tx = s.tx.clone(); let id = s.socket_id; self.runtime.spawn(async move { let _ = tx.send(SocketRunnerActions::Close(id)).await; }); } }); self.callbacks.remove_callback(callback); } Loading Loading
system/gd/rust/linux/stack/src/socket_manager.rs +55 −25 Original line number Diff line number Diff line Loading @@ -403,6 +403,9 @@ struct InternalListeningSocket { /// Used by admin uuid: Option<Uuid>, /// Used for tracing task status joinhandle: JoinHandle<()>, } impl InternalListeningSocket { Loading @@ -411,22 +414,24 @@ impl InternalListeningSocket { socket_id: SocketId, tx: Sender<SocketRunnerActions>, uuid: Option<Uuid>, joinhandle: JoinHandle<()>, ) -> Self { InternalListeningSocket { _callback_id, socket_id, tx, uuid } InternalListeningSocket { _callback_id, socket_id, tx, uuid, joinhandle } } } /// Internal connecting socket data. struct InternalConnectingSocket { _callback_id: CallbackId, socket_info: BluetoothSocket, stream: Option<UnixStream>, socket_id: SocketId, /// Used for cleaning up joinhandle: JoinHandle<()>, } impl InternalConnectingSocket { fn new(_callback_id: CallbackId, socket_info: BluetoothSocket, fd: std::fs::File) -> Self { let stream = file_to_unixstream(fd); InternalConnectingSocket { _callback_id, socket_info, stream } fn new(_callback_id: CallbackId, socket_id: SocketId, joinhandle: JoinHandle<()>) -> Self { InternalConnectingSocket { _callback_id, socket_id, joinhandle } } } Loading Loading @@ -488,8 +493,8 @@ pub struct BluetoothSocketManager { /// List of listening sockets. listening: HashMap<CallbackId, Vec<InternalListeningSocket>>, /// Current futures mapped by callback id (so we can drop if callback disconnects). futures: HashMap<CallbackId, Vec<JoinHandle<()>>>, /// List of connecting sockets with futures (so we can drop if callback disconnects). connecting: HashMap<CallbackId, Vec<InternalConnectingSocket>>, /// Separate runtime for socket listeners (so they're not dependent on the /// same runtime as RPC). Loading @@ -514,7 +519,7 @@ impl BluetoothSocketManager { pub fn new(tx: Sender<Message>, admin: Arc<Mutex<Box<BluetoothAdmin>>>) -> Self { let callbacks = Callbacks::new(tx.clone(), Message::SocketManagerCallbackDisconnected); let socket_counter: u64 = 1000; let futures = HashMap::new(); let connecting = HashMap::new(); let listening = HashMap::new(); let runtime = Arc::new( Builder::new_multi_thread() Loading @@ -527,7 +532,7 @@ impl BluetoothSocketManager { BluetoothSocketManager { callbacks, futures, connecting, listening, runtime, sock: None, Loading Loading @@ -599,10 +604,7 @@ impl BluetoothSocketManager { let id = self.next_socket_id(); socket_info.id = id; let (runner_tx, runner_rx) = channel::<SocketRunnerActions>(10); // Keep track of active listener sockets. let listener = InternalListeningSocket::new(cbid, id, runner_tx, socket_info.uuid); self.listening.entry(cbid).or_default().push(listener); let uuid = socket_info.uuid.clone(); // Push a listening task to local runtime to wait for device to // start accepting or get closed. Loading Loading @@ -631,7 +633,11 @@ impl BluetoothSocketManager { .await; }); self.futures.entry(cbid).or_default().push(joinhandle); // Keep track of active listener sockets. self.listening .entry(cbid) .or_default() .push(InternalListeningSocket::new(cbid, id, runner_tx, uuid, joinhandle)); SocketResult::new(status, id) } Loading Loading @@ -697,7 +703,6 @@ impl BluetoothSocketManager { // callbacks. let id = self.next_socket_id(); socket_info.id = id; let connector = InternalConnectingSocket::new(cbid, socket_info, file); // Push a connecting task to local runtime to wait for connection // completion. Loading @@ -707,7 +712,8 @@ impl BluetoothSocketManager { cbid, id, tx, connector, socket_info, file_to_unixstream(file), Duration::from_millis(CONNECT_COMPLETE_TIMEOUT_MS), ) .await; Loading @@ -715,7 +721,10 @@ impl BluetoothSocketManager { // Keep track of these futures in case they need to be cancelled due to callback // disconnecting. self.futures.entry(cbid).or_default().push(joinhandle); self.connecting .entry(cbid) .or_default() .push(InternalConnectingSocket::new(cbid, id, joinhandle)); SocketResult::new(status, id) } Loading Loading @@ -1030,12 +1039,13 @@ impl BluetoothSocketManager { cbid: CallbackId, socket_id: SocketId, tx: Sender<Message>, connector: InternalConnectingSocket, socket_info: BluetoothSocket, stream: Option<UnixStream>, connection_timeout: Duration, ) { // If the unixstream isn't available for this connection, immediately return // a failure. let stream = match connector.stream { let stream = match stream { Some(s) => s, None => { let _ = tx Loading @@ -1060,7 +1070,7 @@ impl BluetoothSocketManager { if status != BtStatus::Success { log::info!( "Connecting socket to {} failed while trying to read channel from stream", connector.socket_info socket_info ); let _ = tx .send(Message::SocketManagerActions(SocketActions::OnOutgoingConnectionResult( Loading @@ -1076,7 +1086,7 @@ impl BluetoothSocketManager { if status != BtStatus::Success { log::info!( "Connecting socket to {} failed while trying to read connect complete from stream", connector.socket_info socket_info ); let _ = tx .send(Message::SocketManagerActions(SocketActions::OnOutgoingConnectionResult( Loading @@ -1100,7 +1110,7 @@ impl BluetoothSocketManager { )) .await; } else { let mut sock = connector.socket_info; let mut sock = socket_info; sock.fd = Some(unixstream_to_file(stream)); sock.port = cc.channel; sock.max_rx_size = cc.max_rx_packet_size.into(); Loading Loading @@ -1160,6 +1170,11 @@ impl BluetoothSocketManager { SocketActions::OnOutgoingConnectionResult(cbid, socket_id, status, socket) => { if let Some(callback) = self.callbacks.get_by_id_mut(cbid) { callback.on_outgoing_connection_result(socket_id, status, socket); // Also make sure to remove the socket from connecting list. self.connecting .entry(cbid) .and_modify(|v| v.retain(|s| s.socket_id != socket_id)); } } } Loading Loading @@ -1196,8 +1211,23 @@ impl BluetoothSocketManager { pub fn remove_callback(&mut self, callback: CallbackId) { // Remove any associated futures and sockets waiting to accept. self.futures.remove(&callback); self.listening.remove(&callback); self.connecting.remove(&callback).map(|sockets| { for s in sockets { s.joinhandle.abort(); } }); self.listening.remove(&callback).map(|sockets| { for s in sockets { if s.joinhandle.is_finished() { continue; } let tx = s.tx.clone(); let id = s.socket_id; self.runtime.spawn(async move { let _ = tx.send(SocketRunnerActions::Close(id)).await; }); } }); self.callbacks.remove_callback(callback); } Loading