使用阿里云InfluxDB®和Spark Streaming实时处理时序数据

本文涉及的产品
可观测可视化 Grafana 版,10个用户账号 1个月
简介: 本文重点介绍怎样利用阿里云InfluxDB®和spark structured streaming来实时计算、存储和可视化数据。下面将介绍如何购买和初始化阿里云InfluxDB®,扩展spark foreach writer,以及设计阿里云InfluxDB®数据库时需要注意的事项。

本文重点介绍怎样利用阿里云InfluxDB®和spark structured streaming来实时计算、存储和可视化数据。下面将介绍如何购买和初始化阿里云InfluxDB®,扩展spark foreach writer,以及设计阿里云InfluxDB®数据库时需要注意的事项。
image
在大数据处理中,一个主要的趋势是人们希望看到metric是如何随着时间变化发展。这使得管理和处理时序数据(数值随时间变化的数据)成为数据科学家非常重要的研究方向。目前,已经有非常多的时序处理数据库产品,如OpenTSDB,TimeScaleDB,InfluxDB以及Druid等。InfluxDB因为完整的生态、类SQL的查询语言以及简单快捷的布署而非常受用户喜爱,居于DBEngine时序数据排列首位。阿里云已经将其进行开源托管,并且完善了TIG(Telegraf/InfluxDB/Grafana)生态,即将推出托管的Kapacitor流处理报警组件。

阿里云InfluxDB®

关于时序数据的一些重要概念和如何购买阿里云InfluxDB®可以参考之前的文章<阿里云InfluxDB®教你玩转A股数据>和官方文档。这里补充一下阿里云InfluxDB®提供的实例规格和管理帐号的创建。
image
当前,阿里云InfluxDB®大致提供2C8G/4C16G/8C32G/16C64G/32C128G/64C256G等大致6种规格,每种规格的读写能力参考如上图所示。阿里云InfluxDB®开放了开源版的几乎全部功能,用户可以在控制台创建管理员帐号,该帐号可以通过客户端和SDK进行所有的操作。
image

Writing Data From Spark

Spark是目前大数据处理领域中最流行、最高效的开源工具,通过spark structured streaming写数据到InfluxDB的开源适配器主要有chroniclerreactive-influx。chronicler与reactive-influx的区别是,在写入数据点之前,chronicler必须要将数据格式转换成influxdb行协议,在处理大量field字段和字符串值时会变得相当棘手,相较而言reactive-influx比较方便。
在sbt项目中引入reactive-influx:

libraryDependencies ++= Seq(
"com.pygmalios" % "reactiveinflux-spark_2.11" % "1.4.0.10.0.5.1",
"com.typesafe.netty" % "netty-http-pipelining" % "1.1.4"
)

InfluxDB entry 配置,其中阿里云InfluxDB®内网和公网的URL可以在控制台上找到:

reactiveinflux {
  url = "ts-xxxxx.influxdata.rds.aliyuncs.com:3242/"
  spark {
    batchSize = 1000 // No of records to be send in each batch
  }
}

扩展spark foreach writer, enable spark stuctured streaming 向阿里云InfluxDB®写数据的伪代码如下:

import com.pygmalios.reactiveinflux._
import com.pygmalios.reactiveinflux.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime
import com.pygmalios.reactiveinflux.{ReactiveInfluxConfig, ReactiveInfluxDbName}
import com.pygmalios.reactiveinflux.sync.{SyncReactiveInflux, SyncReactiveInfluxDb}
import scala.concurrent.duration._
class influxDBSink(dbName: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
        
    var  db:SyncReactiveInfluxDb = _
    implicit val awaitAtMost = 1.second
        
    // Define the database connection here
    def open(partitionId: Long, version: Long): Boolean = {
    val syncReactiveInflux =
    SyncReactiveInflux(ReactiveInfluxConfig(None))
    db = syncReactiveInflux.database(dbName);
    db.create() // create the database 
    true
  }
  
    // Write the process logic, and database commit code here
    def process(value: org.apache.spark.sql.Row): Unit = {
    val point = Point(
      time = time,  // system or event time 
      measurement = "measurement1",
      tags = Map(
        "t1" -> "A", 
        "t2" -> "B"
      ),
      fields = Map(
        "f1" -> 10.3, // BigDecimal field
        "f2" -> "x",  // String field
        "f3" -> -1,   // Long field
        "f4" -> true) // Boolean field
    )
    
    db.write(point)
  }
  
  // Close connection here
  def close(errorOrNull: Throwable): Unit = {
  }
}

引入Writer:

val influxWriter = new influxDBSink("dbName")
val influxQuery = ifIndicatorData
                                    .writeStream
                                    .foreach(influxWriter)
                                    .outputMode("append")
                                    .start()

可视化

数据写入InfluxDB之后,便可以利用各种工具进行数据可视化,如Grafana,Chronograf等。一个简单的可视化展示如下:
image
当前阿里云InfluxDB®已经自带Grafana数据可视化,用户只需要在控制台一键开通既可,具体可以参考<5分钟快速完成监控系统搭建之实践篇>

总结

目前InfluxDB已经在阿里云完全托管,被用户广泛使用。随着商业化时间的发展,我们在提高稳定性和性能的同时,功能也一步步丰富起来。当前已经提供了TIG(Telegraf/InfluxDB/Grafana)生态,下一步将完全兼容TICK(Telegraf/InfluxDB/Chorograf/Kapacitor)生态。覆盖的业务场景包括DevOps监控、车联网、智慧交通、金融和IOT传感器数据采集,欢迎大家试用并提供意见。
阿里云InfluxDB®为用户提供7*24小时服务,欢迎加入下面的钉钉群咨询。
image

参考文献

  1. Processing Time Series Data in Real-Time with InfluxDB and Structured Streaming
  2. 阿里云InfluxDB®教你玩转A股数据
  3. chronicler-spark
  4. reactiveinflux
目录
相关文章
|
2月前
|
关系型数据库 MySQL 数据挖掘
阿里云 SelectDB 携手 DTS ,一键实现 TP 数据实时入仓
DTS 作为阿里云核心的数据交互引擎,以其高效的实时数据流处理能力和广泛的数据源兼容性,为用户构建了一个安全可靠、可扩展、高可用的数据架构桥梁。阿里云数据库 SelectDB 通过与 DTS 联合,为用户提供了简单、实时、极速且低成本的事务数据分析方案。用户可以通过 DTS 数据传输服务,一键将自建 MySQL / RDS MySQL / PolarDB for MySQL 数据库,迁移或同步至阿里云数据库 SelectDB 的实例中,帮助企业在短时间内完成数据迁移或同步,并即时获得深度洞察。
阿里云 SelectDB 携手 DTS ,一键实现 TP 数据实时入仓
|
12天前
|
关系型数据库 Apache 流计算
手把手教你实现 OceanBase 数据到阿里云数据库 SelectDB 内核版 Apache Doris 的便捷迁移|实用指南
本文介绍了如何将数据从 OceanBase 迁移到阿里云数据库 SelectDB 内核版 Apache Doris。提供 3 种数据同步方法 1. 使用 DataX,下载 DataX 并编写配置文件,通过 OceanBaseReader 和 DorisWriter 进行数据迁移。 2. 利用 Apache Doris 的 Catalog功 能,将 OceanBase 表映射到 Doris 并插入数据。 3. 通过Flink CDC,设置 OceanBase 环境,配置 Flink 连接器,实现实时数据同步。
手把手教你实现 OceanBase 数据到阿里云数据库 SelectDB 内核版 Apache Doris 的便捷迁移|实用指南
|
5天前
|
分布式计算 大数据 BI
MaxCompute产品使用合集之MaxCompute项目的数据是否可以被接入到阿里云的Quick BI中
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2月前
|
机器学习/深度学习 存储 分布式计算
机器学习PAI常见问题之DLC的数据写入到另外一个阿里云主账号的OSS中如何解决
PAI(平台为智能,Platform for Artificial Intelligence)是阿里云提供的一个全面的人工智能开发平台,旨在为开发者提供机器学习、深度学习等人工智能技术的模型训练、优化和部署服务。以下是PAI平台使用中的一些常见问题及其答案汇总,帮助用户解决在使用过程中遇到的问题。
|
5天前
|
运维 数据管理 数据库
数据管理DMS产品使用合集之在阿里云DMS中,想对数据精度进行校验,有什么方法
阿里云数据管理DMS提供了全面的数据管理、数据库运维、数据安全、数据迁移与同步等功能,助力企业高效、安全地进行数据库管理和运维工作。以下是DMS产品使用合集的详细介绍。
|
5天前
|
NoSQL 数据管理 MongoDB
数据管理DMS产品使用合集之如何通过阿里云的数据管理服务(DMS)导出MongoDB数据
阿里云数据管理DMS提供了全面的数据管理、数据库运维、数据安全、数据迁移与同步等功能,助力企业高效、安全地进行数据库管理和运维工作。以下是DMS产品使用合集的详细介绍。
|
8天前
|
新零售 分布式计算 数据可视化
数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析
数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析
21 0
|
16天前
|
分布式计算 大数据 数据处理
【Flink】Flink跟Spark Streaming的区别?
【4月更文挑战第17天】【Flink】Flink跟Spark Streaming的区别?
|
2月前
|
分布式计算 运维 大数据
阿里云 EMR Serverless Spark 版免费邀测中
阿里云 EMR Serverless Spark 版,以 Spark Native Engine 为基础,旨在提供一个全托管、一站式的数据开发平台。诚邀您参与 EMR Serverless Spark 版免费测试,体验 100% 兼容 Spark 的 Serverless 服务:https://survey.aliyun.com/apps/zhiliao/iscizrF54
405 0
阿里云 EMR Serverless Spark 版免费邀测中
|
2月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
123 1