提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
【云Devops转行】【学习嵌入式开发】物联网-MQTT协议入门
- 1. 简介
- 发布&订阅
- 特点
- 基本概念
- EXMQ
- MQTTX
- 2. MQTT控制报文
- 常见报文
- 报文格式
- 固定报头
- 可变报头
- 有效载荷
- 3. QoS
- QoS消息质量等级
- 4. 主题
- 主题通配符
- 系统主题
- 5. 会话
- 6. 消息
- 保留消息
- 消息过期间隔
- 遗嘱消息
- 延迟发布
- 用户属性
- 7. 订阅
- 订阅选项
- QoS
- No Local
- Retain As Published
- Retain Handling
- 共享订阅
- 排他订阅
- 自动订阅
- MQTT客户端编程注意事项
- 单片机(MCU)环境下MQTT开发
- 基于Linux芯片环境开发
1. 简介
MQTT是一种轻量级、发布/订阅模式的消息传输协议。它被设计用于低带宽、高延迟或网络不稳定的受限环境。
学习资料:B站尚硅谷
发布&订阅
特点
轻量级:占用系统资源较少,数据报文较小
可靠性:提供多种消息的质量等级
安全性较强:提供传输层、套接层的加密通讯
双向通信:MQTT客户端可发送数据,也可从代理中获取Data
多语言支持:PHP、Node.js、Python、Go、java
基本概念
MQTT客户端:运行MQTT客户端库的应用、设备
MQTT Broker:实现MQTT的代理软件
主题:Broker中一个普通字符串,用于对消息的分类
EXMQ
一款大规模分布式物联网 MQTT 消息中间件,是常用的 Broker 实现之一。
默认用户/密码:admin / public
可在linux下docker搭建
MQTTX
客户端软件
2. MQTT控制报文
报文是 MQTT 协议在网络中交换和传输的最小数据块。
常见报文
报文格式
固定报头 + 可变报头 + 有效载荷
| 组成部分 | 是否必须 | 内容 |
|---|---|---|
| 固定报头 | 必不可少 | 报文类型、标志位、长度 |
| 可变报头 | 可选 | 报文特定信息,如 Packet Identifier、主题名等 |
| 有效荷载 | 可选 | 报文携带的实际数据,如 PUBLISH 报文的消息体 |
固定报头
报文类型(4 bit)
占4个bit位,无符号整数。
标识位(4 bit)
- Bit 3: DUP,表示当前PUBLISH报文是否是重传。
- Bit 2,1: QoS,表示当前PUBLISH报文使用的服务质量等级。
- Bit 0:Retain,表示当前PUBLISH报文是否是保留消息。
剩余长度
MQTT报文长度 = 固定报头长度 + 剩余长度
剩余长度 = 可变报头长度 + 有效载荷长度
可变报头
内容取决于具体报文类型
举例:
CONNECT报文: 协议名+级别+连接标识+keep alive + 属性
PUBLISH报文:主题名 + 报文标识 + 属性
属性
以属性长度 和 属性内容 的 Key-Value 形式出现,提供了协议的扩展性。
- 不同报文的属性取值不同,用于传递元数据,如会话过期时间、消息过期间隔、用户属性等。
有效载荷
用于实现对应报文的核心功能
举例:
PUBLISH报文:Payload用于承载具体的应用消息内容
SUBSCRIBE报文:Payload包含需要订阅的主题、对应订阅的选项
3. QoS
MQTT 底层使用 TCP 协议,但 TCP 只能保证数据流不丢失,不能保证应用程序级别的消息投递,因此引入 QoS 机制。
QoS消息质量等级
| QoS消息质量等级 | 含义 | 可靠性 | 特点 | 工作机制 | 适用场景 |
|---|---|---|---|---|---|
| 0 | 做多发送一次 | 最低。如果网络抖动、客户端掉线或服务器繁忙,消息就会直接丢失 | 开销最小,延迟最低,速度最快 | 发送方把消息发出去就完事了,不等待接收方的确认(ACK),也不会在本地持久化存储这条消息。 | 对数据丢失不敏感的场景。例如:实时传感器数据上报(每秒都在发,丢一两条无所谓)、环境温湿度监控、在线状态心跳包 |
| 1 | 至少发送一次 | 中等。能保证消息绝对不会丢,但可能会重复 | 在可靠性和性能之间取得了较好的平衡 | 发送方发送消息后,必须等待接收方返回PUBACK确认。如果超时没收到确认,发送方会重新发送该消息 | 允许少量重复,但绝不能丢数据的场景。例如:物联网设备状态变更通知、普通的告警推送、非关键性的业务日志 |
| 2 | 恰好一次发送 | 不会丢失、不会重复 | 网络开销最大,延迟最高,对服务器和客户端的性能要求也最高 | 采用四次握手机制(PUBLISH → PUBREC → PUBREL → PUBCOMP)。不仅保证消息不丢,还通过状态机确保消息绝不重复 | 对数据准确性和唯一性要求极其严格的场景。例如:金融交易结算、计费系统扣费、关键控制指令下发、订单状态流转 |
在发送消息的时候,可以指定消息质量等级,一般broker获取到消息后,会按照发送时指定的质量等级发送给订阅者。特殊情况(订阅者指定了订阅消息的最大质量等级),消息等级可能发生改变。
4. 主题
本质是一个UTF-8编码的字符串,类似URL路径,使用“/”分层,不建议用“/”作为开头和结尾
test/10/temperature不需要提前创建主题,MQTT客户端在订阅、发布消息时自动创建
主题通配符
| 通配符 | 使用 | 示例 | 匹配 |
|---|---|---|---|
+ | 单个主题层级匹配的通配符 | a/+/c | 匹配a/b/c,不匹配a/b/d/c 或 a//c。 |
# | 匹配零个或多个层级,必须放在主题过滤器的最后 | a/b/# | 匹配a/b/c、a/b/d/e、a/b |
系统主题
以$SYS/开头的主题,用于获取MQTT服务器自身运行状态、消息统计、客户端上下线事件等数据。
- 特点: 默认不允许远程客户端订阅(取决于 Broker 配置),通常用于监控 Broker 性能。
- 用途:获取集群状态、客户端连接/离线事件(如
$SYS/brokers/+/clients/+/connected)等。
详细使用见emqx文档
5. 会话
客户端与服务器之间的连接,每个客户端可以启动一个或多个会话,实现和服务器的消息传递。
常见配置
| 参数 | 描述 |
|---|---|
| Clear Start | 决定是否复用旧会话 |
| Clean Start = 0 | 尝试复用旧会话(基于 Client ID)。如果旧会话存在,则恢复其订阅和未完成的 QoS 消息。如果不存在,则创建新会话 |
| Clean Start = 1 | 创建新会话。Broker 丢弃任何存储的旧会话状态(包括旧订阅和未完成的 QoS 状态) |
| Session Expiry Interval (会话过期时间, MQTT 5.0) | 客户端断开连接后,Broker 存储该会话的时长(秒) |
| Session Expiry Interval 未指定或设置为 0 | 客户端断开连接后,会话立即过期并删除。 |
6. 消息
普通消息:在发送前对应的主题如果不存在订阅者,MQTT服务器会直接丢弃。
保留消息:可保留在MQTT服务器中,任何新的订阅者订阅与保留消息中的主题匹配的主题时,会立即接收到该消息。
保留消息
常见使用场景
- 智能家居设备的状态,只在变更时上报,但控制端需要在上线后就能获取到设备状态
- 传感器上报数据的间隔太长,但是订阅者需要在订阅后立即获取到最新的数据
- 传感器版本号、序列号等不会经常变更的属性,在上线后发布一条保留消息告知后续的所有订阅者
特点
- 每一个主题最多只有一条保留消息(即最后一条带有 Retain 标志的消息)。
- 客户端在建立新的订阅时,Broker 会检查该主题是否有保留消息,并立即发送给客户端。
- 必须在保留消息发布之后,客户端再订阅该主题才能收到。如果在保留消息发布前就已订阅,收到的只是后续的普通消息。
删除保留消息
- 发送空内容的保留消息: 发布一个 空载荷 且
Retain = True的消息到该主题。 - 设置过期时间: 在发布保留消息时设置
Message Expiry Interval,使其在达到时间后自动失效。
保留消息独立于会话。 即使使用Clean Start = 1创建新会话,只要你重新订阅了该主题,Broker 依然会发送保留消息。
消息过期间隔
通过Session Expiry Interval为离线客户端缓存未发送的消息,然后在客户端恢复连接时发送;但如果客户端离线时间较长,可能有一些寿命较短的寿命已经没有必要发送给客户端了,避免浪费网络带宽和客户端资源。
默认情况下,消息中不包含消息过期间隔,表示永不过期。
遗嘱消息
客户端在连接时在服务端中注册一个遗嘱消息,当客户端意外断开连接,服务端就会向其他订阅了相应主题的客户端发送此遗嘱消息,接收者可以及时采取行动,例如向用户发送通知、切换备用设备等。
遗嘱消息的设置发生在客户端发送CONNECT 报文时。
触发: 客户端意外断开连接(如Keep Alive 超时、网络错误)。Broker 会发布遗嘱消息。
不触发: 客户端正常关闭(发送DISCONNECT报文),或 Broker 正常关闭且客户端没有持久会话需要保留。
| 字段名称 | 作用说明 | 是否必需 |
|---|---|---|
| Will Flag (遗嘱标志) | 设置为 1,表示客户端想要设置一个遗嘱消息。 | 是 (开启功能) |
| Will Topic (遗嘱主题) | 指定当客户端非正常断开连接时,Broker 应该将遗嘱消息发布到哪个主题上 | 是 |
| Will Payload (遗嘱载荷) | 指定遗嘱消息的具体内容(数据) | 是 |
| Will QoS (遗嘱 QoS 等级) | 指定 Broker 发布遗嘱消息时应使用的 QoS 等级 | 是 |
| Will Retain (遗嘱保留标志) | 设置为 1,表示 Broker 发布遗嘱消息后,应将其作为保留消息存储 | 可选 |
| Will Delay Interval (遗嘱延迟发送) | 客户端断开连接后,延迟多久发布遗嘱消息(秒),期间重连恢复则不会发布 | 可选 |
延迟发布
MQTT服务器收到发布者的消息后,延迟一段时间后再转发给订阅者
使用场景:
定时发送提醒、在特定时间点发布调度任务
主题格式:$delay/{DelayInterval}/{TopicName}
$delay/是前缀。{DelayInterval}:延迟时间间隔,单位为秒。{TopicName}:消息最终要发布的实际主题。
用户属性
允许在Publish、Subscribe、Connect、Disconnect等报文中携带附加信息,类似于http协议请求头。
应用场景:
日志记录:在发布和订阅报文中加入用户属性,帮助记录操作者信息、操作时间、原因说明等
消息分类标记:用来给消息添加标签、分类,如消息类型等,便于接收方进行过滤、排序、处理。
7. 订阅
订阅选项
订阅组成:
- 主题过滤器:决定服务端将要向我们转发哪些主题下的消息
- 订阅选项:是允许我们进一步定制服务端的转发行为(QoS、No Local、Retain As Published、Retain Handling)
QoS
表示服务端在向订阅端发送消息时可以使用的最大QoS等级情况1:服务端支持最大QoS < 客户端订阅时请求的最大QoS
情况2:订阅时请求的最大QoS < 发布时的QoS
No Local
取值:
0(默认值):服务端可以将消息转发给发布这个消息的客户端
1:不可以
常用于桥接场景,桥接本质是两个MQTT Server建立一个MQTT连接,相互订阅一些主题,Server将客户端消息转发给另一个Server,另一个Server则将消息继续转发给他的客户端。没有设置为1则会在Server之间来回转发,导致转发风暴
Retain As Published
取值:
0(默认值): 服务端在向订阅者转发消息时,需要清除Retain标识
1: 服务端在向订阅者转发消息时,需要保持Retain标识不变
用于桥接,相互告知是保留消息,避免保留标识在server间互传时丢失。
Retain Handling
用于向服务端指示当订阅建立时,是否需要发送保留消息
取值:
0(默认值): 只要订阅建立,就发送保留消息
1: 只有建立全新订阅,而不是重复订阅时,发送保留消息
2:订阅建立时不发送保留消息
共享订阅
普通订阅:每发布一条消息,所有匹配的订阅者都会收到消息的副本。当订阅者Subscriber消费速度无法跟上消息Publisher生产速度,达到瓶颈。
共享订阅:使MQTT服务端在使用特定订阅的客户端之间,均衡地分配消息负载。
提高吞吐量、形成高可用
共享订阅分类:
| 前缀格式 | 示例 | 前缀 | 真实主题名称 | 流程 |
|---|---|---|---|---|
| 带群组格式 | $share/abc/t/1 | $share/abc | t/1 | |
| 不带群组格式 | $queue/t/1 | $queue | t/1 |
负载均衡算法:
通过DashBoard进行配置:随机、轮询、哈希、粘性、本地优先
排他订阅
共享订阅的一种特殊形式,用于保证单一处理者和自动故障转移
一个主题同一时刻仅允许一个订阅者存在。$exclusive/t/1,前缀$exclusive,主题:t/1
错误码:0x8F:使用了$exclusive,但未开启排他0x97:已被订阅
自动订阅
在设备连接后,按照规则为其订阅指定主题,不需要额外发起订阅。
MQTT客户端编程注意事项
单片机(MCU)环境下MQTT开发
硬件与网络接入
- Wi-Fi 模块:如 ESP8266、ESP32。通过 UART(串口)发送 AT 指令来建立 TCP 连接。
- 4G 模块:如 SIM800 系列。同
- TCP/IP 协议栈,MCU 只需操作寄存器即可。
软件协议栈实现
在 MCU 上,你通常无法运行完整的 MQTT 库
- 使用嵌入式轻量级 C 库:业界最常用的是 Paho MQTT Embedded C 库。它专门针对资源受限设备优化,仅包含 MQTT 数据包的序列化与反序列化功能,内存占用极低。
- 手动构造报文(裸机开发):如果资源极端受限,开发者需要手动拼接 MQTT 协议的十六进制报文(如 CONNECT、PUBLISH、SUBSCRIBE 报文),并通过串口发送给网络模块。
核心开发步骤
- 配置网络:通过 AT 指令连接 Wi-Fi/4G,并建立到 MQTT Broker(如 EMQ X、阿里云 IoT)的 TCP 连接。
- MQTT 握手与保活:发送 MQTT CONNECT 报文进行身份认证(ClientID、Username、Password),并启动定时器发送 PINGREQ 心跳包以保持连接。
- 业务逻辑:通过定时器或事件触发,调用 PUBLISH 报文上报传感器数据;通过解析接收缓冲区的数据,处理服务端下发的控制指令。
基于Linux芯片环境开发
拥有完整的操作系统、丰富的内存和标准的 TCP/IP 协议栈。开发的核心原则是“调用标准库”和“异步事件驱动”。直接使用客户端库。
以C语音为例步骤:
环境准备:
- 在 Linux 系统中安装
libmosquitto-dev库,或者在交叉编译工具链中配置该库
初始化与连接
- 调用
mosquitto_lib_init()初始化库 - 创建实例
mosquitto_new() - 设置回调函数(如连接成功回调、消息接收回调)
mosquitto_connect_callback_set() - 连接到
Broker mosquitto_connect()
订阅与发布
- 订阅主题:
mosquitto_subscribe() - 发布消息:
mosquitto_publish()
事件循环:
- Linux 下的 MQTT 库是基于异步事件的。你必须调用
mosquitto_loop_start()(开启后台线程处理网络事件)或mosquitto_loop_forever()(阻塞式循环)来维持网络通信
编译与运行:
- 使用 GCC 或交叉编译器(如 aarch64-linux-gnu-gcc)编译代码,链接 -lmosquitto
库,生成可执行文件后在开发板上运行
由于多python更熟悉,开发更敏捷,后续开发过程准备使用如下方式:
Python写MQTT业务逻辑(python库:paho-mqtt) + C语言处理底层驱动提高资源使用率和数据处理性能:由python调用C的方式处理底层内容。