Loading data/src/main/java/com/moez/QKSMS/repository/MessageRepositoryImpl.kt +9 −1 Original line number Diff line number Diff line Loading @@ -62,7 +62,8 @@ class MessageRepositoryImpl @Inject constructor( private val context: Context, private val messageIds: KeyManager, private val imageRepository: ImageRepository, private val prefs: Preferences) : MessageRepository { private val prefs: Preferences, private val syncRepository: SyncRepository) : MessageRepository { override fun getMessages(threadId: Long, query: String): RealmResults<Message> { return Realm.getDefaultInstance() Loading Loading @@ -336,6 +337,13 @@ class MessageRepositoryImpl @Inject constructor( realm.executeTransaction { managedMessage?.takeIf { it.isValid }?.contentId = uri.lastPathSegment.toLong() } realm.close() // On some devices, we can't obtain a threadId until after the first message is sent in a // conversation. In this case, we need to update the message's threadId after it gets added // to the native ContentProvider if (threadId == 0L) { uri?.let(syncRepository::syncMessage) } return message } Loading domain/src/main/java/com/moez/QKSMS/interactor/SendMessage.kt +15 −6 Original line number Diff line number Diff line Loading @@ -21,12 +21,14 @@ package com.moez.QKSMS.interactor import com.moez.QKSMS.model.Attachment import com.moez.QKSMS.repository.ConversationRepository import com.moez.QKSMS.repository.MessageRepository import com.moez.QKSMS.repository.SyncRepository import io.reactivex.Flowable import javax.inject.Inject class SendMessage @Inject constructor( private val conversationRepo: ConversationRepository, private val messageRepo: MessageRepository private val messageRepo: MessageRepository, private val syncRepo: SyncRepository ) : Interactor<SendMessage.Params>() { data class Params( Loading @@ -37,12 +39,19 @@ class SendMessage @Inject constructor( val attachments: List<Attachment> = listOf(), val delay: Int = 0) override fun buildObservable(params: Params): Flowable<Unit> = Flowable.just(Unit) override fun buildObservable(params: Params): Flowable<*> = Flowable.just(Unit) .filter { params.addresses.isNotEmpty() } .doOnNext { messageRepo.sendMessage(params.subId, params.threadId, params.addresses, params.body, params.attachments, params.delay) } // If this was the first message sent in the conversation, the conversation might not exist yet .doOnNext { conversationRepo.getOrCreateConversation(params.threadId) } .doOnNext { conversationRepo.updateConversations(params.threadId) } .doOnNext { conversationRepo.markUnarchived(params.threadId) } .map { // On some manufacturers, we can't obtain a threadId for a new conversation. In // this case, find the threadId manually now that it contains a message if (params.threadId == 0L) { conversationRepo.getOrCreateConversation(params.addresses)?.id ?: 0 } else { params.threadId } } .doOnNext { threadId -> conversationRepo.updateConversations(threadId) } .doOnNext { threadId -> conversationRepo.markUnarchived(threadId) } } No newline at end of file presentation/src/main/java/com/moez/QKSMS/feature/compose/ComposeViewModel.kt +42 −20 Original line number Diff line number Diff line Loading @@ -107,7 +107,6 @@ class ComposeViewModel @Inject constructor( private val contacts: Observable<List<Contact>> by lazy { contactsRepo.getUnmanagedContacts().toObservable() } private val contactsReducer: Subject<(List<Contact>) -> List<Contact>> = PublishSubject.create() private val selectedContacts: Subject<List<Contact>> = BehaviorSubject.createDefault(listOf()) private val threadIdSubject: Subject<Long> = BehaviorSubject.create() private val searchResults: Subject<List<Message>> = BehaviorSubject.create() private val searchSelection: Subject<Long> = BehaviorSubject.createDefault(-1) private val conversation: Subject<Conversation> = BehaviorSubject.create() Loading @@ -119,28 +118,40 @@ class ComposeViewModel @Inject constructor( ?.asObservable() ?: Observable.empty() // Merges two potential conversation sources (threadId from constructor and contact selection) into a single // stream of conversations. If the conversation was deleted, notify the activity to shut down disposables += selectedContacts val selectedConversation = selectedContacts .skipWhile { it.isEmpty() } .map { contacts -> contacts.map { it.numbers.firstOrNull()?.address ?: "" } } .distinctUntilChanged() .doOnNext { newState { copy(loading = true) } } .observeOn(Schedulers.computation()) .map { addresses -> conversationRepo.getThreadId(addresses) ?: 0 } .observeOn(Schedulers.io()) .map { addresses -> Pair(conversationRepo.getOrCreateConversation(addresses)?.id ?: 0, addresses) } .observeOn(AndroidSchedulers.mainThread()) .doOnNext { newState { copy(loading = false) } } .mergeWith(threadIdSubject) .switchMap { threadId -> // Query the entire list of conversations and map from there, rather than // directly querying the conversation from realm. If querying a single // conversation and it doesn't exist yet, the realm query will never update .switchMap { (threadId, addresses) -> // If we already have this thread in realm, or were able to obtain it from the // system, just return that. threadId.takeIf { it > 0 }?.let { return@switchMap conversationRepo.getConversationAsync(threadId).asObservable() } // Otherwise, we'll monitor the conversations until our expected conversation is created conversationRepo.getConversations().asObservable() .map { conversations -> conversations.firstOrNull { it.id == threadId } ?: conversationRepo.getOrCreateConversation(threadId) ?: Conversation(id = threadId) .filter { it.isLoaded } .observeOn(Schedulers.io()) .map { conversationRepo.getOrCreateConversation(addresses)?.id ?: 0 } .observeOn(AndroidSchedulers.mainThread()) .switchMap { actualThreadId -> when (actualThreadId) { 0L -> Observable.just(Conversation(0)) else -> conversationRepo.getConversationAsync(actualThreadId).asObservable() } } } // Merges two potential conversation sources (threadId from constructor and contact selection) into a single // stream of conversations. If the conversation was deleted, notify the activity to shut down disposables += selectedConversation .mergeWith(initialConversation) .filter { conversation -> conversation.isLoaded } .doOnNext { conversation -> Loading Loading @@ -601,6 +612,7 @@ class ComposeViewModel @Inject constructor( } when { // Scheduling a message state.scheduled != 0L -> { newState { copy(scheduled = 0) } val uris = attachments.map { it.getUri() }.map { it.toString() } Loading @@ -609,17 +621,27 @@ class ComposeViewModel @Inject constructor( context.makeToast(R.string.compose_scheduled_toast) } // Sending a group message state.sendAsGroup -> { val threadId = conversation.id.takeIf { it > 0 } ?: TelephonyCompat.getOrCreateThreadId(context, addresses).also(threadIdSubject::onNext) sendMessage.execute(SendMessage.Params(subId, conversation.id, addresses, body, attachments, delay)) } // Sending a message to an existing conversation with one recipient conversation.recipients.size == 1 -> { val address = conversation.recipients.map { it.address } sendMessage.execute(SendMessage.Params(subId, threadId, address, body, attachments, delay)) } // Create a new conversation with one address addresses.size == 1 -> { sendMessage.execute(SendMessage.Params(subId, threadId, addresses, body, attachments, delay)) } // Send a message to multiple addresses else -> { addresses.forEach { addr -> val threadId = TelephonyCompat.getOrCreateThreadId(context, addr) val address = listOf(conversationRepo.getConversation(threadId)?.recipients?.firstOrNull()?.address ?: addr) val threadId = tryOrNull(false) { TelephonyCompat.getOrCreateThreadId(context, addr) } ?: 0 val address = listOf(conversationRepo.getConversation(threadId)?.recipients?.firstOrNull()?.address ?: addr) sendMessage.execute(SendMessage.Params(subId, threadId, address, body, attachments, delay)) } } Loading Loading
data/src/main/java/com/moez/QKSMS/repository/MessageRepositoryImpl.kt +9 −1 Original line number Diff line number Diff line Loading @@ -62,7 +62,8 @@ class MessageRepositoryImpl @Inject constructor( private val context: Context, private val messageIds: KeyManager, private val imageRepository: ImageRepository, private val prefs: Preferences) : MessageRepository { private val prefs: Preferences, private val syncRepository: SyncRepository) : MessageRepository { override fun getMessages(threadId: Long, query: String): RealmResults<Message> { return Realm.getDefaultInstance() Loading Loading @@ -336,6 +337,13 @@ class MessageRepositoryImpl @Inject constructor( realm.executeTransaction { managedMessage?.takeIf { it.isValid }?.contentId = uri.lastPathSegment.toLong() } realm.close() // On some devices, we can't obtain a threadId until after the first message is sent in a // conversation. In this case, we need to update the message's threadId after it gets added // to the native ContentProvider if (threadId == 0L) { uri?.let(syncRepository::syncMessage) } return message } Loading
domain/src/main/java/com/moez/QKSMS/interactor/SendMessage.kt +15 −6 Original line number Diff line number Diff line Loading @@ -21,12 +21,14 @@ package com.moez.QKSMS.interactor import com.moez.QKSMS.model.Attachment import com.moez.QKSMS.repository.ConversationRepository import com.moez.QKSMS.repository.MessageRepository import com.moez.QKSMS.repository.SyncRepository import io.reactivex.Flowable import javax.inject.Inject class SendMessage @Inject constructor( private val conversationRepo: ConversationRepository, private val messageRepo: MessageRepository private val messageRepo: MessageRepository, private val syncRepo: SyncRepository ) : Interactor<SendMessage.Params>() { data class Params( Loading @@ -37,12 +39,19 @@ class SendMessage @Inject constructor( val attachments: List<Attachment> = listOf(), val delay: Int = 0) override fun buildObservable(params: Params): Flowable<Unit> = Flowable.just(Unit) override fun buildObservable(params: Params): Flowable<*> = Flowable.just(Unit) .filter { params.addresses.isNotEmpty() } .doOnNext { messageRepo.sendMessage(params.subId, params.threadId, params.addresses, params.body, params.attachments, params.delay) } // If this was the first message sent in the conversation, the conversation might not exist yet .doOnNext { conversationRepo.getOrCreateConversation(params.threadId) } .doOnNext { conversationRepo.updateConversations(params.threadId) } .doOnNext { conversationRepo.markUnarchived(params.threadId) } .map { // On some manufacturers, we can't obtain a threadId for a new conversation. In // this case, find the threadId manually now that it contains a message if (params.threadId == 0L) { conversationRepo.getOrCreateConversation(params.addresses)?.id ?: 0 } else { params.threadId } } .doOnNext { threadId -> conversationRepo.updateConversations(threadId) } .doOnNext { threadId -> conversationRepo.markUnarchived(threadId) } } No newline at end of file
presentation/src/main/java/com/moez/QKSMS/feature/compose/ComposeViewModel.kt +42 −20 Original line number Diff line number Diff line Loading @@ -107,7 +107,6 @@ class ComposeViewModel @Inject constructor( private val contacts: Observable<List<Contact>> by lazy { contactsRepo.getUnmanagedContacts().toObservable() } private val contactsReducer: Subject<(List<Contact>) -> List<Contact>> = PublishSubject.create() private val selectedContacts: Subject<List<Contact>> = BehaviorSubject.createDefault(listOf()) private val threadIdSubject: Subject<Long> = BehaviorSubject.create() private val searchResults: Subject<List<Message>> = BehaviorSubject.create() private val searchSelection: Subject<Long> = BehaviorSubject.createDefault(-1) private val conversation: Subject<Conversation> = BehaviorSubject.create() Loading @@ -119,28 +118,40 @@ class ComposeViewModel @Inject constructor( ?.asObservable() ?: Observable.empty() // Merges two potential conversation sources (threadId from constructor and contact selection) into a single // stream of conversations. If the conversation was deleted, notify the activity to shut down disposables += selectedContacts val selectedConversation = selectedContacts .skipWhile { it.isEmpty() } .map { contacts -> contacts.map { it.numbers.firstOrNull()?.address ?: "" } } .distinctUntilChanged() .doOnNext { newState { copy(loading = true) } } .observeOn(Schedulers.computation()) .map { addresses -> conversationRepo.getThreadId(addresses) ?: 0 } .observeOn(Schedulers.io()) .map { addresses -> Pair(conversationRepo.getOrCreateConversation(addresses)?.id ?: 0, addresses) } .observeOn(AndroidSchedulers.mainThread()) .doOnNext { newState { copy(loading = false) } } .mergeWith(threadIdSubject) .switchMap { threadId -> // Query the entire list of conversations and map from there, rather than // directly querying the conversation from realm. If querying a single // conversation and it doesn't exist yet, the realm query will never update .switchMap { (threadId, addresses) -> // If we already have this thread in realm, or were able to obtain it from the // system, just return that. threadId.takeIf { it > 0 }?.let { return@switchMap conversationRepo.getConversationAsync(threadId).asObservable() } // Otherwise, we'll monitor the conversations until our expected conversation is created conversationRepo.getConversations().asObservable() .map { conversations -> conversations.firstOrNull { it.id == threadId } ?: conversationRepo.getOrCreateConversation(threadId) ?: Conversation(id = threadId) .filter { it.isLoaded } .observeOn(Schedulers.io()) .map { conversationRepo.getOrCreateConversation(addresses)?.id ?: 0 } .observeOn(AndroidSchedulers.mainThread()) .switchMap { actualThreadId -> when (actualThreadId) { 0L -> Observable.just(Conversation(0)) else -> conversationRepo.getConversationAsync(actualThreadId).asObservable() } } } // Merges two potential conversation sources (threadId from constructor and contact selection) into a single // stream of conversations. If the conversation was deleted, notify the activity to shut down disposables += selectedConversation .mergeWith(initialConversation) .filter { conversation -> conversation.isLoaded } .doOnNext { conversation -> Loading Loading @@ -601,6 +612,7 @@ class ComposeViewModel @Inject constructor( } when { // Scheduling a message state.scheduled != 0L -> { newState { copy(scheduled = 0) } val uris = attachments.map { it.getUri() }.map { it.toString() } Loading @@ -609,17 +621,27 @@ class ComposeViewModel @Inject constructor( context.makeToast(R.string.compose_scheduled_toast) } // Sending a group message state.sendAsGroup -> { val threadId = conversation.id.takeIf { it > 0 } ?: TelephonyCompat.getOrCreateThreadId(context, addresses).also(threadIdSubject::onNext) sendMessage.execute(SendMessage.Params(subId, conversation.id, addresses, body, attachments, delay)) } // Sending a message to an existing conversation with one recipient conversation.recipients.size == 1 -> { val address = conversation.recipients.map { it.address } sendMessage.execute(SendMessage.Params(subId, threadId, address, body, attachments, delay)) } // Create a new conversation with one address addresses.size == 1 -> { sendMessage.execute(SendMessage.Params(subId, threadId, addresses, body, attachments, delay)) } // Send a message to multiple addresses else -> { addresses.forEach { addr -> val threadId = TelephonyCompat.getOrCreateThreadId(context, addr) val address = listOf(conversationRepo.getConversation(threadId)?.recipients?.firstOrNull()?.address ?: addr) val threadId = tryOrNull(false) { TelephonyCompat.getOrCreateThreadId(context, addr) } ?: 0 val address = listOf(conversationRepo.getConversation(threadId)?.recipients?.firstOrNull()?.address ?: addr) sendMessage.execute(SendMessage.Params(subId, threadId, address, body, attachments, delay)) } } Loading