Hadoop数据迁移MaxCompute最佳实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文向您详细介绍如何通过使用DataWorks数据同步功能,将Hadoop数据迁移到阿里云MaxCompute大数据计算服务上。您也可以参考本文进行MaxCompute数据到Hadoop的反向迁移。

本文向您详细介绍如何通过使用DataWorks数据同步功能,将Hadoop数据迁移到阿里云MaxCompute大数据计算服务上。

1.  环境准备

1.1 Hadoop集群搭建

进行数据迁移前,您需要保证自己的Hadoop集群环境正常。本文使用阿里云EMR服务自动化搭建Hadoop集群,详细过程请参见https://help.aliyun.com/document_detail/35223.html?spm=a2c4g.11186623.6.557.20e219306ZJC9i

本文使用的EMR Hadoop版本信息如下:

EMR版本: EMR-3.11.0

集群类型: HADOOP

软件信息: HDFS2.7.2 / YARN2.7.2 / Hive2.3.3 / Ganglia3.7.2 / Spark2.2.1 / HUE4.1.0 / Zeppelin0.7.3 / Tez0.9.1 / Sqoop1.4.6 / Pig0.14.0 / ApacheDS2.0.0 / Knox0.13.0

Hadoop集群使用经典网络,区域为华东1(杭州),主实例组ECS计算资源配置公网及内网IP,高可用选择为否(非HA模式),具体配置如下所示。

1.2 MaxCompute

请参考https://help.aliyun.com/document_detail/58226.html?spm=a2c4g.11174283.6.570.64d4590eaIbcxH

开通MaxCompute服务并创建好项目,本文中在华东1(杭州)区域创建项目bigdata_DOC,同时启动DataWorks相关服务,如下所示。

2.  数据准备

2.1 Hadoop集群创建测试数据

进入EMR Hadoop集群控制台界面,使用交互式工作台,新建交互式任务doc。本例中HIVE建表语句:

CREATE TABLE IF NOT EXISTS hive_doc_good_sale(

   create_time timestamp,

   category STRING,

   brand STRING,

   buyer_id STRING,

   trans_num BIGINT,

   trans_amount DOUBLE,

   click_cnt BIGINT

   )

   PARTITIONED BY (pt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' lines terminated by '\n'

选择运行,观察到Query executed successfully提示则说明成功在EMR Hadoop集群上创建了测试用表格hive_doc_good_sale,如下图所示。

插入测试数据,您可以选择从OSS或其他数据源导入测试数据,也可以手动插入少量的测试数据。本文中手动插入数据如下:

insert into hive_doc_good_sale PARTITION(pt =1 ) values('2018-08-21','外套','品牌A','lilei',3,500.6,7),('2018-08-22','生鲜','品牌B','lilei',1,303,8),('2018-08-22','外套','品牌C','hanmeimei',2,510,2),(2018-08-22,'卫浴','品牌A','hanmeimei',1,442.5,1),('2018-08-22','生鲜','品牌D','hanmeimei',2,234,3),('2018-08-23','外套','品牌B','jimmy',9,2000,7),('2018-08-23','生鲜','品牌A','jimmy',5,45.1,5),('2018-08-23','外套','品牌E','jimmy',5,100.2,4),('2018-08-24','生鲜','品牌G','peiqi',10,5560,7),('2018-08-24','卫浴','品牌F','peiqi',1,445.6,2),('2018-08-24','外套','品牌A','ray',3,777,3),('2018-08-24','卫浴','品牌G','ray',3,122,3),('2018-08-24','外套','品牌C','ray',1,62,7) ;

完成插入数据后,您可以使用select * from hive_doc_good_sale where pt =1;语句检查Hadoop集群表中是否已存在数据可用于迁移。

2.2 利用DataWorks新建目标表

在管理控制台,选择对应的MaxCompute项目,点击进入数据开发页面,点击新建表,如下所示。

在弹框中输入SQL建表语句,本例中使用的建表语句如下:

CREATE TABLE IF NOT EXISTS hive_doc_good_sale(

   create_time string,

   category STRING,

   brand STRING,

   buyer_id STRING,

   trans_num BIGINT,

   trans_amount DOUBLE,

   click_cnt BIGINT

   )

   PARTITIONED BY (pt string) ;

在建表过程中,需要考虑到HIVE数据类型与MaxCompute数据类型的映射,当前数据映射关系可参见 https://help.aliyun.com/document_detail/54081.html?spm=a2c4e.11153940.blogcont630231.16.587919b0KIgCfQ 

由于本文使用DataWorks进行数据迁移,而DataWorks数据同步功能当前暂不支持timestamp类型数据,因此在DataWorks建表语句中,将create_time设置为string类型。

上述步骤同样可通过odpscmd命令行工具完成,命令行工具安装和配置请参考https://help.aliyun.com/document_detail/27804.html?spm=a2c4g.11186623.6.572.e66b5a41WP2nyj ,执行过程如下所示。

注意:考虑到部分HIVE与MaxCompute数据类型的兼容问题,建议在odpscmd客户端上执行以下命令。

set odps.sql.type.system.odps2=true;set odps.sql.hive.compatible=true;

完成建表后,可在DataWorks数据开发>表查询一栏查看到当前创建的MaxCompute上的表,如下所示。

3.  数据同步

3.1 新建自定义资源组

由于MaxCompute项目所处的网络环境与Hadoop集群中的数据节点(data node)网络通常不可达,我们可通过自定义资源组的方式,将DataWorks的同步任务运行在Hadoop集群的Master节点上(Hadoop集群内Master节点和数据节点通常可达)。

3.1.1 查看Hadoop集群datanode

在EMR控制台上首页/集群管理/集群/主机列表页查看,如下图所示,通常非HA模式的EMR上Hadoop集群的master节点主机名为 emr-header-1,datanode主机名为emr-worker-X。

您也可以通过点击上图中Master节点的ECS ID,进入ECS实例详情页,通过点击远程连接进入ECS,通过 hadoop dfsadmin –report命令查看datenode,如下图所示。

由上图可以看到,在本例中,datanode只具有内网地址,很难与DataWorks默认资源组互通,所以我们需要设置自定义资源组,将master node设置为执行DataWorks数据同步任务的节点。

3.1.2 新建自定义资源组

进入DataWorks数据集成页面,选择资源组,点击新增资源组,如下图所示。关于新增调度资源组的详细信息,请参考新增调度资源

在添加服务器步骤中,需要输入ECS UUID和机器IP等信息(对于经典网络类型,需输入服务器名称,对于专有网络类型,需输入服务器UUID。目前仅DataWorks V2.0 华东2区支持经典网络类型的调度资源添加,对于其他区域,无论您使用的是经典网络还是专有网络类型,在添加调度资源组时都请选择专有网络类型),机器IP需填写master node公网IP(内网IP可能不可达)。ECS的UUID需要进入master node管理终端,通过命令dmidecode | grep UUID获取(如果您的hadoop集群并非搭建在EMR环境上,也可以通过该命令获取),如下所示。

完成添加服务器后,需保证master node与DataWorks网络可达,如果您使用的是ECS服务器,需设置服务器安全组。如果您使用的内网IP互通,可参考https://help.aliyun.com/document_detail/72978.html?spm=a2c4g.11186623.6.580.6a294f79DmqM6f设置。如果您使用的是公网IP,可直接设置安全组公网出入方向规则,本文中设置公网入方向放通所有端口(实际应用场景中,为了您的数据安全,强烈建议设置详细的放通规则),如下图所示。

完成上述步骤后,按照提示安装自定义资源组agent,观察到当前状态为可用,说明新增自定义资源组成功。

如果状态为不可用,您可以登录master node,使用tail –f/home/admin/alisatasknode/logs/heartbeat.log命令查看DataWorks与master node之间心跳报文是否超时,如下图所示。

3.2 新建数据源

关于DataWorks新建数据源详细步骤,请参见https://help.aliyun.com/knowledge_list/72788.html?spm=a2c4g.11186623.6.573.7d882b59eqdXhi

DataWorks新建项目后,默认设置自己为数据源odps_first。因此我们只需添加Hadoop集群数据源:在DataWorks数据集成页面,点击数据源>新增数据源,在弹框中选择HDFS类型的数据源。

在弹出窗口中填写数据源名称及defaultFS。对于EMR Hadoop集群而言,如果Hadoop集群为HA集群,则此处地址为hdfs://emr-header-1的IP:8020,如果Hadoop集群为非HA集群,则此处地址为hdfs://emr-header-1的IP:9000。在本文中,emr-header-1与DataWorks通过公网连接,因此此处填写公网IP并放通安全组。

完成配置后,点击测试连通性,如果提示“测试连通性成功”,则说明数据源添加正常。

注意:如果EMR Hadoop集群设置网络类型为专有网络,则不支持连通性测试。

3.3 配置数据同步任务

在DataWorks数据集成页面点击同步任务,选择新建>脚本模式,在导入模板弹窗选择数据源类型如下:

完成导入模板后,同步任务会转入脚本模式,本文中配置脚本如下,相关解释请参见https://help.aliyun.com/document_detail/74304.html?spm=a2c4g.11186631.6.576.2c506aaczJB2i7

在配置数据同步任务脚本时,需注意DataWorks同步任务和HIVE表中数据类型的转换如下:

在Hive表中的数据类型

DataX/DataWorks 内部类型

TINYINT,SMALLINT,INT,BIGINT

Long

FLOAT,DOUBLE,DECIMAL

Double

String,CHAR,VARCHAR

String

BOOLEAN

Boolean

Date,TIMESTAMP

Date

Binary

Binary

详细代码如下:

{

  "configuration": {

    "reader": {

      "plugin": "hdfs",

      "parameter": {

        "path": "/user/hive/warehouse/hive_doc_good_sale/",

        "datasource": "HDFS1",

        "column": [

          {

            "index": 0,

            "type": "string"

          },

          {

            "index": 1,

            "type": "string"

          },

          {

            "index": 2,

            "type": "string"

          },

          {

            "index": 3,

            "type": "string"

          },

          {

            "index": 4,

            "type": "long"

          },

          {

            "index": 5,

            "type": "double"

          },

          {

            "index": 6,

            "type": "long"

          }

        ],

        "defaultFS": "hdfs://121.199.11.138:9000",

        "fieldDelimiter": ",",

        "encoding": "UTF-8",

        "fileType": "text"

      }

    },

    "writer": {

      "plugin": "odps",

      "parameter": {

        "partition": "pt=1",

        "truncate": false,

        "datasource": "odps_first",

        "column": [

          "create_time",

          "category",

          "brand",

          "buyer_id",

          "trans_num",

          "trans_amount",

          "click_cnt"

        ],

        "table": "hive_doc_good_sale"

      }

    },

    "setting": {

      "errorLimit": {

        "record": "1000"

      },

      "speed": {

        "throttle": false,

        "concurrent": 1,

        "mbps": "1",

        "dmu": 1

      }

    }

  },

  "type": "job",

  "version": "1.0"

}

其中,path参数为数据在Hadoop集群中存放的位置,您可以在登录master node后,使用hdfs dfs –ls /user/hive/warehouse/hive_doc_good_sale命令确认。对于分区表,您可以不指定分区,DataWorks数据同步会自动递归到分区路径,如下图所示。

完成配置后,点击运行。如果提示任务运行成功,则说明同步任务已完成。如果运行失败,可通过复制日志进行进一步排查。

4.  验证结果

在DataWorks数据开发/表查询页面,选择表hive_doc_good_sale后,点击数据预览可查看HIVE数据是否已同步到MaxCompute。您也可以通过新建一个table查询任务,在任务中输入脚本select * FROM   hive_doc_good_sale where pt =1;后,点击运行来查看表结果,如下图所示。

当然,您也可以通过在odpscmd命令行工具中输入select * FROM   hive_doc_good_sale where pt =1;查询表结果。


5.  MaxCompute数据迁移到Hadoop

如果您想实现MaxCompute数据迁移到Hadoop。步骤与上述步骤类似,不同的是同步脚本内的reader和writer对象需要对调,具体实现脚本举例如下。

{
  "configuration": {
    "reader": {
      "plugin": "odps",
      "parameter": {
      "partition": "pt=1",
      "isCompress": false,
      "datasource": "odps_first",
      "column": [
        "create_time",
        "category",
        "brand",
      "buyer_id",
      "trans_num",
      "trans_amount",
      "click_cnt"
    ],
    "table": "hive_doc_good_sale"
    }
  },
  "writer": {
    "plugin": "hdfs",
    "parameter": {
    "path": "/user/hive/warehouse/hive_doc_good_sale",
    "fileName": "pt=1",
    "datasource": "HDFS_data_source",
    "column": [
      {
        "name": "create_time",
        "type": "string"
      },
      {
        "name": "category",
        "type": "string"
      },
      {
        "name": "brand",
        "type": "string"
      },
      {
        "name": "buyer_id",
        "type": "string"
      },
      {
        "name": "trans_num",
        "type": "BIGINT"
      },
      {
        "name": "trans_amount",
        "type": "DOUBLE"
      },
      {
        "name": "click_cnt",
        "type": "BIGINT"
      }
    ],
    "defaultFS": "hdfs://47.99.162.100:9000",
    "writeMode": "append",
    "fieldDelimiter": ",",
    "encoding": "UTF-8",
    "fileType": "text"
    }
  },
  "setting": {
    "errorLimit": {
      "record": "1000"
  },
  "speed": {
    "throttle": false,
    "concurrent": 1,
    "mbps": "1",
    "dmu": 1
  }
  }
},
"type": "job",
"version": "1.0"
}

您需要参考配置HDFS Writer在运行上述同步任务前对Hadoop集群进行设置,在运行同步任务后手动拷贝同步过去的文件。


相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
7天前
|
存储 分布式计算 Hadoop
大数据处理架构Hadoop
【4月更文挑战第10天】Hadoop是开源的分布式计算框架,核心包括MapReduce和HDFS,用于海量数据的存储和计算。具备高可靠性、高扩展性、高效率和低成本优势,但存在低延迟访问、小文件存储和多用户写入等问题。运行模式有单机、伪分布式和分布式。NameNode管理文件系统,DataNode存储数据并处理请求。Hadoop为大数据处理提供高效可靠的解决方案。
28 2
|
7天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
2月前
|
分布式计算 关系型数据库 数据库连接
MaxCompute数据问题之数据迁移如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
31 0
|
9天前
|
SQL 分布式计算 Hadoop
利用Hive与Hadoop构建大数据仓库:从零到一
【4月更文挑战第7天】本文介绍了如何使用Apache Hive与Hadoop构建大数据仓库。Hadoop的HDFS和YARN提供分布式存储和资源管理,而Hive作为基于Hadoop的数据仓库系统,通过HiveQL简化大数据查询。构建过程包括设置Hadoop集群、安装配置Hive、数据导入与管理、查询分析以及ETL与调度。大数据仓库的应用场景包括海量数据存储、离线分析、数据服务化和数据湖构建,为企业决策和创新提供支持。
40 1
|
26天前
|
消息中间件 SQL 分布式计算
大数据Hadoop生态圈体系视频课程
熟悉大数据概念,明确大数据职位都有哪些;熟悉Hadoop生态系统都有哪些组件;学习Hadoop生态环境架构,了解分布式集群优势;动手操作Hbase的例子,成功部署伪分布式集群;动手Hadoop安装和配置部署;动手实操Hive例子实现;动手实现GPS项目的操作;动手实现Kafka消息队列例子等
20 1
大数据Hadoop生态圈体系视频课程
|
4月前
|
分布式计算 资源调度 搜索推荐
《PySpark大数据分析实战》-02.了解Hadoop
大家好!今天为大家分享的是《PySpark大数据分析实战》第1章第2节的内容:了解Hadoop。
44 0
《PySpark大数据分析实战》-02.了解Hadoop
|
4月前
|
SQL 分布式计算 大数据
大数据技术之集群数据迁移
大数据技术之集群数据迁移
52 0
|
4月前
|
存储 搜索推荐 算法
【大数据毕设】基于Hadoop的音乐推荐系统的设计和实现(六)
【大数据毕设】基于Hadoop的音乐推荐系统的设计和实现(六)
159 0
|
4月前
|
分布式计算 Hadoop Java
【大数据实训】基于Hadoop的2019年11月至2020年2月宁波天气数据分析(五)
【大数据实训】基于Hadoop的2019年11月至2020年2月宁波天气数据分析(五)
52 1
|
4月前
|
存储 分布式计算 搜索推荐
【大数据毕设】基于Hadoop的音乐管理系统论文(三)
【大数据毕设】基于Hadoop的音乐管理系统论文(三)
92 0

相关产品

  • 云原生大数据计算服务 MaxCompute