Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0eca2b96 authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "[Pandora] Fix coroutine scope/lifetime issues"

parents 4a998e3f 053e7984
Loading
Loading
Loading
Loading
+21 −19
Original line number Diff line number Diff line
@@ -30,13 +30,13 @@ import io.grpc.stub.StreamObserver
import java.util.concurrent.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onCompletion
@@ -97,11 +97,11 @@ fun <T> grpcUnary(
  scope: CoroutineScope,
  responseObserver: StreamObserver<T>,
  timeout: Long = 60,
  block: suspend CoroutineScope.() -> T
  block: suspend () -> T
): Job {
  return scope.launch {
    try {
      val response = withTimeout(timeout * 1000, block)
      val response = withTimeout(timeout * 1000) { block() }
      responseObserver.onNext(response)
      responseObserver.onCompleted()
    } catch (e: Throwable) {
@@ -141,11 +141,11 @@ fun <T, U> grpcBidirectionalStream(
  block: CoroutineScope.(Flow<T>) -> Flow<U>
): StreamObserver<T> {

  val inputFlow = MutableSharedFlow<T>(extraBufferCapacity = 8)
  val outputFlow = scope.block(inputFlow.asSharedFlow())
  val inputChannel = Channel<T>()

  val job =
    outputFlow
    scope.launch {
      block(inputChannel.consumeAsFlow())
        .onEach { responseObserver.onNext(it) }
        .onCompletion { error ->
          if (error == null) {
@@ -156,13 +156,14 @@ fun <T, U> grpcBidirectionalStream(
          it.printStackTrace()
          responseObserver.onError(it)
        }
      .launchIn(scope)
        .launchIn(this)
    }

  return object : StreamObserver<T> {
    override fun onNext(req: T) {
      // Note: this should be made a blocking call, and the handler should run in a separate thread
      // so we get flow control - but for now we can live with this
      if (!inputFlow.tryEmit(req)) {
      if (!inputChannel.offer(req)) {
        job.cancel(CancellationException("too many incoming requests, buffer exceeded"))
        responseObserver.onError(
          CancellationException("too many incoming requests, buffer exceeded")
@@ -171,7 +172,8 @@ fun <T, U> grpcBidirectionalStream(
    }

    override fun onCompleted() {
      job.cancel()
      // stop the input flow, but keep the job running
      inputChannel.close()
    }

    override fun onError(e: Throwable) {