推荐一款数据同步工具:FlinkX

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: FlinkX是基于flink的分布式离线数据同步框架,实现了多种异构数据源之间高效的数据迁移

FlinkX

1 什么是FlinkX

  • FlinkX是基于flink的分布式离线数据同步框架,实现了多种异构数据源之间高效的数据迁移。

不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
template

2 工作原理

在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行,工作原理如下图:
diagram

3 快速起步

3.1 运行模式

  • 单机模式:对应Flink集群的单机模式
  • standalone模式:对应Flink集群的分布式模式
  • yarn模式:对应Flink集群的yarn模式

3.2 执行环境

  • Java: JDK8及以上
  • Flink集群: 1.4及以上(单机模式不需要安装Flink集群)
  • 操作系统:理论上不限,但是目前只编写了shell启动脚本,用户可以可以参考shell脚本编写适合特定操作系统的启动脚本。

3.3 打包

进入项目根目录,使用maven打包:

mvn clean package -Dmaven.test.skip

打包结束后,项目根目录下会产生bin目录和plugins目录,其中bin目录包含FlinkX的启动脚本,plugins目录下存放编译好的数据同步插件包

3.4 启动

3.4.1 命令行参数选项

  • model

    • 描述:执行模式,也就是flink集群的工作模式

      • local: 本地模式
      • standalone: 独立部署模式的flink集群
      • yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"
    • 必选:否
    • 默认值:local
  • job

    • 描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。
    • 必选:是
    • 默认值:无
  • plugin

    • 描述:插件根目录地址,也就是打包后产生的plugins目录。
    • 必选:是
    • 默认值:无
  • flinkconf

    • 描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.4.0/conf
    • 必选:否
    • 默认值:无
  • yarnconf

    • 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
    • 必选:否
    • 默认值:无

3.4.2 启动数据同步任务

  • 以本地模式启动数据同步任务
bin/flinkx -mode local -job /Users/softfly/company/flink-data-transfer/jobs/task_to_run.json -plugin /Users/softfly/company/flink-data-transfer/plugins -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
  • 以standalone模式启动数据同步任务
bin/flinkx -mode standalone -job /Users/softfly/company/flink-data-transfer/jobs/oracle_to_oracle.json  -plugin /Users/softfly/company/flink-data-transfer/plugins -flinkconf /hadoop/flink-1.4.0/conf -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
  • 以yarn模式启动数据同步任务
bin/flinkx -mode yarn -job /Users/softfly/company/flinkx/jobs/mysql_to_mysql.json  -plugin /opt/dtstack/flinkplugin/syncplugin -flinkconf /opt/dtstack/myconf/conf -yarnconf /opt/dtstack/myconf/hadoop -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*

4 数据同步任务模版

从最高空俯视,一个数据同步的构成很简单,如下:

{
    "job": {
        "setting": {...},
        "content": [...]
    }
}

数据同步任务包括一个job元素,而这个元素包括setting和content两部分。

  • setting: 用于配置限速、错误控制和脏数据管理
  • content: 用于配置具体任务信息,包括从哪里来(Reader插件信息),到哪里去(Writer插件信息)

4.1 setting

    "setting": {
        "speed": {...},
        "errorLimit": {...},
        "dirty": {...}
    }

setting包括speed、errorLimit和dirty三部分,分别描述限速、错误控制和脏数据管理的配置信息

4.1.1 speed

            "speed": {
                 "channel": 3,
                 "bytes": 0
            }
  • channel: 任务并发数
  • bytes: 每秒字节数,默认为 Long.MAX_VALUE

4.1.2 errorLimit

            "errorLimit": {
                "record": 10000,
                "percentage": 100
            }
  • record: 出错记录数超过record设置的条数时,任务标记为失败
  • percentage: 当出错记录数超过percentage百分数时,任务标记为失败

4.1.3 dirty

        "dirty": {
                "path": "/tmp",
                "hadoopConfig": {
                    "fs.default.name": "hdfs://ns1",
                    "dfs.nameservices": "ns1",
                    "dfs.ha.namenodes.ns1": "nn1,nn2",
                    "dfs.namenode.rpc-address.ns1.nn1": "node02:9000",
                    "dfs.namenode.rpc-address.ns1.nn2": "node03:9000",
                    "dfs.ha.automatic-failover.enabled": "true",
                    "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
                    "fs.hdfs.impl.disable.cache": "true"
                }
            }
  • path: 脏数据存放路径
  • hadoopConfig: 脏数据存放路径对应hdfs的配置信息(hdfs高可用配置)

4.1.4 restore

"restore": {

        "isRestore": false,
        "restoreColumnName": "",
        "restoreColumnIndex": 0
      }

restore配置请参考断点续传

4.2 content

        "content": [
            {
               "reader": {
                    "name": "...",
                    "parameter": {
                        ...
                    }
                },
               "writer": {
                    "name": "...",
                    "parameter": {
                         ...
                     }
                }
            }
        ]
  • reader: 用于读取数据的插件的信息
  • writer: 用于写入数据的插件的信息

reader和writer包括name和parameter,分别表示插件名称和插件参数

4.3 数据同步任务例子

详见flinkx-examples子工程

5. 数据同步插件

5.1 读取插件

5.2 写入插件

断点续传和实时采集功能介绍

数据源开启Kerberos

统计指标说明

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
SQL 分布式计算 Oracle
数据同步工具DataX的安装
数据同步工具DataX的安装
421 0
|
4月前
|
消息中间件 SQL 分布式计算
一篇文章搞定数据同步工具SeaTunnel
一篇文章搞定数据同步工具SeaTunnel
523 0
|
4月前
|
存储 关系型数据库 MySQL
DataX: 阿里开源的又一款高效数据同步工具
DataX 是由阿里巴巴集团开源的一款大数据同步工具,旨在解决不同数据存储之间的数据迁移、同步和实时交换的问题。它支持多种数据源和数据存储系统,包括关系型数据库、NoSQL 数据库、Hadoop 等。 DataX 提供了丰富的数据读写插件,可以轻松地将数据从一个数据源抽取出来,并将其加载到另一个数据存储中。它还提供了灵活的配置选项和高度可扩展的架构,以适应各种复杂的数据同步需求。
|
存储 文件存储 对象存储
S3存储服务间数据同步工具Rclone迁移教程
目前大多项目我们都会使用各种存储服务,例如oss、cos、minio等。当然,因各种原因,可能需要在不同存储服务间进行数据迁移工作,所以今天就给大家介绍一个比较通用的数据迁移工具Rclone。
S3存储服务间数据同步工具Rclone迁移教程
|
7月前
|
canal SQL 关系型数据库
大数据同步工具Canal 2
大数据同步工具Canal
195 0
|
7月前
|
canal 消息中间件 关系型数据库
大数据同步工具Canal 1
大数据同步工具Canal
364 0
|
9月前
|
算法 Linux
Linux系统【文件传输】rsync命令 – 远程数据同步工具
rsync命令来自于英文词组“remote sync”的缩写,其功能是用于远程数据同步。rsync命令能够基于网络(含局域网和互联网)快速的实现多台主机间的文件同步工作,并与scp或ftp发送完整文件不同,rsync有独立的文件内容差异算法,会在传送前对两个文件进行比较,只传送两者内容间的差异部分,因此速度更快。
149 2
|
12月前
|
存储 SQL JSON
阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!下
阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!下
|
12月前
|
存储 JavaScript 小程序
阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!上
阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!上
|
存储 SQL JSON
阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!(2)
阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!
324 0

热门文章

最新文章