Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 77e3c547 authored by James Shargo's avatar James Shargo
Browse files

bufferstreams: Add basic implementations of core BS traits

For BufferSubscriptions, this change provides a generic implenmentation
that tracks requests and cancellations.

For BufferPublishers and BufferSubscribers, we provide test
implementations that let a user manually control the flow of events
between the two objects.

The traits themselves have also been updated to be more generic--instead
of forcing Arc/Weak pointers for shared objects, we now use generic
owned types for a BufferPublisher's BufferSubscriber and a
BufferSubscriber's BufferSubscription.

To make it possible to hold into a handle to a BufferSubscriber while a
BufferPublisher owns it, we provide a generic implementation of
BufferSubscriber for any Arc<BS: BufferSubscriber> that delegates to the
underlying subscriber.

Bug: 296449936, 296100790
Test: atest libbufferstreams-internal_test
Change-Id: Ibbf925d2dfb85f606baa3dc1f9722440af4f862c
parent 83baba50
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@ rust_defaults {
    name: "libbufferstreams_defaults",
    srcs: ["src/lib.rs"],
    rustlibs: [
        "libanyhow",
        "libnativewindow_rs",
    ],
    edition: "2021",
+130 −42
Original line number Diff line number Diff line
@@ -14,12 +14,14 @@

//! libbufferstreams: Reactive Streams for Graphics Buffers

pub mod publishers;
mod stream_config;
pub mod subscribers;
pub mod subscriptions;

pub use stream_config::*;

use nativewindow::*;
use std::sync::{Arc, Weak};
use std::time::Instant;

/// This function will print Hello World.
@@ -58,7 +60,7 @@ pub trait BufferPublisher {
    fn get_publisher_stream_config(&self) -> StreamConfig;
    /// This function will create the subscription between the publisher and
    /// the subscriber.
    fn subscribe(&self, subscriber: Weak<dyn BufferSubscriber>);
    fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static);
}

/// BufferSubscribers can subscribe to BufferPublishers. They can request Frames
@@ -91,13 +93,13 @@ pub trait BufferSubscriber {
    /// The StreamConfig of buffers that this subscriber expects.
    fn get_subscriber_stream_config(&self) -> StreamConfig;
    /// This function will be called at the beginning of the subscription.
    fn on_subscribe(&self, subscription: Arc<dyn BufferSubscription>);
    fn on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>);
    /// This function will be called for buffer that comes in.
    fn on_next(&self, frame: Frame);
    fn on_next(&mut self, frame: Frame);
    /// This function will be called in case of an error.
    fn on_error(&self, error: BufferError);
    fn on_error(&mut self, error: BufferError);
    /// This function will be called on finite streams when done.
    fn on_complete(&self);
    fn on_complete(&mut self);
}

/// BufferSubscriptions serve as the bridge between BufferPublishers and
@@ -150,8 +152,9 @@ pub trait BufferSubscription {
    /// cancel
    fn cancel(&self);
}

/// Type used to describe errors produced by subscriptions.
type BufferError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub type BufferError = anyhow::Error;

/// Struct used to contain the buffer.
pub struct Frame {
@@ -162,3 +165,88 @@ pub struct Frame {
    /// A fence used for reading/writing safely.
    pub fence: i32,
}

#[cfg(test)]
mod test {
    #![allow(warnings, unused)]
    use super::*;

    use anyhow::anyhow;
    use std::borrow::BorrowMut;
    use std::error::Error;
    use std::ops::Add;
    use std::sync::Arc;
    use std::time::Duration;

    use crate::publishers::testing::*;
    use crate::subscribers::{testing::*, SharedSubscriber};

    const STREAM_CONFIG: StreamConfig = StreamConfig {
        width: 1,
        height: 1,
        layers: 1,
        format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM,
        usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN,
        stride: 0,
    };

    fn make_frame() -> Frame {
        Frame {
            buffer: STREAM_CONFIG
                .create_hardware_buffer()
                .expect("Unable to create hardware buffer for test"),
            present_time: Instant::now() + Duration::from_secs(1),
            fence: 0,
        }
    }

    #[test]
    fn test_test_implementations_next() {
        let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
        let mut publisher = TestPublisher::new(STREAM_CONFIG);

        publisher.subscribe(subscriber.clone());
        assert!(subscriber.map_inner(|s| s.has_subscription()));
        assert!(publisher.has_subscriber());

        publisher.send_frame(make_frame());
        let events = subscriber.map_inner_mut(|s| s.take_events());
        assert!(!matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));

        subscriber.map_inner(|s| s.request(1));
        assert_eq!(publisher.pending_requests(), 1);

        publisher.send_frame(make_frame());
        let events = subscriber.map_inner_mut(|s| s.take_events());
        assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
        assert_eq!(publisher.pending_requests(), 0);
    }

    #[test]
    fn test_test_implementations_complete() {
        let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
        let mut publisher = TestPublisher::new(STREAM_CONFIG);

        publisher.subscribe(subscriber.clone());
        assert!(subscriber.map_inner(|s| s.has_subscription()));
        assert!(publisher.has_subscriber());

        publisher.send_complete();
        let events = subscriber.map_inner_mut(|s| s.take_events());
        assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Complete));
    }

    #[test]
    fn test_test_implementations_error() {
        let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
        let mut publisher = TestPublisher::new(STREAM_CONFIG);

        publisher.subscribe(subscriber.clone());
        assert!(subscriber.map_inner(|s| s.has_subscription()));
        assert!(publisher.has_subscriber());

        publisher.send_error(anyhow!("error"));
        let events = subscriber.map_inner_mut(|s| s.take_events());
        assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Error(_)));
    }
}
+17 −0
Original line number Diff line number Diff line
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This module provides [BufferSubscriber] implementations and helpers.

pub mod testing;
+103 −0
Original line number Diff line number Diff line
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Provides useful publishers for testing specifically. These should not be used in normal code.

use crate::{subscriptions::SharedBufferSubscription, *};

/// A [BufferPublisher] specifically for testing.
///
/// Provides users the ability to send events and read the state of the subscription.
pub struct TestPublisher {
    config: StreamConfig,
    subscriber: Option<Box<dyn BufferSubscriber>>,
    subscription: SharedBufferSubscription,
}

impl TestPublisher {
    /// Create a new [TestPublisher].
    pub fn new(config: StreamConfig) -> Self {
        Self { config, subscriber: None, subscription: SharedBufferSubscription::new() }
    }

    /// Send a [BufferSubscriber::on_next] event to an owned [BufferSubscriber] if it has any
    /// requested and returns true. Drops the frame and returns false otherwise.
    ///
    /// # Panics
    ///
    /// This will panic if there is no owned subscriber.
    pub fn send_frame(&mut self, frame: Frame) -> bool {
        let subscriber =
            self.subscriber.as_deref_mut().expect("Tried to send_frame with no subscriber");

        if self.subscription.take_request() {
            subscriber.on_next(frame);
            true
        } else {
            false
        }
    }

    /// Send a [BufferSubscriber::on_complete] event to an owned [BufferSubscriber].
    ///
    /// # Panics
    ///
    /// This will panic if there is no owned subscriber.
    pub fn send_complete(&mut self) {
        let subscriber =
            self.subscriber.as_deref_mut().expect("Tried to send_complete with no subscriber");
        subscriber.on_complete();
    }

    /// Send a [BufferSubscriber::on_error] event to an owned [BufferSubscriber].
    ///
    /// # Panics
    ///
    /// This will panic if there is no owned subscriber.
    pub fn send_error(&mut self, error: BufferError) {
        let subscriber =
            self.subscriber.as_deref_mut().expect("Tried to send_error with no subscriber");
        subscriber.on_error(error);
    }

    /// Returns whether this [BufferPublisher] owns a subscriber.
    pub fn has_subscriber(&self) -> bool {
        self.subscriber.is_some()
    }

    /// Returns the nummber of frames requested by the [BufferSubscriber].
    pub fn pending_requests(&self) -> u64 {
        self.subscription.pending_requests()
    }

    /// Returns whether the [BufferSubscriber] has cancelled the subscription.
    pub fn is_cancelled(&self) -> bool {
        self.subscription.is_cancelled()
    }
}

impl BufferPublisher for TestPublisher {
    fn get_publisher_stream_config(&self) -> crate::StreamConfig {
        self.config
    }

    fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static) {
        assert!(self.subscriber.is_none(), "TestingPublishers can only take one subscriber");
        self.subscriber = Some(Box::new(subscriber));

        if let Some(ref mut subscriber) = self.subscriber {
            subscriber.on_subscribe(self.subscription.clone_for_subscriber());
        }
    }
}
+20 −0
Original line number Diff line number Diff line
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This module provides [BufferSubscriber] implementations and helpers.

mod shared;
pub mod testing;

pub use shared::*;
Loading