使用Fluentd读写OSS

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 前言Fluentd是一个实时开源的数据收集器,基于CRuby实现,td-agent是其商业化版本,由Treasure Data公司维护。本文将介绍如何使Fluentd能够读写OSS。安装首先下载并安装td-agent,笔者使用的是td-agent-3.

前言

Fluentd是一个实时开源的数据收集器,基于CRuby实现,td-agent是其商业化版本,由Treasure Data公司维护。本文将介绍如何使Fluentd能够读写OSS。

安装

首先下载并安装td-agent,笔者使用的是td-agent-3.3.0-1.el7.x86_64.rpm,使用rpm命令安装:

[root@apache ~]# rpm -ivh td-agent-3.3.0-1.el7.x86_64.rpm

然后,需要安装Fluentd的OSS plugin:

[root@apache ~]# /usr/sbin/td-agent-gem install fluent-plugin-aliyun-oss

这里请注意,因为我们使用的是td-agent,安装Fluentd plugin时需要使用td-agent的td-agent-gem(/usr/sbin/td-agent-gem)。原因是td-agent有自己的Ruby,你需要将plugin安装到它的Ruby里面,而不是其他的Ruby,否则将会找不到已经安装好的plugin。

具体可以参见Fluentd的官方文档

安装完成后,我们可以查看安装的OSS plugin:

[root@apache ~]# /usr/sbin/td-agent-gem list fluent-plugin-aliyun-oss

*** LOCAL GEMS ***

fluent-plugin-aliyun-oss (0.0.1)

fluent-plugin-aliyun-oss这个plugin包含两部分:

Fluent OSS output plugin
将数据缓存在本地,达到设定的条件后,将缓存的数据(压缩后,如果设置的话)上传到OSS。

Fluent OSS input plugin
首先,OSS的bucket需要配置事件通知,这篇文章介绍了如何设置,得到MNS的Queue与Endpoint。
设置好之后,这个plugin会定时地从MNS拉取消息,从消息中获取上传的Objects,最后从OSS读取这些Objects,再发往下游。

下面将分别介绍如何配置,具体的配置参数说明参见github: https://github.com/aliyun/fluent-plugin-oss

配置(向OSS写数据)

下面是一个例子,将读到的数据,每分钟一个文件写到OSS中。其中endpoint/bucket/access_key_id/access_key_secret是必填项,其他的都是可选项。

<system>
  workers 6
</system>

<match input.*>
  @type oss
  endpoint <OSS endpoint to connect to>
  bucket <Your Bucket>
  access_key_id <Your Access Key>
  access_key_secret <Your Secret Key>
  upload_crc_enable false
  path "fluent-oss/logs"
  auto_create_bucket true
  key_format "%{path}/%{time_slice}/events_%{index}_%{thread_id}.%{file_extension}"
  #key_format %{path}/events/ts=%{time_slice}/events_%{index}_%{thread_id}.%{file_extension}
  time_slice_format %Y%m%d-%H
  store_as gzip
  <buffer tag,time>
    @type file
    path /var/log/fluent/oss/${ENV['SERVERENGINE_WORKER_ID']}
    timekey 60 # 1 min partition
    timekey_wait 1s
    # timekey_use_utc true
    flush_thread_count 1
  </buffer>
  <format>
    @type json
  </format>
</match>

我们可以从OSS控制台看到效果:
15_40_01__04_23_2019

配置(从OSS读数据)

下面是一个配置示例,其中endpoint/bucket/access_key_id/access_key_secret和MNS的endpoint/queue是必填项,其他的都是可选项。

<source>
  @type oss
  endpoint <OSS endpoint to connect to>
  bucket <Your Bucket>
  access_key_id <Your Access Key>
  access_key_secret <Your Secret Key>
  store_local false
  store_as gzip
  flush_batch_lines 800
  flush_pause_milliseconds 1

  download_crc_enable false
  <mns>
    endpoint <MNS endpoint to connect to, E.g.,{account-id}.mns.cn-zhangjiakou-internal.aliyuncs.com>
    queue <MNS queue>
    poll_interval_seconds 1
  </mns>
  <parse>
    @type json
  </parse>
</source>

我们可以从log中看一下运行状态

2019-04-23 15:38:14 +0800 [info]: #5 start to poll message from MNS queue fluentd-oss
2019-04-23 15:38:14 +0800 [info]: #5 http://1305310278558820.mns.cn-zhangjiakou-internal.aliyuncs.com/queues/fluentd-oss/messages
2019-04-23 15:38:14 +0800 [info]: #5 read object fluent-oss/logs/20190423-12/events_10_70226640160100.gz, size 4389548 from OSS
2019-04-23 15:38:15 +0800 [info]: #1 http://1305310278558820.mns.cn-zhangjiakou-internal.aliyuncs.com/queues/fluentd-oss/messages?ReceiptHandle=0BC1EA4E51483D4EAC69736941044AAE-MjY5ODkgMTU1NjAwNTAwODMwNSAzNjAwMDA
2019-04-23 15:38:16 +0800 [info]: #1 start to poll message from MNS queue fluentd-oss
2019-04-23 15:38:16 +0800 [info]: #1 http://1305310278558820.mns.cn-zhangjiakou-internal.aliyuncs.com/queues/fluentd-oss/messages
2019-04-23 15:38:16 +0800 [info]: #1 read object fluent-oss/logs/20190423-09/events_50_69939581261780.gz, size 6750045 from OSS

参考资料

https://github.com/aliyun/fluent-plugin-oss
https://rubygems.org/gems/fluent-plugin-aliyun-oss
https://www.fluentd.org/
https://docs.fluentd.org/v1.0/articles/quickstart

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
|
分布式计算 大数据 Hadoop
Apache Hadoop 2.7如何支持读写OSS
背景 2017.12.13日Apache Hadoop 3.0.0正式版本发布,默认支持阿里云OSS对象存储系统,作为Hadoop兼容的文件系统,后续版本号大于等于Hadoop 2.9.x系列也支持OSS。
3736 0
|
存储 缓存 API
云存储网关共享透明读写OSS归档文件
本文通过具体的例子展示了云存储网关NFS共享所支持的对OSS Bucket里面归档文件的透明读写功能。对于SMB共享虽然不能完全支持透明读写功能,但是也能够帮助用户自动执行解冻过程。
3167 0
云存储网关共享透明读写OSS归档文件
|
存储 JSON 安全
STS Python_SDK授权临时用户读写OSS资源
这里将手动定义 授权策略(Policy),将 授权策略 授权给 角色也可授权给RAM子账号,这里不作展示 ,然后子账号(RAM account)通过 扮演角色方法 获取 角色 的 安全令牌即临时身份 对 资源 进行操作. RAM 用户 可以使用 API 扮演 RAM 角色。当 RAM 用户被授予 AliyunSTSAssumeRoleAccess 权限策略 之后,可以使用其访问密钥调用 STS API AssumeRole 接口,以获取某个角色的 安全令牌临时身份,从而使用安全令牌访问资源。
1001 0
|
对象存储 算法框架/工具 TensorFlow
如何使用PAI深度学习TensorFlow读写OSS教程?
Python不支持读取oss的数据, 故所有调用 python Open(), os.path.exist() 等文件, 文件夹操作的函数的代码都无法执行。 如Scipy.misc.imread(), numpy.load() 等。
1684 0
|
SQL 分布式计算 大数据
Spark读写OSS并使用OSS Select来加速查询
Spark读写OSS 基于这篇文章搭建的CDH6以及配置,我们来使Spark能够读写OSS(其他版本的Spark都是类似的做法,不再赘述)。 由于默认Spark并没有将OSS的支持包放到它的CLASSPATH里面,所以我们需要执行如下命令下面的步骤需要在所有的CDH节点执行 进入到$CDH_HO.
4427 0
|
分布式计算 Hadoop Shell
CDH5 Hadoop如何支持读写OSS
CDH和CM(Cloudera Manager) CDH (Cloudera’s Distribution, including Apache Hadoop)是众多Hadoop发行版本中的一种,由Cloudera维护,目前有不少用户使用这个发行版本。
3778 0
CDH5 Hadoop如何支持读写OSS
|
分布式计算 大数据 Hadoop
HDP2.6 Hadoop如何支持读写OSS
HDP和Ambari HDP(Hortonworks Data Platform)是由Hortonworks发行的大数据平台,里面包含了Hadoop、Hive、HBase等很多开源组件,目前有不少用户直接使用HDP版本的Hadoop。
3640 0
|
存储 分布式计算 Java
MaxCompute与OSS非结构化数据读写互通(及图像处理实例)
MaxCompute作为阿里巴巴集团内部绝大多数大数据处理需求的核心计算组件,拥有强大的计算能力,随着集团内外大数据业务的不断扩展,新的数据使用场景也在不断产生。在这样的背景下,MaxCompute(ODPS)计算框架持续演化,而原来主要面对内部特殊格式数据的强大计算能力,也正在一步步的通过新增的非结构化数据处理框架,开放给不同的外部数据。
7928 0
|
1月前
|
Java API 开发工具
如何用阿里云 oss 下载文件
阿里云对象存储服务(OSS)提供了多种方式下载文件,以下讲解下各种方式的下载方法
756 1

相关产品

  • 对象存储