阿里云物联网平台数据转发到函数计算示例

简介: 使用物联网平台规则引擎的数据流转功能,可将Topic中的数据消息转发至其他Topic或其他阿里云产品进行存储或处理。本文主要演示通过规则引擎将设备上行消息流转到函数计算,并通过函数计算发送消息到钉钉机器人。

概述

使用物联网平台规则引擎的数据流转功能,可将Topic中的数据消息转发至其他Topic或其他阿里云产品进行存储或处理。本文主要演示通过规则引擎将设备上行消息流转到函数计算,并通过函数计算发送消息到钉钉机器人。

Step By Step

产品及设备准备

1、创建产品
_

2、定义物模型
_

3、添加设备
_

_

4、使用SDK 上行消息,参考链接:基于开源JAVA MQTT Client连接阿里云IoT

import com.alibaba.taro.AliyunIoTSignUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class IoTDemoPubSubDemo {

    // 设备三元组信息
    public static String productKey = "a16MX********";
    public static String deviceName = "device1";
    public static String deviceSecret = "YGLHxUr40E1JaWhk3IVAm0uk********";
    public static String regionId = "cn-shanghai";

    // 物模型-属性上报topic
    private static String pubTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/property/post";
    // 自定义topic,在产品Topic列表位置定义
    private static String subTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/property/post_reply";

    private static MqttClient mqttClient;

    public static void main(String [] args){

        initAliyunIoTClient();
        ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
                new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());

        scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,5, TimeUnit.SECONDS);

        try {
            mqttClient.subscribe(subTopic); // 订阅Topic
        } catch (MqttException e) {
            System.out.println("error:" + e.getMessage());
            e.printStackTrace();
        }

        // 设置订阅监听
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                System.out.println("connection Lost");

            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                System.out.println("Sub message");
                System.out.println("Topic : " + s);
                System.out.println(new String(mqttMessage.getPayload())); //打印输出消息payLoad
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });

    }

    /**
     * 初始化 Client 对象
     */
    private static void initAliyunIoTClient() {

        try {
            // 构造连接需要的参数
            String clientId = "java" + System.currentTimeMillis();
            Map<String, String> params = new HashMap<>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);
            // cn-shanghai
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
//        connOpts.setCleanSession(true);
        connOpts.setCleanSession(false);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }

    /**
     * 汇报属性
     */
    private static void postDeviceProperties() {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("上报属性值");
            String payloadJson = "{\"params\":{\"CurrentTemperature\":13,\"Humidity\":10}}";
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(1);
            mqttClient.publish(pubTopic, message);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

5、运行状态查看
_


函数计算创建与配置

1、创建应用
_

_

2、应用下面添加函数

_

_

3、编辑脚本

const https = require('https');

const accessToken = '填写accessToken,即钉钉机器人webhook的accessToken';
module.exports.handler = function(event, context, callback) {
var eventJson = JSON.parse(event.toString());
console.log(event.toString());
//钉钉消息格式
const postData = JSON.stringify({
"msgtype": "markdown",
"markdown": {
"title": "设备温湿度传感器",
"text": "#### 温湿度传感器上报\n" +
"> 设备名称:" + eventJson.deviceName+ "\n\n" +
"> 实时温度:" + eventJson.Temperature + "℃\n\n" +
"> 相对湿度:" + eventJson.Humidity + "%\n\n" +
"> ###### " + eventJson.time + " 发布 by [物联网平台](https://www.aliyun.com/product/iot) \n"
},
"at": {
"isAtAll": false
}
});
const options = {
hostname: 'oapi.dingtalk.com',
port: 443,
path: '/robot/send?access_token=' + accessToken,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(postData)
}
};
const req = https.request(options, (res) => {
res.setEncoding('utf8');
res.on('data', (chunk) => {});
res.on('end', () => {
callback(null, 'success');
});
});
// 异常返回
req.on('error', (e) => {
callback(e);
});
// 写入数据
req.write(postData);
req.end();
};

钉钉机器人webhook的accessToken获取参考链接:阿里云IoT Studio服务开发定时关灯功能示例Demo: 2.3 钉钉机器人Webhook获取 部分。

4、快速测试
_

_


规则引擎配置

1、创建规则引擎
_

2、配置处理数据

_

SQL字段

deviceName() as deviceName, items.Humidity.value as Humidity, items.CurrentTemperature.value as Temperature, timestamp('yyyy-MM-dd HH:mm:ss') as time

3、配置转发数据

_

4、启动设备端SDK,周期性上行消息,钉钉群查看通知

_

5、上行日志查看

_

参考链接

温湿度计上报数据到钉钉群机器人

相关实践学习
基于函数计算一键部署掌上游戏机
本场景介绍如何使用阿里云计算服务命令快速搭建一个掌上游戏机。
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
相关文章
|
22天前
|
物联网 Serverless
MQTT常见问题之通过mqtt控制台查询不到设备轨迹如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
数据挖掘 API
物联网平台云端调用ListAnalyticsData接口
物联网平台云端调用ListAnalyticsData接口
147 0
物联网平台云端调用ListAnalyticsData接口
|
物联网
《阿里云物联网平台属性,事件,服务详解》电子版地址
阿里云物联网平台属性,事件,服务详解
163 0
《阿里云物联网平台属性,事件,服务详解》电子版地址
|
存储 物联网 机器人
物联网平台数据流转到函数计算|学习笔记
快速学习物联网平台数据流转到函数计算
174 0
物联网平台数据流转到函数计算|学习笔记
|
物联网 开发工具 数据安全/隐私保护
物联网平台中连接参数
物联网平台中连接参数
166 0
物联网平台中连接参数
|
数据可视化 网络协议 安全
如何使用C LinkSDK(4.x)快速接入阿里云物联网平台?
如何使用C LinkSDK(4.x)快速接入阿里云物联网平台?
574 0
如何使用C LinkSDK(4.x)快速接入阿里云物联网平台?
|
Java 物联网 API
阿里云物联网平台RRPC同步方式下行推送请求
该接口向指定设备发送请求消息,并同步返回响应。
449 1
阿里云物联网平台RRPC同步方式下行推送请求
|
SQL 网络协议 Java
阿里云物联网平台之利用云平台流转如何实现同一款产品下任意俩个设备的通信?
大部分同学应该都知道这种正常的基于云平台流转的M2M设备间通信,可以指定A产品的某个设备的消息流转到B产品的某个设备,或者从B产品的某个设备流转到A产品的某个设备,具有一定的指向性。 那么如何才能够体现任意俩个字呢?假设把产品下每个产品都当成群成员,如何才能让他们之间自由的发言呢? 也就是说我如何只配置一条规则,就可以实现这个产品下的设备进行自由通信,而不是A->B要配置一条规则,B->C还要配置一条规则,甚至B->A也要配置一条规则。
1154 0
阿里云物联网平台之利用云平台流转如何实现同一款产品下任意俩个设备的通信?
|
安全 网络协议 物联网
阿里云物联网平台C-SDK 4.x版本网关子设备如何上报物模型?
物联网设备很多都是微型嵌入式级别的,集成SDK一方面耗费资源、影响设备性能、增加成本,另一方面对于商家来说新入网一款设备带来极大的不便,集成SDK要进行开发(设备厂家要进行额外的定制),需要更新固件程序。所以阿里云物联网平台提供了一种网关-子设备的入网模式,为商家新入网设备提供便利。网关设备属于和物联网平台直连设备,子设备不和物联网平台连接,那么子设备又是如何进行物模型上报(物模型上报指的是设备端将物模型数据发送给物联网平台)呢?尤其是C-SDK(网关集成SDK),官方没有现成的Demo。 此篇文章将阐述C-SDK4.X版本网关子设备上报物模型的业务流程原理和Demo源码。
3126 0
阿里云物联网平台C-SDK 4.x版本网关子设备如何上报物模型?
|
物联网
物联网平台网关设备+子设备接入平台演示
子设备不直接连接物联网平台,而是通过网关接入物联网平台。本示例介绍如何实现子设备通过网关接入物联网平台。 官方文档参考https://help.aliyun.com/document_detail/123984.html?spm=a2c4g.11186623.6.972.4417272dhkseDs
1069 0
物联网平台网关设备+子设备接入平台演示

相关产品

  • 物联网平台