Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6c5dc26a authored by Rahul Arya's avatar Rahul Arya
Browse files

[Pandora] Utilities for bidirectional streaming APIs

The OnPair interface (as well as potential others in the future) are
bidirectional streaming rpcs. This CL adds a utility in Kotlin to
emulate the native-Kotlin gRPC interface on top of the Java interface
provided (Flow -> Flow rather than callback-based).

In Python, the native interface consumes an iterator and supplies a new
one. So to interleave rx and tx messages, we need multiple threads. This
CL also adds a client-side utility to simplify usage, by making it mimic
channels (which are the grpc.aio interface, as well as what Go gRPC
does). It means we consume one thread for every active streaming gRPC
call, but this is unavoidable unless we use asyncio channels, which
would require replacing all the mmi2grpc calls with async calls.

Bug: 242326310
Test: next CL
Tag: #feature
Change-Id: I526982d3163120689797919382a7dff8ff33378f
parent cc7e04e1
Loading
Loading
Loading
Loading
+21 −8
Original line number Diff line number Diff line
@@ -55,6 +55,19 @@ def generate_method(imports, file, service, method):
    output_type = import_type(imports, method.output_type)

    if input_mode == 'stream':
        if output_mode == 'stream':
            return (
                f'def {method.name}(self):\n'
                f'    from mmi2grpc._streaming import StreamWrapper\n'
                f'    return StreamWrapper(\n'
                f'        self.channel.{input_mode}_{output_mode}(\n'
                f"            '/{file.package}.{service.name}/{method.name}',\n"
                f'            request_serializer={input_type}.SerializeToString,\n'
                f'            response_deserializer={output_type}.FromString\n'
                f'        ),\n'
                f'        {input_type})'
            ).split('\n')
        else:
            return (
                f'def {method.name}(self, iterator, **kwargs):\n'
                f'    return self.channel.{input_mode}_{output_mode}(\n'
+46 −0
Original line number Diff line number Diff line
import queue


class IterableQueue:
    CLOSE = object()

    def __init__(self):
        self.queue = queue.Queue()

    def __iter__(self):
        return iter(self.queue.get, self.CLOSE)

    def put(self, value):
        self.queue.put(value)

    def close(self):
        self.put(self.CLOSE)


class StreamWrapper:

    def __init__(self, stream, ctor):
        self.tx_queue = IterableQueue()
        self.ctor = ctor

        # tx_queue is consumed on a separate thread, so
        # we don't block here
        self.rx_iter = stream(iter(self.tx_queue))

    def send(self, **kwargs):
        self.tx_queue.put(self.ctor(**kwargs))

    def __iter__(self):
        for value in self.rx_iter:
            yield value
        self.tx_queue.close()

    def recv(self):
        try:
            return next(self.rx_iter)
        except StopIteration:
            self.tx_queue.close()
            return

    def close(self):
        self.tx_queue.close()
+78 −0
Original line number Diff line number Diff line
@@ -27,12 +27,20 @@ import android.content.IntentFilter
import android.net.MacAddress
import com.google.protobuf.ByteString
import io.grpc.stub.StreamObserver
import java.util.concurrent.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
@@ -105,6 +113,76 @@ fun <T> grpcUnary(
  }
}

/**
 * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
 * taking in a Flow of gRPC requests and returning a Flow of gRPC responses and sends it on a given
 * gRPC stream observer.
 *
 * @param T the type of gRPC response.
 * @param scope coroutine scope used to run the coroutine.
 * @param responseObserver the gRPC stream observer on which to send the response.
 * @param block the suspended function transforming the request Flow to the response Flow.
 * @return a StreamObserver for the incoming requests.
 *
 * Example usage:
 * ```
 * override fun grpcMethod(
 *   request: TypeOfRequest,
 *   responseObserver: StreamObserver<TypeOfResponse> {
 *     grpcBidirectionalStream(scope, responseObserver) {
 *       block
 *     }
 *   }
 * }
 * ```
 */
@kotlinx.coroutines.ExperimentalCoroutinesApi
fun <T, U> grpcBidirectionalStream(
  scope: CoroutineScope,
  responseObserver: StreamObserver<U>,
  block: CoroutineScope.(Flow<T>) -> Flow<U>
): StreamObserver<T> {

  val inputFlow = MutableSharedFlow<T>(extraBufferCapacity = 8)
  val outputFlow = scope.block(inputFlow.asSharedFlow())

  val job =
    outputFlow
      .onEach { responseObserver.onNext(it) }
      .onCompletion { error ->
        if (error == null) {
          responseObserver.onCompleted()
        }
      }
      .catch {
        it.printStackTrace()
        responseObserver.onError(it)
      }
      .launchIn(scope)

  return object : StreamObserver<T> {
    override fun onNext(req: T) {
      // Note: this should be made a blocking call, and the handler should run in a separate thread
      // so we get flow control - but for now we can live with this
      if (!inputFlow.tryEmit(req)) {
        job.cancel(CancellationException("too many incoming requests, buffer exceeded"))
        responseObserver.onError(
          CancellationException("too many incoming requests, buffer exceeded")
        )
      }
    }

    override fun onCompleted() {
      // no-op
    }

    override fun onError(e: Throwable) {
      job.cancel()
      e.printStackTrace()
    }
  }
}

/**
 * Synchronous method to get a Bluetooth profile proxy.
 *