消息推送标准协议:MQTT

简介: 随着物联网(Internet of Things,IoT)的兴起,机器之间(Machine-to-Machine,M2M)的大规模信息沟通成为重要的课堂,之前HTTP的请求/回答(Request/Response)模式不再合适,取而代之的是发布/订阅(Publish/Subscribe)模式。

随着物联网(Internet of Things,IoT)的兴起,机器之间(Machine-to-Machine,M2M)的大规模信息沟通成为重要的课堂,之前HTTP的请求/回答(Request/Response)模式不再合适,取而代之的是发布/订阅(Publish/Subscribe)模式。这就是轻量级、可扩展的MQTT(Message Queuing Telemetry Transport)可以施展拳脚的舞台。

1. MQTT与IoT

MQTT是基于二进制消息的发布/订阅编程模式的消息协议,最早由IBM提出的,如今已经成为OASIS规范。由于规范很简单,非常适合需要低功耗和网络带宽有限的IoT场景。其主要特点包括:

  • 轻量级的 machine-to-machine 通信协议;
  • publish/subscribe模式;
  • 基于TCP/IP;
  • 支持质量等级QoS;
  • 适合于低带宽、不可靠连接、嵌入式设备、CPU内存资源紧张。

运用MQTT协议,设备可以很方便地连接到物联网云服务,管理设备并处理数据,最后应用到各种业务场景中。同时MQTT也是一种比较不错的Android消息推送方案,FacebookMessenger就是采用了MQTT。可以说MQTT是物联网中最有潜力的网络协议之一。


MQTT与物联网

若初次接触MQTT协议,可先理解以下概念:

  • MQTT协议特点——相比于RESTful架构的物联网系统,MQTT协议借助消息推送功能,可以更好地实现远程控制。
  • MQTT协议角色——在RESTful架构的物联网系统,包含两个角色客户端和服务器端,而在MQTT协议中包括发布者代理器(服务器)和订阅者
  • MQTT协议消息——MQTT中的消息可理解为发布者和订阅者交换的内容(负载),这些消息包含具体的内容,可以被订阅者使用。
  • MQTT协议主题——MQTT中的主题可理解为相同类型或相似类型的消息集合。

2. 发布/订阅模式

请求/回答这种同步模式不同,发布/定义模式解耦了发布消息的客户(发布者)与订阅消息的客户(订阅者)之间的关系,这意味着发布者和订阅者之间并不需要直接建立联系。

打个比方,你打电话给朋友,一直要等到朋友接电话了才能够开始交流,是一个典型的同步请求/回答的场景;而给一个好友邮件列表发电子邮件就不一样,你发好电子邮件该干嘛干嘛,好友们到有空了去查看邮件就是了,是一个典型的异步发布/订阅的场景。

换一种类比,请求/回答模式是一种同步模式,请求方会一直等待应答方的回复;而发布/订阅模式是一种异步的模式,
这种设计模式的好处为:

  • 发布者与订阅者不比了解彼此,只要认识同一个消息代理即可;
  • 发布者和订阅者不需要交互,发布者无需等待订阅者确认而导致锁定;
  • 发布者和订阅者不需要同时在线,可以自由选择时间来消费消息;

3. 主题

MQTT是通过主题(Topics)对消息进行分类的,本质上就是一个UTF-8的字符串,不过可以通过反斜杠表示多个层级关系。主题并不需要创建,直接使用即可。

主题还可以通过通配符进行过滤,关于Topic通配符:

  • /:用来表示层次,比如a/b,a/b/c;
  • :表示匹配>=0个层次,比如a/#就匹配a/,a/b,a/b/c;

    • 单独的一个#表示匹配所有。
    • 不允许 a#和a/#/c。
  • +:表示匹配一个层次,例如a/+匹配a/b,a/c,不匹配a/b/c。
    • 单独的一个+是允许的,a+不允许,a/+/b不允许

注意,MQTT允许使用通配符订阅主题,但是并不允许使用通配符广播。

4. 服务质量QoS

QoS

为了满足不同的场景,MQTT支持三种不同级别的服务质量(Quality of Service,QoS)为不同场景提供消息可靠性:

  • 级别0:尽力而为。消息发送者会想尽办法发送消息,但是遇到意外并不会重试。
  • 级别1:至少一次。消息接收者如果没有知会或者知会本身丢失,消息发送者会再次发送以保证消息接收者至少会收到一次,当然可能造成重复消息。
  • 级别2:恰好一次。保证这种语义肯待会减少并发或者增加延时,不过丢失或者重复消息是不可接受的时候,级别2是最合适的。

用户可以根据消息的重要性选择不同的质量级别。

5. 消息体和消息类型

MQTT消息头

MQTT的固定头部,使用两个字节,共16位。

其中4-7Bit为消息类型,使用4位二进制表示,可代表16种消息类型:


消息类型

除去0和15位置属于保留待用,共14种消息事件类型。

  • CONNECT

    • TCP连接建立完毕后,Client向Server发出一个Request;
    • 如果一段时间内接收不到Server的Response,则关闭socket,重新建立一个session连接。
    • 如果一个ClientID已经与服务器连接,则持有同样ClientID的旧有连接必须由服务器关闭后,新建立才能建立。
  • CONNACK:Server发出CONNECT消息的Response:

    • 0x00 Connection Accepted
    • 0x01 Connection Refused: unacceptable protocol version
    • 0x02 Connection Refused: identifier rejected
    • 0x03 Connection Refused: server unavailable
    • 0x04 Connection Refused: bad user name or password
    • 0x05 Connection Refused: not authorized
  • PUBLISH : 发布消息

    • Client/Servier均可以进行PUBLISH。
    • publish message 应该包含一个 TopicName(Subject/Channel),即订阅关键词。
  • PUBACK: QoS=1时,用于发布消息后的确认

    • QoS=1时,Server向发布者Client该确认收到消息(Client收到确认后删除消息),订阅者向Server发布确认收到消息。
  • PUBREC / PUBREL / PUBCOMP
    QoS=2时:

  1. Server->Client发布PUBREC(已收到);
  2. Client->Server发布PUBREL(已释放);
  3. Server->Client发布PUBCOMP(已完成),Client删除msg;
    订阅者也会向Server发布类似过程确认。该过程类似于TCP的三次握手过程。
  • SUBSCRIBE/SUBACK

    • SUBSCRIBE 用于Client向Server发送订阅某个主题的请求;
    • SUBACK 用于Server回复Client,确认已订阅;
  • UNSUBSCRIBE /UNSUBACK

    • UNSUBSCRIBE 用于Client向Server发送取消订阅某个主题的请求;
    • SUBACK 用于Server回复Client,确认已取消订阅;
  • PINGREQ / PINGRES :心跳

    • Client有责任发送KeepAliveTime时长告诉给Server。在一个时长内,发送PINGREQ,Server发送PINGRES确认。
    • Server在1.5个时长内未收到PINGREQ,就断开连接。
    • Client在1个时长内未收到PINGRES,断开连接。
    • 一般来说,时长设置为几个分钟。最大18hours,0表示一直未断开。

DUP flag(打开标志)
保证消息可靠传输,默认为0,只占用一个字节,表示第一次发送。不能用于检测消息重复发送等。只适用于客户端或服务器端尝试重发PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意需要满足以下条件:

  • 当QoS > 0,消息需要回复确认
  • 当值为1时,表示当前消息先前已经被传送过。

QoS(Quality of Service,服务质量)

使用两个二进制表示PUBLISH类型消息:

QoS value bit 2 & bit 1 Description
0 00 至多一次 发完即丢弃
1 01 至少一次 需要确认回复
2 10 只有一次 需要确认回复
3 11 待用,保留位置

RETAIN(保持)

仅针对PUBLISH消息。不同值,不同含义:

  • 1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。
    • 备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。
  • 0:仅仅为当前订阅者推送此消息。

假如服务器收到一个空消息体(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服务器可以删除掉对应的已被持久化的PUBLISH消息。

6. MQTT代理服务

市面上有相当多的高质量MQTT代理,其中Mosquitto是一个开源的轻量级的C实现,其官网地址为: Mosquitto-Server

在Ubuntu系统中可以直接通过以下命令安装:

apt-get install mosquitto
apt-get install mosquitto-clients

关于如何配置和使用Mosquitto请详见官网和参考文献10, 这里不再详细展开。

Moqtuitto性能突出,发送消息快,稳定性高,cpu占用很少,并发比较高。i5-4核CPU,4G内存的服务器,就在能在20s以内发送10w条 QoS-0信息,且CPU使用率不超过20%。具体性能分析请见: MQTT SERVER 性能测试报告

7. Paho客户端实战

可能有的读者很心急,不像自己搭建服务器就像体现MQTT的工作流程。Eclipse提供可一个测试的服务器:iot.eclipse.org:1883。读者朋友们可以使用MQTT协议的官方客户端Paho-Client来直接连接使用。

示例代码如下:

package srx.awesome.code.mqtt.client;

import org.eclipse.paho.client.mqttv3.*;

public class PahoTest {
    //关注的主题
    private static String topic        = "MQTT Examples";//
    //发送的内容
    private static String content      = "Hello MQTT!!!!!";
    //质量等级
    private static int qos             = 2;
    //MQTT服务地址
    private static String broker       = "tcp://iot.eclipse.org:1883";
    //客户端ID
    private static String clientId     = "JavaSample";
    //用户名
    private static String userName     = "admin";
    //密码
    private static String passWord     = "password";

    @SuppressWarnings("finally")
    public static void main(String[] args) {
        try {
            //创建客户端
            MqttClient sampleClient = new MqttClient(broker, clientId, null);
            //配置回调函数
            sampleClient.setCallback(new MyMqttCallback());

            //创建连接选择
            MqttConnectOptions connOpts = getMqttConnectOptions(userName, passWord);
            System.out.println("Connecting to broker: "+broker);
            //创建服务连接
            sampleClient.connect(connOpts);
            System.out.println("Connected");
            //关注主题,质量等级为2
            sampleClient.subscribe(topic, qos);

            //在另一个线程中发送消息
            Thread thread = new Thread(() -> {
                try {
                    publishMsg(topic, content, qos, sampleClient);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            });
            thread.start();

            thread.join();
            //断开服务连接
            sampleClient.disconnect();
            System.out.println("Disconnected");
        } catch(MqttException me) {
            System.out.println("reason "+me.getReasonCode());
            System.out.println("msg "+me.getMessage());
            System.out.println("loc "+me.getLocalizedMessage());
            System.out.println("cause "+me.getCause());
            System.out.println("excep "+me);
            me.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.exit(0);
        }
    }

    private static void publishMsg(String topic, String content, int qos, MqttClient sampleClient) throws MqttException {
        //循环发送10次消息
        for (int times =0 ;times<10; times++) {
            System.out.println(String.format("%d time Publishing message: %s", times, content));
            //创建消息内容
            MqttMessage message = new MqttMessage(content.getBytes());
            //设置质量级别
            message.setQos(qos);
            //发送消息
            sampleClient.publish(topic, message);
            //System.out.println("Message published");
        }
    }

    private static MqttConnectOptions getMqttConnectOptions(String userName, String passWord) {
        MqttConnectOptions connOpts = new MqttConnectOptions();
        //是否清除Session,如果否,重新连接之后会自动关注之前关注的主题
        connOpts.setCleanSession(true);
        connOpts.setUserName(userName);
        connOpts.setPassword(passWord.toCharArray());
        connOpts.setAutomaticReconnect(true);
        // 设置连接超时时间, 单位为秒,默认30
        connOpts.setConnectionTimeout(30);
        // 设置会话心跳时间,单位为秒,默认20
        connOpts.setKeepAliveInterval(20);
        return connOpts;
    }

}

代码实现的功能很简单:该客户订阅主题"MQTT Examples",然后向这个主题连续10次发送消息,服务代理会把发布在该主题的消息在发给定语该主题的用户,也就是客户端自己。

需要重点说明的事为客户端代理设置回调器(MqttCallback ),下面是作者自定义的回调器。

package srx.awesome.code.mqtt.client;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

class MyMqttCallback implements MqttCallback {
    //端看连接之后被调用
    @Override
    public void connectionLost(Throwable arg0) {
        System.out.println("Connection Lost:"+arg0.getMessage());
    }

    //收到消息后被发送
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws MqttException {
        System.out.println(String.format("get Msg: %s from Topic: %s", mqttMessage, s));
    }

    //消息被送到之后被调用
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            if(iMqttDeliveryToken.isComplete()){
            System.out.println(String.format("Delivery a Msg to Topic: %s",iMqttDeliveryToken.getTopics()[0]));
        }
    }
}

通过自定义回调器,就可以设置消息事件到来和发出后的业务逻辑,将通信和业务处理分离开,是一种解耦和的设计。

运行程序,输入如下:

Connecting to broker: tcp://iot.eclipse.org:1883
Connected
0 time Publishing message: Hello MQTT!!!!!
get Msg: 881.0267578289576 from Topic: MQTT Examples
1 time Publishing message: Hello MQTT!!!!!
Delivery a Msg to Topic: MQTT Examples
get Msg: Hello MQTT!!!!! from Topic: MQTT Examples
2 time Publishing message: Hello MQTT!!!!!
Delivery a Msg to Topic: MQTT Examples
get Msg: Hello MQTT!!!!! from Topic: MQTT Examples
Delivery a Msg to Topic: MQTT Examples
3 time Publishing message: Hello MQTT!!!!!
get Msg: Hello MQTT!!!!! from Topic: MQTT Examples
Delivery a Msg to Topic: MQTT Examples
4 time Publishing message: Hello MQTT!!!!!
get Msg: Hello MQTT!!!!! from Topic: MQTT Examples
Delivery a Msg to Topic: MQTT Examples
5 time Publishing message: Hello MQTT!!!!!
get Msg: Hello MQTT!!!!! from Topic: MQTT Examples
Delivery a Msg to Topic: MQTT Examples
6 time Publishing message: Hello MQTT!!!!!
get Msg: Hello MQTT!!!!! from Topic: MQTT Examples
Delivery a Msg to Topic: MQTT Examples
7 time Publishing message: Hello MQTT!!!!!
get Msg: Hello MQTT!!!!! from Topic: MQTT Examples
Delivery a Msg to Topic: MQTT Examples
8 time Publishing message: Hello MQTT!!!!!
get Msg: Hello MQTT!!!!! from Topic: MQTT Examples
9 time Publishing message: Hello MQTT!!!!!
Delivery a Msg to Topic: MQTT Examples
get Msg: Hello MQTT!!!!! from Topic: MQTT Examples
Delivery a Msg to Topic: MQTT Examples
Disconnected

10次消息发送全部成功,客户端也成功收到自己发送的消息。主要注意的是,由于我们设置QoS=2,需要服务器和客户端之间多次通信,耗费了时间,往往是消息已经被发到了,客户端才确定消息真的被发出了。

示例代码:https://github.com/sunrongxin7666/pahoclient

以上就是MQTT协议的简单介绍,更为复杂的功能期待各位读者探索。

感谢参考文献中列出的文章对于作者的帮助。

参考文献

  1. MQTT学习笔记——MQTT协议体验 Mosquitto安装和使用
  2. MQTT入门篇
  3. MQTT协议简记
  4. Mosquitto-Server
  5. Paho-Client
  6. MQTT SERVER 性能测试报告
  7. MQTT安全篇
  8. Introducing the MQTT Security Fundamentals
  9. MQTT V3.1--我的理解
  10. MQTT学习笔记——MQTT协议体验 Mosquitto安装和使用
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
物联网
mqtt实现消息通知
mqtt实现消息通知
18 0
|
5月前
|
小程序 API 开发工具
小程序消息推送入门 (订阅消息推送)
小程序消息推送入门 (订阅消息推送)
181 0
|
11月前
|
网络协议 搜索推荐 Java
消息推送技术
消息推送技术
148 0
|
小程序 开发工具 开发者
WebSocket长连接接入支付宝消息服务,实现消息通知
支付宝开放平台消息服务提供两种通讯协议来接收消息,一种是基于 HTTPS/HTTP,一种是基于 WebSocket长连接。相比之下,WebSocket长连接有更多的优势,所以一般选择使用WebSocket长连接来接收支付宝服务端发来的消息。
246 0
WebSocket长连接接入支付宝消息服务,实现消息通知
|
前端开发 Java Unix
WebSocket实现消息推送
WebSocket实现消息推送
273 0
WebSocket实现消息推送
|
网络协议 前端开发
WebSocket实现长连接实时消息推送
WebSocket实现长连接实时消息推送
WebSocket实现长连接实时消息推送
EMQ
|
传感器 网络协议 JavaScript
MQTT 协议快速体验
本文将通过讲解与演示向读者展示MQTT协议的入门使用流程,开发者可以通过本文以更简单的方式理解MQTT相关概念,快速开始MQTT服务及应用的开发。
EMQ
326 1
MQTT 协议快速体验
|
消息中间件 网络协议 物联网
浅谈物联网开发最热协议—MQTT协议
浅谈物联网开发最热协议—MQTT协议
447 0
浅谈物联网开发最热协议—MQTT协议
|
网络协议 算法 网络性能优化
我的mqtt协议和emqttd开源项目个人理解(15) - MQTT消息推送协议应用数据包超时是否需要重发?
我的mqtt协议和emqttd开源项目个人理解(15) - MQTT消息推送协议应用数据包超时是否需要重发?
316 0
|
消息中间件 JavaScript 物联网
阿里云物联网平台AMQP服务端订阅NetSDK Demo
服务端可以直接订阅产品下所有类型的消息:设备上报消息、设备状态变化通知、网关发现子设备上报、设备生命周期变更、设备拓扑关系变更。配置服务端订阅后,物联网平台会将产品下所有设备的已订阅类型的消息转发至您的服务端。本文主要演示如果使用NET SDK进行AMQP服务端订阅。
1760 0
阿里云物联网平台AMQP服务端订阅NetSDK Demo