fix: 去抖改为暂存合并,不丢弃消息

问题:多条MQTT消息同时到达(间隔<1s),去抖直接丢弃后续消息。
旧版是延迟处理而非丢弃。

修复:
- 去抖窗口内的消息暂存到pendingJsons
- 1s后延迟任务统一处理暂存消息(只加ID,不重复震动)
- 合并后统一发一次NewTaskArrived事件更新横幅
- MainActivity改为监听NewTaskArrived事件显示横幅

时序示例:
t=0ms:   消息1(A) → 立即处理, pendingTaskIds=[A], 横幅"1条"
t=100ms: 消息2(B) → 暂存
t=200ms: 消息3(C) → 暂存
t=1000ms: 延迟任务 → 处理B+C, pendingTaskIds=[A,B,C], 横幅"3条"

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
dongliang
2026-04-29 15:27:06 +09:30
parent 1ffdefc887
commit 379d784f70
3 changed files with 79 additions and 41 deletions

View File

@@ -82,22 +82,21 @@ class MainActivity : AppCompatActivity() {
// ===== MQTT 新任务 → 通知横幅 ===== // ===== MQTT 新任务 → 通知横幅 =====
/** 监听 MQTT 消息type=1 时通知横幅 */ /** 监听 MQTT 消息和通知事件 */
private fun observeMqttMessages() { private fun observeMqttMessages() {
activityScope.launch { activityScope.launch {
eventBus.events.collect { event -> eventBus.events.collect { event ->
when (event) { when (event) {
is AppEvent.MqttMessageReceived -> { is AppEvent.MqttMessageReceived -> {
if (event.type == 1) { if (event.type == 1) {
// 交给 NotificationManager 处理(去抖+震动+亮屏+事件 // 交给 NotificationManager 处理(去抖+合并+震动+亮屏)
val handled = notificationManager.onNewTaskMessage(event.rawJson) notificationManager.onNewTaskMessage(event.rawJson)
if (handled) {
// 显示横幅
notificationBanner.show(notificationManager.pendingCount)
} }
} }
// NotificationManager 处理完后发出事件 → 显示/更新横幅
is AppEvent.NewTaskArrived -> {
notificationBanner.show(event.count)
} }
// 横幅点击由 HomeFragment 处理跳转
else -> {} else -> {}
} }
} }

View File

@@ -47,6 +47,12 @@ class NotificationManager @Inject constructor(
/** 上次消息处理时间(去抖用) */ /** 上次消息处理时间(去抖用) */
private var lastMessageTime = 0L private var lastMessageTime = 0L
/** 去抖窗口内暂存的消息(延迟合并处理,不丢弃) */
private val pendingJsons = mutableListOf<String>()
/** 去抖延迟任务 */
private var debounceJob: kotlinx.coroutines.Job? = null
/** 上次统计数据(对比红点用) */ /** 上次统计数据(对比红点用) */
var lastStats: TaskStatistics? = null var lastStats: TaskStatistics? = null
@@ -55,46 +61,80 @@ class NotificationManager @Inject constructor(
/** /**
* 处理 MQTT type=1 新任务消息 * 处理 MQTT type=1 新任务消息
* 去抖策略1s 窗口内的消息暂存,窗口结束后合并处理(不丢弃)
* @param rawJson MQTT 消息原始 JSON * @param rawJson MQTT 消息原始 JSON
* @return true=已处理false=被去抖过滤
*/ */
fun onNewTaskMessage(rawJson: String): Boolean { fun onNewTaskMessage(rawJson: String) {
// 1. 去抖1s 内重复消息忽略
val now = System.currentTimeMillis() val now = System.currentTimeMillis()
if (now - lastMessageTime < DEBOUNCE_MS) { if (now - lastMessageTime < DEBOUNCE_MS) {
Timber.d("通知: 去抖过滤 (距上条 ${now - lastMessageTime}ms)") // 去抖窗口内 → 暂存,等延迟任务统一处理
return false pendingJsons.add(rawJson)
Timber.d("通知: 去抖暂存 (已暂存 ${pendingJsons.size} 条)")
return
} }
// 窗口外 → 立即处理本条 + 启动延迟任务处理后续暂存
lastMessageTime = now lastMessageTime = now
processMessage(rawJson)
// 2. 解析任务 ID // 启动延迟任务1s 后处理暂存的消息
debounceJob?.cancel()
debounceJob = scope.launch {
kotlinx.coroutines.delay(DEBOUNCE_MS)
if (pendingJsons.isNotEmpty()) {
Timber.d("通知: 处理暂存的 ${pendingJsons.size} 条消息")
val jsons = pendingJsons.toList()
pendingJsons.clear()
lastMessageTime = System.currentTimeMillis()
for (json in jsons) {
processMessageSilent(json) // 只加 ID不重复震动
}
// 合并后统一发一次事件
notifyUi()
}
}
}
/** 处理单条消息(完整:解析+震动+亮屏+事件) */
private fun processMessage(rawJson: String) {
val taskIds = parseTaskIds(rawJson) val taskIds = parseTaskIds(rawJson)
if (taskIds.isEmpty()) { if (taskIds.isEmpty()) return
Timber.w("通知: 消息中无有效任务 ID")
return false
}
// 3. 加入未读列表(去重) addTaskIds(taskIds)
for (id in taskIds) {
if (id !in _pendingTaskIds) {
_pendingTaskIds.add(id)
}
}
Timber.d("通知: 收到 ${taskIds.size} 个新任务, 当前未读 ${_pendingTaskIds.size}")
// 4. 震动 + 亮屏 // 震动 + 亮屏(只在首条时触发,暂存的合并后不重复震动)
val pattern = VibrationDefaults.getPattern(PLAN_NEW_MESSAGE) val pattern = VibrationDefaults.getPattern(PLAN_NEW_MESSAGE)
if (pattern != null) { if (pattern != null) {
vibrationController.executePattern(pattern) vibrationController.executePattern(pattern)
} }
screenController.turnOn() screenController.turnOn()
// 5. 发送事件通知 notifyUi()
scope.launch {
eventBus.emit(AppEvent.NewTaskArrived(taskIds, _pendingTaskIds.size))
} }
return true /** 静默处理(只加 ID不震动不亮屏用于合并暂存消息 */
private fun processMessageSilent(rawJson: String) {
val taskIds = parseTaskIds(rawJson)
if (taskIds.isEmpty()) return
addTaskIds(taskIds)
}
/** 加入未读列表(去重) */
private fun addTaskIds(taskIds: List<String>) {
for (id in taskIds) {
if (id !in _pendingTaskIds) {
_pendingTaskIds.add(id)
}
}
Timber.d("通知: 当前未读 ${_pendingTaskIds.size}")
}
/** 通知 UI 更新(横幅+红点) */
private fun notifyUi() {
scope.launch {
eventBus.emit(AppEvent.NewTaskArrived(_pendingTaskIds.toList(), _pendingTaskIds.size))
}
} }
/** 消费所有未读消息(用户已查看列表) */ /** 消费所有未读消息(用户已查看列表) */

View File

@@ -30,20 +30,19 @@ class NotificationManagerTest {
@Test @Test
fun `onNewTaskMessage - first message should be handled`() { fun `onNewTaskMessage - first message should be handled`() {
val json = """{"id":"100"}""" val json = """{"id":"100"}"""
val result = manager.onNewTaskMessage(json) manager.onNewTaskMessage(json)
assertTrue(result)
assertEquals(1, manager.pendingCount) assertEquals(1, manager.pendingCount)
} }
@Test @Test
fun `onNewTaskMessage - duplicate within 1s should be filtered`() { fun `onNewTaskMessage - duplicate within 1s should be buffered not dropped`() {
val json = """{"id":"100"}""" val json1 = """{"id":"100"}"""
manager.onNewTaskMessage(json) val json2 = """{"id":"200"}"""
// 立即再发一条(间隔 <1s manager.onNewTaskMessage(json1)
val result = manager.onNewTaskMessage(json) // 立即再发一条(间隔 <1s→ 暂存,不丢弃
assertFalse(result) manager.onNewTaskMessage(json2)
// 只有第一条处理 // 第一条立即处理,第二条暂存等延迟处理
assertEquals(1, manager.pendingCount) assertEquals(1, manager.pendingCount) // 暂存的还没处理
} }
@Test @Test