Loading app/src/main/java/io/heckel/ntfy/service/WsConnection.kt +29 −33 Original line number Diff line number Diff line Loading @@ -6,13 +6,11 @@ import android.os.Handler import android.os.Looper import io.heckel.ntfy.db.* import io.heckel.ntfy.log.Log import io.heckel.ntfy.msg.ApiService import io.heckel.ntfy.msg.ApiService.Companion.requestBuilder import io.heckel.ntfy.msg.NotificationParser import io.heckel.ntfy.util.topicUrl import io.heckel.ntfy.util.topicShortUrl import io.heckel.ntfy.util.topicUrlWs import okhttp3.* import java.nio.charset.StandardCharsets import java.util.* import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong Loading Loading @@ -45,7 +43,7 @@ class WsConnection( .build() private var errorCount = 0 private var webSocket: WebSocket? = null private val webSocketId = AtomicLong(0) private val listenerId = AtomicLong(0) private var state: State? = null private var closed = false Loading @@ -54,23 +52,23 @@ class WsConnection( private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds private val subscriptionIds = topicsToSubscriptionIds.values private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",") private val url = topicUrl(baseUrl, topicsStr) private val shortUrl = topicShortUrl(baseUrl, topicsStr) @Synchronized override fun start() { if (closed || state == State.Connecting || state == State.Connected) { Log.d(TAG,"[$url] WebSocket: Not (re-)starting, because connection is marked closed/connecting/connected") Log.d(TAG,"$shortUrl: Not (re-)starting, because connection is marked closed/connecting/connected") return } if (webSocket != null) { webSocket!!.close(WS_CLOSE_NORMAL, "") } state = State.Connecting val nextId = webSocketId.incrementAndGet() val nextId = listenerId.incrementAndGet() val sinceVal = if (since == 0L) "all" else since.toString() val urlWithSince = topicUrlWs(baseUrl, topicsStr, sinceVal) val request = requestBuilder(urlWithSince, user).build() Log.d(TAG, "[$url] WebSocket: Opening $urlWithSince with listener ID $nextId ...") Log.d(TAG, "$shortUrl: Opening $urlWithSince with listener ID $nextId ...") webSocket = client.newWebSocket(request, Listener(nextId)) } Loading @@ -78,10 +76,10 @@ class WsConnection( override fun close() { closed = true if (webSocket == null) { Log.d(TAG,"[$url] WebSocket: Not closing existing connection, because there is no active web socket") Log.d(TAG,"$shortUrl: Not closing existing connection, because there is no active web socket") return } Log.d(TAG, "[$url] WebSocket: Closing existing connection") Log.d(TAG, "$shortUrl: Closing existing connection") state = State.Disconnected webSocket!!.close(WS_CLOSE_NORMAL, "") webSocket = null Loading @@ -95,17 +93,17 @@ class WsConnection( @Synchronized fun scheduleReconnect(seconds: Int) { if (closed || state == State.Connecting || state == State.Connected) { Log.d(TAG,"[$url] WebSocket: Not rescheduling connection, because connection is marked closed/connecting/connected") Log.d(TAG,"$shortUrl: Not rescheduling connection, because connection is marked closed/connecting/connected") return } state = State.Scheduled if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) { Log.d(TAG,"[$url] WebSocket: Scheduling a restart in $seconds seconds (via alarm manager)") Log.d(TAG,"$shortUrl: Scheduling a restart in $seconds seconds (via alarm manager)") val reconnectTime = Calendar.getInstance() reconnectTime.add(Calendar.SECOND, seconds) alarmManager.setExact(AlarmManager.RTC_WAKEUP, reconnectTime.timeInMillis, RECONNECT_TAG, { start() }, null) } else { Log.d(TAG, "[$url] WebSocket: Scheduling a restart in $seconds seconds (via handler)") Log.d(TAG, "$shortUrl: Scheduling a restart in $seconds seconds (via handler)") val handler = Handler(Looper.getMainLooper()) handler.postDelayed({ start() }, TimeUnit.SECONDS.toMillis(seconds.toLong())) } Loading @@ -113,8 +111,8 @@ class WsConnection( private inner class Listener(private val id: Long) : WebSocketListener() { override fun onOpen(webSocket: WebSocket, response: Response) { syncExec("onOpen") { Log.d(TAG, "[$url] WebSocket ($id): Opened connection") synchronize("onOpen") { Log.d(TAG, "$shortUrl (listener $id): Opened connection") state = State.Connected if (errorCount > 0) { errorCount = 0 Loading @@ -124,17 +122,17 @@ class WsConnection( } override fun onMessage(webSocket: WebSocket, text: String) { syncExec("onMessage") { Log.d(TAG, "[$url] WebSocket ($id): Received message: $text") synchronize("onMessage") { Log.d(TAG, "$shortUrl (listener $id): Received message: $text") val notificationWithTopic = parser.parseWithTopic(text, subscriptionId = 0, notificationId = Random.nextInt()) if (notificationWithTopic == null) { Log.d(TAG, "[$url] WebSocket ($id): Unable to parse message. Discarding.") return@syncExec Log.d(TAG, "$shortUrl (listener $id): Irrelevant or unknown message. Discarding.") return@synchronize } val topic = notificationWithTopic.topic val notification = notificationWithTopic.notification val subscriptionId = topicsToSubscriptionIds[topic] ?: return@syncExec val subscription = repository.getSubscription(subscriptionId) ?: return@syncExec val subscriptionId = topicsToSubscriptionIds[topic] ?: return@synchronize val subscription = repository.getSubscription(subscriptionId) ?: return@synchronize val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id) notificationListener(subscription, notificationWithSubscriptionId) since = notification.timestamp Loading @@ -142,22 +140,22 @@ class WsConnection( } override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { syncExec("onClosed") { Log.w(TAG, "[$url] WebSocket ($id): Closed connection") synchronize("onClosed") { Log.w(TAG, "$shortUrl (listener $id): Closed connection") state = State.Disconnected } } override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { syncExec("onFailure") { synchronize("onFailure") { if (response == null) { Log.e(TAG, "[$url] WebSocket ($id): Connection failed (response is null): ${t.message}", t) Log.e(TAG, "$shortUrl (listener $id): Connection failed (response is null): ${t.message}", t) } else { Log.e(TAG, "[$url] WebSocket ($id): Connection failed (response code ${response.code}, message: ${response.message}): ${t.message}", t) Log.e(TAG, "$shortUrl (listener $id): Connection failed (response code ${response.code}, message: ${response.message}): ${t.message}", t) } if (closed) { Log.d(TAG, "WebSocket ($id): Connection marked as closed. Not retrying.") return@syncExec Log.d(TAG, "$shortUrl (listener $id): Connection marked as closed. Not retrying.") return@synchronize } stateChangeListener(subscriptionIds, ConnectionState.CONNECTING) state = State.Disconnected Loading @@ -167,14 +165,12 @@ class WsConnection( } } private fun syncExec(tag: String, fn: () -> Unit) { private fun synchronize(tag: String, fn: () -> Unit) { synchronized(this) { if (webSocketId.get() == id) { Log.d(TAG, "[$url] WebSocket ($id): Begin $tag") if (listenerId.get() == id) { fn() Log.d(TAG, "[$url] WebSocket ($id): End $tag") } else { Log.d(TAG, "[$url] WebSocket ($id): Skipping synchronized block '$tag', because ID does not match ${webSocketId.get()}") Log.w(TAG, "$shortUrl (listener $id): Skipping synchronized block '$tag', because listener ID does not match ${listenerId.get()}") } } } Loading Loading
app/src/main/java/io/heckel/ntfy/service/WsConnection.kt +29 −33 Original line number Diff line number Diff line Loading @@ -6,13 +6,11 @@ import android.os.Handler import android.os.Looper import io.heckel.ntfy.db.* import io.heckel.ntfy.log.Log import io.heckel.ntfy.msg.ApiService import io.heckel.ntfy.msg.ApiService.Companion.requestBuilder import io.heckel.ntfy.msg.NotificationParser import io.heckel.ntfy.util.topicUrl import io.heckel.ntfy.util.topicShortUrl import io.heckel.ntfy.util.topicUrlWs import okhttp3.* import java.nio.charset.StandardCharsets import java.util.* import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong Loading Loading @@ -45,7 +43,7 @@ class WsConnection( .build() private var errorCount = 0 private var webSocket: WebSocket? = null private val webSocketId = AtomicLong(0) private val listenerId = AtomicLong(0) private var state: State? = null private var closed = false Loading @@ -54,23 +52,23 @@ class WsConnection( private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds private val subscriptionIds = topicsToSubscriptionIds.values private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",") private val url = topicUrl(baseUrl, topicsStr) private val shortUrl = topicShortUrl(baseUrl, topicsStr) @Synchronized override fun start() { if (closed || state == State.Connecting || state == State.Connected) { Log.d(TAG,"[$url] WebSocket: Not (re-)starting, because connection is marked closed/connecting/connected") Log.d(TAG,"$shortUrl: Not (re-)starting, because connection is marked closed/connecting/connected") return } if (webSocket != null) { webSocket!!.close(WS_CLOSE_NORMAL, "") } state = State.Connecting val nextId = webSocketId.incrementAndGet() val nextId = listenerId.incrementAndGet() val sinceVal = if (since == 0L) "all" else since.toString() val urlWithSince = topicUrlWs(baseUrl, topicsStr, sinceVal) val request = requestBuilder(urlWithSince, user).build() Log.d(TAG, "[$url] WebSocket: Opening $urlWithSince with listener ID $nextId ...") Log.d(TAG, "$shortUrl: Opening $urlWithSince with listener ID $nextId ...") webSocket = client.newWebSocket(request, Listener(nextId)) } Loading @@ -78,10 +76,10 @@ class WsConnection( override fun close() { closed = true if (webSocket == null) { Log.d(TAG,"[$url] WebSocket: Not closing existing connection, because there is no active web socket") Log.d(TAG,"$shortUrl: Not closing existing connection, because there is no active web socket") return } Log.d(TAG, "[$url] WebSocket: Closing existing connection") Log.d(TAG, "$shortUrl: Closing existing connection") state = State.Disconnected webSocket!!.close(WS_CLOSE_NORMAL, "") webSocket = null Loading @@ -95,17 +93,17 @@ class WsConnection( @Synchronized fun scheduleReconnect(seconds: Int) { if (closed || state == State.Connecting || state == State.Connected) { Log.d(TAG,"[$url] WebSocket: Not rescheduling connection, because connection is marked closed/connecting/connected") Log.d(TAG,"$shortUrl: Not rescheduling connection, because connection is marked closed/connecting/connected") return } state = State.Scheduled if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) { Log.d(TAG,"[$url] WebSocket: Scheduling a restart in $seconds seconds (via alarm manager)") Log.d(TAG,"$shortUrl: Scheduling a restart in $seconds seconds (via alarm manager)") val reconnectTime = Calendar.getInstance() reconnectTime.add(Calendar.SECOND, seconds) alarmManager.setExact(AlarmManager.RTC_WAKEUP, reconnectTime.timeInMillis, RECONNECT_TAG, { start() }, null) } else { Log.d(TAG, "[$url] WebSocket: Scheduling a restart in $seconds seconds (via handler)") Log.d(TAG, "$shortUrl: Scheduling a restart in $seconds seconds (via handler)") val handler = Handler(Looper.getMainLooper()) handler.postDelayed({ start() }, TimeUnit.SECONDS.toMillis(seconds.toLong())) } Loading @@ -113,8 +111,8 @@ class WsConnection( private inner class Listener(private val id: Long) : WebSocketListener() { override fun onOpen(webSocket: WebSocket, response: Response) { syncExec("onOpen") { Log.d(TAG, "[$url] WebSocket ($id): Opened connection") synchronize("onOpen") { Log.d(TAG, "$shortUrl (listener $id): Opened connection") state = State.Connected if (errorCount > 0) { errorCount = 0 Loading @@ -124,17 +122,17 @@ class WsConnection( } override fun onMessage(webSocket: WebSocket, text: String) { syncExec("onMessage") { Log.d(TAG, "[$url] WebSocket ($id): Received message: $text") synchronize("onMessage") { Log.d(TAG, "$shortUrl (listener $id): Received message: $text") val notificationWithTopic = parser.parseWithTopic(text, subscriptionId = 0, notificationId = Random.nextInt()) if (notificationWithTopic == null) { Log.d(TAG, "[$url] WebSocket ($id): Unable to parse message. Discarding.") return@syncExec Log.d(TAG, "$shortUrl (listener $id): Irrelevant or unknown message. Discarding.") return@synchronize } val topic = notificationWithTopic.topic val notification = notificationWithTopic.notification val subscriptionId = topicsToSubscriptionIds[topic] ?: return@syncExec val subscription = repository.getSubscription(subscriptionId) ?: return@syncExec val subscriptionId = topicsToSubscriptionIds[topic] ?: return@synchronize val subscription = repository.getSubscription(subscriptionId) ?: return@synchronize val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id) notificationListener(subscription, notificationWithSubscriptionId) since = notification.timestamp Loading @@ -142,22 +140,22 @@ class WsConnection( } override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { syncExec("onClosed") { Log.w(TAG, "[$url] WebSocket ($id): Closed connection") synchronize("onClosed") { Log.w(TAG, "$shortUrl (listener $id): Closed connection") state = State.Disconnected } } override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { syncExec("onFailure") { synchronize("onFailure") { if (response == null) { Log.e(TAG, "[$url] WebSocket ($id): Connection failed (response is null): ${t.message}", t) Log.e(TAG, "$shortUrl (listener $id): Connection failed (response is null): ${t.message}", t) } else { Log.e(TAG, "[$url] WebSocket ($id): Connection failed (response code ${response.code}, message: ${response.message}): ${t.message}", t) Log.e(TAG, "$shortUrl (listener $id): Connection failed (response code ${response.code}, message: ${response.message}): ${t.message}", t) } if (closed) { Log.d(TAG, "WebSocket ($id): Connection marked as closed. Not retrying.") return@syncExec Log.d(TAG, "$shortUrl (listener $id): Connection marked as closed. Not retrying.") return@synchronize } stateChangeListener(subscriptionIds, ConnectionState.CONNECTING) state = State.Disconnected Loading @@ -167,14 +165,12 @@ class WsConnection( } } private fun syncExec(tag: String, fn: () -> Unit) { private fun synchronize(tag: String, fn: () -> Unit) { synchronized(this) { if (webSocketId.get() == id) { Log.d(TAG, "[$url] WebSocket ($id): Begin $tag") if (listenerId.get() == id) { fn() Log.d(TAG, "[$url] WebSocket ($id): End $tag") } else { Log.d(TAG, "[$url] WebSocket ($id): Skipping synchronized block '$tag', because ID does not match ${webSocketId.get()}") Log.w(TAG, "$shortUrl (listener $id): Skipping synchronized block '$tag', because listener ID does not match ${listenerId.get()}") } } } Loading