遇到的场景:netty 中初始化 mqtt 服务,添加处理器时,默认使用的是 new MstMqttSocketHandler() 返回处理器。

if (isMqttRequest(msgBody)) {
    pipeline.addLast("mqttDecoder", new MqttDecoder())
            .addLast(MqttEncoder.INSTANCE)
            .addLast("mqttHandler", new MstMqttSocketHandler());
    ctx.fireChannelRead(byteBuf);
    pipeline.remove(this.getClass());
}

在使用 MQTTX 客户端发起 CONNECT 连接请求时报错了:An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.

提示在 pipeline 尾部触发了异常,结合 Wizzer / MqttWk 中 mqtt 协议处理代码,末尾的两句似乎有问题:

ctx.fireChannelRead(byteBuf);
pipeline.remove(this.getClass());

尝试注释掉两行再发送连接,没有报错,但此时连接没有连接成功。通过增加日志发现请求消息并没有经过 MstMqttSocketHandler 处理器。网上查到 fireChannelRead 用法,后面还有一个心跳的处理器,这个操作似乎是必要的。其他协议(tcp)之类的接入似乎没有发生这种情况,所以这两句不能去掉。

观察原有代码 tcp 协议的处理发现了几行比较有意思的内容:

if (protocolProcess == null) {
   protocolProcess = MyContext.getContext().getBean(ProtocolProcess.class);
}

本来 protocolProcess 就是通过 @Autowired 注解注入的对象,怎么会有有一个为空从容器中获取的操作呢?尝试按照相同的方式添加类似的代码,果然再次连接没有报错了。这意味着 @Autowired 注入失败了。

查找 @Autowired 注入失败的情况,找到以下符合现状的描述:

A 类包含以 @Autowired 方式注入的属性 B,如果 A 类通过 new 实例对象则属性 B 会注入失败

所以 A 类的使用需要通过从容器中获取,即 new MstMqttSocketHandler() 应该改成 MyContext.getContext().getBean(MstMqttSocketHandler.class),这样就好了。

后续遇到说 MstMqttSocketHandler 不能 share,添加 @ChannelHandler.Sharable 注解即可。