Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 69b5aa3d authored by Rahul Arya's avatar Rahul Arya Committed by Thomas Girardier
Browse files

[Pandora] Fix coroutine scope/lifetime issues

Some glitchy utils mean that we can leak tasks beyond the scope of the
request handler. Ideally we should move the globalScope into Utils.kt so
it's not exposed to users at all.

Tag: #stability
Bug: 245578454
Test: Test: atest pts-bot
Ignore-AOSP-First: Cherry-pick from aosp
Merged-In: I6e3b9f0b7b3faac48739a11caf3f0587c99dd3e5
Change-Id: I6e3b9f0b7b3faac48739a11caf3f0587c99dd3e5
parent c0d23675
Loading
Loading
Loading
Loading
+21 −19
Original line number Diff line number Diff line
@@ -36,13 +36,13 @@ import java.util.concurrent.CancellationException
import java.util.stream.Collectors
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onCompletion
@@ -109,11 +109,11 @@ fun <T> grpcUnary(
  scope: CoroutineScope,
  responseObserver: StreamObserver<T>,
  timeout: Long = 60,
  block: suspend CoroutineScope.() -> T
  block: suspend () -> T
): Job {
  return scope.launch {
    try {
      val response = withTimeout(timeout * 1000, block)
      val response = withTimeout(timeout * 1000) { block() }
      responseObserver.onNext(response)
      responseObserver.onCompleted()
    } catch (e: Throwable) {
@@ -153,11 +153,11 @@ fun <T, U> grpcBidirectionalStream(
  block: CoroutineScope.(Flow<T>) -> Flow<U>
): StreamObserver<T> {

  val inputFlow = MutableSharedFlow<T>(extraBufferCapacity = 8)
  val outputFlow = scope.block(inputFlow.asSharedFlow())
  val inputChannel = Channel<T>()

  val job =
    outputFlow
    scope.launch {
      block(inputChannel.consumeAsFlow())
        .onEach { responseObserver.onNext(it) }
        .onCompletion { error ->
          if (error == null) {
@@ -168,13 +168,14 @@ fun <T, U> grpcBidirectionalStream(
          it.printStackTrace()
          responseObserver.onError(it)
        }
      .launchIn(scope)
        .launchIn(this)
    }

  return object : StreamObserver<T> {
    override fun onNext(req: T) {
      // Note: this should be made a blocking call, and the handler should run in a separate thread
      // so we get flow control - but for now we can live with this
      if (!inputFlow.tryEmit(req)) {
      if (!inputChannel.offer(req)) {
        job.cancel(CancellationException("too many incoming requests, buffer exceeded"))
        responseObserver.onError(
          CancellationException("too many incoming requests, buffer exceeded")
@@ -183,7 +184,8 @@ fun <T, U> grpcBidirectionalStream(
    }

    override fun onCompleted() {
      job.cancel()
      // stop the input flow, but keep the job running
      inputChannel.close()
    }

    override fun onError(e: Throwable) {