基于开源Java MQTT Client的阿里云物联网平台RRPC功能测试

简介: MQTT协议是基于PUB/SUB的异步通信模式,不适用于服务端同步控制设备端返回结果的场景。物联网平台基于MQTT协议制定了一套请求和响应的同步机制,无需改动MQTT协议即可实现同步通信。物联网平台提供API给服务端,设备端只需要按照固定的格式回复PUB消息,服务端使用API,即可同步获取设备端的响应结果。

概述

MQTT协议是基于PUB/SUB的异步通信模式,不适用于服务端同步控制设备端返回结果的场景。物联网平台基于MQTT协议制定了一套请求和响应的同步机制,无需改动MQTT协议即可实现同步通信。物联网平台提供API给服务端,设备端只需要按照固定的格式回复PUB消息,服务端使用API,即可同步获取设备端的响应结果。RRPC:Revert-RPC。RPC(Remote Procedure Call)采用客户机/服务器模式,用户不需要了解底层技术协议,即可远程请求服务。RRPC则可以实现由服务端请求设备端并能够使设备端响应的功能。本文主要基于开源Java MQTT Client,分别针对系统Topic和自定义Topic,演示阿里云物联网平台RRPC的实现。

RRPC原理

_

  • 1、物联网平台收到来自用户服务器的RRPC调用,下发一条RRPC请求消息给设备。消息体为用户传入的数据,Topic为物联网平台定义的Topic,其中含有唯一的RRPC消息ID。
  • 2、设备收到下行消息后,按照指定Topic格式(包含之前云端下发的唯一的RRPC消息ID)回复一条RRPC响应消息给云端,云端提取出Topic中的消息ID,和之前的RRPC请求消息匹配上,然后回复给用户服务器。
  • 3、如果调用时设备不在线,云端会给用户服务器返回设备离线的错误;如果设备没有在超时时间内(8秒内)回复RRPC响应消息,云端会给用户服务器返回超时错误。

更多原理介绍可以参考阿里云官方文档


实验测试

准备工作

1、创建产品和设备

参考:阿里云物联网平台Qucik Start 创建产品和设备部分。

2、创建自定义Topic

_

3、开源SDK的使用

参考:基于开源JAVA MQTT Client连接阿里云IoT

4、服务端RRpc API的调用:调用该接口向指定设备发送请求消息,并同步返回响应。

这里我们使用Open API Explorer快速测试调用,RRPC测试地址

_


系统Topic测试

参数:

RRPC调用的系统Topic格式如下:

  • RRPC请求消息Topic:/sys/${YourProductKey}/${YourDeviceName}/rrpc/request/${messageId}
  • RRPC响应消息Topic:/sys/${YourProductKey}/${YourDeviceName}/rrpc/response/${messageId}
  • RRPC订阅Topic:/sys/${YourProductKey}/${YourDeviceName}/rrpc/request/+

1、设备端Code

import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;

public class IoTRRPCForSysTopic {

    // 设备三元组信息
    public static String productKey = "******";
    public static String deviceName = "******";
    public static String deviceSecret = "********";
    public static String regionId = "cn-shanghai";

    // RRPC 系统Topic
    private static String subTopic = "/sys/" + productKey + "/" + deviceName+ "/rrpc/request/+";
    private static MqttClient mqttClient;

    public static void main(String [] args) {

        initAliyunIoTClient();

        // RRPC订阅Topic
        try {
            mqttClient.subscribe(subTopic);
        } catch (MqttException e) {
            e.printStackTrace();
        }

        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("connectionLost:" + cause.getMessage());
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                System.out.println("message: " + new String(message.getPayload()));

                // 根据RRPC请求消息Topic,构建RRPC响应消息Topic
                String responseTopic = topic.replace("request","response");
                MqttMessage message1 = new MqttMessage("resonse demo".getBytes("utf-8"));
                mqttClient.publish(responseTopic,message1);
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
//                System.out.println("IMqttDeliveryToken: " + token);
            }
        });
    }

    /***
     * 初始化Client
     */
    private static void initAliyunIoTClient() {

        try {
            String clientId = "java" + System.currentTimeMillis();

            Map<String, String> params = new HashMap<>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);

            // cn-shanghai
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt." + regionId + ".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }
}

2、服务端下发消息

_

3、设备端订阅情况

_


自定义Topic测试

参数

RRPC调用自定义Topic的格式如下:

  • RRPC请求消息Topic:/ext/rrpc/${messageId}/${topic}
  • RRPC响应消息Topic:/ext/rrpc/${messageId}/${topic}
  • RRPC订阅Topic:/ext/rrpc/+/${topic}

1、设备端Code

import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;

public class IoTRRPCForPersonalTopic {

    // 设备三元组信息
    public static String productKey = "******";
    public static String deviceName = "******";
    public static String deviceSecret = "********";
    public static String regionId = "cn-shanghai";

    // RRPC 自定义Topic
    private static String personalTopic = "/" + productKey + "/" + deviceName + "/user/rrpcdemo";
    private static String subTopic = "/ext/rrpc/+" + personalTopic;
    private static MqttClient mqttClient;

    public static void main(String [] args){

        initAliyunIoTClient();

        try {
            mqttClient.subscribe(subTopic);
        } catch (MqttException e) {
            e.printStackTrace();

            System.out.println(e.getMessage());
        }

        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("connectionLost");
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                System.out.println("topic: " + topic);
                System.out.println("message: " + new String(message.getPayload()));

                // 设置响应 Topic 及 message
                MqttMessage msg = new MqttMessage("demo".getBytes());
                mqttClient.publish(topic,msg);
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
//                System.out.println("IMqttDeliveryToken: " + token);
            }
        });
    }

    /***
     * 初始化Client
     */
    private static void initAliyunIoTClient() {

        try {
            String clientId = "java" + System.currentTimeMillis();

            Map<String, String> params = new HashMap<>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);

            // cn-shanghai
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            // 从云端下发自定义格式Topic的RRPC调用命令到设备端时,设备端必须在进行MQTT CONNECT协议设置时,在clientId中增加ext=1参数。
            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + ",ext=1|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }
}

注意

从云端下发自定义格式Topic的RRPC调用命令到设备端时,设备端必须在进行MQTT CONNECT协议设置时,在clientId中增加ext=1参数。

2、服务端下发消息

_

3、设备端订阅情况

_


参考链接

调用系统Topic
调用自定义Topic

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
相关文章
|
2月前
|
监控 安全 物联网
Java基于物联网技术的智慧工地解决方案源代码
应用先进的大数据、物联网、云计算等数字化技术,融合施工运营管理规范和技术标准,建构支撑施工和运营的一体化平台是投资、施工和运营单位能力建设的关键。应用企业架构、设计思维和软件工程方法,深入分析施工和运营技术特性与管理体系,研究开发基于大数据技术的智慧工地信息一体化平台,智慧工地管理平台是依托物联网、互联网建立的大数据管理平台,是一种全新的管理模式,能够实现劳务管理、安全施工、绿色施工的智能化和互联网化。
79 2
EMQ
|
存储 网络协议 IDE
如何在 Java 中使用 MQTT
本文主要介绍如何在Java项目中使用MQTT,实现MQTT客户端与服务器的连接、订阅和收发消息等功能。
EMQ
1168 0
如何在 Java 中使用 MQTT
|
1月前
|
Java Maven Android开发
java如何连接mqtt
java如何连接mqtt
36 0
|
1月前
|
存储 监控 安全
Java基于物联网技术的智慧工地云管理平台源码 依托丰富的设备接口标准库,快速接入工地现场各类型设备
围绕施工安全、质量管理主线,通过物联感知设备全周期、全覆盖实时监测,将管理动作前置,实现从事后被动补救到事前主动预防的转变。例如塔吊运行监测,超重预警,升降机、高支模等机械设备危险监控等,通过安全关键指标设定,全面掌握现场安全情况,防患于未然。
147 5
|
1月前
|
小程序 Java 数据安全/隐私保护
Java中心校智慧校园智慧班牌物联网平台源码
班牌首页用于展示班级风采信息,所以,班牌其他页面没有操作的情况下,2分钟后会自动回到首页展示。
58 5
|
4月前
|
人工智能 数据可视化 安全
Java带可视化数据大屏的物联网智慧工地系统源码
通过现场AI智能视频监控、临时设施动态管理,实时检测场地空间、资源、设施的运行状况,及时发现场地安全隐患,确保为工人营造一个安全、文明的场地作业环境。
60 0
|
4月前
|
传感器 小程序 物联网
JAVA基于物联网技术的智慧校园电子班牌原生微信小程序源码
系统架构:Java+vue2+springboot+MySQL+ elmentui+Quartz+jpa+jwt
70 0
|
4月前
|
数据采集 监控 Java
工业物联网的java应用
工业物联网的java应用
|
4月前
|
人工智能 监控 物联网
基于物联网、大数据、云计算、人工智能等技术的智慧工地源码(Java+Spring Cloud +UniApp +MySql)
基于物联网、大数据、云计算、人工智能等技术的智慧工地源码(Java+Spring Cloud +UniApp +MySql)
631 0
|
3月前
|
人工智能 监控 安全
【物联网+JAVA 】智慧工地源码
【物联网+JAVA 】智慧工地源码
53 0

相关产品

  • 物联网平台