基于Pub/Sub模式的阿里云IoT同步调用详解

简介: 1.同步调用场景 1.1 背景 MQTT协议是基于PUB/SUB的异步通信模式,无法实现服务端下发指令给设备端,同时需要设备端返回响应结果的场景。 IoT物联网平台基于MQTT协议制定了一套请求和响应的同步机制,无需改动MQTT协议即可实现同步通信。应用服务器通过POP API发起Rrpc调用,IoT设备端只需要在Timeout内,按照固定的格式回复Pub消息,服务端即可同步获取IoT设备端

1.同步调用场景

1.1 背景

MQTT协议是基于PUB/SUB的异步通信模式,无法实现服务端下发指令给设备端,同时需要设备端返回响应结果的场景。
IoT物联网平台基于MQTT协议制定了一套请求和响应的同步机制,无需改动MQTT协议即可实现同步通信。应用服务器通过POP API发起Rrpc调用,IoT设备端只需要在Timeout内,按照固定的格式回复Pub消息,服务端即可同步获取IoT设备端的响应结果。
 

具体流程如下:

1536135338761-5846db43-86cb-42b4-bdde-6d
 

1.2 Topic格式约定

请求:/sys/${productKey}/${deviceName}/rrpc/request/${messageId}
响应:/sys/${productKey}/${deviceName}/rrpc/response/${messageId}
$表示变量,每个设备不同
messageId为IoT平台生成的消息ID,设备端回复responseTopic里的messageId要与requestTopic一致

示例:

设备端需要订阅:
/sys/${productKey}/${deviceName}/rrpc/request/+
运行中设备收到Topic:
/sys/PK100101/DN213452/rrpc/request/443859344534
收到消息后,在timeout时间内回复Topic:
/sys/PK100101/DN213452/rrpc/response/443859344534
 
 

2.同步调用RRPC示例

2.1 设备端代码

 
 

const mqtt = require('aliyun-iot-mqtt');
//设备属性
const options = require("./iot-device-config.json");
//建立连接
const client = mqtt.getAliyunIotMqttClient(options);

client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)
client.on('message', function(topic, message) {
    
    if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){
    	handleRrpc(topic, message)
    }
})

function handleRrpc(topic, message){
	topic = topic.replace('/request/','/response/');
	console.log("topic=" + topic)
	//普通Rrpc,响应payload自定义
    const payloadJson = {code:200,msg:"handle ok"};
	client.publish(topic, JSON.stringify(payloadJson));
}
 

2.2 服务端POP调用Rrpc

 
 
const co = require('co');
const RPCClient = require('@alicloud/pop-core').RPCClient;

const options = require("./iot-ak-config.json");

//1.初始化client
const client = new RPCClient({
    accessKeyId: options.accessKey,
    secretAccessKey: options.accessKeySecret,
    endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
    apiVersion: '2017-04-20'
});

const payload = {
  "msg": "hello Rrpc"
};

//2.构建request
const params = {
    ProductKey:"a1gMu82K4m2",
    DeviceName:"h5@nuwr5r9hf6l@1532088166923",
    RequestBase64Byte:new Buffer(JSON.stringify(payload)).toString("base64"),
    Timeout:3000
};

co(function*() {
    //3.发起API调用
    const response = yield client.request('Rrpc', params);

    console.log(JSON.stringify(response));
});
 
rrpc响应:
 
 
{
    "MessageId": "1037292594536681472",
    "RequestId": "D2150496-2A61-4499-8B2A-4B3EC4B2A432",
    "PayloadBase64Byte": "eyJjb2RlIjoyMDAsIm1zZyI6ImhhbmRsZSBvayJ9",
    "Success": true,
    "RrpcCode": "SUCCESS"
}

// PayloadBase64Byte 解码: {"code":200,"msg":"handle ok"}
 
 

3.物模型-服务同步调用InvokeThingService示例

注意:物模型 服务调用 接口InvokeThingService,不是Rrpc

3.1 物模型-同步服务定义

1536145308148-48813953-b0b0-4949-b591-1c
 

3.2 设备端实现

 
 
 
const mqtt = require('aliyun-iot-mqtt');
//设备属性
const options = require("./iot-device-config.json");
//建立连接
const client = mqtt.getAliyunIotMqttClient(options);

client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)
client.on('message', function(topic, message) {
    
    if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){
    	handleRrpc(topic, message)
    }
})
/*
* 如果存在多个同步调用服务,需要通过payload里的method区分
*/
function handleRrpc(topic, message){

	topic = topic.replace('/request/','/response/');
	console.log("topic=" + topic)
	//物模型 同步服务调用,响应payload结构:
    const payloadJson = {
        id: Date.now(),
        code:200,
        data: {
            currentMode: Math.floor((Math.random() * 20) + 10)
        }
    }

	client.publish(topic, JSON.stringify(payloadJson));
}
注意:设备端响应的payload要满足物模型定义的出参结构

3.3 服务端POP 接口InvokeThingService

 
 
const co = require('co');
const RPCClient = require('@alicloud/pop-core').RPCClient;

const options = require("./iot-ak-config.json");

//1.初始化client
const client = new RPCClient({
    accessKeyId: options.accessKey,
    secretAccessKey: options.accessKeySecret,
    endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
    apiVersion: '2018-01-20'
});

const params = {
    ProductKey: "a1gMu82K4m2",
    DeviceName: "h5@nuwr5r9hf6l@1532088166923",
    Args: JSON.stringify({ "mode": "1" }),
    Identifier: "thing.service.setMode"
};

co(function*() {
    try {
        //3.发起API调用
        const response = yield client.request('InvokeThingService', params);

        console.log(JSON.stringify(response));
    } catch (err) {
        console.log(err);
    }
});
 
调用结果:
 
 
{
    "Data":{
        "Result": "{\"currentMode\":12}",
        "MessageId": "1536145625658"
    },
    "RequestId": "29FD78CE-D1FF-48F7-B0A7-BD52C142DD7F",
    "Success": true
}
 
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
27天前
|
域名解析 移动开发 负载均衡
阿里云DNS常见问题之DNS负载均衡调加权模式失败如何解决
阿里云DNS(Domain Name System)服务是一个高可用和可扩展的云端DNS服务,用于将域名转换为IP地址,从而让用户能够通过域名访问云端资源。以下是一些关于阿里云DNS服务的常见问题合集:
|
4月前
|
弹性计算 关系型数据库 MySQL
数据传输DTS腾讯云上的mysql同步到阿里云上的mysql可以操作吗?
数据传输DTS腾讯云上的mysql同步到阿里云上的mysql可以操作吗?
238 0
|
7月前
|
canal
阿里云DTS同步binlog实战
阿里云DTS同步binlog实战
|
4月前
|
监控 关系型数据库 MySQL
数据传输DTS腾讯云上的mysql同步到阿里云上的mysql可以操作吗?
数据传输DTS腾讯云上的mysql同步到阿里云上的mysql可以操作吗?
227 0
|
2月前
|
弹性计算
阿里云服务器“带宽计费模式”详细说明_2024固定带宽和流量详解
阿里云服务器“带宽计费模式”详细说明_2024固定带宽和流量详解,按固定带宽是指直接购买多少M带宽,比如1M、5M、10M、100M等,阿里云直接分配用户所购买的带宽值,根据带宽大小先付费再使用;按使用流量是先设置一个带宽峰值,然后根据实际公网产生的出流量来计算费用,先使用后付费
|
2月前
|
弹性计算
阿里云服务器的带宽计费模式是什么意思?咋收费的?
阿里云服务器带宽计费模式分为“按固定带宽”和“按使用流量”,有什么区别?按固定带宽是指直接购买多少M带宽,比如1M、5M、10M、100M等,阿里云直接分配用户所购买的带宽值,根据带宽大小先付费再使用;按使用流量是先设置一个带宽峰值,然后根据实际公网产生的出流量来计算费用,先使用后付费。阿里云百科分享阿里云服务器“带宽计费模式”详细区别、计费及注意事项
|
2月前
|
弹性计算
阿里云带宽计费模式是怎么回事?
阿里云带宽计费模式是怎么回事?带宽计费模式分为“按固定带宽”和“按使用流量”,有什么区别?按固定带宽是指直接购买多少M带宽,比如1M、5M、10M、100M等,阿里云直接分配用户所购买的带宽值,根据带宽大小先付费再使用;按使用流量是先设置一个带宽峰值,然后根据实际公网产生的出流量来计算费用,先使用后付费
|
2月前
|
弹性计算
阿里云带宽计费模式怎么设置?
阿里云带宽计费模式怎么设置?带宽计费模式分为“按固定带宽”和“按使用流量”,有什么区别?按固定带宽是指直接购买多少M带宽,比如1M、5M、10M、100M等,阿里云直接分配用户所购买的带宽值,根据带宽大小先付费再使用;按使用流量是先设置一个带宽峰值,然后根据实际公网产生的出流量来计算费用,先使用后付费。阿里云百科分享阿里云服务器“带宽计费模式”详细区别、计费及注意事项
|
2月前
|
弹性计算
阿里云服务器“带宽计费模式”是什么意思?
阿里云服务器“带宽计费模式”是什么意思?阿里云服务器带宽计费模式分为“按固定带宽”和“按使用流量”,有什么区别?按固定带宽是指直接购买多少M带宽,比如1M、5M、10M、100M等,阿里云直接分配用户所购买的带宽值,根据带宽大小先付费再使用;按使用流量是先设置一个带宽峰值,然后根据实际公网产生的出流量来计算费用,先使用后付费
|
4月前
|
SQL 运维 关系型数据库
阿里云DTS踩坑经验分享系列|如何使用DTS进行MySQL->ClickHouse同步
在使用阿里云DTS 进行MySQL->ClickHouse同步时,从准备工作,到创建任务,再到后期运维处理,新手可能会感到茫然和不知所措。为了帮助新手顺利过渡,本文将介绍使用阿里云DTS在进行MySQL到ClickHouse迁移时的最佳实践以及常见踩坑问题, 我们希望通过这篇文章,让您能无忧使用阿里云DTS进行数据迁移,享受ClickHouse带来的高效数据分析体验。
98311 12
阿里云DTS踩坑经验分享系列|如何使用DTS进行MySQL->ClickHouse同步

热门文章

最新文章