StreamingPro 简化流式计算配置

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 前些天可以让批处理的配置变得更优雅StreamingPro 支持多输入,多输出配置,现在流式计算也支持相同的配置方式了。另外未来等另外一个项目稳定,会释放出来配合StreamingPro使用,它可以让你很方便的读写HBase,比如可以为HBase 表 添加mapping,类似ES的做法,也可以不用mapping,系统会自动为你创建列(familly:column作为列名),或者将所有列合并成一个字段让你做处理。
前言
前些天可以让批处理的配置变得更优雅 StreamingPro 支持多输入,多输出配置,现在流式计算也支持相同的配置方式了。

另外未来等另外一个项目稳定,会释放出来配合StreamingPro使用,它可以让你很方便的读写HBase,比如可以为HBase 表 添加mapping,类似ES的做法,也可以不用mapping,系统会自动为你创建列(familly:column作为列名),或者将所有列合并成一个字段让你做处理。


配置

首先需要配置源:

{
        "name": "stream.sources.kafka",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "topics":"test",
            "zk":"127.0.0.1",
            "groupId":"kk3",
            "outputTable": "abc"

          }
        ]
      }

我们配置了一个Kafka流,一个普通的CSV文件。目前StreamingPro只允许配置一个Kafka流,但是支持多个topic,按逗号分隔即可。你可以配置多个其他非流式源,比如从MySQL,Parquet,CSV同时读取数据并且映射成表。

之后你就可以写SQL进行处理了。

{
        "name": "stream.sql",
        "params": [
          {
            "sql": "select abc.content,'abc' as dd from abc left join test on test.content = abc.content",
            "outputTableName": "finalOutputTable"
          }
        ]
      },

我这里做了简单的join。

{
        "name": "stream.outputs",
        "params": [
          {
            "format": "jdbc",
            "path": "-",
            "driver":"com.mysql.jdbc.Driver",
            "url":"jdbc:mysql://127.0.0.1/~?characterEncoding=utf8",
            "inputTableName": "finalOutputTable",
            "user":"~",
            "password":"~",
            "dbtable":"aaa",
            "mode":"Append"
          }
        ]
      }
然后把数据追加到Mysql里去。其实你也可以配置多个输出。


完整配置

{
  "example": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "stream.sources.kafka",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "topics":"test",
            "zk":"127.0.0.1",
            "groupId":"kk3",
            "outputTable": "abc"

          }
        ]
      },
      {
        "name": "stream.sql",
        "params": [
          {
            "sql": "select abc.content,'abc' as dd from abc left join test on test.content = abc.content",
            "outputTableName": "finalOutputTable"
          }
        ]
      },
      {
        "name": "stream.outputs",
        "params": [
          {
            "format": "jdbc",
            "path": "-",
            "driver":"com.mysql.jdbc.Driver",
            "url":"jdbc:mysql://127.0.0.1/~?characterEncoding=utf8",
            "inputTableName": "finalOutputTable",
            "user":"~",
            "password":"~",
            "dbtable":"aaa",
            "mode":"Append"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}
你可以在 StreamingPro-0.4.11 下载到包,然后用命令启动:
SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-0.4.11-SNAPSHOT-online-1.6.1-jar-with-dependencies.jar    \
-streaming.name test    \
-streaming.platform spark \
-streaming.job.file.path file://$SHome/batch.json
目录
相关文章
|
存储 缓存 资源调度
想了解流计算,你必须得看一眼,实现Flink on Yarn的三种部署方式,并运行wordcount
想了解流计算,你必须得看一眼,实现Flink on Yarn的三种部署方式,并运行wordcount
637 0
想了解流计算,你必须得看一眼,实现Flink on Yarn的三种部署方式,并运行wordcount
|
SQL 分布式计算 大数据
统一批处理流处理——Flink批流一体实现原理
统一批处理流处理——Flink批流一体实现原理
1485 0
统一批处理流处理——Flink批流一体实现原理
|
3月前
|
分布式计算 Hadoop 关系型数据库
Sqoop与Spark的协作:高性能数据处理
Sqoop与Spark的协作:高性能数据处理
Sqoop与Spark的协作:高性能数据处理
|
3月前
|
存储 分布式计算 数据处理
请描述一下MapReduce的工作流程。
请描述一下MapReduce的工作流程。
16 0
|
3月前
|
消息中间件 分布式计算 Java
流计算与批处理的区别是什么?请举例说明。
流计算与批处理的区别是什么?请举例说明。
33 0
|
6月前
|
资源调度 分布式计算 调度
Fink--3、Flink运行时架构(并行度、算子链、任务槽、作业提交流程)
Fink--3、Flink运行时架构(并行度、算子链、任务槽、作业提交流程)
|
8月前
|
监控 大数据 Java
|
消息中间件 资源调度 Oracle
对Flink流处理模型的抽象
对Flink流处理模型的抽象
对Flink流处理模型的抽象
|
消息中间件 存储 大数据
实时流处理框架之Storm的安装与部署
实时流处理框架之Storm的安装与部署
196 0
实时流处理框架之Storm的安装与部署
|
canal 消息中间件 SQL
简化ETL工作,编写一个Canal胶水层(下)
这是一篇憋了很久的文章,一直想写,却又一直忘记了写。整篇文章可能会有点流水账,相对详细地介绍怎么写一个小型的"框架"。这个精悍的胶水层已经在生产环境服役超过半年,这里尝试把耦合业务的代码去掉,提炼出一个相对简洁的版本。
522 0