通过Fluentd实时上传数据到DataHub实践

简介: 本文把我通过Flunetd,把数据上传到DataHub的配置过程记录下来,希望对大家在配置中能有帮助。

本文把我通过Fluentd,把数据上传到DataHub的配置过程记录下来,希望对大家在配置中能有帮助。

安装

前序准备

本文使用一台CentOS 6.8的ECS来做测试的,机器上已经有yum。用户测试的时候,需要有一台能连接上公网的Linux机器。

  1. 安装依赖包

    `yum -y install gcc gcc-c++ openssl* readline* ncurses* zlib* libxml* libjpeg* libpng* libxslt* libtool* `
  2. 下载并安装包
wget http://aliyun-datahub.oss-cn-hangzhou.aliyuncs.com/tools/fluentd-with-datahub-0.12.23.tar.gz&file=fluentd-with-datahub-0.12.23.tar.gz
tar -xzvf fluentd-with-datahub-0.12.23.tar.gz
cd fluentd-with-datahub

这个wget的下载地址,是从文档里拿到的最新的安装包。以后版本可能会更新,用户也可以直接从文档上拿最新的安装包。

  1. 安装

    目前安装包里只包含了ruby和插件的安装,没有提供fluentd的安装部分的脚本,这里可以做修改
    
vi install.sh
##修改里面的gem install --local fluent-plugin-datahub-0.0.1.gem,改成
gem install --local string-scrub-0.0.5.gem
gem install --local thread_safe-0.3.5.gem
gem install --local tzinfo-1.2.2.gem
gem install --local tzinfo-data-1.2016.4.gem
gem install --local sigdump-0.2.4.gem
gem install --local http_parser.rb-0.6.0.gem
gem install --local cool.io-1.2.3.gem 
gem install --local yajl-ruby-1.2.1.gem
gem install --local msgpack-0.5.12.gem
gem install --local fluentd-0.12.23.gem
gem install --local fluent-plugin-datahub-0.0.2.gem
##注意以上版本号都是一键安装包里已经有的,以后如果版本更新,这里的版本号也从dependency_gem里看看当时提供的版本是多少

##wq保存退出
sudo sh install.sh

最终结果是
b4

配置

DataHub配置

在DataHub的控制台里创建一个Project,然后创建一个Topic(Topic对应到数据库是一个表的概念)
b1
因为只是测试,我这里就3个字段。

测试

我们在ECS上,用touch /tmp/test.log生成一个文件。我们通过fluentd监控这个文件的变化,并把变化的内容解析后传到datahub里。

配置Fluentd

这里,我们的datahub配置文件fluentd.conf的内容为

<source>
  @type tail
  path /tmp/test.log
  tag test
  format /(?<request_time>\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d)\s\[(?<level>.+)\]\s(?<content>.+)/
</source>
<match test>
  @type datahub
  access_id your_access_id
  access_key your_access_key
  endpoint http://dh-cn-hangzhou.aliyuncs.com
  project_name your_project_name
  topic_name fluentd
  column_names ["request_time", "level", "content"]
</match>

这里我把我的project_name和access id/key信息给隐藏了。endpoint 的地址看这里,我纯测试,就用了公网地址。
配置文件生成好了后,使用bin/fluentd -c fluentd.conf启动fluentd

测试

我们通过date "+%Y-%m-%d %H:%M:%S [Info] login success" >> /tmp/test.log往/tmp/test.log里写日志,然后在fluentd 上,我们可以看到日志为:
b2
来到datahub上可以看到新写进去的日志:
b3
至此大功告成,/tmp/test.log上的新增数据会被实时推送到DataHub里了。

相关实践学习
实时数据及离线数据上云方案
本实验通过使用CANAL、DataHub、DataWorks、MaxCompute服务,实现数据上云,解决了数据孤岛问题,同时把数据迁移到云计算平台,对后续数据的计算和应用提供了第一步开山之路。
目录
相关文章
|
1月前
|
消息中间件 分布式计算 DataWorks
DataWorks常见问题之kafka数据导入datahub失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
5月前
|
大数据
元数据治理平台Datahub学习
元数据治理平台Datahub学习
116 0
|
分布式计算 MaxCompute
《零基础实现Flume收集网站日志数据到MaxCompute》电子版地址
零基础实现Flume收集网站日志数据到MaxCompute
70 0
《零基础实现Flume收集网站日志数据到MaxCompute》电子版地址
|
存储 消息中间件 SQL
DataHub——实时数据治理平台
DataHub——实时数据治理平台
4438 0
DataHub——实时数据治理平台
|
消息中间件 存储 SQL
Kafka数据入湖OSS实践
本质上,Kafka提供的是消息队列的能力,为消息从生产者流向消费中提供稳定、高效、可靠的渠道。但Kafka本身并不提供海量数据存储的能力,这意味着重读kafka中历史数据将不可能。同时,Kafka没有提供开箱即用的数据处理工具(尽管你可以采用kafka streams或者flink等,但这需要你自己写代码逻辑),使得对原始数据进行加工处理成本较高。我们知道,阿里云OSS提供了灵活、海量、高性价比的
Kafka数据入湖OSS实践
|
Web App开发 弹性计算 缓存
SLS 数据加工 vs 自建 Logstash
阿里云 SLS 是云上一站式大数据处理、分析平台。SLS 数据加工功能则是一个可托管、高可用、可扩展的数据处理服务,广泛适用于数据的规整、富化、分发、汇总、重建索引等场景。
973 0
SLS 数据加工 vs 自建 Logstash
|
Java 测试技术 Ruby
基于LogStash插件采集数据到阿里云Datahub
DataHub服务基于阿里云自研的飞天平台,具有高可用,低延迟,高可扩展,高吞吐的特点,原生支持对接阿里云的多项服务,相关功能特点与Kafka类似。本身主要介绍如何使用LogStash采集数据写入Datahub。
927 0
基于LogStash插件采集数据到阿里云Datahub
|
消息中间件 分布式计算 运维
Dataworks实时数据同步(Kafka -> maxcompute)
DataWorks支持实时同步数据,本文主要演示如果将Kafka中的数据同步到maxcompute中。
1987 1
Dataworks实时数据同步(Kafka -> maxcompute)
DataHub常见问题之限制篇
介绍DataHub因为参数限制而出现的错误
4080 0
|
分布式计算 MaxCompute
DataHub常见问题之同步篇
介绍DataHub同步的常见问题
3927 0
DataHub常见问题之同步篇