Hadoop数据迁到MaxCompute

本文涉及的产品
云服务器 ECS,每月免费额度200元 3个月
云服务器ECS,u1 2核4GB 1个月
简介:

通过最佳实践帮助您实现上述案例效果

Step1:数据准备

接下来,我们需要准备好一张表及数据集;

  • Hive表名:hive_dplus_good_sale;
  • 是否分区表:分区表,分区名为pt;
  • hdfs文件数据列分隔符:英文逗号;
  • 表数据量:100条。

源hive表建表语句

CREATE TABLE IF NOT EXISTS hive_dplus_good_sale(
   create_time timestamp,
   good_cate STRING,
   brand STRING,
   buyer_id STRING,
   trans_num BIGINT,
   trans_amount DOUBLE,
   click_cnt BIGINT,
   addcart_cnt BIGINT,
   collect_cnt BIGINT
   )
   PARTITIONED BY (pt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' lines terminated by '\n' ;

数据预览如下:

TB1eZQQOVXXXXcDXFXXXXXXXXXX-934-193.png

TB1j_NePXXXXXXDXpXXXXXXXXXX-1048-422.png
Hdfs&datax类型对照表

在Hive表中的数据类型 DataX 内部类型
TINYINT,SMALLINT,INT,BIGINT Long
FLOAT,DOUBLE,DECIMAL Double
String,CHAR,VARCHAR String
BOOLEAN Boolean
Date,TIMESTAMP Date
Binary Binary

Step2:MaxCompute目标表准备

1.1 开通MaxCompute
阿里云实名认证账号访问https://www.aliyun.com/product/odps ,开通MaxCompute,选择按量付费进行购买。
TB1IKEAOVXXXXcRaXXXXXXXXXXX-1158-487.png
TB1hTApOVXXXXbWaFXXXXXXXXXX-1243-351.png
TB1IHkOOVXXXXa5XVXXXXXXXXXX-1208-337.png

1.2 数加上创建MaxCompute project

操作步骤:

步骤1:进入数加管理控制台,前面开通MaxCompute成功页面,点击管理控制台,或者导航产品->大数据(数加)->MaxCompoute(https://www.aliyun.com/product/odps) 点击管理控制台。

TB1SNkTOVXXXXbiXFXXXXXXXXXX-1281-473.png

步骤2:创建项目。付费模式选择I/O后付费,输入项目名称:

TB1BLoEOVXXXXaRapXXXXXXXXXX-1361-506.png

步骤3:创建MaxCompute表。进入大数据开发套件的数据开发页面:

TB1ZFhePXXXXXcDXXXXXXXXXXXX-1023-211.png

新建表

TB18IACOVXXXXb_XVXXXXXXXXXX-1044-336.png

对应建表语句:

CREATE TABLE IF NOT EXISTS mc_dplus_good_sale (
   create_time DATETIME COMMENT '时间',
   good_cate STRING COMMENT '商品种类',
   brand STRING COMMENT '品牌',
   buyer_id STRING COMMENT '用户id',
   trans_num BIGINT COMMENT '交易量',
   trans_amount DOUBLE COMMENT '金额',
   click_cnt BIGINT COMMENT '点击次数',
   addcart_cnt BIGINT COMMENT '加入购物车次数',
   collect_cnt BIGINT COMMENT '加入收藏夹次数'
)
PARTITIONED BY (pt STRING);

Step3:通过Datax进行数据迁移

MaxCompute中的结构化数据是以表的形式存在,我们可以通过工具或者API将数据直接写入表中。

网络传输

  • datax: https://github.com/alibaba/DataX 本case我们选用datax方式进行数据迁移。
    datax是阿里开源的一款工具,可以适配常见的数据源,包括关系数据库、文本文件等,这是一款单机的软件,适用于中小数据量的传输,传输效率按照单机网卡 50%(约50MB/s)计算,一天可以传输的数据量为

50MB/s 60 60 * 24 / 1024/1024 ≈ 4T
具体的传输效率会取决于网络环境(机房出口带宽、本地网络),机器性能,文件格式与数量等多个因素。
使用datax传输数据到ODPS的部署结构如下,在ECS机器或一台物理机器上部署好dataX,此机器要能够同时连通数据源与MaxCompute(原ODPS)服务。

TB108QROVXXXXcNXFXXXXXXXXXX-1290-128.png

磁盘导入

当数据量比较大(PB级别以上)时,可以考虑通过磁盘将数据导入ODPS,需要磁盘导入请联系相应的大数据业务架构师。

API: Tunnel SDK

当数据源较特殊,无法使用以上现有的工具的时候,可以用SDK编写适配的工具,从数据源读出并写入ODPS中,Tunnel是数据上传的底层接口,可以同时接受大量客户端并行的写入。
https://help.aliyun.com/document_detail/27837.html?spm=5176.doc34614.6.144.RyeirI

ECS机器部署

前提准备:获取ECS的主机名和IP:

TB16pI2OVXXXXbkXpXXXXXXXXXX-972-297.png

操作步骤:

步骤1:增加调度资源。大数据开发套件->项目管理->调度资源管理->增加调度资源,输入名称和标识hive2mc:
TB12kdlPXXXXXXnXXXXXXXXXXXX-1352-292.png

步骤2:添加配置ECS服务器:
TB1kdMTOVXXXXXDaXXXXXXXXXXX-1216-375.png
输入前提准备中获取到的ECS服务器名称和IP,点击添加。

步骤3:初始化ECS服务器:
TB1GnozOVXXXXcxapXXXXXXXXXX-1200-318.png

按照弹框提示,登录ECS服务器(登录外网IP)在root用户下执行两个命令,即部署相关datax等服务。

命令执行成功后,回到“管理服务器”页面,点击刷新按钮可以看到服务器状态为“正常”状态:
TB19nhaPXXXXXXFXXXXXXXXXXXX-1090-152.png

此时该项目的调度资源组就有两个:默认资源组、刚添加成功的资源组,后面执行datax任务即使用刚添加成功的资源组进行调度运行。

任务配置

前提准备:查看hive表对应数据文件地址
TB1rJZmOVXXXXXXaVXXXXXXXXXX-645-410.png

操作步骤:

步骤1:前面ECS机器部署介绍的资源组配置好后,导航上点击进入“数据开发”页面进行任务开发。
TB18LwJOVXXXXXoaXXXXXXXXXXX-879-324.png

步骤2:创建工作流,命名为hive2MC_demo,选择一次性调度(本case是做一次性迁移,若需要每日生产调度则选择周期调度):
TB1lkIuOVXXXXbqapXXXXXXXXXX-1076-405.png

步骤3:在工作流中创建shell节点:
TB1G7cqOVXXXXa9aFXXXXXXXXXX-1084-515.png

步骤4:代码配置。Shell节点创建后,双击节点图,进入代码编辑器进行Shell代码编辑。

代码中进行datax配置,reader为hdfsreader,writer为odpswriter,具体代码如下,只需要修改reader和writer的配置:

shell_datax_home='/home/admin/shell_datax'
mkdir -p ${shell_datax_home}
shell_datax_config=${shell_datax_home}/${ALISA_TASK_ID}
echo '''
{
   "job": {
       "setting": {
           "speed": {
               "channel": 3
           }
       },
       "content": [
           {
               "reader": {
                   "name": "hdfsreader",
                   "parameter": {
                       "path": "/user/hive/warehouse/hive_table_name/*",
                       "defaultFS": "hdfs://xxx:port",
                       "column": [
                              {
                               "index": 0, 
                               "type": "date"
                              },
                              {
                               "index": 1,
                               "type": "STRING"
                              },
                              {
                               "index":2,
                               "type": "STRING"
                              },
                              {
                               "index":3,
                               "type": "STRING"
                              },
                              {
                               "index": 4,
                               "type": "long"
                              }
                              ,
                              {
                               "index": 5,
                               "type": "DOUBLE"
                              },
                              {
                               "index": 6,
                               "type": "long"
                              },
                              {
                               "index": 7,
                               "type": "long"
                              },
                              {
                               "index": 8,
                               "type": "long"
                              }
                       ],
                       "fileType": "text",
                       "encoding": "UTF-8",
                       "fieldDelimiter": ","
                   }

               },
               "writer": {
                   "name": "odpswriter",
                       "parameter": {
                         "project": "base_case", 
                         "table": "mc_dplus_good_sale", 
                         "partition":"pt=20160618", 
                         "column": ["create_time","good_cate","brand","buyer_id","trans_num","trans_amount","click_cnt","addcart_cnt","collect_cnt"],
                         "accessId": "xxxxxxxxxx",
                         "accessKey": "xxxxxxxxxxx",
                         "truncate": true,
                         "odpsServer": "http://service.odps.aliyun.com/api",
                         "tunnelServer": "http://dt.odps.aliyun.com",
                         "accountType": "aliyun"
                      }
               }
           }
       ]
   }
}''' > ${shell_datax_config}

echo "`date '+%Y-%m-%d %T'` shell datax config: ${shell_datax_config}"

/home/admin/datax3/bin/datax.py ${shell_datax_config}
shell_datax_run_result=$?

rm ${shell_datax_config}

if [ ${shell_datax_run_result} -ne 0 ]
then
   echo "`date '+%Y-%m-%d %T'` shell datax ended failed :("
   exit -1
fi
echo "`date '+%Y-%m-%d %T'` shell datax ended success~"

hdfsreader

  • path:hive表hdfs文件路径(注意格式);
  • defaultFS: hdfs文件系统namenode节点地址(注意格式);
  • index:hdfs数据文本第几列,以0开始。
  • Type:hdfs数据对应datax的类型,请看hdsf&datax类型对照表;
  • fileType: hdfs数据文件类型;
  • encoding: hdfs数据文件的编码;
  • fieldDelimiter: hdfs数据文件字段分隔符。
    Odpswriter
  • project: MaxCompute目标表所属项目名称;
  • table: MaxCompute目标表名称;
  • partition:表分区值,此case直接用固定值;
  • column:目标表列,注意顺序与reader的列顺序一一对应;
  • accessId:写入目标表所用的云账号access id;
  • accessKey:写入目标表所用的云长access key;
  • truncate:写入前是否清空当前表/分区数据,需要清空就true,需要保留就false。

步骤5:保存shell节点,提交工作流。
TB1JhEpOVXXXXasaFXXXXXXXXXX-998-323.png

步骤6:指派执行资源组。这里将给shell任务指定执行的机器即我们前面部署的ECS。
TB14HwROVXXXXcrXFXXXXXXXXXX-1356-508.png

hive2mc资源组即为《ECS机器部署》章节中配置的资源组。资源组配置好后,后面就可以执行shell任务进行数据迁移。

Step4:执行数据迁移任务

接前面的步骤,点击shell节点的操作->流程:

TB1JhEpOVXXXXasaFXXXXXXXXXX-998-323.png

TB1KnQuOVXXXXb_apXXXXXXXXXX-1265-310.png

进入该节点的任务管理视图界面,右击节点图,选择测试节点,即可把该阶段调度执行起来:

TB1Tj4tPXXXXXb5XXXXXXXXXXXX-1055-510.png

示例名称和业务日期采用默认,直接点击生成并运行,然后点击“前往查看运行结果”。

TB1m8szOVXXXXbtaXXXXXXXXXXX-585-286.png
进入测试实例视图,可以看到节点示例执行状态,如下图变成绿色打勾的符号说明执行成功,右击节点图选择 查看运行日志 查看具体执行日志。

TB1cy..OVXXXXatXXXXXXXXXXXX-810-339.png
查看日志中读出100条,写入失败0条,说明成功写入100条数据;

TB1WxQ4OVXXXXXnXpXXXXXXXXXX-1283-493.png

Step5:校验MaxCompute表数据

可以到数据开发中创建一个脚本文件

TB1fLoFOVXXXXaNaXXXXXXXXXXX-768-308.png
运行代码:

select create_time,good_cate,brand,buyer_id,trans_num,trans_amount,click_cnt,addcart_cnt,collect_cnt from mc_dplus_good_sale limit 10; 

查看数据是否正常。

TB1Zp31OVXXXXaMXpXXXXXXXXXX-1323-545.png
也可以执行

select count(*) from mc_dplus_good_sale where pt=20160618;

查看是否导入的数据是否是100条。

TB1IOUKOVXXXXX1XVXXXXXXXXXX-767-345.png

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
5天前
|
分布式计算 DataWorks 数据库
DataWorks操作报错合集之DataWorks使用数据集成整库全增量同步oceanbase数据到odps的时候,遇到报错,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
19 0
|
5天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在 DataWorks 中,使用Oracle作为数据源进行数据映射和查询,如何更改数据源为MaxCompute或其他类型
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
22 1
|
6天前
|
分布式计算 DataWorks 调度
DataWorks产品使用合集之在DataWorks中,查看ODPS表的OSS对象如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
21 1
|
6天前
|
分布式计算 DataWorks MaxCompute
DataWorks产品使用合集之在DataWorks中,将数据集成功能将AnalyticDB for MySQL中的数据实时同步到MaxCompute中如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
19 0
|
6天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在DataWorks中,MaxCompute创建外部表,MaxCompute和DataWorks的数据一直保持一致如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
15 0
DataWorks产品使用合集之在DataWorks中,MaxCompute创建外部表,MaxCompute和DataWorks的数据一直保持一致如何解决
|
6天前
|
分布式计算 DataWorks 安全
DataWorks产品使用合集之在DataWorks中,从Elasticsearch同步数据到ODPS时同步_id字段的如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
21 0
|
6天前
|
分布式计算 DataWorks Java
DataWorks操作报错合集之dataworks 同步es数据到maxcompute 遇到报错:获取表列信息失败如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
15 0
|
23天前
|
存储 分布式计算 Hadoop
大数据处理架构Hadoop
【4月更文挑战第10天】Hadoop是开源的分布式计算框架,核心包括MapReduce和HDFS,用于海量数据的存储和计算。具备高可靠性、高扩展性、高效率和低成本优势,但存在低延迟访问、小文件存储和多用户写入等问题。运行模式有单机、伪分布式和分布式。NameNode管理文件系统,DataNode存储数据并处理请求。Hadoop为大数据处理提供高效可靠的解决方案。
49 2
|
23天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
11天前
|
分布式计算 Hadoop 大数据
[大数据] mac 史上最简单 hadoop 安装过程
[大数据] mac 史上最简单 hadoop 安装过程

热门文章

最新文章