Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1f51e620 authored by Zach Johnson's avatar Zach Johnson
Browse files

rusty-gd: introduce RxAdapter

this consolidates logic for dispatching channels over gRPC or callbacks
into legacy

Bug: 171749953
Tag: #gd-refactor
Test: gd/cert/run --rhost SimpleHalTest
Change-Id: I724939c1e55312bfb2c33b906252e6df111bd9f5
parent c3c0b3ca
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -37,6 +37,11 @@ pub enum Error {
  InvalidPacketError
}

pub trait Packet {
  fn to_bytes(self) -> Bytes;
  fn to_vec(self) -> Vec<u8>;
}

)";
}

+11 −8
Original line number Diff line number Diff line
@@ -1041,22 +1041,25 @@ void PacketDef::GenRustAccessStructImpls(std::ostream& s) const {
    s << "}";
  }

  s << "impl " << name_ << "Packet {";
  if (parent_ == nullptr) {
    s << "pub fn parse(bytes: &[u8]) -> Result<Self> { ";
    s << "Ok(Self::new(Arc::new(" << name_ << "Data::parse(bytes)?)))";
    s << "}";
  }
  s << "impl Packet for " << name_ << "Packet {";
  auto root = GetRootDef();
  auto root_accessor = util::CamelCaseToUnderScore(root->name_);

  s << "pub fn to_bytes(self) -> Bytes {";
  s << "fn to_bytes(self) -> Bytes {";
  s << " let mut buffer = BytesMut::new();";
  s << " self." << root_accessor << ".write_to(&mut buffer);";
  s << " buffer.freeze()";
  s << "}\n";

  s << "pub fn to_vec(self) -> Vec<u8> { self.to_bytes().to_vec() }\n";
  s << "fn to_vec(self) -> Vec<u8> { self.to_bytes().to_vec() }\n";
  s << "}";

  s << "impl " << name_ << "Packet {";
  if (parent_ == nullptr) {
    s << "pub fn parse(bytes: &[u8]) -> Result<Self> { ";
    s << "Ok(Self::new(Arc::new(" << name_ << "Data::parse(bytes)?)))";
    s << "}";
  }

  if (!children_.empty()) {
    s << " pub fn specialize(&self) -> " << name_ << "Child {";
+18 −0
Original line number Diff line number Diff line
rust_library {
    name: "libbt_facade_helpers",
    defaults: ["gd_rust_defaults"],
    crate_name: "bt_facade_helpers",
    srcs: ["src/lib.rs"],
    edition: "2018",
    rustlibs: [
        "libbt_facade_proto",
        "libbt_packets",
        "libbytes",
        "libfutures",
        "libgrpcio",
        "libtokio",
        "libprotobuf",
        "liblog_rust",
        "libcxx",
    ],
}
+67 −0
Original line number Diff line number Diff line
//! common facade & shim helpers

use bt_facade_proto::common::Data;
use bt_packets::hci::Packet;
use futures::sink::SinkExt;
use grpcio::*;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::Receiver;
use tokio::sync::Mutex;

/// Wrapper so we can invoke callbacks
pub trait U8SliceRunnable {
    /// Do the thing
    fn run(&self, data: &[u8]);
}

/// Helper for interfacing channels with shim or gRPC boundaries
#[derive(Clone)]
pub struct RxAdapter<T> {
    rx: Arc<Mutex<Receiver<T>>>,
    running: bool,
}

impl<T: 'static + Packet + Send> RxAdapter<T> {
    /// New, from an unwrapped receiver
    pub fn new(rx: Receiver<T>) -> Self {
        Self::from_arc(Arc::new(Mutex::new(rx)))
    }

    /// New, from an already arc mutexed receiver
    pub fn from_arc(rx: Arc<Mutex<Receiver<T>>>) -> Self {
        Self { rx, running: false }
    }

    /// Stream out the channel over the provided sink
    pub fn stream_grpc(&mut self, ctx: RpcContext<'_>, mut sink: ServerStreamingSink<Data>) {
        assert!(!self.running);
        self.running = true;

        let clone_rx = self.rx.clone();
        ctx.spawn(async move {
            while let Some(payload) = clone_rx.lock().await.recv().await {
                let mut data = Data::default();
                data.set_payload(payload.to_vec());
                sink.send((data, WriteFlags::default())).await.unwrap();
            }
        });
    }

    /// Stream out the channel over the provided shim runnable
    pub fn stream_runnable<R: 'static + U8SliceRunnable + Send>(
        &mut self,
        rt: &Arc<Runtime>,
        runnable: R,
    ) {
        assert!(!self.running);
        self.running = true;

        let clone_rx = self.rx.clone();
        rt.spawn(async move {
            while let Some(payload) = clone_rx.lock().await.recv().await {
                runnable.run(&payload.to_bytes());
            }
        });
    }
}
+1 −1
Original line number Diff line number Diff line
@@ -5,7 +5,7 @@ use bt_common::GrpcFacade;
use bt_facade_proto::common::Data;
use bt_facade_proto::empty::Empty;
use bt_facade_proto::hal_facade_grpc::{create_hci_hal_facade, HciHalFacade};
use bt_packets::hci::{AclPacket, CommandPacket};
use bt_packets::hci::{AclPacket, CommandPacket, Packet};
use futures::sink::SinkExt;
use gddi::{module, provides, Stoppable};
use grpcio::*;
Loading