Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 053e7984 authored by Rahul Arya's avatar Rahul Arya
Browse files

[Pandora] Fix coroutine scope/lifetime issues

Some glitchy utils mean that we can leak tasks beyond the scope of the
request handler. Ideally we should move the globalScope into Utils.kt so
it's not exposed to users at all.

Bug: 244782047
Tag: #stability
Test: atest pts-bot
Change-Id: I6e3b9f0b7b3faac48739a11caf3f0587c99dd3e5
parent 1dfdf11b
Loading
Loading
Loading
Loading
+21 −19
Original line number Diff line number Diff line
@@ -30,13 +30,13 @@ import io.grpc.stub.StreamObserver
import java.util.concurrent.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onCompletion
@@ -97,11 +97,11 @@ fun <T> grpcUnary(
  scope: CoroutineScope,
  responseObserver: StreamObserver<T>,
  timeout: Long = 60,
  block: suspend CoroutineScope.() -> T
  block: suspend () -> T
): Job {
  return scope.launch {
    try {
      val response = withTimeout(timeout * 1000, block)
      val response = withTimeout(timeout * 1000) { block() }
      responseObserver.onNext(response)
      responseObserver.onCompleted()
    } catch (e: Throwable) {
@@ -141,11 +141,11 @@ fun <T, U> grpcBidirectionalStream(
  block: CoroutineScope.(Flow<T>) -> Flow<U>
): StreamObserver<T> {

  val inputFlow = MutableSharedFlow<T>(extraBufferCapacity = 8)
  val outputFlow = scope.block(inputFlow.asSharedFlow())
  val inputChannel = Channel<T>()

  val job =
    outputFlow
    scope.launch {
      block(inputChannel.consumeAsFlow())
        .onEach { responseObserver.onNext(it) }
        .onCompletion { error ->
          if (error == null) {
@@ -156,13 +156,14 @@ fun <T, U> grpcBidirectionalStream(
          it.printStackTrace()
          responseObserver.onError(it)
        }
      .launchIn(scope)
        .launchIn(this)
    }

  return object : StreamObserver<T> {
    override fun onNext(req: T) {
      // Note: this should be made a blocking call, and the handler should run in a separate thread
      // so we get flow control - but for now we can live with this
      if (!inputFlow.tryEmit(req)) {
      if (!inputChannel.offer(req)) {
        job.cancel(CancellationException("too many incoming requests, buffer exceeded"))
        responseObserver.onError(
          CancellationException("too many incoming requests, buffer exceeded")
@@ -171,7 +172,8 @@ fun <T, U> grpcBidirectionalStream(
    }

    override fun onCompleted() {
      job.cancel()
      // stop the input flow, but keep the job running
      inputChannel.close()
    }

    override fun onError(e: Throwable) {