Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 3a479a78 authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "[Pandora] Utilities for bidirectional streaming APIs"

parents 3604139e 6c5dc26a
Loading
Loading
Loading
Loading
+21 −8
Original line number Diff line number Diff line
@@ -55,6 +55,19 @@ def generate_method(imports, file, service, method):
    output_type = import_type(imports, method.output_type)

    if input_mode == 'stream':
        if output_mode == 'stream':
            return (
                f'def {method.name}(self):\n'
                f'    from mmi2grpc._streaming import StreamWrapper\n'
                f'    return StreamWrapper(\n'
                f'        self.channel.{input_mode}_{output_mode}(\n'
                f"            '/{file.package}.{service.name}/{method.name}',\n"
                f'            request_serializer={input_type}.SerializeToString,\n'
                f'            response_deserializer={output_type}.FromString\n'
                f'        ),\n'
                f'        {input_type})'
            ).split('\n')
        else:
            return (
                f'def {method.name}(self, iterator, **kwargs):\n'
                f'    return self.channel.{input_mode}_{output_mode}(\n'
+46 −0
Original line number Diff line number Diff line
import queue


class IterableQueue:
    CLOSE = object()

    def __init__(self):
        self.queue = queue.Queue()

    def __iter__(self):
        return iter(self.queue.get, self.CLOSE)

    def put(self, value):
        self.queue.put(value)

    def close(self):
        self.put(self.CLOSE)


class StreamWrapper:

    def __init__(self, stream, ctor):
        self.tx_queue = IterableQueue()
        self.ctor = ctor

        # tx_queue is consumed on a separate thread, so
        # we don't block here
        self.rx_iter = stream(iter(self.tx_queue))

    def send(self, **kwargs):
        self.tx_queue.put(self.ctor(**kwargs))

    def __iter__(self):
        for value in self.rx_iter:
            yield value
        self.tx_queue.close()

    def recv(self):
        try:
            return next(self.rx_iter)
        except StopIteration:
            self.tx_queue.close()
            return

    def close(self):
        self.tx_queue.close()
+78 −0
Original line number Diff line number Diff line
@@ -27,12 +27,20 @@ import android.content.IntentFilter
import android.net.MacAddress
import com.google.protobuf.ByteString
import io.grpc.stub.StreamObserver
import java.util.concurrent.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
@@ -105,6 +113,76 @@ fun <T> grpcUnary(
  }
}

/**
 * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
 * taking in a Flow of gRPC requests and returning a Flow of gRPC responses and sends it on a given
 * gRPC stream observer.
 *
 * @param T the type of gRPC response.
 * @param scope coroutine scope used to run the coroutine.
 * @param responseObserver the gRPC stream observer on which to send the response.
 * @param block the suspended function transforming the request Flow to the response Flow.
 * @return a StreamObserver for the incoming requests.
 *
 * Example usage:
 * ```
 * override fun grpcMethod(
 *   request: TypeOfRequest,
 *   responseObserver: StreamObserver<TypeOfResponse> {
 *     grpcBidirectionalStream(scope, responseObserver) {
 *       block
 *     }
 *   }
 * }
 * ```
 */
@kotlinx.coroutines.ExperimentalCoroutinesApi
fun <T, U> grpcBidirectionalStream(
  scope: CoroutineScope,
  responseObserver: StreamObserver<U>,
  block: CoroutineScope.(Flow<T>) -> Flow<U>
): StreamObserver<T> {

  val inputFlow = MutableSharedFlow<T>(extraBufferCapacity = 8)
  val outputFlow = scope.block(inputFlow.asSharedFlow())

  val job =
    outputFlow
      .onEach { responseObserver.onNext(it) }
      .onCompletion { error ->
        if (error == null) {
          responseObserver.onCompleted()
        }
      }
      .catch {
        it.printStackTrace()
        responseObserver.onError(it)
      }
      .launchIn(scope)

  return object : StreamObserver<T> {
    override fun onNext(req: T) {
      // Note: this should be made a blocking call, and the handler should run in a separate thread
      // so we get flow control - but for now we can live with this
      if (!inputFlow.tryEmit(req)) {
        job.cancel(CancellationException("too many incoming requests, buffer exceeded"))
        responseObserver.onError(
          CancellationException("too many incoming requests, buffer exceeded")
        )
      }
    }

    override fun onCompleted() {
      // no-op
    }

    override fun onError(e: Throwable) {
      job.cancel()
      e.printStackTrace()
    }
  }
}

/**
 * Synchronous method to get a Bluetooth profile proxy.
 *