《Storm实时数据处理》一2.5 索引与持久化日志数据

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介:

本节书摘来华章计算机《Storm实时数据处理》一书中的第2章 ,第2.5节,(澳)Quinton Anderson 著 卢誉声 译更多章节内容可以访问云栖社区“华章计算机”公众号查看。

2.5 索引与持久化日志数据

我们需要在某些特定时期将日志数据存储起来以便后期利用,而且还要保证这些日志数据能够被检索。为了实现这个目标,本例中将会集成名为Elastic Search的开源产品,它是一个通用并集成RESTful API的集群搜索引擎(http://www.elasticsearch.org/)。

2.5.1 实战

Step01 创建一个继承自BaseRichBolt的IndexerBolt类,并声明org.elasticsearch.client.Client 私有成员变量。你需要在prepare方法中初始化它,代码如下:
image

Step02 然后在Bolt的execute方法中建立LogEntry对象的索引:
ENTRY);
image

Step03 创建这个Bolt的单元测试可能没那么容易,因此值得在这里对此多补充几句。在测试源码目录中的storm.cookbook.log包里新建一个 JUnit 4单元测试,然后添加StoringMatcher私有内部类,代码如下:

image

Step04 然后实现真正的测试逻辑,代码如下:

image

2.5.2 解析

Elastic Search提供了完整的Java客户端API(它本身也是用Java实现的),因此与它集成轻而易举。Bolt的prepare方法会在本地模式或集群模式下创建一个集群节点。集群模式会将根据名称获得的集群和在当前节点上创建的本地存储节点连接起来,这样就可以避免在使用不同传输方式进行写操作时发生的双跃点延迟问题。
Elastic Search本身是一个大型复杂系统,为了更好地理解操作和配置方面的问题,建议你先读一读它所提供的文档。
当Storm处于调试模式时,Elastic Search节点将连同多个在相同JVM中被执行的节点(如果需要的话)运行嵌入式集群。这显然对单元测试来说大有裨益。所有这些操作都会在Bolt的prepare方法中实现。
image

当收到Tuple时,Bolt会获取其中的LogEntry对象,并将其转化成对应的JSON格式内容,然后发送给Elastic Search。
image

接着从Elastic Search集群的响应(response)中获取日志ID,并连同LogEntry一起发送给下游的Bolt。在这个例子中,我们只将这个值用于单元测试。不管怎么说,我们都可以很容易地通过添加下游的Bolt来持久化这个值,进而将该值用于日志统计信息,这么做能为开发用户界面带来极大便利。
image

对这个Bolt进行单元测试十分需要讲究技巧。这是因为对于一般的单元测试来说,我们在执行单元测试之前就已预知输出结果。但在这个示例中,只有在接收到Elastic Search集群的响应后,我们才能知道ID的具体值。这使得我们很难提前指定预期输出结果,更别提验证搜索引擎中的日志了。所以为了实现这个目标,我们使用了一个JMock自定义匹配器。在matches方法中实现了该自定义匹配器的主要逻辑。
image

该方法能够确保返回Values的实例,并将其保存起来。我们会在后续的计算操作用到这个值。有了这个实例,就可以指定以下期望集合:
image

并获取记录ID,根据嵌入式Elastic Search集群来验证它。
image

若想要实现在集群中搜索日志文件的功能,可以从kibna.org下载并安装Kibana—一款优秀的日志搜索前端引擎。本例通过来自logstash的JSON日志结构来维护信息,而Kibana可作为Elastic Search上logstash的前端,所以它能与本例所使用的系统进行无缝集成。除此之外,Kibana还使用了Twitter Bootstrap 框架,因此你可以非常简单地将其集成到分析面板里。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
1月前
|
存储 JSON 监控
可以通过配置Filebeat来将Higress日志持久化到磁盘
【2月更文挑战第10天】可以通过配置Filebeat来将Higress日志持久化到磁盘
28 4
|
4月前
|
消息中间件 存储 Kafka
阿里 P7 三面凉凉,kafka Borker 日志持久化没答上来
阿里 P7 三面凉凉,kafka Borker 日志持久化没答上来
|
4月前
|
SQL 关系型数据库 MySQL
我使用flinkcdc的sql形式进行全量同步,4张表,有两张表数据没进去,看日志,id怎么是null呢?
我使用flinkcdc的sql形式进行全量同步,4张表,有两张表数据没进去,看日志,id怎么是null呢?
117 40
|
6月前
|
NoSQL Redis 索引
Filebeat收集日志数据传输到Redis,通过Logstash来根据日志字段创建不同的ES索引
Filebeat收集日志数据传输到Redis,通过Logstash来根据日志字段创建不同的ES索引
|
1天前
|
机器学习/深度学习 前端开发 数据挖掘
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断(下)
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
32 11
|
6天前
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断2
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
13 0
|
7天前
|
机器学习/深度学习 前端开发 数据挖掘
R语言计量经济学:工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
R语言计量经济学:工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
38 0
|
1月前
|
SQL 缓存 关系型数据库
MySQL的万字总结(缓存,索引,Explain,事务,redo日志等)
MySQL的万字总结(缓存,索引,Explain,事务,redo日志等)
66 0
|
7月前
|
SQL 关系型数据库 MySQL
数据库基本概念(SQL,索引,视图,事务,日志等)(二)
数据库基本概念(SQL,索引,视图,事务,日志等)(二)
194 0
|
4月前
|
SQL 关系型数据库 MySQL
⑩⑥ 【MySQL】详解 触发器TRIGGER,协助 确保数据的完整性,日志记录,数据校验等操作。
⑩⑥ 【MySQL】详解 触发器TRIGGER,协助 确保数据的完整性,日志记录,数据校验等操作。
38 0

热门文章

最新文章