Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Unverified Commit c8cd6d78 authored by Ricki Hirner's avatar Ricki Hirner
Browse files

[WebDAV] Add timeout for RandomAccessCallback notification (bitfireAT/davx5#408)

* [WIP] Add timeout for RandomAccessCallback

* Use state machine to handle timeout

* Use sealed class for states, guard callback access with correct states
parent 4ce6fcbf
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -183,6 +183,7 @@ dependencies {
    //noinspection GradleDependency - don't update until API level 26 (Android 8) is the minimum API [https://github.com/bitfireAT/davx5/issues/130]
    implementation 'commons-io:commons-io:2.8.0'
    implementation 'dnsjava:dnsjava:3.5.2'
    implementation "io.github.nsk90:kstatemachine-jvm:0.22.1"
    implementation 'net.openid:appauth:0.11.1'
    implementation "org.apache.commons:commons-collections4:${versions.commonsCollections}"
    implementation "org.apache.commons:commons-lang3:${versions.commonsLang}"
+5 −3
Original line number Diff line number Diff line
@@ -30,13 +30,14 @@ import at.bitfire.dav4jvm.DavResource
import at.bitfire.dav4jvm.Response
import at.bitfire.dav4jvm.exception.HttpException
import at.bitfire.dav4jvm.property.*
import at.bitfire.davdroid.network.HttpClient
import at.bitfire.davdroid.network.MemoryCookieStore
import at.bitfire.davdroid.R
import at.bitfire.davdroid.db.AppDatabase
import at.bitfire.davdroid.db.WebDavDocument
import at.bitfire.davdroid.log.Logger
import at.bitfire.davdroid.network.HttpClient
import at.bitfire.davdroid.network.MemoryCookieStore
import at.bitfire.davdroid.ui.webdav.WebdavMountsActivity
import at.bitfire.davdroid.webdav.DavDocumentsProvider.DavDocumentsActor
import at.bitfire.davdroid.webdav.cache.HeadResponseCache
import dagger.hilt.EntryPoint
import dagger.hilt.InstallIn
@@ -469,6 +470,7 @@ class DavDocumentsProvider: DocumentsProvider() {
        }
        Logger.log.info("Received file info: $fileInfo")

        // RandomAccessCallback.Wrapper / StreamingFileDescriptor are responsible for closing httpClient
        return if (
            Build.VERSION.SDK_INT >= 26 &&      // openProxyFileDescriptor exists since Android 8.0
            readAccess &&                       // WebDAV doesn't support random write access natively
@@ -477,7 +479,7 @@ class DavDocumentsProvider: DocumentsProvider() {
            fileInfo.supportsPartial != false   // WebDAV server must support random access
        ) {
            val accessor = RandomAccessCallback.Wrapper(ourContext, client, url, doc.mimeType, fileInfo, signal)
            storageManager.openProxyFileDescriptor(modeFlags, accessor, accessor.callback!!.workerHandler)
            storageManager.openProxyFileDescriptor(modeFlags, accessor, accessor.workerHandler)
        } else {
            val fd = StreamingFileDescriptor(ourContext, client, url, doc.mimeType, signal) { transferred ->
                // called when transfer is finished
+138 −24
Original line number Diff line number Diff line
@@ -26,16 +26,32 @@ import at.bitfire.davdroid.network.HttpClient
import at.bitfire.davdroid.ui.NotificationUtils
import at.bitfire.davdroid.ui.NotificationUtils.notifyIfPossible
import at.bitfire.davdroid.util.DavUtils
import at.bitfire.davdroid.webdav.RandomAccessCallback.Wrapper.Companion.TIMEOUT_INTERVAL
import at.bitfire.davdroid.webdav.cache.MemoryCache
import at.bitfire.davdroid.webdav.cache.SegmentedCache
import okhttp3.Headers
import okhttp3.HttpUrl
import okhttp3.MediaType
import org.apache.commons.io.FileUtils
import ru.nsk.kstatemachine.DefaultState
import ru.nsk.kstatemachine.Event
import ru.nsk.kstatemachine.FinalState
import ru.nsk.kstatemachine.StateMachine
import ru.nsk.kstatemachine.addFinalState
import ru.nsk.kstatemachine.addInitialState
import ru.nsk.kstatemachine.createStdLibStateMachine
import ru.nsk.kstatemachine.onEntry
import ru.nsk.kstatemachine.onExit
import ru.nsk.kstatemachine.onFinished
import ru.nsk.kstatemachine.processEventBlocking
import ru.nsk.kstatemachine.transition
import java.io.InterruptedIOException
import java.lang.ref.WeakReference
import java.net.HttpURLConnection
import java.util.Timer
import java.util.TimerTask
import java.util.logging.Level
import kotlin.concurrent.schedule

typealias MemorySegmentCache = MemoryCache<SegmentedCache.SegmentKey<RandomAccessCallback.DocumentKey>>

@@ -89,16 +105,11 @@ class RandomAccessCallback private constructor(
        .setOngoing(true)
    val notificationTag = url.toString()

    private val workerThread = HandlerThread(javaClass.simpleName).apply { start() }
    val workerHandler: Handler = Handler(workerThread.looper)

    val memoryCache = getMemoryCache(context)
    val cache = SegmentedCache(PAGE_SIZE, this, memoryCache)


    override fun onFsync() {
        Logger.log.fine("onFsync")
    }
    override fun onFsync() { /* not used */ }

    override fun onGetSize(): Long {
        Logger.log.fine("onGetFileSize $url")
@@ -140,9 +151,6 @@ class RandomAccessCallback private constructor(
    }

    override fun onRelease() {
        workerThread.quit()
        httpClient.close()

        notificationManager.cancel(notificationTag, NotificationUtils.NOTIFY_WEBDAV_ACCESS)
    }

@@ -204,34 +212,140 @@ class RandomAccessCallback private constructor(
     * a [Map]), but is not unregistered anymore. So it stays in the memory until the whole mount
     * is unloaded. See https://issuetracker.google.com/issues/208788568
     *
     * Use this wrapper to ensure that all memory is released as soon as [onRelease] is called.
     * Use this wrapper to
     *
     * - ensure that all memory is released as soon as [onRelease] is called,
     * - provide timeout functionality: [RandomAccessCallback] will be closed when not
     * used for more than [TIMEOUT_INTERVAL] ms and re-created when necessary.
     *
     * @param httpClient    HTTP client – [Wrapper] is responsible to close it
     */
    class Wrapper(
        context: Context,
        httpClient: HttpClient,
        url: HttpUrl,
        mimeType: MediaType?,
        headResponse: HeadResponse,
        cancellationSignal: CancellationSignal?
        val context: Context,
        val httpClient: HttpClient,
        val url: HttpUrl,
        val mimeType: MediaType?,
        val headResponse: HeadResponse,
        val cancellationSignal: CancellationSignal?
    ): ProxyFileDescriptorCallback() {

        var callback: RandomAccessCallback? = RandomAccessCallback(context, httpClient, url, mimeType, headResponse, cancellationSignal)
        companion object {
            val TIMEOUT_INTERVAL = 15000L
        }

        sealed class Events {
            object Transfer : Event
            object NowIdle : Event
            object GoStandby : Event
            object Close : Event
        }
        sealed class States : DefaultState() {
            object Active: States() {
                object Transferring: States()
                object Idle: States()
            }
            object Standby: States()
            object Closed: States(), FinalState
        }
        val machine = createStdLibStateMachine {
            addInitialState(States.Active) {
                onEntry {
                    _callback = RandomAccessCallback(context, httpClient, url, mimeType, headResponse, cancellationSignal)
                }
                onExit {
                    _callback?.onRelease()
                    _callback = null
                }

                transition<Events.GoStandby>(targetState = States.Standby)
                transition<Events.Close>(targetState = States.Closed)

                // active has two nested states: transferring (I/O running) and idle (starts timeout timer)
                addInitialState(States.Active.Idle) {
                    val timer: Timer = Timer(true)
                    var timeout: TimerTask? = null

                    onEntry {
                        timeout = timer.schedule(TIMEOUT_INTERVAL) {
                            machine.processEventBlocking(Events.GoStandby)
                        }
                    }
                    onExit {
                        timeout?.cancel()
                        timeout = null
                    }
                    onFinished {
                        timer.cancel()
                    }

                    transition<Events.Transfer>(targetState = States.Active.Transferring)
                }

                addState(States.Active.Transferring) {
                    transition<Events.NowIdle>(targetState = States.Active.Idle)
                }
            }

            addState(States.Standby) {
                transition<Events.Transfer>(targetState = States.Active.Transferring)
                transition<Events.NowIdle>(targetState = States.Active.Idle)
                transition<Events.Close>(targetState = States.Closed)
            }

            addFinalState(States.Closed)
            onFinished {
                shutdown()
            }

            logger = StateMachine.Logger { message ->
                Logger.log.fine(message())
            }
        }

        private val workerThread = HandlerThread(javaClass.simpleName).apply { start() }
        val workerHandler: Handler = Handler(workerThread.looper)

        private var _callback: RandomAccessCallback? = null

        fun<T> requireCallback(block: (callback: RandomAccessCallback) -> T): T {
            machine.processEventBlocking(Events.Transfer)
            try {
                return block(_callback ?: throw IllegalStateException())
            } finally {
                machine.processEventBlocking(Events.NowIdle)
            }
        }


        /// states ///

        @Synchronized
        private fun shutdown() {
            httpClient.close()
            workerThread.quit()
        }


        override fun onFsync() =
            callback?.onFsync() ?: throw IllegalStateException("Must not be called after onRelease()")
        /// delegating implementation of ProxyFileDescriptorCallback ///

        @Synchronized
        override fun onFsync() { /* not used */ }

        @Synchronized
        override fun onGetSize() =
            callback?.onGetSize() ?: throw IllegalStateException("Must not be called after onRelease()")
            requireCallback { it.onGetSize() }

        @Synchronized
        override fun onRead(offset: Long, size: Int, data: ByteArray) =
            callback?.onRead(offset, size, data) ?: throw IllegalStateException("Must not be called after onRelease()")
            requireCallback { it.onRead(offset, size, data) }

        @Synchronized
        override fun onWrite(offset: Long, size: Int, data: ByteArray) =
            callback?.onWrite(offset, size, data) ?: throw IllegalStateException("Must not be called after onRelease()")
            requireCallback { it.onWrite(offset, size, data) }

        @Synchronized
        override fun onRelease() {
            callback?.onRelease()
            callback = null
            machine.processEventBlocking(Events.Close)
        }

    }
+5 −0
Original line number Diff line number Diff line
@@ -28,6 +28,9 @@ import java.io.IOException
import java.util.logging.Level
import kotlin.concurrent.thread

/**
 * @param client    HTTP client– [StreamingFileDescriptor] is responsible to close it
 */
class StreamingFileDescriptor(
    val context: Context,
    val client: HttpClient,
@@ -73,6 +76,8 @@ class StreamingFileDescriptor(
            } catch (e: Exception) {
                Logger.log.log(Level.INFO, "Couldn't serve file (not necessesarily an error)", e)
                writeFd.closeWithError(e.message)
            } finally {
                client.close()
            }

            try {