使用DataX进行OTS实例间数据迁移

本文涉及的产品
云服务器 ECS,每月免费额度200元 3个月
云服务器ECS,u1 2核4GB 1个月
简介: 本文主要说明在业务不停服的前提下如何实现对OTS的增量迁移,以及迁移后的数据一致性校验

 表格存储是 NoSQL 的数据存储服务,是基于云计算技术构建的一个分布式结构化和半结构化数据的存储和管理服务。表格存储的数据模型以二维表为中心。表有行和列的概念,但是与传统数据库不一样,表格存储的表是稀疏的,每一行可以有不同的列,可以动态增加或者减少属性列,建表时不需要为表的属性列定义严格的 schema。

1. 概述

 OTS的数据迁移可以使用DataX完成全量数据迁移。但由于部分数据表的数据量较大,无法在指定的窗口内完成全量迁移,且目前DataX只能针对主键值进行范围查询,暂不支持按照属性列范围抽取数据。所以可以按如下两种方式实现全量+增量的数据迁移:

  1. 分区键包含范围信息(如时间信息、自增ID),则以指定range为切分点,分批次迁移;
  2. 分区键不包含范围信息,则可以采用在应用侧双写的模式将数据分批次迁移,写入目标环境同一张业务表。利用OTS的主键唯一性,选择对重复数据执行覆盖原有行的策略来保证数据唯一性;
    本文以应用侧调整为双写模式为例,详细说明OTS数据迁移、校验过程。其中OTS数据迁移流程具体如下所示:

108

1) 预迁移阶段:双写模式中的大表全量迁移。
2) 正式迁移阶段:双写模式中的增量表全量迁移、其余小表的全量迁移。

2. 预迁移阶段

2.1 准备工作

 为保证新老环境的数据一致性,需要在开始数据迁移前,对目标环境的OTS数据表进行数据清空操作,Delete操作是通过Datax工具直接删除表内数据,无需重新建表。具体操作如下:

2.1.1 配置datax任务

 在使用datax执行数据清空前,需配置对应数据表使用datax执行delete任务所需的json文件。在清空数据的配置中,reader与writer均配置目标端的连接信息,且数据写入模式配置DeleteRow即可,具体内容如下。

{
    "job": {
        "setting": {
            "speed": {
                "channel": "5"
            }
        },
        "content": [{
            "reader": {
                "name": "otsreader",
                "parameter": {
                    "endpoint": "http://xxx.vpc.ots.yyy.com/",
                    "accessId": "dest_accessId",
                    "accessKey": "dest_accessKey",
                    "instanceName": " dest_instanceName",
                    "table": " tablename ",
                    "column": [{
                            "name": "xxxxx"
                        },
                        {
                            "name": "xxxxx"
                        }
                    ],
                    "range": {
                        "begin": [{
                            "type": "INF_MIN"
                        }],
                        "end": [{
                            "type": "INF_MAX"
                        }]
                    }
                }
            },
            "writer": {
                "name": "otswriter",
                "parameter": {
                    "endpoint": "http://xxx.vpc.ots.yun.yyy.com/",
                    "accessId": "dest_accessId",
                    "accessKey": "dest_accessKey",
                    "instanceName": " dest_instanceName",
                    "table": " tablename ",
                    "primaryKey": [{
                        "name": "xxxxx",
                        "type": "string"
                    }],
                    "column": [{
                            "name": "xxxxx",
                            "type": "string"
                        },
                        {
                            "name": "xxxxx",
                            "type": "string"
                        }
                    ],
                    "writeMode": "DeleteRow"
                }
            }
        }]
    }
}

2.1.2 执行datax任务

  1. 登录datax所在ECS后,进入datax所在路径
  2. 在对应的工具机分别执行del_pre.sh脚本,即可开始目标环境对应表的数据清空,具体命令如下:
sh del_pre.sh

del_pre.sh脚本内容如下:

#!/bin/bash
nohup python datax.py del_table_1.json --jvm="-Xms16G -Xmx16G" > del_table_1.log &

2.2 数据迁移

 在不停服务的情况下把源环境内数据量较大的数据表全部迁移到目标环境内对应的数据表。

2.2.1 配置datax任务

 在datax对数据表配置相应的json文件,迁移配置的具体内容如下:
需注意,由于OTS本身是NoSQL系统,在迁移数据的配置中,必须配置所有的属性列,否则会缺失对应属性列的值。

{
    "job": {
        "setting": {
            "speed": {
                "channel": "5"
            }
        },
        "content": [{
            "reader": {
                "name": "otsreader",
                "parameter": {
                    "endpoint": "http://xxx.vpc.ots.yyy.com/",
                    "accessId": "src_accessId",
                    "accessKey": "src_ accessKey ",
                    "instanceName": " src_instanceName",
                    "table": "tablename",
                    "column": [{
                            "name": "xxxxx"
                        },
                        {
                            "name": "xxxxx"
                        }
                    ],
                    "range": {
                        "begin": [{
                            "type": "INF_MIN"
                        }],
                        "end": [{
                            "type": "INF_MAX"
                        }]
                    }
                }
            },
            "writer": {
                "name": "otswriter",
                "parameter": {
                    "endpoint": "http://xxx.vpc.ots.yun.zzz.com/",
                    "accessId": "dest_accessId",
                    "accessKey": "dest_accessKey",
                    "instanceName": " dest_instanceName",
                    "table": " tablename ",
                    "primaryKey": [{
                        "name": "xxxxx",
                        "type": "string"
                    }],
                    "column": [{
                            "name": "xxxxx",
                            "type": "string"
                        },
                        {
                            "name": "xxxxx",
                            "type": "string"
                        }
                    ],
                    "writeMode": "PutRow"
                }
            }
        }]
    }
}

2.2.2 执行datax任务

  1. 登录datax所在ECS后,进入datax所在路径
  2. 在对应的工具机分别执行pre_transfer.sh脚本,即可开始专有域OTS到专有云OTS的数据迁移,具体命令如下:
sh pre_transfer.sh
  • pre_transfer.sh脚本内容如下:
#!/bin/bash
nohup python datax.py table_1.json --jvm="-Xms16G -Xmx16G" >table_1.log &

3. 正式迁移阶段

3.1 OTS数据静默

 OTS的数据静默主要是通过观察对应表的数据是否存在变化来判断,校验方式主要包括行数统计、内容统计。

3.1.1 行数统计

 因OTS本身不提供count接口,所以采用在hive创建OTS外部表的方式,读取OTS数据并计算对应数据表的行数,具体操作如下:

  1. 创建外部表
     启动hive,创建上述数据表对应的外部表;为提高统计效率,外部表可以只读取OTS的主键列,建表语句示例如下:
CREATE EXTERNAL TABLE t_oilcard_expenses_old
(h_card_no string)
STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler'
WITH SERDEPROPERTIES(
"tablestore.columns.mapping"="card_no")
TBLPROPERTIES ("tablestore.endpoint"="$endpoint ","tablestore.instance"="instanceName","tablestore.access_key_id"="ak","tablestore.access_key_secret"="sk","tablestore.table.name"="tableName");
  1. 进入脚本所在路径
    登录Hadoop集群master所在ECS,进入hive所在目录;
  2. 执行行数统计
    执行pre_all_count.sh脚本,即可开始源环境内OTS对应表的行数统计;
nohup sh pre_all_count.sh >pre_all_count.log &
  • pre_all_count.sh脚本内容如下:
#!/bin/bash
  ./bin/hive -e "use ots;select count(h_card_no) from tableName;" >table.rs &

 连续执行两次行数统计,若两次统计结果一致则说明数据已经静默,数据写入以停止;

3.1.2 内容统计

由于部分数据表分区键对应的值比较单一,导致数据全部存储在同一个分区。若采用hive统计行数会耗时太久,所以对于这个表使用datax将OTS数据导入oss的方式进行内容统计,具体操作如下:

  1. 进入脚本所在路径
    登录上述表格对应的ECS,进入datax所在路径;
  2. 执行内容校验
    1) 执行check_table.sh脚本,即可将源环境内OTS数据表导出到OSS;
sh check_table.sh
  • check_table.sh脚本内容如下:
#!/bin/bash
nohup python datax.py table.json --jvm="-Xms8G -Xmx8G">ots2oss01.log &

2) 获取OSS object的ETAG值,写入对应文件table_check01.rs
连续执行两次内容统计,对比两次导出object的ETAG值,若结果一致则说明数据已经静默,数据写入以停止;

3.2 OTS数据迁移

3.2.1 准备工作

 为保证迁移后新老环境数据一致,防止目标环境因测试产生遗留脏数据,在进行数据迁移前,需要将目标环境的OTS的其余全量表进行数据清空。数据清空方式主要有drop、delete,两者的区别如下:

操作方式 优势 劣势 建议
Drop 耗时短 需重新建表
新表分区数需重新扩展
分区数为1的数据表建议选择drop
Delete 分区数保留 耗时较长 需要保留分区数提升迁移速率的表建议选择delete
3.2.1.1 Drop表操作
  1. 登录OTS图形化客户端所在工具机,使用如下信息连接指定OTS实例,并进行对应表的drop操作;
AK: dest_accessId
SK: dest_accessKey
InstanceName: InstanceName
Endpoint:endpoint
  1. 确认删除后,再在客户端重新创建对应的数据;

3.2.1.2 Delete表操作

 Delete操作是通过Datax工具直接删除表内数据,无需重新建表。Datax所需的配置文件参考2.1.1所示。

  1. 登录datax所在ECS后,进入datax所在路径
  2. 在对应的工具机分别执行delete脚本,即可开始目标环境OTS的对应表的数据清空,具体命令如下:
sh del_table_01.sh
  • del_table_01.sh脚本内容如下:
#!/bin/bash
nohup python datax.py del_table.json --jvm="-Xms16G -Xmx16G">del_table.log &

3.2.2 数据迁移

 在源环境停止服务的情况下把双写模式中的增量表全量迁移以及其余小表全部迁移到目标环境内对应的数据表。
具体操作如下:

3.2.2.1 配置datax任务

 在datax对上述数据表配置相应的json文件,迁移配置的具体内容参考2.2.1,在迁移数据的配置中,需要列全所有的属性列。

3.2.2.2 执行datax任务
  1. 登录datax所在ECS后,进入datax所在路径
  2. 在对应的工具机分别执行transfer.sh脚本,即可开始专有域OTS到专有云OTS的数据迁移,具体命令如下:
sh transfer.sh
  • transfer.sh脚本内容如下:
#!/bin/bash
nohup python datax.py Table.json  >Table.log &

3.3 OTS数据校验

 新老环境OTS的数据校验方式均包括行数统计、内容统计,具体如下。

3.3.1 源环境数据统计

 源环境OTS数据表的数据量统计依据数据静默期间最后一次的统计结果即可。

3.3.2 目标环境数据统计

3.3.2.1 行数统计

 因OTS本身不提供count接口,且目标环境ODPS支持创建OTS外部表,所以采用在ODPS创建OTS外部表的方式,读取OTS数据并计算对应数据表的行数,具体操作如下:

  1. 创建外部表
    登录odpscmd,创建上述数据表对应的外部表;
  2. 进入脚本所在路径
    登录odpscmd工具所在ECS,进入odps所在路径;
  3. 执行行数统计
    执行newots_count.sh脚本,即可进行目标环境内OTS对应表的行数统计;
nohup sh newots_count.sh >newots_count.log &
  • newots_count.sh脚本内容如下:
#!/bin/bash
./bin/odpscmd -e "select count(h_card_no) from tableName;" >table.rs &
3.3.2.2 内容统计

由于源环境的部分数据表采用内容统计的方式进行数据校验,为了方便对比数据是否一致,所以目标环境也采用内容统计的方式,具体操作参考3.1.2。

相关实践学习
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
相关文章
|
11月前
|
数据采集 关系型数据库 MySQL
大数据数据采集的数据迁移(同步/传输)的Sqoop之DataX
在大数据领域中,数据迁移是一个非常重要的任务。而Sqoop是一款流行且实用的数据迁移工具,但是它对于某些特定场景的数据迁移并不太方便。为了解决这个问题,阿里巴巴集团开发了一款开源的数据集成工具DataX,提供了更多的数据迁移方式和功能。本文将介绍DataX的基本原理和使用方法,希望能够为大家提供一些参考和帮助。
281 0
|
DataX 开发者
DataX在数据迁移中的应用
DataX在数据迁移中的应用
DataX在数据迁移中的应用
|
分布式计算 NoSQL Java
使用DataX同步MaxCompute数据到TableStore(原OTS)优化指南
现在越来越多的技术架构下会组合使用MaxCompute和TableStore,用MaxCompute作大数据分析,计算的结果会导出到TableStore提供在线访问。MaxCompute提供海量数据计算的能力,而TableStore提供海量数据高并发低延迟读写的能力。
5215 0
|
DataX Python Java
OTS数据迁移验证方案
  OTS在业务停写的情况下,可以通过DATAX工具对OTS数据的全量迁移。本文描述了在进行业务割接的情况下,ots数据的全量迁移配置方法,以及数据校验方法。 1 OTS数据迁移方法 1.1 工具环境要求 tablestore客户端工具机:在本地安装表格存储客户端管理工具,提供图形化的操作界面,用于创建、更新和删除数据表。
4605 0
|
NoSQL DataX 开发工具
TableStore: 使用Datax将实例A的数据迁移到实例B中
现在我们需要将数据从一个老的实例A迁移到实例B上面,做一下备份,我们打算使用Datax作为我们的数据传输工具,其中用到了otsreader和otswriter。
6938 0
|
2月前
|
Java 数据处理 调度
Dataphin常见问题之离线管道同步数据datax就报连接超时如何解决
Dataphin是阿里云提供的一站式数据处理服务,旨在帮助企业构建一体化的智能数据处理平台。Dataphin整合了数据建模、数据处理、数据开发、数据服务等多个功能,支持企业更高效地进行数据治理和分析。
|
7月前
|
存储 DataWorks Unix
Dataworks数据集成之“文本数据”
Dataworks不是支持文本数据导入么?为什么Excel数据不能导入?CSV文件不就是Excel文件么?关于这些问题,我整理了一篇文章进行解释。
774 2
|
4天前
|
分布式计算 DataWorks 数据库
DataWorks操作报错合集之DataWorks使用数据集成整库全增量同步oceanbase数据到odps的时候,遇到报错,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
16 0
|
4天前
|
分布式计算 DataWorks 调度
DataWorks产品使用合集之在DataWorks中,查看ODPS表的OSS对象如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
18 1
|
4天前
|
分布式计算 DataWorks MaxCompute
DataWorks产品使用合集之在DataWorks中,将数据集成功能将AnalyticDB for MySQL中的数据实时同步到MaxCompute中如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
17 0