使用embulk从Oracle抽取数据到trafodion

简介:

转自:http://blog.csdn.net/post_yuan/article/details/77856320

Embulk提供很多plugins,包括Input plugins、Output plugins、File parser plugins等,详细可以参考http://www.embulk.org/plugins/。embulk不仅可实现数据库到数据库的抽取,也可以实现csv文件/csv.gz文件到数据库的抽取。(相关yml文件的配置可以参考官网)

用户也可以开发自己的plugins并上传,如EsgynDB就基于trafodion自己开发一款基于trafodion的upsert using load实现的批量加载插件,如下图,

这里写图片描述

本文在此利用上述的trafodion output插件及oracle input插件通过实例描述如何使用embulk实现oracle到trafodion的数据加载。 
1 下载安装Embulk

1
2
3
4
5
wget https: //dl .embulk.org /embulk-latest .jarmkdir ~/.embulk /bin
mv  embulk-latest.jar ~/.embulk /bin/embulk
chmod  +x ~/.embulk /bin/embulk
echo  'export PATH="$HOME/.embulk/bin:$PATH"'  >> ~/.bashrc
source  ~/.bashrc123456

2 验证Embulk下载安装成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[root@n12 ~] # embulk gem list
2017-09-05 20:19:51.184 +0800: Embulk v0.8.31
 
*** LOCAL GEMS ***
 
did_you_mean (default: 1.0.1)
jar-dependencies (default: 0.3.5)
jruby-openssl (0.9.17 java)
json (1.8.3 java)
minitest (default: 5.4.1)
net-telnet (default: 0.1.1)
power_assert (default: 0.2.3)
psych (2.0.17 java)
racc (1.4.14 java)
rake (default: 10.4.2)
rdoc (default: 4.2.0)
test -unit (default: 3.1.1)

3 安装oracle input插件及trafodion output插件

embulk gem install embulk-input-oracle embulk gem install embulk-output-trafodion12

4 验证以上插件安装成功

root@n12 ~]# embulk gem list 2017-09-05 20:22:57.146 +0800: Embulk v0.8.31 *** LOCAL GEMS *** did_you_mean (default: 1.0.1) embulk-input-oracle (0.8.5) embulk-output-trafodion (0.1.1) jar-dependencies (default: 0.3.5) jruby-openssl (0.9.17 java) json (1.8.3 java) minitest (default: 5.4.1) net-telnet (default: 0.1.1) power_assert (default: 0.2.3) psych (2.0.17 java) racc (1.4.14 java) rake (default: 10.4.2) rdoc (default: 4.2.0) test-unit (default: 3.1.1)

5 准备oracle jar包,用于读取oracle数据库

[root@n12 ~]# ll /opt/drivers/ total 2676 -rw-rw-r-- 1 trafodion trafodion 2739670 Sep  5 20:38 ojdbc6.jar123

6 编辑YAML文件oracle_to_trafodion.yml 
注:Embulk使用YAML文件来定义数据批量加载的方式,YAML文件格式定义可参考 http://www.embulk.org/docs/built-in.html#embulk-configuration-file-format

exec:   max_threads: 8   min_output_tasks: 4in:  type: oracle   driver_path: /opt/drivers/ojdbc6.jar   url: "jdbc:oracle:thin:@10.10.11.16:1521:esgyn"   user: system   password: system12345   query: |    select * from test_tbl out:  type: trafodion   url: "jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase"   user: trafodion   password: traf123   default_timezone: 'Asia/Shanghai'   table: TEST_TBL   mode:  insert_direct12345678910111213141516171819

7 创建Oracle测试表并插入测试数据

SQL> create table test_tbl(a char(10),b char(10)); SQL> insert into test_tbl values('a','b'); SQL> insert into test_tbl values('c','d'); SQL> insert into test_tbl values('e','f'); SQL> select * from test_tbl; A          B ---------- ---------- a          b c          d e          f

8 创建Trafodion目标表

>>create table test_tbl(a char(10),b char(10)); --- SQL operation complete. >>select * from test_tbl; --- 0 row(s) selected.123456

9 运行Embulk任务

--预览embulk preview ~/oracle_to_trafodion.yml [root@n12 ~]# embulk preview ~/oracle_to_trafodion.yml 2017-09-05 20:41:24.093 +0800: Embulk v0.8.31 2017-09-05 20:41:27.303 +0800 [INFO] (0001:preview): Loaded plugin embulk-input-oracle (0.8.5) 2017-09-05 20:41:27.662 +0800 [INFO] (0001:preview): Using JDBC Driver 11.2.0.4.0 2017-09-05 20:41:27.977 +0800 [INFO] (0001:preview): SQL: select * from test_tbl2017-09-05 20:41:27.999 +0800 [INFO] (0001:preview): > 0.02 seconds +------------+------------+ |   A:string |   B:string | +------------+------------+ | a          | b          | | c          | d          | | e          | f          | +------------+------------+ --运行 embulk run     ~/oracle_to_trafodion.yml [root@n12 ~]# embulk run     ~/oracle_to_trafodion.yml 2017-09-05 21:01:26.419 +0800: Embulk v0.8.31 2017-09-05 21:01:30.260 +0800 [INFO] (0001:transaction): Loaded plugin embulk-input-oracle (0.8.5) 2017-09-05 21:01:30.298 +0800 [INFO] (0001:transaction): Loaded plugin embulk-output-trafodion (0.1.1) 2017-09-05 21:01:30.807 +0800 [INFO] (0001:transaction): Using JDBC Driver 11.2.0.4.0 2017-09-05 21:01:30.995 +0800 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4 -----------------------------------jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase 2017-09-05 21:01:31.019 +0800 [INFO] (0001:transaction): Connecting to jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase options {user=trafodion, tcpKeepAlive=true, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000} connection------------------------------------:org.trafodion.jdbc.t4.TrafT4Connection@1cc41b77 2017-09-05 21:01:36.271 +0800 [INFO] (0001:transaction): Using insert_direct mode [WARN] Plugin uses deprecated constructor of org.embulk.spi.time.TimestampFormatter. [WARN] Report plugins in your config at:   2017-09-05 21:01:40.345 +0800 [INFO] (0001:transaction): {done:  0 / 1, running: 0} -----------------------------------jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase 2017-09-05 21:01:40.389 +0800 [INFO] (0026:task-0000): Connecting to jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase options {user=trafodion, tcpKeepAlive=true, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000} connection------------------------------------:org.trafodion.jdbc.t4.TrafT4Connection@4ee9df382017-09-05 21:01:40.970 +0800 [INFO] (0026:task-0000): Prepared SQL: UPSERT USING LOAD INTO "TEST_TBL" ("A", "B") VALUES (?, ?) -----------------------------------jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase 2017-09-05 21:01:47.001 +0800 [INFO] (0026:task-0000): Connecting to jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase options {user=trafodion, tcpKeepAlive=true, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000} connection------------------------------------:org.trafodion.jdbc.t4.TrafT4Connection@785aae0e2017-09-05 21:01:52.935 +0800 [INFO] (0026:task-0000): Prepared SQL: UPSERT USING LOAD INTO "TEST_TBL" ("A", "B") VALUES (?, ?) -----------------------------------jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase 2017-09-05 21:02:01.036 +0800 [INFO] (0026:task-0000): Connecting to jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase options {user=trafodion, tcpKeepAlive=true, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000} connection------------------------------------:org.trafodion.jdbc.t4.TrafT4Connection@38f4c2382017-09-05 21:02:03.872 +0800 [INFO] (0026:task-0000): Prepared SQL: UPSERT USING LOAD INTO "TEST_TBL" ("A", "B") VALUES (?, ?) -----------------------------------jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase 2017-09-05 21:02:10.364 +0800 [INFO] (0026:task-0000): Connecting to jdbc:t4jdbc://192.168.1.93:23400/:schema=seabase options {user=trafodion, tcpKeepAlive=true, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000} connection------------------------------------:org.trafodion.jdbc.t4.TrafT4Connection@6c47ecb4 2017-09-05 21:02:17.565 +0800 [INFO] (0026:task-0000): Prepared SQL: UPSERT USING LOAD INTO "TEST_TBL" ("A", "B") VALUES (?, ?) 2017-09-05 21:02:36.756 +0800 [INFO] (0026:task-0000): SQL: select * from test_tbl 2017-09-05 21:02:36.777 +0800 [INFO] (0026:task-0000): > 0.02 seconds2017-09-05 21:02:36.783 +0800 [INFO] (0026:task-0000): Loading 3 rows 2017-09-05 21:02:36.821 +0800 [INFO] (0026:task-0000): > 0.04 seconds (loaded 3 rows in total) 2017-09-05 21:02:37.653 +0800 [INFO] (0001:transaction): {done:  1 / 1, running: 0} 2017-09-05 21:02:37.665 +0800 [INFO] (main): Committed. 2017-09-05 21:02:37.665 +0800 [INFO] (main): Next config diff: {"in":{},"out":{}}

10 检查Embulk任务执行成功

SQL>select * from test_tbl; A          B ---------- ---------- a          b c          d e          f --- 3 row(s) selected.

至此,使用Embulk从Oracle抽取数据到Trafodion演示完毕!

相关文章
|
18天前
|
SQL Oracle 关系型数据库
不小心删除表或数据后,如何利用Oracle的闪回进行恢复
不小心删除表或数据后,如何利用Oracle的闪回进行恢复
|
13天前
|
DataWorks Oracle 关系型数据库
DataWorks操作报错合集之尝试从Oracle数据库同步数据到TDSQL的PG版本,并遇到了与RAW字段相关的语法错误,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
30 0
|
14天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在 DataWorks 中,使用Oracle作为数据源进行数据映射和查询,如何更改数据源为MaxCompute或其他类型
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
28 1
|
24天前
|
SQL Oracle 关系型数据库
Oracle spool格式化数据命令
在这个示例中,通过设置不同的 `SET`命令参数,你可以控制输出的格式,包括每页行数、每行字符数、列分隔符等。你也可以使用其他的 `SET`命令参数来进一步定制输出格式。
14 0
|
24天前
|
Oracle 安全 关系型数据库
Oracle数据守卫(DG):数据的“守护者”与“时光机”
【4月更文挑战第19天】Oracle Data Guard保障数据安全,通过实时维护备库实现故障切换,保证业务连续性。它使用日志传输和应用保持数据同步,如同“时光机”,借助闪回技术能恢复误操作数据。此外,它还提供数据压缩、加密和故障转移等功能,提升数据库安全性与性能。作为数据管理员,理解并善用Data Guard是确保企业数据安全的关键。
|
24天前
|
SQL Oracle 关系型数据库
Oracle 12c的TOP N语句:数据排名的“快速通道”
【4月更文挑战第19天】Oracle 12c的TOP N语句是用于快速获取数据集排名前N的记录的SQL查询方法,特别适合寻找最具代表性的数据。通过指定排序条件和数量,TOP N能高效筛选出所需信息,例如最高销售额产品或最大访问量网页。在Oracle 12c中,查询优化器对TOP N查询进行了优化,保证快速返回结果,并提供丰富的排序和过滤选项。基本用法如`SELECT ... ORDER BY ... FETCH FIRST N ROWS ONLY`,还可结合`OFFSET`进行分页查询或用`WITH TIES`保持结果完整性。掌握TOP N语句能提升数据分析效率,助力企业决策。
|
24天前
|
存储 Oracle 关系型数据库
Oracle 12c的临时UNDO:数据的“临时保镖”
【4月更文挑战第19天】Oracle 12c引入的临时UNDO为数据安全提供新保障。它为临时操作和特定事务提供独立UNDO空间,避免共享UNDO带来的性能瓶颈和管理复杂性。临时UNDO随事务开始分配,记录修改历史,事务结束后自动释放。优点包括提高性能、简化管理及保证数据一致性。但需注意手动配置、监控和优化,以防长时间占用资源。了解其工作原理和最佳实践是提升数据库性能的关键。
|
SQL Oracle 关系型数据库
oracle数据库带或不带数据导入导出操作大全
oracle数据库带或不带数据导入导出操作大全
|
Oracle 关系型数据库 数据库
|
Oracle 关系型数据库 数据库