Loading system/gd/rust/common/src/time.rs +41 −44 Original line number Diff line number Diff line ///! Waking timers for Bluetooth. Implemented using timerfd, but supposed to feel similar to ///Tokio's time use crate::ready; use nix::sys::time::TimeSpec; use nix::sys::timerfd::{ClockId, Expiration, TimerFd, TimerFlags, TimerSetTimeFlags}; use nix::unistd::close; use std::future::Future; use std::os::unix::io::AsRawFd; use std::pin::Pin; use std::task::{self, Poll}; use std::time::Duration; use tokio::io::unix::AsyncFd; /// Similar to tokio's sleep() pub fn sleep(duration: Duration) -> Sleep { /// A single shot Alarm pub struct Alarm { fd: AsyncFd<TimerFd>, } impl Alarm { /// Construct a new alarm pub fn new() -> Self { let timer = TimerFd::new(get_clock(), TimerFlags::empty()).unwrap(); timer Self { fd: AsyncFd::new(timer).unwrap(), } } /// Reset the alarm to duration, starting from now pub fn reset(&mut self, duration: Duration) { self.fd.get_ref() .set( Expiration::OneShot(TimeSpec::from(duration)), TimerSetTimeFlags::empty(), ) .unwrap(); Sleep { fd: AsyncFd::new(timer).unwrap(), } } /// Future returned by sleep() pub struct Sleep { fd: AsyncFd<TimerFd>, /// Stop the alarm if it is currently started pub fn cancel(&mut self) { self.reset(Duration::from_millis(0)); } impl Future for Sleep { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { match ready!(self.fd.poll_read_ready(cx)) { Ok(_) => { /// Completes when the alarm has expired pub async fn expired(&mut self) { drop(self.fd.readable().await.unwrap()); // Will not block, since we have confirmed it is readable self.fd.get_ref().wait().unwrap(); Poll::Ready(()) } Err(e) => panic!("timer error: {}", e), } impl Default for Alarm { fn default() -> Self { Alarm::new() } } impl Drop for Sleep { impl Drop for Alarm { fn drop(&mut self) { close(self.fd.as_raw_fd()).unwrap(); } Loading Loading @@ -99,24 +102,18 @@ fn get_clock() -> ClockId { #[cfg(test)] mod tests { use super::interval; use super::sleep; use super::Alarm; use crate::assert_near; use std::time::{Duration, Instant}; #[test] fn sleep_schedule_and_then_drop() { let runtime = tokio::runtime::Runtime::new().unwrap(); runtime.block_on(async { sleep(Duration::from_millis(200)); }); } #[test] fn sleep_simple_case() { fn alarm_simple_case() { let runtime = tokio::runtime::Runtime::new().unwrap(); runtime.block_on(async { let timer = Instant::now(); sleep(Duration::from_millis(10)).await; let mut alarm = Alarm::new(); alarm.reset(Duration::from_millis(10)); alarm.expired().await; assert_near!(timer.elapsed().as_millis(), 10, 3); }); Loading system/gd/rust/hci/src/lib.rs +54 −15 Original line number Diff line number Diff line Loading @@ -6,18 +6,22 @@ pub mod error; /// HCI layer facade service pub mod facade; use bt_common::time::Alarm; use bt_hal::HalExports; use bt_packets::hci::CommandCompleteChild::ResetComplete; use bt_packets::hci::EventChild::{ CommandComplete, CommandStatus, LeMetaEvent, MaxSlotsChange, PageScanRepetitionModeChange, VendorSpecificEvent, }; use bt_packets::hci::{ AclPacket, CommandPacket, EventCode, EventPacket, LeMetaEventPacket, OpCode, SubeventCode, AclPacket, CommandCompletePacket, CommandPacket, CommandStatusPacket, ErrorCode, EventCode, EventPacket, LeMetaEventPacket, OpCode, ResetBuilder, SubeventCode, }; use error::Result; use gddi::{module, provides, Stoppable}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::runtime::Runtime; use tokio::select; use tokio::sync::mpsc::{channel, Receiver, Sender}; Loading Loading @@ -47,13 +51,24 @@ async fn provide_hci(hal_exports: HalExports, rt: Arc<Runtime>) -> HciExports { cmd_rx, )); HciExports { let mut exports = HciExports { cmd_tx, evt_handlers, le_evt_handlers, acl_tx: hal_exports.acl_tx, acl_rx: hal_exports.acl_rx, }; match exports .enqueue_command_with_complete(ResetBuilder {}.build().into()) .await .specialize() { ResetComplete(evt) if *evt.get_status() == ErrorCode::Success => {} _ => panic!("reset did not complete successfully"), } exports } /// HCI command entry Loading Loading @@ -93,18 +108,35 @@ impl HciExports { /// Enqueue an HCI command expecting a command complete /// response from the controller pub async fn enqueue_command_with_complete(&mut self, cmd: CommandPacket) -> EventPacket { self.send(cmd).await.unwrap() pub async fn enqueue_command_with_complete( &mut self, cmd: CommandPacket, ) -> CommandCompletePacket { match self.send(cmd).await.unwrap().specialize() { CommandComplete(evt) => evt, _ => panic!("Expected command complete, got status instead"), } } /// Enqueue an HCI command expecting a status response /// from the controller pub async fn enqueue_command_with_status(&mut self, cmd: CommandPacket) -> EventPacket { self.send(cmd).await.unwrap() pub async fn enqueue_command_with_status(&mut self, cmd: CommandPacket) -> CommandStatusPacket { match self.send(cmd).await.unwrap().specialize() { CommandStatus(evt) => evt, _ => panic!("Expected command status, got complete instead"), } } /// Indicate interest in specific HCI events pub async fn register_event_handler(&mut self, code: EventCode, sender: Sender<EventPacket>) { match code { EventCode::CommandStatus | EventCode::CommandComplete | EventCode::LeMetaEvent | EventCode::PageScanRepetitionModeChange | EventCode::MaxSlotsChange | EventCode::VendorSpecific => panic!("{:?} is a protected event", code), _ => { assert!( self.evt_handlers .lock() Loading @@ -115,6 +147,8 @@ impl HciExports { code ); } } } /// Remove interest in specific HCI events pub async fn unregister_event_handler(&mut self, code: EventCode) { Loading Loading @@ -152,11 +186,13 @@ async fn dispatch( mut cmd_rx: Receiver<Command>, ) { let mut pending_cmd: Option<PendingCommand> = None; let mut hci_timeout = Alarm::new(); loop { select! { Some(evt) = consume(&evt_rx) => { match evt.specialize() { CommandStatus(evt) => { hci_timeout.cancel(); let this_opcode = *evt.get_command_op_code(); match pending_cmd.take() { Some(PendingCommand{opcode, fut}) if opcode == this_opcode => fut.send(evt.into()).unwrap(), Loading @@ -165,6 +201,7 @@ async fn dispatch( } }, CommandComplete(evt) => { hci_timeout.cancel(); let this_opcode = *evt.get_command_op_code(); match pending_cmd.take() { Some(PendingCommand{opcode, fut}) if opcode == this_opcode => fut.send(evt.into()).unwrap(), Loading Loading @@ -197,7 +234,9 @@ async fn dispatch( fut: cmd.fut, }); cmd_tx.send(cmd.cmd).await.unwrap(); hci_timeout.reset(Duration::from_secs(2)); }, _ = hci_timeout.expired() => panic!("Timed out waiting for {:?}", pending_cmd.unwrap().opcode), else => break, } } Loading Loading
system/gd/rust/common/src/time.rs +41 −44 Original line number Diff line number Diff line ///! Waking timers for Bluetooth. Implemented using timerfd, but supposed to feel similar to ///Tokio's time use crate::ready; use nix::sys::time::TimeSpec; use nix::sys::timerfd::{ClockId, Expiration, TimerFd, TimerFlags, TimerSetTimeFlags}; use nix::unistd::close; use std::future::Future; use std::os::unix::io::AsRawFd; use std::pin::Pin; use std::task::{self, Poll}; use std::time::Duration; use tokio::io::unix::AsyncFd; /// Similar to tokio's sleep() pub fn sleep(duration: Duration) -> Sleep { /// A single shot Alarm pub struct Alarm { fd: AsyncFd<TimerFd>, } impl Alarm { /// Construct a new alarm pub fn new() -> Self { let timer = TimerFd::new(get_clock(), TimerFlags::empty()).unwrap(); timer Self { fd: AsyncFd::new(timer).unwrap(), } } /// Reset the alarm to duration, starting from now pub fn reset(&mut self, duration: Duration) { self.fd.get_ref() .set( Expiration::OneShot(TimeSpec::from(duration)), TimerSetTimeFlags::empty(), ) .unwrap(); Sleep { fd: AsyncFd::new(timer).unwrap(), } } /// Future returned by sleep() pub struct Sleep { fd: AsyncFd<TimerFd>, /// Stop the alarm if it is currently started pub fn cancel(&mut self) { self.reset(Duration::from_millis(0)); } impl Future for Sleep { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { match ready!(self.fd.poll_read_ready(cx)) { Ok(_) => { /// Completes when the alarm has expired pub async fn expired(&mut self) { drop(self.fd.readable().await.unwrap()); // Will not block, since we have confirmed it is readable self.fd.get_ref().wait().unwrap(); Poll::Ready(()) } Err(e) => panic!("timer error: {}", e), } impl Default for Alarm { fn default() -> Self { Alarm::new() } } impl Drop for Sleep { impl Drop for Alarm { fn drop(&mut self) { close(self.fd.as_raw_fd()).unwrap(); } Loading Loading @@ -99,24 +102,18 @@ fn get_clock() -> ClockId { #[cfg(test)] mod tests { use super::interval; use super::sleep; use super::Alarm; use crate::assert_near; use std::time::{Duration, Instant}; #[test] fn sleep_schedule_and_then_drop() { let runtime = tokio::runtime::Runtime::new().unwrap(); runtime.block_on(async { sleep(Duration::from_millis(200)); }); } #[test] fn sleep_simple_case() { fn alarm_simple_case() { let runtime = tokio::runtime::Runtime::new().unwrap(); runtime.block_on(async { let timer = Instant::now(); sleep(Duration::from_millis(10)).await; let mut alarm = Alarm::new(); alarm.reset(Duration::from_millis(10)); alarm.expired().await; assert_near!(timer.elapsed().as_millis(), 10, 3); }); Loading
system/gd/rust/hci/src/lib.rs +54 −15 Original line number Diff line number Diff line Loading @@ -6,18 +6,22 @@ pub mod error; /// HCI layer facade service pub mod facade; use bt_common::time::Alarm; use bt_hal::HalExports; use bt_packets::hci::CommandCompleteChild::ResetComplete; use bt_packets::hci::EventChild::{ CommandComplete, CommandStatus, LeMetaEvent, MaxSlotsChange, PageScanRepetitionModeChange, VendorSpecificEvent, }; use bt_packets::hci::{ AclPacket, CommandPacket, EventCode, EventPacket, LeMetaEventPacket, OpCode, SubeventCode, AclPacket, CommandCompletePacket, CommandPacket, CommandStatusPacket, ErrorCode, EventCode, EventPacket, LeMetaEventPacket, OpCode, ResetBuilder, SubeventCode, }; use error::Result; use gddi::{module, provides, Stoppable}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::runtime::Runtime; use tokio::select; use tokio::sync::mpsc::{channel, Receiver, Sender}; Loading Loading @@ -47,13 +51,24 @@ async fn provide_hci(hal_exports: HalExports, rt: Arc<Runtime>) -> HciExports { cmd_rx, )); HciExports { let mut exports = HciExports { cmd_tx, evt_handlers, le_evt_handlers, acl_tx: hal_exports.acl_tx, acl_rx: hal_exports.acl_rx, }; match exports .enqueue_command_with_complete(ResetBuilder {}.build().into()) .await .specialize() { ResetComplete(evt) if *evt.get_status() == ErrorCode::Success => {} _ => panic!("reset did not complete successfully"), } exports } /// HCI command entry Loading Loading @@ -93,18 +108,35 @@ impl HciExports { /// Enqueue an HCI command expecting a command complete /// response from the controller pub async fn enqueue_command_with_complete(&mut self, cmd: CommandPacket) -> EventPacket { self.send(cmd).await.unwrap() pub async fn enqueue_command_with_complete( &mut self, cmd: CommandPacket, ) -> CommandCompletePacket { match self.send(cmd).await.unwrap().specialize() { CommandComplete(evt) => evt, _ => panic!("Expected command complete, got status instead"), } } /// Enqueue an HCI command expecting a status response /// from the controller pub async fn enqueue_command_with_status(&mut self, cmd: CommandPacket) -> EventPacket { self.send(cmd).await.unwrap() pub async fn enqueue_command_with_status(&mut self, cmd: CommandPacket) -> CommandStatusPacket { match self.send(cmd).await.unwrap().specialize() { CommandStatus(evt) => evt, _ => panic!("Expected command status, got complete instead"), } } /// Indicate interest in specific HCI events pub async fn register_event_handler(&mut self, code: EventCode, sender: Sender<EventPacket>) { match code { EventCode::CommandStatus | EventCode::CommandComplete | EventCode::LeMetaEvent | EventCode::PageScanRepetitionModeChange | EventCode::MaxSlotsChange | EventCode::VendorSpecific => panic!("{:?} is a protected event", code), _ => { assert!( self.evt_handlers .lock() Loading @@ -115,6 +147,8 @@ impl HciExports { code ); } } } /// Remove interest in specific HCI events pub async fn unregister_event_handler(&mut self, code: EventCode) { Loading Loading @@ -152,11 +186,13 @@ async fn dispatch( mut cmd_rx: Receiver<Command>, ) { let mut pending_cmd: Option<PendingCommand> = None; let mut hci_timeout = Alarm::new(); loop { select! { Some(evt) = consume(&evt_rx) => { match evt.specialize() { CommandStatus(evt) => { hci_timeout.cancel(); let this_opcode = *evt.get_command_op_code(); match pending_cmd.take() { Some(PendingCommand{opcode, fut}) if opcode == this_opcode => fut.send(evt.into()).unwrap(), Loading @@ -165,6 +201,7 @@ async fn dispatch( } }, CommandComplete(evt) => { hci_timeout.cancel(); let this_opcode = *evt.get_command_op_code(); match pending_cmd.take() { Some(PendingCommand{opcode, fut}) if opcode == this_opcode => fut.send(evt.into()).unwrap(), Loading Loading @@ -197,7 +234,9 @@ async fn dispatch( fut: cmd.fut, }); cmd_tx.send(cmd.cmd).await.unwrap(); hci_timeout.reset(Duration::from_secs(2)); }, _ = hci_timeout.expired() => panic!("Timed out waiting for {:?}", pending_cmd.unwrap().opcode), else => break, } } Loading