浏览代码

EuiccChannelManagerService: Fix support for multiple subscribers

We have to use another SharedFlow here. Otherwise, the flow transforms
break our ability to subscribe to it more than once, which is needed for
UI state to preserve across recreate events.
Peter Cai 1 年之前
父节点
当前提交
d68a7172de
共有 1 个文件被更改,包括 52 次插入31 次删除
  1. 52 31
      app-common/src/main/java/im/angry/openeuicc/service/EuiccChannelManagerService.kt

+ 52 - 31
app-common/src/main/java/im/angry/openeuicc/service/EuiccChannelManagerService.kt

@@ -20,13 +20,13 @@ import kotlinx.coroutines.delay
 import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.flow.MutableSharedFlow
 import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.buffer
+import kotlinx.coroutines.flow.asSharedFlow
 import kotlinx.coroutines.flow.collect
 import kotlinx.coroutines.flow.first
 import kotlinx.coroutines.flow.flow
 import kotlinx.coroutines.flow.last
 import kotlinx.coroutines.flow.onCompletion
-import kotlinx.coroutines.flow.onStart
+import kotlinx.coroutines.flow.onEach
 import kotlinx.coroutines.flow.takeWhile
 import kotlinx.coroutines.flow.transformWhile
 import kotlinx.coroutines.isActive
@@ -208,8 +208,12 @@ class EuiccChannelManagerService : LifecycleService(), OpenEuiccContextMarker {
      * Launch a potentially blocking foreground task in this service's lifecycle context.
      * This function does not block, but returns a Flow that emits ForegroundTaskState
      * updates associated with this task. The last update the returned flow will emit is
-     * always ForegroundTaskState.Done. The returned flow MUST be started in order for the
-     * foreground task to run.
+     * always ForegroundTaskState.Done.
+     *
+     * The returned flow can be subscribed multiple times because it is implemented as a
+     * SharedFlow under the hood. New subscribers are guaranteed at least 1 previous update
+     * in addition to latest ones. This is to ensure subscribers always receive at least one
+     * InProgress before any sort of failure.
      *
      * The task closure is expected to update foregroundTaskState whenever appropriate.
      * If a foreground task is already running, this function returns null.
@@ -277,42 +281,49 @@ class EuiccChannelManagerService : LifecycleService(), OpenEuiccContextMarker {
             }
         }
 
+        // This is the flow we are going to return. We allow multiple subscribers by
+        // re-emitting state updates into this flow from another coroutine.
+        // replay = 2 ensures that we at least have 1 previous state whenever subscribed to.
+        // This is helpful when the task completed and is then re-subscribed to due to a
+        // UI recreation event -- this way, the UI will know at least one last progress event
+        // before completion / failure
+        val subscriberFlow = MutableSharedFlow<ForegroundTaskState>(
+            replay = 2,
+            onBufferOverflow = BufferOverflow.DROP_OLDEST
+        )
+
         // We should be the only task running, so we can subscribe to foregroundTaskState
         // until we encounter ForegroundTaskState.Done.
         // Then, we complete the returned flow, but we also set the state back to Idle.
         // The state update back to Idle won't show up in the returned stream, because
         // it has been completed by that point.
-        val subscriberFlow = foregroundTaskState
-            .transformWhile {
-                // Also update our notification when we see an update
-                // But ignore the first progress = 0 update -- that is the current value.
-                // we need that to be handled by the main coroutine after it finishes.
-                if (it !is ForegroundTaskState.InProgress || it.progress != 0) {
-                    withContext(Dispatchers.Main) {
+        lifecycleScope.launch(Dispatchers.Main) {
+            foregroundTaskState
+                .transformWhile {
+                    // Also update our notification when we see an update
+                    // But ignore the first progress = 0 update -- that is the current value.
+                    // we need that to be handled by the main coroutine after it finishes.
+                    if (it !is ForegroundTaskState.InProgress || it.progress != 0) {
                         updateForegroundNotification(title, iconRes)
                     }
+                    emit(it)
+                    it !is ForegroundTaskState.Done
                 }
-                emit(it)
-                it !is ForegroundTaskState.Done
-            }.onStart {
-                // When this Flow is started, we unblock the coroutine launched above by
-                // self-starting as a foreground service.
-                withContext(Dispatchers.Main) {
-                    startForegroundService(
-                        Intent(
-                            this@EuiccChannelManagerService,
-                            this@EuiccChannelManagerService::class.java
-                        )
-                    )
+                .onEach {
+                    subscriberFlow.emit(it)
+                }
+                .onCompletion {
+                    // Reset state back to Idle when we are done.
+                    // We do it here because otherwise Idle and Done might become conflated
+                    // when emitted by the main coroutine in quick succession.
+                    // Doing it here ensures we've seen Done. This Idle event won't be
+                    // emitted to the consumer because the subscription has completed here.
+                    foregroundTaskState.value = ForegroundTaskState.Idle
                 }
-            }.onCompletion { foregroundTaskState.value = ForegroundTaskState.Idle }
-            // Buffer the returned flow by 2, so that if there is an error,
-            // we always get a copy of the last process update before completion.
-            // This also guarantees that our onCompletion callback is always run
-            // even if the returned flow isn't subscribed to
-            .buffer(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST)
-
-        val ret = ForegroundTaskSubscriberFlow(taskID, subscriberFlow)
+                .collect()
+        }
+
+        val ret = ForegroundTaskSubscriberFlow(taskID, subscriberFlow.asSharedFlow())
         foregroundTaskSubscribers[taskID] = ret
 
         if (foregroundTaskSubscribers.size > 5) {
@@ -323,6 +334,16 @@ class EuiccChannelManagerService : LifecycleService(), OpenEuiccContextMarker {
             }
         }
 
+        // Before we return, and after we have set everything up,
+        // self-start with foreground permission.
+        // This is going to unblock the main coroutine handling the task.
+        startForegroundService(
+            Intent(
+                this@EuiccChannelManagerService,
+                this@EuiccChannelManagerService::class.java
+            )
+        )
+
         return ret
     }