阿里云物联网平台自定义Topic脚本解析功能演示

简介: 之前物联网平台自定义Topic均做消息的直接透传,不做类似物模型模式的数据脚本解析。平台最新推出的自定义Topic脚本解析功能,设备通过携带解析标记(?_sn=default)的自定义Topic上报数据,物联网平台收到数据后,调用您在控制台提交的数据解析脚本,将自定义格式数据转换为JSON结构体,再流转给后续业务系统。

概述

之前物联网平台自定义Topic均做消息的直接透传,不做类似物模型模式的数据脚本解析。平台最新推出的自定义Topic脚本解析功能,设备通过携带解析标记(?_sn=default)的自定义Topic上报数据,物联网平台收到数据后,调用您在控制台提交的数据解析脚本,将自定义格式数据转换为JSON结构体,再流转给后续业务系统。本文主要演示该功能的具体功能实现。

Step By Step

1、创建产品和设备

_

_

2、添加脚本

_

示例脚本

3、设备端通过自定义Topic模拟上行数据

import com.alibaba.taro.AliyunIoTSignUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

// 透传类设备测试
public class IoTDemoPubSubDemoForPersonalTopic {

    // 设备三元组信息
    public static String productKey = "a1wJG******";
    public static String deviceName = "Device1";
    public static String deviceSecret = "40YEyiGzXmvhDdpvbUVFCHjC********";
    public static String regionId = "cn-shanghai";

    private static String pubTopic = "/"+ productKey + "/" + deviceName  + "/user/update?_sn=default";//自定义Topic做脚本解析
    private static MqttClient mqttClient;

    public static void main(String [] args){

        initAliyunIoTClient(); // 初始化Client
        ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
                new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());

        scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
    }

    /**
     * 初始化 Client 对象
     */
    private static void initAliyunIoTClient() {

        try {
            // 构造连接需要的参数
            String clientId = "java" + System.currentTimeMillis();
            Map<String, String> params = new HashMap<>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);
            // cn-shanghai
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }

    /**
     * 汇报属性
     */
    private static void postDeviceProperties() {

        try {
            //上报数据
            System.out.println("自定义Topic上报属性值");
            //0x000000000100320100000000
            String hexString = "000000000100320100000000";
            byte[] payLoad = hexToByteArray(hexString);
            MqttMessage message = new MqttMessage(payLoad);
            message.setQos(0);
            mqttClient.publish(pubTopic, message);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    /**
     * hex字符串转byte数组
     * @param inHex 待转换的Hex字符串
     * @return  转换后的byte数组结果
     */
    public static byte[] hexToByteArray(String inHex){
        int hexlen = inHex.length();
        byte[] result;
        if (hexlen % 2 == 1){
            //奇数
            hexlen++;
            result = new byte[(hexlen/2)];
            inHex="0"+inHex;
        }else {
            //偶数
            result = new byte[(hexlen/2)];
        }
        int j=0;
        for (int i = 0; i < hexlen; i+=2){
            result[j]=hexToByte(inHex.substring(i,i+2));
            j++;
        }
        return result;
    }

    /**
     * Hex字符串转byte
     * @param inHex 待转换的Hex字符串
     * @return  转换后的byte
     */
    public static byte hexToByte(String inHex) {
        return (byte) Integer.parseInt(inHex, 16);
    }
}

参考链接:LoRaWAN设备数据解析及开源MQTT SDK设备端模拟

4、上行消息查看

_

_

5、注意事项

1、在物联网平台创建自定义Topic时按正常Topic定义,不添加该解析标记;
2、仅解析设备上报云端的数据,不解析云端下行数据;
3、仅解析上报数据的Payload,并返回解析后的Payload;
4、解析前后,数据所在Topic不变。例如,设备发送到/${productKey}/${deviceName}/user/update的数据,解析后仍在该Topic中。

参考链接

自定义数据解析Topic概述

LoRaWAN设备数据解析及开源MQTT SDK设备端模拟

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
相关文章
|
14天前
|
弹性计算 运维 监控
每天解析一个脚本(53)
【4月更文挑战第26天】shell脚本解析及训练(53)
27 5
|
12天前
|
运维 网络协议 安全
Serverless 应用引擎产品使用之阿里云函数计算中添加自定义域名进行域名DNS验证如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
20 1
|
12天前
|
弹性计算 运维 Shell
每天解析一个shell脚本(82)
【4月更文挑战第28天】shell脚本解析及训练(82)
8 1
|
12天前
|
弹性计算 运维 Shell
每天解析一个shell脚本(68)
【4月更文挑战第28天】shell脚本解析及训练(68)
6 0
|
14天前
|
弹性计算 运维 Shell
每天解析一个shell脚本(61)
【4月更文挑战第26天】shell脚本解析及训练(61)
19 3
|
14天前
|
弹性计算 运维 Shell
每天解析一个shell脚本(58)
【4月更文挑战第26天】shell脚本解析及训练(58)
83 0
|
14天前
|
弹性计算 Shell 数据安全/隐私保护
每天解析一个shell脚本(56)
【4月更文挑战第26天】shell脚本解析及训练(56)
18 0
|
14天前
|
弹性计算 运维 监控
每天解析一个脚本(52)
【4月更文挑战第26天】shell脚本解析及训练(52)
28 3
|
14天前
|
运维 监控 Shell
每天解析一个脚本(51)
【4月更文挑战第26天】shell脚本解析及训练(51)
15 4
|
15天前
|
数据采集 安全 API
阿里云大学考试python中级题目及解析-python高级
阿里云大学考试python中级题目及解析-python高级

相关产品

  • 物联网平台
  • 推荐镜像

    更多