Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c4662740 authored by Treehugger Robot's avatar Treehugger Robot Committed by Android (Google) Code Review
Browse files

Merge changes Ibbf925d2,I12a2aa59 into main

* changes:
  bufferstreams: Add basic implementations of core BS traits
  libbufferstreams: Add a StreamConfig type (and associated trait methods)
parents 94c756ee 77e3c547
Loading
Loading
Loading
Loading
+18 −5
Original line number Diff line number Diff line
@@ -12,13 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

rust_library {
    name: "libbufferstreams",
    crate_name: "bufferstreams",
rust_defaults {
    name: "libbufferstreams_defaults",
    srcs: ["src/lib.rs"],
    edition: "2021",
    rlibs: [
    rustlibs: [
        "libanyhow",
        "libnativewindow_rs",
    ],
    edition: "2021",
}

rust_library {
    name: "libbufferstreams",
    crate_name: "bufferstreams",
    defaults: ["libbufferstreams_defaults"],
    min_sdk_version: "30",
}

rust_test {
    name: "libbufferstreams-internal_test",
    crate_name: "bufferstreams",
    defaults: ["libbufferstreams_defaults"],
    test_suites: ["general-tests"],
}
+138 −42
Original line number Diff line number Diff line
@@ -14,8 +14,14 @@

//! libbufferstreams: Reactive Streams for Graphics Buffers

pub mod publishers;
mod stream_config;
pub mod subscribers;
pub mod subscriptions;

pub use stream_config::*;

use nativewindow::*;
use std::sync::{Arc, Weak};
use std::time::Instant;

/// This function will print Hello World.
@@ -50,9 +56,11 @@ pub extern "C" fn hello() -> bool {
/// * A Publisher MAY support multiple Subscribers and decides whether each
/// Subscription is unicast or multicast.
pub trait BufferPublisher {
    /// Returns the StreamConfig of buffers that publisher creates.
    fn get_publisher_stream_config(&self) -> StreamConfig;
    /// This function will create the subscription between the publisher and
    /// the subscriber.
    fn subscribe(&self, subscriber: Weak<dyn BufferSubscriber>);
    fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static);
}

/// BufferSubscribers can subscribe to BufferPublishers. They can request Frames
@@ -82,14 +90,16 @@ pub trait BufferPublisher {
/// * A Publisher MAY support multiple Subscribers and decides whether each
/// Subscription is unicast or multicast.
pub trait BufferSubscriber {
    /// The StreamConfig of buffers that this subscriber expects.
    fn get_subscriber_stream_config(&self) -> StreamConfig;
    /// This function will be called at the beginning of the subscription.
    fn on_subscribe(&self, subscription: Arc<dyn BufferSubscription>);
    fn on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>);
    /// This function will be called for buffer that comes in.
    fn on_next(&self, frame: Frame);
    fn on_next(&mut self, frame: Frame);
    /// This function will be called in case of an error.
    fn on_error(&self, error: BufferError);
    fn on_error(&mut self, error: BufferError);
    /// This function will be called on finite streams when done.
    fn on_complete(&self);
    fn on_complete(&mut self);
}

/// BufferSubscriptions serve as the bridge between BufferPublishers and
@@ -142,8 +152,9 @@ pub trait BufferSubscription {
    /// cancel
    fn cancel(&self);
}

/// Type used to describe errors produced by subscriptions.
type BufferError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub type BufferError = anyhow::Error;

/// Struct used to contain the buffer.
pub struct Frame {
@@ -154,3 +165,88 @@ pub struct Frame {
    /// A fence used for reading/writing safely.
    pub fence: i32,
}

#[cfg(test)]
mod test {
    #![allow(warnings, unused)]
    use super::*;

    use anyhow::anyhow;
    use std::borrow::BorrowMut;
    use std::error::Error;
    use std::ops::Add;
    use std::sync::Arc;
    use std::time::Duration;

    use crate::publishers::testing::*;
    use crate::subscribers::{testing::*, SharedSubscriber};

    const STREAM_CONFIG: StreamConfig = StreamConfig {
        width: 1,
        height: 1,
        layers: 1,
        format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM,
        usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN,
        stride: 0,
    };

    fn make_frame() -> Frame {
        Frame {
            buffer: STREAM_CONFIG
                .create_hardware_buffer()
                .expect("Unable to create hardware buffer for test"),
            present_time: Instant::now() + Duration::from_secs(1),
            fence: 0,
        }
    }

    #[test]
    fn test_test_implementations_next() {
        let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
        let mut publisher = TestPublisher::new(STREAM_CONFIG);

        publisher.subscribe(subscriber.clone());
        assert!(subscriber.map_inner(|s| s.has_subscription()));
        assert!(publisher.has_subscriber());

        publisher.send_frame(make_frame());
        let events = subscriber.map_inner_mut(|s| s.take_events());
        assert!(!matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));

        subscriber.map_inner(|s| s.request(1));
        assert_eq!(publisher.pending_requests(), 1);

        publisher.send_frame(make_frame());
        let events = subscriber.map_inner_mut(|s| s.take_events());
        assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
        assert_eq!(publisher.pending_requests(), 0);
    }

    #[test]
    fn test_test_implementations_complete() {
        let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
        let mut publisher = TestPublisher::new(STREAM_CONFIG);

        publisher.subscribe(subscriber.clone());
        assert!(subscriber.map_inner(|s| s.has_subscription()));
        assert!(publisher.has_subscriber());

        publisher.send_complete();
        let events = subscriber.map_inner_mut(|s| s.take_events());
        assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Complete));
    }

    #[test]
    fn test_test_implementations_error() {
        let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
        let mut publisher = TestPublisher::new(STREAM_CONFIG);

        publisher.subscribe(subscriber.clone());
        assert!(subscriber.map_inner(|s| s.has_subscription()));
        assert!(publisher.has_subscriber());

        publisher.send_error(anyhow!("error"));
        let events = subscriber.map_inner_mut(|s| s.take_events());
        assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Error(_)));
    }
}
+17 −0
Original line number Diff line number Diff line
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This module provides [BufferSubscriber] implementations and helpers.

pub mod testing;
+103 −0
Original line number Diff line number Diff line
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Provides useful publishers for testing specifically. These should not be used in normal code.

use crate::{subscriptions::SharedBufferSubscription, *};

/// A [BufferPublisher] specifically for testing.
///
/// Provides users the ability to send events and read the state of the subscription.
pub struct TestPublisher {
    config: StreamConfig,
    subscriber: Option<Box<dyn BufferSubscriber>>,
    subscription: SharedBufferSubscription,
}

impl TestPublisher {
    /// Create a new [TestPublisher].
    pub fn new(config: StreamConfig) -> Self {
        Self { config, subscriber: None, subscription: SharedBufferSubscription::new() }
    }

    /// Send a [BufferSubscriber::on_next] event to an owned [BufferSubscriber] if it has any
    /// requested and returns true. Drops the frame and returns false otherwise.
    ///
    /// # Panics
    ///
    /// This will panic if there is no owned subscriber.
    pub fn send_frame(&mut self, frame: Frame) -> bool {
        let subscriber =
            self.subscriber.as_deref_mut().expect("Tried to send_frame with no subscriber");

        if self.subscription.take_request() {
            subscriber.on_next(frame);
            true
        } else {
            false
        }
    }

    /// Send a [BufferSubscriber::on_complete] event to an owned [BufferSubscriber].
    ///
    /// # Panics
    ///
    /// This will panic if there is no owned subscriber.
    pub fn send_complete(&mut self) {
        let subscriber =
            self.subscriber.as_deref_mut().expect("Tried to send_complete with no subscriber");
        subscriber.on_complete();
    }

    /// Send a [BufferSubscriber::on_error] event to an owned [BufferSubscriber].
    ///
    /// # Panics
    ///
    /// This will panic if there is no owned subscriber.
    pub fn send_error(&mut self, error: BufferError) {
        let subscriber =
            self.subscriber.as_deref_mut().expect("Tried to send_error with no subscriber");
        subscriber.on_error(error);
    }

    /// Returns whether this [BufferPublisher] owns a subscriber.
    pub fn has_subscriber(&self) -> bool {
        self.subscriber.is_some()
    }

    /// Returns the nummber of frames requested by the [BufferSubscriber].
    pub fn pending_requests(&self) -> u64 {
        self.subscription.pending_requests()
    }

    /// Returns whether the [BufferSubscriber] has cancelled the subscription.
    pub fn is_cancelled(&self) -> bool {
        self.subscription.is_cancelled()
    }
}

impl BufferPublisher for TestPublisher {
    fn get_publisher_stream_config(&self) -> crate::StreamConfig {
        self.config
    }

    fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static) {
        assert!(self.subscriber.is_none(), "TestingPublishers can only take one subscriber");
        self.subscriber = Some(Box::new(subscriber));

        if let Some(ref mut subscriber) = self.subscriber {
            subscriber.on_subscribe(self.subscription.clone_for_subscriber());
        }
    }
}
+67 −0
Original line number Diff line number Diff line
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use nativewindow::*;

/// The configuration of the buffers published by a [BufferPublisher] or
/// expected by a [BufferSubscriber].
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct StreamConfig {
    /// Width in pixels of streaming buffers.
    pub width: u32,
    /// Height in pixels of streaming buffers.
    pub height: u32,
    /// Number of layers of streaming buffers.
    pub layers: u32,
    /// Format of streaming buffers.
    pub format: AHardwareBuffer_Format::Type,
    /// Usage of streaming buffers.
    pub usage: AHardwareBuffer_UsageFlags,
    /// Stride of streaming buffers.
    pub stride: u32,
}

impl StreamConfig {
    /// Tries to create a new AHardwareBuffer from settings in a [StreamConfig].
    pub fn create_hardware_buffer(&self) -> Option<AHardwareBuffer> {
        AHardwareBuffer::new(self.width, self.height, self.layers, self.format, self.usage)
    }
}

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn test_create_hardware_buffer() {
        let config = StreamConfig {
            width: 123,
            height: 456,
            layers: 1,
            format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM,
            usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN
                | AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_WRITE_OFTEN,
            stride: 0,
        };

        let maybe_buffer = config.create_hardware_buffer();
        assert!(maybe_buffer.is_some());

        let buffer = maybe_buffer.unwrap();
        assert_eq!(config.width, buffer.width());
        assert_eq!(config.height, buffer.height());
        assert_eq!(config.format, buffer.format());
        assert_eq!(config.usage, buffer.usage());
    }
}
Loading