Data Lake Analytics,大数据的ETL神器!

本文涉及的产品
对象存储 OSS,20GB 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
对象存储 OSS,恶意文件检测 1000次 1年
简介: 0. Data Lake Analytics(简称DLA)介绍 数据湖(Data Lake)是时下大数据行业热门的概念:https://en.wikipedia.org/wiki/Data_lake。

0. Data Lake Analytics(简称DLA)介绍

数据湖(Data Lake)是时下大数据行业热门的概念:https://en.wikipedia.org/wiki/Data_lake。基于数据湖做分析,可以不用做任何ETL、数据搬迁等前置过程,实现跨各种异构数据源进行大数据关联分析,从而极大的节省成本和提升用户体验。关于Data Lake的概念。

终于,阿里云现在也有了自己的数据湖分析产品:https://www.aliyun.com/product/datalakeanalytics
可以点击申请使用(目前公测阶段还属于邀测模式),体验本教程分析OTS数据之旅。
产品文档:https://help.aliyun.com/product/70174.html

1. ETL介绍

ETL(https://en.wikipedia.org/wiki/Extract,_transform,_load)就是Extract、Transfrom、Load即抽取、转换、加载,是传统数仓和大数据的重要工具。

抽取:就是从源系统抽取需要的数据,这些源系统是同构或异构的:比如Excel表格、XML文件、关系型数据库。
转换:源系统的数据按照分析目的,转换成目标系统要求的格式,或者做数据清洗和数据加工。
加载:把转换后的数据装载到目标数据库,作为联机分析、数据挖掘、数据展示的基础。

整个ETL过程就像是在源系统和目标系统之间构建一个管道,数据在这个管道里源源不断的流动。

2. DLA与ETL

Data Placement Optimization(数据摆放优化)是目前云平台上的业务系统的主流架构方向和思路。架构师们会从读写性能、稳定性、强一致性、成本、易用性、开发效率等方面来考量不同存储引擎给业务上带来的好处,从而实现整个业务系统的完美的平衡状态。

而这种跨异构数据源之间的数据搬迁,却不是一件容易的事情。很多ELT工具基本上属于框架级别,需要自己开发不少的辅助工具;同时表达能力也较弱,无法满足很多场景;另外对异构数据源的抽象和兼容性也不是那么完美。

反观DLA,无论从哪方面来看,DLA都完美的契合ETL的需求场景。下图是DLA的简易架构图,DLA一开始就是基于“MPP计算引擎+存储计算分离+弹性高可用+异构数据集源”等架构原则来设计的,支持各种异构数据源读写是DLA的核心目标!

image.png | left | 706x724

通过连接异构数据源来执行select + join + subQuery等逻辑实现Extract,通过Filter+ Project + Aggregation + Sort + Functions等实现数据流转换和映射Transform,而通过insert实现Load,下面是一个例子:

--基本格式
insert into target_table (col1, col2, col3, ....)  --需要导入的列以及列的顺序
select c1, c2, c3, ....                            --需要与导入列的类型兼容,顺序要确认清楚
from ...                         --可以是任何你想要查询的数据目标
where ...


--下面是一个例子
insert into target_table (id, name, age)  
select s1.pk1, s2.name, s1.age            
from source_table1 s1
join source_table2 s2
on s1.sid = s2.sid
where s1.xxx = 'yyy'

下面我们就尝试往不同的数据源导入数据吧。

3. 实际测试(以TableStore:为例)

  • 准备DLA账号(已有测试账号)

    • 测试集群:上海region;
    • 账号账号:DLA测试账号;
  • 准备两个来源表(两个TPC-H的OSS表,customer和nation),用来做join和数据查询;
  • 准备一个TableStore(https://help.aliyun.com/document_detail/27280.html)的目标表;
  • 执行导入SQL,写入数据后校验结果;

a)两个来源表定义:

mysql> show create database tpch_50x_text;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Database      | Create Database                                                                                                                                                        |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| tpch_50x_text | CREATE DATABASE `tpch_50x_text`
WITH DBPROPERTIES (
    catalog = 'hive',
    location = 'oss://${您的bucket}/datasets/tpch/50x/text_date/'
)
COMMENT '' |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.02 sec)


mysql> show tables;
+------------+
| Table_Name |
+------------+
| customer   |
| nation     |
+------------+
2 rows in set (0.03 sec)

mysql> show create table customer;
+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table    | Create Table                                                                                                                                                                                                                                                                                                                                                                           |
+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| customer | CREATE EXTERNAL TABLE `tpch_50x_text`.`customer` (
    `c_custkey` int,
    `c_name` string,
    `c_address` string,
    `c_nationkey` int,
    `c_phone` string,
    `c_acctbal` double,
    `c_mktsegment` string,
    `c_comment` string
)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '|'
STORED AS `TEXTFILE`
LOCATION 'oss://${您的bucket}/datasets/tpch/50x/text_date/customer_text' |
+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.90 sec)


mysql> show create table nation;
+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table      | Create Table                                                                                                                                                                                                                                    |
+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| nation     | CREATE EXTERNAL TABLE `tpch_50x_text`.`nation` (
    `n_nationkey` int,
    `n_name` string,
    `n_regionkey` int,
    `n_comment` string
)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '|'
STORED AS `TEXTFILE`
LOCATION 'oss://${您的bucket}/datasets/tpch/50x/text_date/nation_text' |
+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.73 sec)

b)准备TableStore的库和表

## 建库
mysql> show create database etl_ots_test;
+--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Database     | Create Database                                                                                                                                                                |
+--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| etl_ots_test | CREATE DATABASE `etl_ots_test`
WITH DBPROPERTIES (
    catalog = 'ots',
    location = 'https://${您的instance}.cn-shanghai.ots-internal.aliyuncs.com',
    instance = '${您的instance}'
)
COMMENT '' |
+--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.02 sec)

## 使用库
mysql> use etl_ots_test;
Database changed

## 建表
mysql> show create table test_insert;
+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table       | Create Table                                                                                                                                                                          |
+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| test_insert | CREATE EXTERNAL TABLE `test_insert` (
    `id1_int` int NOT NULL COMMENT '客户id主键',
    `c_address` varchar(20) NULL COMMENT '客户的地址',
    `c_acctbal` double NULL COMMENT '客户的account balance',
    PRIMARY KEY (`id1_int`)
)
COMMENT ''             |
+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.03 sec)

以下是实际数据的截图:

image.png | left | 747x416

image.png | left | 747x601

c)开始导入数据,确保导入字段顺序和类型兼容性

## 检查数据,都是空的
mysql> select * from etl_ots_test.test_insert;
Empty set (0.31 sec)
mysql> use tpch_50x_text;
Database changed

## 查询下nation数据,其中CANADA的nationkey是3,后续要找这个数据
mysql> select n_nationkey, n_name from nation;
+-------------+----------------+
| n_nationkey | n_name         |
+-------------+----------------+
|           0 | ALGERIA        |
|           1 | ARGENTINA      |
|           2 | BRAZIL         |
|           3 | CANADA         |
|           4 | EGYPT          |
|           5 | ETHIOPIA       |
|           6 | FRANCE         |
|           7 | GERMANY        |
|           8 | INDIA          |
|           9 | INDONESIA      |
|          10 | IRAN           |
|          11 | IRAQ           |
|          12 | JAPAN          |
|          13 | JORDAN         |
|          14 | KENYA          |
|          15 | MOROCCO        |
|          16 | MOZAMBIQUE     |
|          17 | PERU           |
|          18 | CHINA          |
|          19 | ROMANIA        |
|          20 | SAUDI ARABIA   |
|          21 | VIETNAM        |
|          22 | RUSSIA         |
|          23 | UNITED KINGDOM |
|          24 | UNITED STATES  |
+-------------+----------------+
25 rows in set (0.37 sec)

## 查询下customer数据,我们只关注nationkey=3以及c_mktsegment='BUILDING'的数据
mysql> select count(*) from customer where c_nationkey = 3 and c_mktsegment = 'BUILDING';
+----------+
| count(*) |
+----------+
|    60350 |
+----------+
1 row in set (0.66 sec)

## 查询下customer数据,我们只关注nationkey=3以及c_mktsegment='BUILDING'的数据
mysql> select * from customer where c_nationkey = 3 and c_mktsegment = 'BUILDING' order by c_custkey limit 3;
+-----------+--------------------+-------------------------+-------------+-----------------+-----------+--------------+----------------------------------------------------------------------------------------------------+
| c_custkey | c_name             | c_address               | c_nationkey | c_phone         | c_acctbal | c_mktsegment | c_comment                                                                                          |
+-----------+--------------------+-------------------------+-------------+-----------------+-----------+--------------+----------------------------------------------------------------------------------------------------+
|        13 | Customer#000000013 | nsXQu0oVjD7PM659uC3SRSp |           3 | 13-761-547-5974 |   3857.34 | BUILDING     | ounts sleep carefully after the close frays. carefully bold notornis use ironic requests. blithely |
|        27 | Customer#000000027 | IS8GIyxpBrLpMT0u7       |           3 | 13-137-193-2709 |   5679.84 | BUILDING     |  about the carefully ironic pinto beans. accoun                                                    |
|        40 | Customer#000000040 | gOnGWAyhSV1ofv          |           3 | 13-652-915-8939 |    1335.3 | BUILDING     | rges impress after the slyly ironic courts. foxes are. blithely                                    |
+-----------+--------------------+-------------------------+-------------+-----------------+-----------+--------------+----------------------------------------------------------------------------------------------------+
3 rows in set (0.78 sec)

导入之前我们想清楚需求:把国家是'CANADA'的,客户的market segmentation为'BUILDING'的客户找到,然后对c_custkey排序,选择前10条数据,然后选择他们的c_custkey、c_address、c_acctbal三列,清晰到OTS的test_insert表中,以备后续使用

##先查询下数据,看看有几条数据
mysql> select c.c_custkey, c.c_address, c.c_acctbal 
    -> from tpch_50x_text.customer c
    -> join tpch_50x_text.nation n 
    -> on c.c_nationkey = n.n_nationkey
    -> where n.n_name = 'CANADA' 
    -> and c.c_mktsegment = 'BUILDING' 
    -> order by c.c_custkey
    -> limit 10;
+-----------+--------------------------------+-----------+
| c_custkey | c_address                      | c_acctbal |
+-----------+--------------------------------+-----------+
|        13 | nsXQu0oVjD7PM659uC3SRSp        |   3857.34 |
|        27 | IS8GIyxpBrLpMT0u7              |   5679.84 |
|        40 | gOnGWAyhSV1ofv                 |    1335.3 |
|        64 | MbCeGY20kaKK3oalJD,OT          |   -646.64 |
|       255 | I8Wz9sJBZTnEFG08lhcbfTZq3S     |   3196.07 |
|       430 | s2yfPEGGOqHfgkVSs5Rs6 qh,SuVmR |   7905.17 |
|       726 | 4w7DOLtN9Hy,xzZMR              |   6253.81 |
|       905 | f iyVEgCU2lZZPCebx5bGp5        |   -600.73 |
|      1312 | f5zgMB4MHLMSHaX0tDduHAmVd4     |    9459.5 |
|      1358 | t23gsl4TdVXqTZha DioEHIq5w7y   |   5149.23 |
+-----------+--------------------------------+-----------+
10 rows in set (1.09 sec)


##开始导入
mysql> insert into etl_ots_test.test_insert (id1_int ,c_address, c_acctbal)
    -> select c.c_custkey, c.c_address, c.c_acctbal 
    -> from tpch_50x_text.customer c
    -> join tpch_50x_text.nation n 
    -> on c.c_nationkey = n.n_nationkey
    -> where n.n_name = 'CANADA' 
    -> and c.c_mktsegment = 'BUILDING' 
    -> order by c.c_custkey
    -> limit 10;
+------+
| rows |
+------+
|   10 |
+------+
1 row in set (2.14 sec)

## 验证结果,没有问题:
mysql> select * from etl_ots_test.test_insert;
+---------+--------------------------------+-----------+
| id1_int | c_address                      | c_acctbal |
+---------+--------------------------------+-----------+
|      13 | nsXQu0oVjD7PM659uC3SRSp        |   3857.34 |
|      27 | IS8GIyxpBrLpMT0u7              |   5679.84 |
|      40 | gOnGWAyhSV1ofv                 |    1335.3 |
|      64 | MbCeGY20kaKK3oalJD,OT          |   -646.64 |
|     255 | I8Wz9sJBZTnEFG08lhcbfTZq3S     |   3196.07 |
|     430 | s2yfPEGGOqHfgkVSs5Rs6 qh,SuVmR |   7905.17 |
|     726 | 4w7DOLtN9Hy,xzZMR              |   6253.81 |
|     905 | f iyVEgCU2lZZPCebx5bGp5        |   -600.73 |
|    1312 | f5zgMB4MHLMSHaX0tDduHAmVd4     |    9459.5 |
|    1358 | t23gsl4TdVXqTZha DioEHIq5w7y   |   5149.23 |
+---------+--------------------------------+-----------+
10 rows in set (0.27 sec)

d)注意点:

虽然有ETL工具快速导入导出,但也有些问题需要注意的,比如:

  • 如果导入任务时间太长,请走异步模式,否则连接断开可能会影响任务正常运行;
  • TableStore目前的insert是根据主键覆盖,主键不会去重判断的,请务必不能对你正常的数据表做插入;
  • 目前DLA和TableStore的事务能力还不够,可能会出现中断,已导入的数据不会清楚,需要自行清理;
  • 列的个数和列的类型,需要自己对齐保障,否则会报错;

4. 其他数据源导入

整个过程是不是很简单?是不是想要导入其他场景的数据源?对DLA而言,底层任何数据源都以相同方式处理,只要确保其他数据源的库、表在DLA中正常创建,就可以正常的读写,实现ETL啦!赶紧试试吧!

其他相关的文档:

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
5月前
|
存储 分布式计算 运维
【2023云栖】刘一鸣:Data+AI时代大数据平台建设的思考与发布
本文根据2023云栖大会演讲实录整理而成,演讲信息如下: 演讲人:刘一鸣 | 阿里云自研大数据产品负责人 演讲主题:Data+AI时代大数据平台应该如何建设
101382 7
|
9月前
|
数据采集 SQL 分布式计算
数据处理 、大数据、数据抽取 ETL 工具 DataX 、Kettle、Sqoop
数据处理 、大数据、数据抽取 ETL 工具 DataX 、Kettle、Sqoop
973 0
|
4月前
|
关系型数据库 MySQL 大数据
程序员小sister的烦恼_快速上手大数据ETL神器Kettle(xls导入mysql)
程序员小sister的烦恼_快速上手大数据ETL神器Kettle(xls导入mysql)
51 0
|
4月前
|
SQL 存储 大数据
手把手教你大数据离线综合实战 ETL+Hive+Mysql+Spark
手把手教你大数据离线综合实战 ETL+Hive+Mysql+Spark
88 0
|
5月前
|
SQL 消息中间件 存储
TuGraph Analytics动态插件:快速集成大数据生态系统
插件机制为GeaFlow任务提供了外部数据源的集成能力扩展,GeaFlow支持从各类Connector中读写数据,GeaFlow将它们都识别为外部表,并将元数据存储在Catalog中。GeaFlow已有一些内置的插件,例如FileConnector,KafkaConnector,JDBCConnector,HiveConnector等。
|
7月前
|
数据采集 存储 大数据
大数据ETL简介
大数据ETL简介
163 0
|
10月前
|
存储 SQL 机器学习/深度学习
MaxCompute(原名ODPS,全称Open Data Processing Service)
MaxCompute(原名ODPS,全称Open Data Processing Service)是阿里云开发的一种云原生数据处理和分析服务。它提供了强大的数据计算和处理能力,支持海量数据的存储、计算、分析和挖掘,并且具有高可靠、高性能、高可扩展、高安全等优势,适用于各种数据处理和分析场景。
573 0
|
10月前
|
XML JSON 大数据
大数据ETL开发之图解Kettle工具
大数据ETL开发之图解Kettle工具
155 0
|
12月前
|
XML SQL JSON
大数据 ETL 处理工具 Kettle 常用输入输出
相比现在流行大数据技术,你可能觉得 Kettle 的使用场景太少了,或者没有必要使用这么个玩意儿,查看了下 github kettle 发现最近也有一些更新,另外,对于没有编程经验的数据使用人员,使用非常简单的 Kettle,通过图形界面设计实现做什么业务,无需写代码去实现,就可以做一些实验,比如:抓取网站上的股票数据、外汇信息等等。 Kettle 支持很多种输入和输出格式,包括文本文件,数据表,以及数据库引擎。总之,Kettle 强大的输入、输出、转换功能让你非常方便的操作数据。
|
12月前
|
SQL JavaScript 前端开发
大数据 ETL 处理工具 Kettle 完成一个作业任务
简单一句话,作业流程,即是对转换流程进行调度,也可以嵌套转换流程和作业流程。