Flume数据导入ODPS方法

简介: Apache Flume是一个分布式的、可靠的、可用的系统,可用于从不同的数据源中高效地收集、聚合和移动海量日志数据到集中式数据存储系统。 ODPS Sink是基于ODPS DataHub Service开发的Flume插件,可以将Flume的Event数据导入到ODPS中。插件兼容Flume的原

一、简介

 Apache Flume是一个分布式的、可靠的、可用的系统,可用于从不同的数据源中高效地收集、聚合和移动海量日志数据到集中式数据存储系统。
 ODPS Sink是基于ODPS DataHub Service开发的Flume插件,可以将Flume的Event数据导入到ODPS中。插件兼容Flume的原有功能特性,支持ODPS表自定义分区、且可以自动创建分区。

二、环境要求
1、JDK(1.6以上,推荐1.7)
2、Flume-NG 1.x

三、插件部署
1、下载ODPS Sink插件并解压:aliyun-odps-flume-plugin
2、Flume-NG 1.x下载:https://flume.apache.org/download.html
(1)下载 apache-flume-1.6.0-bin.tar.gz
(2)下载apache-flume-1.6.0-src.tar.gz
3、Flume的安装
(1) 解压apache-flume-1.6.0-src.tar.gz和apache-flume-1.6.0-bin.tar.gz
(2) 将apache-flume-1.6.0-src中的文件复制到apache-flume-1.6.0-bin中
4、部署ODPS Sink插件:将文件夹odps_sink移动到Apache Flume安装目录下:
$ mkdir {YOUR_APACHE_FLUME_DIR}/plugins.d
$mv odps_sink/ { YOUR_APACHE_FLUME_DIR }/plugins.d/
移动后,核验ODPS Sink插件是否已经在相应目录:
$ ls { YOUR_APACHE_FLUME_DIR}/plugins.d
odps_sink
部署完成后,只需要在Flume的配置文件中将sink的type字段配置为:
com.aliyun.odps.flume.sink.OdpsSink
即可使用

四、配置示例
例:将日志文件中的结构化数据进行解析,并上传到ODPS表中
需要上传的日志文件格式如下(每行为一条记录,字段之间逗号分隔):

test_basic.log

some,log,line1
some,log,line2
...
第一步、在ODPS 的 project创建ODPS Datahub表
建表语句如下所示:
CREATE TABLE hub_table_basic (col1 STRING, col2 STRING)

PARTITIONED BY (pt STRING)
INTO 1 SHARDS
HUBLIFECYCLE 1;

第二步、创建Flume作业配置文件:
在Flume安装目录的conf/文件夹下创建名为odps_basic.conf的文件,并输入内容如下:

odps_basic.conf

A single-node Flume configuration for ODPS

Name the components on this agent

a1.sources = r1
a1.sinks = k1
a1.channels = c1

Describe/configure the source

a1.sources.r1.type = exec
a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log

Describe the sink

a1.sinks.k1.type = com.aliyun.odps.flume.sink.OdpsSink
a1.sinks.k1.accessID = {YOUR_ALIYUN_ODPS_ACCESS_ID}
a1.sinks.k1.accessKey = {YOUR_ALIYUN_ODPS_ACCESS_KEY}
a1.sinks.k1.odps.endPoint = http://service.odps.aliyun.com/api
a1.sinks.k1.odps.datahub.endPoint = http://dh.odps.aliyun.com
a1.sinks.k1.odps.project = {YOUR_ALIYUN_ODPS_PROJECT}
a1.sinks.k1.odps.table = hub_table_basic
a1.sinks.k1.odps.partition = 20150814
a1.sinks.k1.batchSize = 100
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = col1,,col2
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60
a1.sinks.k1.autoCreatePartition = true

Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

第三步:启动Flume
启动Flume并指定agent的名称和配置文件路径,-Dflume.root.logger=INFO,console选项可以将日志实时输出到控制台。
$ cd { YOUR_APACHE_FLUME_DIR}
$ bin/flume-ng agent -n a1 -c conf -f conf/odps_basic.conf -Dflume.root.logger=INFO,console
写入成功,显示日志如下:
...
Write success. Event count: 2
...
在ODPS Datahub表中即可查到数据;

多数据源上传到ODPS
多个数据上传到odps,只需要配置对应的source和channel,可以有一下几种上传方式:
(1) 多个source和一个channel和一个sink
screenshot

(2) 多个source和多个channel和一个sink
screenshot

(3) 多个source,多个channel和多个sink,输出到多个地方存储
screenshot

(4)多个agent的复杂情况:
screenshot

下面给出(1)中情况的配置:

odps_basic.conf

A single-node Flume configuration for ODPS

Name the components on this agent

a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1

Describe/configure the source

a1.sources.r1.type = exec
a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log

source2的配置

a1.sources.r2.type = exec
a1.sources.r2.command = cat {YOUR_LOG_DIRECTORY}/test_basic2.log

Describe the sink

a1.sinks.k1.type = com.aliyun.odps.flume.sink.OdpsSink
a1.sinks.k1.accessID = {YOUR_ALIYUN_ODPS_ACCESS_ID}
a1.sinks.k1.accessKey = {YOUR_ALIYUN_ODPS_ACCESS_KEY}
a1.sinks.k1.odps.endPoint = http://service.odps.aliyun.com/api
a1.sinks.k1.odps.datahub.endPoint = http://dh.odps.aliyun.com
a1.sinks.k1.odps.project = {YOUR_ALIYUN_ODPS_PROJECT}
a1.sinks.k1.odps.table = hub_table_basic
a1.sinks.k1.odps.partition = 20150814
a1.sinks.k1.batchSize = 100
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = col1,,col2
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60
a1.sinks.k1.autoCreatePartition = true

Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

source2的channel

a1.sources.r2.channels = c2

可能遇到的问题:
1、在数据sink阶段报错,数据无法传递
screenshot
这个错误是由于数据的最上面加了一行注释,它默认读取改行导致数据的行数与配置文件中 配置的行数不一致,所以报上面这个错,删出上面的注释行问题就解决了。

2、OOM 问题:
flume 报错:
java.lang.OutOfMemoryError: GC overhead limit exceeded
或者:
java.lang.OutOfMemoryError: Java heap space
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.OutOfMemoryError: Java heap space
Flume 启动时的最大堆内存大小默认是 20M,线上环境很容易 OOM,因此需要你在 flume-env.sh 中添加 JVM 启动参数:

JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
然后在启动 agent 的时候一定要带上 -c conf 选项,否则 flume-env.sh 里配置的环境变量不会被加载生效

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
4天前
|
机器学习/深度学习 分布式计算 DataWorks
MaxCompute产品使用合集之MaxCompute读取外部表的速度较慢,有什么方法来提升读取速度
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2月前
|
SQL DataWorks 关系型数据库
DataWorks报错问题之dataX数据导入报错如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
DataWorks报错问题之dataX数据导入报错如何解决
|
2月前
|
消息中间件 分布式计算 DataWorks
DataWorks常见问题之kafka数据导入datahub失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
4月前
|
机器学习/深度学习 数据采集 算法
大数据分析技术与方法探究
在当今信息化时代,数据量的增长速度远快于人类的处理能力。因此,如何高效地利用大数据,成为了企业和机构关注的焦点。本文将从大数据分析的技术和方法两个方面进行探究,为各行业提供更好的数据应用方向。
|
4月前
|
机器学习/深度学习 人工智能 自然语言处理
大数据分析的技术和方法:从深度学习到机器学习
大数据时代的到来,让数据分析成为了企业和组织中不可或缺的一环。如何高效地处理庞大的数据集并且从中发现潜在的价值是每个数据分析师都需要掌握的技能。本文将介绍大数据分析的技术和方法,包括深度学习、机器学习、数据挖掘等方面的应用,以及如何通过这些技术和方法来解决实际问题。
58 2
|
5月前
|
存储 SQL 分布式计算
数据计算MaxCompute读取外部表(数据在oss gz压缩)速度非常慢,有什么方法可以提升效率么?
数据计算MaxCompute读取外部表(数据在oss gz压缩)速度非常慢,有什么方法可以提升效率么?
49 1
|
5天前
|
分布式计算 DataWorks 关系型数据库
MaxCompute产品使用合集之可以使用什么方法将MySQL的数据实时同步到MaxCompute
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
11天前
|
SQL 分布式计算 资源调度
一文解析 ODPS SQL 任务优化方法原理
本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,覆盖了部分调优方法的分析,从知道怎么优化,到为什么这样优化,以及还能怎样优化。
103465 1
|
2月前
|
机器学习/深度学习 存储 分布式计算
机器学习PAI常见问题之将MaxCompute方法设置成永久如何解决
PAI(平台为智能,Platform for Artificial Intelligence)是阿里云提供的一个全面的人工智能开发平台,旨在为开发者提供机器学习、深度学习等人工智能技术的模型训练、优化和部署服务。以下是PAI平台使用中的一些常见问题及其答案汇总,帮助用户解决在使用过程中遇到的问题。
|
7月前
|
存储 数据采集 机器学习/深度学习
克服大数据障碍的三种方法
克服大数据障碍的三种方法

热门文章

最新文章