ソースを参照

EuiccChannelManagerservice: Fix completion event in returned flows

Peter Cai 1 年間 前
コミット
74489a9ae0

+ 29 - 14
app-common/src/main/java/im/angry/openeuicc/service/EuiccChannelManagerService.kt

@@ -20,6 +20,7 @@ import kotlinx.coroutines.delay
 import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.flow.MutableSharedFlow
 import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.SharedFlow
 import kotlinx.coroutines.flow.asSharedFlow
 import kotlinx.coroutines.flow.collect
 import kotlinx.coroutines.flow.first
@@ -116,7 +117,7 @@ class EuiccChannelManagerService : LifecycleService(), OpenEuiccContextMarker {
      * tasks are running. Having this buffer allows the components to re-subscribe even if
      * the task completes while they are being recreated.
      */
-    private val foregroundTaskSubscribers: MutableMap<Long, ForegroundTaskSubscriberFlow> =
+    private val foregroundTaskSubscribers: MutableMap<Long, SharedFlow<ForegroundTaskState>> =
         mutableMapOf()
 
     override fun onBind(intent: Intent): IBinder {
@@ -196,13 +197,27 @@ class EuiccChannelManagerService : LifecycleService(), OpenEuiccContextMarker {
         NotificationManagerCompat.from(this).notify(TASK_FAILURE_ID, notification)
     }
 
+    /**
+     * Apply transform to a ForegroundTaskState flow so that it completes when a Done is seen.
+     *
+     * This must be applied each time a flow is returned for subscription purposes. If applied
+     * beforehand, we lose the ability to subscribe multiple times.
+     */
+    private fun Flow<ForegroundTaskState>.applyCompletionTransform() =
+        transformWhile {
+            emit(it)
+            it !is ForegroundTaskState.Done
+        }
+
     /**
      * Recover the subscriber to a foreground task that is recently launched.
      *
      * null if the task doesn't exist, or was launched too long ago.
      */
     fun recoverForegroundTaskSubscriber(taskId: Long): ForegroundTaskSubscriberFlow? =
-        foregroundTaskSubscribers[taskId]
+        foregroundTaskSubscribers[taskId]?.let {
+            ForegroundTaskSubscriberFlow(taskId, it.applyCompletionTransform())
+        }
 
     /**
      * Launch a potentially blocking foreground task in this service's lifecycle context.
@@ -210,10 +225,10 @@ class EuiccChannelManagerService : LifecycleService(), OpenEuiccContextMarker {
      * updates associated with this task. The last update the returned flow will emit is
      * always ForegroundTaskState.Done.
      *
-     * The returned flow can be subscribed multiple times because it is implemented as a
-     * SharedFlow under the hood. New subscribers are guaranteed at least 1 previous update
-     * in addition to latest ones. This is to ensure subscribers always receive at least one
-     * InProgress before any sort of failure.
+     * The returned flow can only be subscribed to once even though the underlying implementation
+     * is a SharedFlow. This is due to the need to apply transformations so that the stream
+     * actually completes. In order to subscribe multiple times, use `recoverForegroundTaskSubscriber`
+     * to acquire another instance.
      *
      * The task closure is expected to update foregroundTaskState whenever appropriate.
      * If a foreground task is already running, this function returns null.
@@ -299,17 +314,15 @@ class EuiccChannelManagerService : LifecycleService(), OpenEuiccContextMarker {
         // it has been completed by that point.
         lifecycleScope.launch(Dispatchers.Main) {
             foregroundTaskState
-                .transformWhile {
+                .applyCompletionTransform()
+                .onEach {
                     // Also update our notification when we see an update
                     // But ignore the first progress = 0 update -- that is the current value.
                     // we need that to be handled by the main coroutine after it finishes.
                     if (it !is ForegroundTaskState.InProgress || it.progress != 0) {
                         updateForegroundNotification(title, iconRes)
                     }
-                    emit(it)
-                    it !is ForegroundTaskState.Done
-                }
-                .onEach {
+
                     subscriberFlow.emit(it)
                 }
                 .onCompletion {
@@ -323,8 +336,7 @@ class EuiccChannelManagerService : LifecycleService(), OpenEuiccContextMarker {
                 .collect()
         }
 
-        val ret = ForegroundTaskSubscriberFlow(taskID, subscriberFlow.asSharedFlow())
-        foregroundTaskSubscribers[taskID] = ret
+        foregroundTaskSubscribers[taskID] = subscriberFlow.asSharedFlow()
 
         if (foregroundTaskSubscribers.size > 5) {
             // Remove enough elements so that the size is kept at 5
@@ -344,7 +356,10 @@ class EuiccChannelManagerService : LifecycleService(), OpenEuiccContextMarker {
             )
         )
 
-        return ret
+        return ForegroundTaskSubscriberFlow(
+            taskID,
+            subscriberFlow.asSharedFlow().applyCompletionTransform()
+        )
     }
 
     val isForegroundTaskRunning: Boolean