Springboot2 物联网服务对接 MQTT 协议问题归纳
接触一个新的知识点首先遇到困难就是各种名词,不熟悉,不理解,让人头大。当我遇到 MQTT 协议,第一冒出来的问题是 RabbitMQ 与这个 MQTT 有什么关系?原项目中使用了 RabbitMQ 来处理消息,而物联网服务根本也是处理各种消息。尤其是当我百度这两者之间的关系时,更矛盾了。甚至有文章提供 rabbitmq 使用 mqtt 协议 的教程。
网上摘取一些区别解释:
MQTT 是一个非常简洁的二进制消息通信协议,类似的协议有 AMQP
RabbitMQ 是一个 erlang 实现的 AMQP 协议 0.9x 版本的消息服务器实现Mqtt 是应用层协议,rabbitmq 是基于 amqp 协议开发的消息代理。
要对比,应该是 mqtt vs amqp,或者是 emqx vs rabbitmq。mqtt 是协议,rabbitmq 是 broker(消息服务器),是基于 amqp 协议的,但也有 mqtt 插件,就是说也支持 mqtt 协议
RabbitMQ 就是一种消息队列的实现,可以简单理解为生产者/消费者模式,生产商将生产的商品放在商店(消息队列),消费者有需求就来商店买,商店(消息队列)实现两者间的异步和解耦。
MQTT 全称(Message Queue Telemetry Transport):一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网(Internet of Thing)中的一个标准传输协议。
经过一段时间的头脑风暴和消化,大致理解两者区别与用途。原项目中使用 RabbitMQ 主要是处理日志,需要使用 docker 安装 RabbitMQ 服务。而需要接进来的 MQTT 协议是使用 Netty 来实现的,使用 mqtt 协议的解码器和编码器,再添加一个处理器就可以了。此时的物联网服务充当是 broker 角色,对标应用中的 emqx 服务器。emqx 服务也可以通过 docker 进行安装,可以配合 MQTT X 或者 MQTT.fx 等客户端工具配合测试使用。
docker 使用 EMQX 服务需要注意这些端口都需要做映射,其中第一个 18083 对应了 emqx 的 http 访问服务:localhost:18083
,可以配置 emqx 的一些参数。
介绍完协议和测试工具,下面就是使用 netty 实现 MQTT 协议了。
因为原来对接的都是 TCP 协议,根据头部字节标识判断对接的哪家厂商协议。之前有个判断协议类型的:取前 8 位字符判断是否包含 MQTT:
String substring = msgBody.substring(0, 8);
return msgBody.contains("MQTT");
到了对接的时候发现,总是报错无效的消息格式,也就是没有匹配到。直接把字节码数组输出来,没有发现 MQTT,倒是有一个 MQIsdp,其余都是些乱码之类的。这个协议问题厂家技术也没搞明白,后来是涉及到连接时 Client ID 的长度被限制报错,百度才了解到 MQTT 也是存在不同的协议版本的。
客户端ID可以很长。在MQTT 3.1版,每个客户ID不能超过23 字节,这一点非常不方便,并能导致许多麻烦,例如客户端标识符要使用UUID的场景。在3.1.1版,Broker可以使用 65535 字节的客户端ID。
其他低级别的更改。在CONNECT 的 Header 报头中,协议名从 MQIsdp 改为 MQTT
从 netty 涉及到 mqtt 协议的各种类里查找,只找到一个类中存在枚举了 3.1 和 3.1.1 两个版本号,没有配置的地方无法确定是否服务一定支持 3.1.1。但在 MQTT X 连接编辑的高级选项中是有协议版本号的选择的:
经测试发现,netty 启动的项目服务是支持 3.1.1 甚至是 5.0 版本的,所以设备出现连接时 Client ID 的长度被限制报错,或者协议名称不能匹配上,这都是因为设备使用了低版本 3.1 的 MQTT 协议。在匹配时增加对 3.1 版本的兼容,并让客户端在使用 3.1 时调整 Client ID 长度:
return msgBody.contains("MQTT") || msgBody.contains("MQIsdp");
注意:这里的 msgBody 是直接通过二进制数据转化的字符串,并不是 MQTT 协议解码后的 payload 数据承载部分。
主要协议处理部分参考 Wizzer / MqttWk - gitee 项目,包括 MQTT服务连接时用户名和密码认证
,完整的QoS服务质量等级实现
,主题过滤(支持单主题订阅)
部分。
MQTT 服务连接时(CONNECT)的用户名和密码认证是自己增加设置的,而非协议自带的内容。
String username = msg.payload().userName();
String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8);
log.info("auth username: " + username + ", password: " + password);
如果验证不通过,或者设备 uuid 未入库,就直接返回消息、关闭通道。
客户端连接成功后会发起订阅(SUBSCRIBE),服务端需要把客户端提交的订阅存储起来(参考 MqttWk 项目),在接收发布消息(PUBLISH)和发送发布消息时,通过主题过滤对接收和发送做控制,只有客户端已经订阅的主题才会接收发布消息,或者才可以发送发布消息。设备与服务器大部分通讯内容主要就是通过接收和发送发布消息来实现的。
当然要结合实际厂家的协议实现情况做适配,比如这次对接的设备在心跳、上报、下发三类操作上分别关联了 heart、update、get 主题,而设备连接时并没有订阅 get 下发主题,这意味着需要在发送发布消息处去除主题过滤部分,不然指令无法下发到设备。
Qos 服务指令等级的实现是协议的重要部分,这主要也是在接收和发送发布消息的时候(PUBLISH)。Qos 主要分为三个等级:
public class QOS {
//最多一次,Sender只发送一次消息,Receiver收不到就算了
//这个方案的缺点是,Receiver有可能收不到消息
public static final int AT_MOST_ONCE = 0;
//至少一次,Sender发送多次消息,直到Receiver回复确认消息
//如果Receiver的回复丢失了,则Sender会重复发送消息给Receiver
//这个方案的缺点是,Receiver可能收到重复消息,如果Receiver的回复丢失的话
public static final int AT_LEAST_ONCE = 1;
//只有一次,Sender和Receiver进行双向确认,确认消息被收到且不重复
//性能开销更大,但可以保证准确无误
//在调试代码时,由于加了断点,MQTT的正常工作流程会被阻塞,这时有可能会导致重复收到消息,是正常现象
public static final int EXACTLY_ONCE = 2;
}
如果 Qos 是 0,那么恭喜甚至不需要给客户端发送发布消息,直接处理消息完毕就好;
如果 Qos 是 1,此时也可以选择不发送发布消息,但得发送确认消息(PUBACK),客户端收不到确认会重复发送发布消息直到超时超次;
如果 Qos 是 2,那么就需要双方都确认,此时服务端也需要发送确认消息(PUBREC),之后还有释放(PUBREL)、完成(PUBCOMP)依次对上一次消息进行回复。
幸运的是设备使用了 Qos.AT_LEAST_ONCE
,只需要对每条上报消息发送确认消息即可。如果对接过程中出现一次下发、多次重复上报的情况,大概率是使用了 Qos.AT_LEAST_ONCE
且服务端没有给到确认消息。
通过对 MqttMessage 解析,主要内容分成三个部分:fixedHeader
(固定头)、variableHeader
(可变头)还有 payload
(有效负载数据)。固定头中有一个 messageType
参数来标识上报消息的类型(CONNECT、PUBLISH、SUBSCRIBE 等)。可变头没有用到,猜测是放一些自定义的参数。这次主要的通讯内容放在了有效负载数据中,上报的是二级制编码,通过解析报文通讯格式,得到对应消息的上报内容。一般下发指令会与上报消息使用同一个通讯格式,即需要对下发指令做格式封装,但这次是例外。所以一切不确定内容都要跟对方沟通清楚,文档里写的要确认,没写的更要仔细询问清楚。最好要有 demo,毕竟沟通是有成本的。
关于心跳维持,MQTT 协议本身是有做的,对应上报和下发类型为 PINGREQ 和 PINGRESP。主体报文部分也可以自定义心跳类型,注意,心跳维护通常都是客户端主动发起,一般不建议服务端维护。
自此对接工作基本完成。netty 框架可以对标 php 中使用过的 wokerman 框架,都支持 websocket、tcp 等多种协议通讯,尤其是异步通讯。对接完之后好像什么都会了一点,又好像对什么都不太了解,需要后续继续加深学习。
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。