Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0c3a14d5 authored by Philipp Heckel's avatar Philipp Heckel
Browse files

remove connection listener, move repo into connectionmgr

parent 43b3aec3
Loading
Loading
Loading
Loading
+15 −20
Original line number Diff line number Diff line
@@ -18,13 +18,13 @@ import io.heckel.ntfy.data.*
import io.heckel.ntfy.detail.DetailActivity
import kotlin.random.Random

const val TOPIC_ID = "topic_id"
const val SUBSCRIPTION_ID = "topic_id"
const val TOPIC_NAME = "topic_name"
const val TOPIC_BASE_URL = "base_url"
const val SERVICE_BASE_URL = "base_url"

class MainActivity : AppCompatActivity() {
    private val newTopicActivityRequestCode = 1
    private val topicsViewModel by viewModels<SubscriptionViewModel> {
    private val newSubscriptionActivityRequestCode = 1
    private val subscriptionViewModel by viewModels<SubscriptionsViewModel> {
        SubscriptionsViewModelFactory()
    }

@@ -39,11 +39,11 @@ class MainActivity : AppCompatActivity() {
        }

        // Update main list based on topicsViewModel (& its datasource/livedata)
        val adapter = TopicsAdapter { topic -> topicOnClick(topic) }
        val adapter = TopicsAdapter { topic -> subscriptionOnClick(topic) }
        val recyclerView: RecyclerView = findViewById(R.id.recycler_view)
        recyclerView.adapter = adapter

        topicsViewModel.list().observe(this) {
        subscriptionViewModel.list().observe(this) {
            it?.let {
                println("new data arrived: $it")
                adapter.submitList(it as MutableList<Subscription>)
@@ -52,36 +52,31 @@ class MainActivity : AppCompatActivity() {

        // Set up notification channel
        createNotificationChannel()
        topicsViewModel.setListener(object : NotificationListener {
            override fun onNotification(subscriptionId: Long, notification: Notification) {
                displayNotification(notification)
            }
        })
        subscriptionViewModel.setListener { n -> displayNotification(n) }
    }

    /* Opens TopicDetailActivity when RecyclerView item is clicked. */
    private fun topicOnClick(topic: Subscription) {
    /* Opens detail view when list item is clicked. */
    private fun subscriptionOnClick(subscription: Subscription) {
        val intent = Intent(this, DetailActivity()::class.java)
        intent.putExtra(TOPIC_ID, topic.id)
        intent.putExtra(SUBSCRIPTION_ID, subscription.id)
        startActivity(intent)
    }

    /* Adds topic to topicList when FAB is clicked. */
    private fun fabOnClick() {
        val intent = Intent(this, AddTopicActivity::class.java)
        startActivityForResult(intent, newTopicActivityRequestCode)
        startActivityForResult(intent, newSubscriptionActivityRequestCode)
    }

    override fun onActivityResult(requestCode: Int, resultCode: Int, intentData: Intent?) {
        super.onActivityResult(requestCode, resultCode, intentData)

        if (requestCode == newTopicActivityRequestCode && resultCode == Activity.RESULT_OK) {
        if (requestCode == newSubscriptionActivityRequestCode && resultCode == Activity.RESULT_OK) {
            intentData?.let { data ->
                val name = data.getStringExtra(TOPIC_NAME) ?: return
                val baseUrl = data.getStringExtra(TOPIC_BASE_URL) ?: return
                val topic = Subscription(Random.nextLong(), name, baseUrl, Status.CONNECTING, 0)

                topicsViewModel.add(topic)
                val baseUrl = data.getStringExtra(SERVICE_BASE_URL) ?: return
                val subscription = Subscription(Random.nextLong(), name, baseUrl, Status.CONNECTING, 0)
                subscriptionViewModel.add(subscription)
            }
        }
    }
+46 −0
Original line number Diff line number Diff line
@@ -3,15 +3,13 @@ package io.heckel.ntfy
import androidx.lifecycle.LiveData
import androidx.lifecycle.ViewModel
import androidx.lifecycle.ViewModelProvider
import androidx.lifecycle.viewModelScope
import io.heckel.ntfy.data.*
import kotlin.collections.List


class SubscriptionViewModel(private val repository: Repository, private val connectionManager: ConnectionManager) : ViewModel() {
class SubscriptionsViewModel(private val repository: Repository, private val connectionManager: ConnectionManager) : ViewModel() {
    fun add(topic: Subscription) {
        repository.add(topic)
        connectionManager.start(topic, viewModelScope)
        connectionManager.start(topic)
    }

    fun get(id: Long) : Subscription? {
@@ -28,26 +26,7 @@ class SubscriptionViewModel(private val repository: Repository, private val conn
    }

    fun setListener(listener: NotificationListener) {
        connectionManager.setListener(object : ConnectionListener {
            override fun onStatusChanged(subcriptionId: Long, status: Status) {
                println("onStatusChanged($subcriptionId, $status)")
                val topic = repository.get(subcriptionId)
                if (topic != null) {
                    println("-> old topic: $topic")
                    repository.update(topic.copy(status = status))
                }
            }

            override fun onNotification(subscriptionId: Long, notification: Notification) {
                println("onNotification($subscriptionId, $notification)")
                val topic = repository.get(subscriptionId)
                if (topic != null) {
                    println("-> old topic: $topic")
                    repository.update(topic.copy(messages = topic.messages + 1))
                }
                listener.onNotification(subscriptionId, notification) // Forward downstream
            }
        })
        connectionManager.setListener(listener)
    }
}

@@ -56,10 +35,10 @@ class SubscriptionsViewModelFactory : ViewModelProvider.Factory {
    override fun <T : ViewModel?> create(modelClass: Class<T>) =
        with(modelClass){
            when {
                isAssignableFrom(SubscriptionViewModel::class.java) -> {
                isAssignableFrom(SubscriptionsViewModel::class.java) -> {
                    val repository = Repository.getInstance()
                    val connectionManager = ConnectionManager.getInstance()
                    SubscriptionViewModel(repository, connectionManager) as T
                    val connectionManager = ConnectionManager.getInstance(repository)
                    SubscriptionsViewModel(repository, connectionManager) as T
                }
                else -> throw IllegalArgumentException("Unknown viewModel class $modelClass")
            }
+2 −2
Original line number Diff line number Diff line
@@ -7,7 +7,7 @@ import android.widget.Button
import androidx.appcompat.app.AppCompatActivity
import com.google.android.material.textfield.TextInputEditText
import io.heckel.ntfy.R
import io.heckel.ntfy.TOPIC_BASE_URL
import io.heckel.ntfy.SERVICE_BASE_URL
import io.heckel.ntfy.TOPIC_NAME

class AddTopicActivity : AppCompatActivity() {
@@ -39,7 +39,7 @@ class AddTopicActivity : AppCompatActivity() {
            setResult(Activity.RESULT_CANCELED, resultIntent)
        } else {
            resultIntent.putExtra(TOPIC_NAME, topicName.text.toString())
            resultIntent.putExtra(TOPIC_BASE_URL, baseUrl.text.toString())
            resultIntent.putExtra(SERVICE_BASE_URL, baseUrl.text.toString())
            setResult(Activity.RESULT_OK, resultIntent)
        }
        finish()
+42 −34
Original line number Diff line number Diff line
@@ -2,80 +2,88 @@ package io.heckel.ntfy.data

import com.google.gson.GsonBuilder
import com.google.gson.JsonObject
import com.google.gson.JsonSyntaxException
import kotlinx.coroutines.*
import java.io.IOException
import java.net.HttpURLConnection
import java.net.URL

const val READ_TIMEOUT = 60_000 // Keep alive every 30s assumed

class ConnectionManager {
class ConnectionManager(private val repository: Repository) {
    private val jobs = mutableMapOf<Long, Job>()
    private val gson = GsonBuilder().create()
    private var listener: ConnectionListener? = null;
    private var listener: NotificationListener? = null;

    fun start(subscription: Subscription, scope: CoroutineScope) {
        jobs[subscription.id] = launchConnection(subscription, scope)
    fun start(s: Subscription) {
        jobs[s.id] = launchConnection(s.id, topicJsonUrl(s))
    }

    fun stop(subscription: Subscription) {
        jobs.remove(subscription.id)?.cancel() // Cancel coroutine and remove
    fun stop(s: Subscription) {
        jobs.remove(s.id)?.cancel() // Cancel coroutine and remove
    }

    fun setListener(listener: ConnectionListener) {
        this.listener = listener
    fun setListener(l: NotificationListener) {
        this.listener = l
    }

    private fun launchConnection(subscription: Subscription, scope: CoroutineScope): Job {
        return scope.launch(Dispatchers.IO) {
    private fun launchConnection(subscriptionId: Long, topicUrl: String): Job {
        return GlobalScope.launch(Dispatchers.IO) {
            while (isActive) {
                openConnection(this, subscription)
                openConnection(subscriptionId, topicUrl)
                delay(5000) // TODO exponential back-off
            }
        }
    }

    private fun openConnection(scope: CoroutineScope, subscription: Subscription) {
        val url = "${subscription.baseUrl}/${subscription.topic}/json"
        println("Connecting to $url ...")
        val conn = (URL(url).openConnection() as HttpURLConnection).also {
    private fun openConnection(subscriptionId: Long, topicUrl: String) {
        println("Connecting to $topicUrl ...")
        val conn = (URL(topicUrl).openConnection() as HttpURLConnection).also {
            it.doInput = true
            it.readTimeout = READ_TIMEOUT
        }
        try {
            listener?.onStatusChanged(subscription.id, Status.CONNECTED)
            updateStatus(subscriptionId, Status.CONNECTED)
            val input = conn.inputStream.bufferedReader()
            while (scope.isActive) {
            while (GlobalScope.isActive) {
                val line = input.readLine() ?: break // Break if EOF is reached, i.e. readLine is null
                if (!scope.isActive) {
                if (!GlobalScope.isActive) {
                    break // Break if scope is not active anymore; readLine blocks for a while, so we want to be sure
                }
                try {
                val json = gson.fromJson(line, JsonObject::class.java) ?: break // Break on unexpected line
                    if (!json.isJsonNull && !json.has("event") && json.has("message")) {
                        val message = json.get("message").asString
                        listener?.onNotification(subscription.id, Notification(subscription, message))
                    }
                } catch (e: JsonSyntaxException) {
                    break // Break on unexpected line
                val validNotification = !json.isJsonNull
                        && !json.has("event") // No keepalive or open messages
                        && json.has("message")
                if (validNotification) {
                    notify(subscriptionId, json.get("message").asString)
                }
            }
        } catch (e: IOException) {
            println("Connection error: " + e.message)
        } catch (e: Exception) {
            println("Connection error: " + e)
        } finally {
            conn.disconnect()
        }
        listener?.onStatusChanged(subscription.id, Status.CONNECTING)
        println("Connection terminated: $url")
        updateStatus(subscriptionId, Status.CONNECTING)
        println("Connection terminated: $topicUrl")
    }

    private fun updateStatus(subscriptionId: Long, status: Status) {
        val subscription = repository.get(subscriptionId)
        repository.update(subscription?.copy(status = status))
    }

    private fun notify(subscriptionId: Long, message: String) {
        val subscription = repository.get(subscriptionId)
        if (subscription != null) {
            listener?.let { it(Notification(subscription, message)) }
            repository.update(subscription.copy(messages = subscription.messages + 1))
        }
    }

    companion object {
        private var instance: ConnectionManager? = null

        fun getInstance(): ConnectionManager {
        fun getInstance(repository: Repository): ConnectionManager {
            return synchronized(ConnectionManager::class) {
                val newInstance = instance ?: ConnectionManager()
                val newInstance = instance ?: ConnectionManager(repository)
                instance = newInstance
                newInstance
            }
+2 −7
Original line number Diff line number Diff line
@@ -17,13 +17,8 @@ data class Notification(
    val message: String
)

interface NotificationListener {
    fun onNotification(subscriptionId: Long, notification: Notification)
}

interface ConnectionListener : NotificationListener {
    fun onStatusChanged(subcriptionId: Long, status: Status)
}
typealias NotificationListener = (notification: Notification) -> Unit

fun topicUrl(s: Subscription) = "${s.baseUrl}/${s.topic}"
fun topicJsonUrl(s: Subscription) = "${s.baseUrl}/${s.topic}/json"
fun topicShortUrl(s: Subscription) = topicUrl(s).replace("http://", "").replace("https://", "")
Loading