feat: MQTT通信模块 - TCP连接+消息分发

新增:
- MqttConfig MQTT连接配置(TCP:1883, 心跳120s, 自动重连)
- MqttManager 连接管理器(连接/订阅3个Topic/消息解析/EventBus分发)
- AppEvent 新增 MqttConnected/MqttDisconnected/MqttMessageReceived

修改:
- build.gradle.kts MQTT_URL改为MQTT_HOST(TCP不需要路径)
- EnvConfig 适配MQTT_HOST
- HomeFragment 连接MQTT并显示连接状态

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
dongliang
2026-04-27 15:59:26 +09:30
parent 977c2dd0d2
commit a3c1f1d632
6 changed files with 243 additions and 3 deletions

View File

@@ -0,0 +1,39 @@
package com.xiaoqu.watch.service.manager
/**
* MQTT 连接配置常量
*/
object MqttConfig {
/** TCP 端口(标准 MQTT 端口) */
const val PORT = 1883
/** 认证用户名(所有手表设备共用) */
const val USERNAME = "watch"
/** 认证密码 */
const val PASSWORD = "xiaoquwatch"
/** 心跳间隔(秒)— 满足 ≥60s 功耗红线,低于 4G NAT 超时 */
const val KEEP_ALIVE_INTERVAL = 120
/** 连接超时(秒) */
const val CONNECTION_TIMEOUT = 15
/** 是否自动重连Paho 内置指数退避1s→2s→4s→...→128s */
const val AUTO_RECONNECT = true
/** 清除会话(每次连接重新订阅) */
const val CLEAN_SESSION = true
/** 订阅 QoS — 设备专属 Topic */
const val QOS_DEVICE = 2
/** 订阅 QoS — 广播 Topic */
const val QOS_BROADCAST = 0
/** 广播 Topic手表参数变更 */
const val TOPIC_WATCH_PARAMS = "WatchSetParamsInit"
/** 广播 Topic振动方案变更 */
const val TOPIC_SHOCK_PARAMS = "WatchShockSetParamsInit"
}

View File

@@ -0,0 +1,179 @@
package com.xiaoqu.watch.service.manager
import com.xiaoqu.watch.data.prefs.DevicePrefs
import com.xiaoqu.watch.event.AppEvent
import com.xiaoqu.watch.event.EventBus
import com.xiaoqu.watch.network.EnvConfig
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.eclipse.paho.client.mqttv3.*
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.json.JSONObject
import timber.log.Timber
import javax.inject.Inject
import javax.inject.Singleton
/**
* MQTT 连接管理器
* 负责连接/断连/消息接收/EventBus 分发
*
* 使用方式HomeFragment 中 initDevicePrefs() 之后调用 connect()
*
* 消息处理:
* 收到消息 → 解析 messageType → EventBus.emit(MqttMessageReceived)
* 各业务模块通过 EventBus 订阅自己关心的 messageType
*/
@Singleton
class MqttManager @Inject constructor(
private val devicePrefs: DevicePrefs,
private val eventBus: EventBus
) {
/** 协程作用域(用于发送 EventBus 事件) */
private val scope = CoroutineScope(Dispatchers.Main + SupervisorJob())
/** MQTT 异步客户端 */
private var client: MqttAsyncClient? = null
/** 是否已连接 */
val isConnected: Boolean get() = client?.isConnected == true
/**
* 连接 MQTT Broker
* 使用 DevicePrefs.imei 作为 clientIdTCP 协议连接
*/
fun connect() {
val imei = devicePrefs.imei
if (imei.isEmpty()) {
Timber.w("MQTT: IMEI 为空,无法连接")
return
}
// 已连接则跳过
if (isConnected) {
Timber.d("MQTT: 已连接,跳过")
return
}
try {
// 构建 TCP 连接地址
val serverUri = "tcp://${EnvConfig.mqttHost}:${MqttConfig.PORT}"
Timber.d("MQTT: 连接 $serverUri, clientId=$imei")
// 创建客户端(使用内存持久化)
client = MqttAsyncClient(serverUri, imei, MemoryPersistence()).apply {
// 设置回调
setCallback(mqttCallback)
}
// 构建连接选项
val options = MqttConnectOptions().apply {
userName = MqttConfig.USERNAME
password = MqttConfig.PASSWORD.toCharArray()
keepAliveInterval = MqttConfig.KEEP_ALIVE_INTERVAL
connectionTimeout = MqttConfig.CONNECTION_TIMEOUT
isAutomaticReconnect = MqttConfig.AUTO_RECONNECT
isCleanSession = MqttConfig.CLEAN_SESSION
}
// 发起连接
client?.connect(options, null, connectCallback)
} catch (e: Exception) {
Timber.e(e, "MQTT: 连接异常")
}
}
/** 断开 MQTT 连接 */
fun disconnect() {
try {
client?.disconnect()
Timber.d("MQTT: 已断开")
} catch (e: Exception) {
Timber.w(e, "MQTT: 断开异常")
}
}
/** 连接回调:成功后订阅 3 个 Topic */
private val connectCallback = object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken?) {
Timber.d("MQTT: 连接成功")
emitEvent(AppEvent.MqttConnected)
subscribeTopics()
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
Timber.e(exception, "MQTT: 连接失败")
}
}
/** 订阅 3 个 Topic */
private fun subscribeTopics() {
val imei = devicePrefs.imei
try {
// 设备专属 Topic绑定/解绑/任务/工作状态等)
client?.subscribe(imei, MqttConfig.QOS_DEVICE)
Timber.d("MQTT: 订阅 Topic=$imei, QoS=${MqttConfig.QOS_DEVICE}")
// 手表参数广播 Topic
client?.subscribe(MqttConfig.TOPIC_WATCH_PARAMS, MqttConfig.QOS_BROADCAST)
Timber.d("MQTT: 订阅 Topic=${MqttConfig.TOPIC_WATCH_PARAMS}")
// 振动方案广播 Topic
client?.subscribe(MqttConfig.TOPIC_SHOCK_PARAMS, MqttConfig.QOS_BROADCAST)
Timber.d("MQTT: 订阅 Topic=${MqttConfig.TOPIC_SHOCK_PARAMS}")
} catch (e: Exception) {
Timber.e(e, "MQTT: 订阅异常")
}
}
/** MQTT 消息回调 */
private val mqttCallback = object : MqttCallbackExtended {
/** 连接完成(包括自动重连后) */
override fun connectComplete(reconnect: Boolean, serverURI: String?) {
if (reconnect) {
Timber.d("MQTT: 重连成功,重新订阅")
subscribeTopics()
emitEvent(AppEvent.MqttConnected)
}
}
/** 连接丢失 */
override fun connectionLost(cause: Throwable?) {
Timber.w(cause, "MQTT: 连接丢失,等待自动重连")
emitEvent(AppEvent.MqttDisconnected)
}
/** 收到消息 */
override fun messageArrived(topic: String?, message: MqttMessage?) {
try {
val payload = message?.payload ?: return
val json = String(payload)
Timber.d("MQTT: 收到消息 topic=$topic, payload=$json")
// 解析 messageType
val jsonObj = JSONObject(json)
val messageType = jsonObj.optInt("messageType", -1)
if (messageType >= 0) {
// 通过 EventBus 分发,业务模块各自订阅处理
emitEvent(AppEvent.MqttMessageReceived(messageType, json))
}
} catch (e: Exception) {
Timber.w(e, "MQTT: 消息解析异常")
}
}
/** 消息发送完成(本项目不发消息,空实现) */
override fun deliveryComplete(token: IMqttDeliveryToken?) {}
}
/** 通过 EventBus 发送事件 */
private fun emitEvent(event: AppEvent) {
scope.launch {
eventBus.emit(event)
}
}
}