twitter storm源码走读(四)

简介: TridentTopology是storm提供的高层使用接口,常见的一些SQL中的操作在tridenttopology提供的api中都有类似的影射。关于TridentTopology的使用及运行原理,当前进行详细分析的文章不多。 本文尝试TridentTopology是如何先一步步转换成普通的sto

Trident Topology执行过程分析

TridentTopology是storm提供的高层使用接口,常见的一些SQL中的操作在tridenttopology提供的api中都有类似的影射。关于TridentTopology的使用及运行原理,当前进行详细分析的文章不多。

从TridentTopology到vanilla topology(普通的topology)由三个层次组成:

  1. 面向最终用户的概念stream, operation
  2. 利用planner将tridenttopology转换成vanilla topology
  3. 执行vanilla topology

本文尝试TridentTopology是如何先一步步转换成普通的storm Topology(即vanila topology), 转换后的topology的执行中有哪些区别?

 

概述

从TridentTopology到基本的Topology有三层,下图给出一个全局的视图。

创建TridentTopology

下面的代码摘自StormStarter中的TridentWordCount.java

 

复制代码
    TridentTopology topology = new TridentTopology();
    topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
        new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),
        new Count(), new Fields("count")).parallelismHint(16);

    return topology.build();
复制代码

 

上述代码的newStream一行,分两大部分,一是使用newStream来创建一个stream对象,然后针对该Stream进行各种操作,each/shuffle/persistentAggregate等就是各种operation.

用户在使用TridentTopology的时候,只需要熟悉Stream和TridentTopology中的API函数即可。

转换TridentTopology为Vanilla Topology

上一节创建了Stream,但是如何将其与原有的Spout及Bolt联系起来呢?问题的关键就在TridentTopology::build函数和TridentTopologyBuilder::buildTopology

TridentTopology::build

newStream及其后的函数调用创建了一个含有三大类节点的List,利用该List创建了一个有向非循环图(DAG)。这三类节点分别是operation, partition, spout,在build函数将节点分类分别加入到boltNodes或spoutNodes,注意此处的spout或bolt不能等同于普通的spout和bolt.

TridentTopologyBuilder::buildTopology

利用在build函数中创建的boltNodes,spoutNodes及生成的graph来创建vanilla topology所需要的bolt及spout.

在buildTopology中会看到类似的代码片段。

builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
                        .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));
for(String b: c.committerBatches) {
                specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
            }
            
            BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);

最终生成的普通Topology,与普通Topology中的Spout相对应的是MasterBatchCoordinator,而在创建TridentTopology使用的spout则成了Bolt,使用于Stream上的各种Operation也存在于多个普通Bolt中。

 TridentTopology的执行

TridentTopology被转换为普通的Topology(vanilla Topology)之后提交到nimbus,它的具体执行过程有什么不同呢?

主要有几点:

  1. MasterBatchCoordinator通过Batch_stream_id来发送通知给TridentSpoutExecutor
  2. TridentSpoutExecutor收到通知发送成批的tuple给下一跳的Bolt
  3. 下一跳的Bolt收到tuple之后,使用TridentBoltExecutor来进行处理
    1. TridentBoltExecutor调用SubtopologyBolt::execute
    2. InitialReceiver::execute被调用
    3. TridentProcessor::execute被调用

MasterBatchCoordinator收到ack之后,会发送success消息给Spout

MasterBatchCoordinator在commit的时候,会发送commit消息给Spout,让Spout将缓存的消息删除

trident topology可靠性分析

本文详细分析TridentTopology的可靠性实现, TridentTopology通过transactional spout与transactional state相结合,能够做到tuple“只被处理一次,不多也不少”。也就是做到事务性处理exactly-once,要么成功,要么失败。

而一般的storm topology是无法保证eactly-once的处理的,它们要么是at-least-once(至少被处理一次,有可能被处理多次);要么是at-most-once(最多被处理一次,这样就存在遗漏的可能).

TridentTopology在设计中借鉴和保留了目前已经过期的transactional topology的设计思想。

Storm Topology的ack机制

 

 在进行TridentTopology的可靠性分析之前,我们先回顾一下在storm topology中的ack机制。ack bolt是在提交到storm cluster中,由系统自动产生的,一般来说一个topology只有一个ack bolt(当然可以通过配置参数指定多个)。

当bolt处理并下发完tuple给下一跳的bolt时,会发送一个ack给ack bolt。ack bolt通过简单的异或原理(即同一个数与自己异或结果为零)来判定从spout发出的某一个bolt是否已经被完全处理完毕。如果结果为真,ack bolt发送消息给spout,spout中的ack函数被调用并执行。如果超时,则发送fail消息给spout,spout中的fail函数被调用并执行,spout中的ack和fail的处理逻辑由用户自行填写。

如在github上的kerstel spout就能做到只有当某一个tuple被成功处理之后,它才会从缓存中移除,否则继续放入到处理队列再次进行处理。

TridentTopology的可靠性机制

在“走读之6”一文中分析了一个tridenttopology是如何转换成storm topology的,我想用上面这幅图再次阐述一下转变后的结果。

  • 一个tridenttopoloy会至少引入一个MasterBatchCoordinator,这个MBC就类似于storm topology中的spout
  • newStream时使用的入参spout会裂变成两个bolt,一是TridentSpoutCoordinator,另一个是TridentSpoutExecutor
  • 针对stream的各种操作则被分散到各个Bolt中,它们的执行上下文是TridentBoltExecutor

可以看出使用TridentTopology Api进行操作时,所有的东西其实都运行在bolt context中,而真正的spout是在调用TridentTopologyBuilder.buildTopology()的时候被添加的。

  • MasterBatchCoordinator使用batch_stream发送一个类似于seeder tuple的东西给tridentspoutcoordinator,tridentspoutcoordinator将该信号继续下发给TridentSpoutExecutor, TridentSpout是如何一步步被调用到的呢。
    • TridentBoltExecutor::execute
      •   TridentSpoutExecutor::execute
        •   BatchSpoutExecutor::execute
          •   ITridentSpout::emitBatch

emitBatch是产生真正需要被处理的tuple的,这些tuple会被各个Operation所在的bolt所接收。它们的调用顺序是

  • TridentBoltExecutor::execute
    •   SubtopologyBolt::execute
      •   InitialReceiver::receive
        •   TridentProcessor::execute

处理结束的判断依据

在TridentSpout中是如何判断所有的tuple都已经被处理的呢。

  1. 在每跳中认为自己处理完毕的时候,它都会告诉下一跳,即下游,我给你发送了多少tuple,如果下游将上游发送过来的确认消息与自身确实已经处理的消息比对一致的话,则认为处理都完成,于是发送ack.
  2. 问题的关键变成每一个bolt是如何判断自己已经处理完毕的呢,请看步骤3
  3. 总有一个bolt是没有上游的,即TridentSpoutExecutor,它只会收到启动指令,但不接收真正的业务数据,于是它会告诉下一跳,我发了多少tuple给你。

STREAM

在MasterBatchCoordinator中定义了三种不同的stream,这三种stream分别是

  1. BATCH_STREAM
  2. COMMIT_STREAM
  3. SUCCESS_STREAM

这些stream分别在什么时候被使用呢,下图给出一个大概的时序

简要说明:

  1. masterbatchcoordinator通过batch_stream发送seeder tuple给tridentspoutcoordinator
  2. tridentspoutcoordinator给tridentspoutexecutor继续传递该指令
  3. TridentSpoutExecutor在收到启动指令后,调用ITridentSpout接口的实现类进行emitBatch
  4. TridentSpoutExecutor在发送完一批batch后,finishBatch被调用,通过emitDirect会给下一跳通过coord_stream发送trackedinfo,即我已经发送了多少消息给你
  5. TridentSpoutExecutor紧接着还会给ack bolt发送ack消息,ack bolt将其传达到MasterBatchCoordinator
  6. MasterBatchCoordinator在收到第一个ack后,将状态置为processed
  7. 当MasterBatchCoordinator再次收到ack后,会将状态转为committing,同时通过commit_stream发送tuple给TridentSpoutExecutor
  8. 收到commit_stream上传来的tuple后,TridentSpoutExecutor会调用ITridentSpout中的emmitter, emmitter::commit()被执行,TridentSpoutExecutor会再次ack收到tuple
  9. MasterBatchCoordinator在收到这个tuple之后,会认为针对某一个seeder tuple的处理已经完全实现,于是通过SUCCESS_STREM告知TridentSpoutCoordinator,所有的活都已经都完成了,收工。
  10. 收到Success_stream上传来的信号后,ITridentSpout中的内嵌子类Emmit和Coordinator中相应的success方法会被调用执行。

注意:

  1. 为了描述方便,将TridentTopology进行了简化,认为其在转换成真正的storm topology时,只有一个TridentProcessor所在的bolt。真实的情况可能比这复杂,但消息的传递路径还是差不多的。
  2. 注意在TridentTopology中ack会被多次反复调用,这不同于普通的storm topology

状态机

在MasterBatchCoordinator中,针对每一个seeder tuple,其状态机如下图所示。注意这些状态是会被保存到zookeeper server中的,使用的api定义在TransactionalState中。

总结

通过上面的分析可以看出,TridentTopology实现了一个比较好的框架,但真正要做到exactly-once的处理,还需要用户自己去实现ITridentSpout中的两个重要内嵌类,Emmitter和Coordinator。

具体如何实现该接口,可以查看storm-core/src/jvm/storm/trident/testing目录下的FixedBatchSpout.java和FeederCommitterBatchSpout.java

相关实践学习
容器服务Serverless版ACK Serverless 快速入门:在线魔方应用部署和监控
通过本实验,您将了解到容器服务Serverless版ACK Serverless 的基本产品能力,即可以实现快速部署一个在线魔方应用,并借助阿里云容器服务成熟的产品生态,实现在线应用的企业级监控,提升应用稳定性。
云原生实践公开课
课程大纲 开篇:如何学习并实践云原生技术 基础篇: 5 步上手 Kubernetes 进阶篇:生产环境下的 K8s 实践 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
目录
相关文章
|
流计算
|
存储 NoSQL 算法
【Storm】Storm实战之频繁二项集挖掘(附源码)
针对大叔据实时处理的入门,除了使用WordCount示例之外,还需要相对更深入点的示例来理解Storm,因此,本篇博文利用Storm实现了频繁项集挖掘的案例,以方便更好的入门Storm。
81 0
【Storm】Storm实战之频繁二项集挖掘(附源码)
|
消息中间件 Kafka 流计算
storm从入门到放弃(三),放弃使用 StreamId 特性
  序:StreamId是storm中实现DAG有向无环图的重要一个特性,但是从实际生产环境来看,这个功能其实蛮影响生产环境的稳定性的,我们系统在迭代时会带来整体服务的不可用。   StreamId是storm中实现DAG有向无环图的重要一个特性,官方也提供对应的接口实现让开发者自己灵活化构造自己的ADG图。
1155 0
|
NoSQL Java 测试技术
|
Java 流计算
twitter storm源码走读(二)
storm cluster可以想像成为一个工厂,nimbus主要负责从外部接收订单和任务分配。除了从外部接单,nimbus还要将这些外部订单转换成为内部工作分配,这个时候nimbus充当了调度室的角色。supervisor作为中层干部,职责就是生产车间的主任,他的日常工作就是时刻等待着调度到给他下
2322 0
|
API 流计算 算法
twitter storm源码走读(五)
从用户层面来看TridentTopology,有两个重要的概念一是Stream,另一个是作用于Stream上的各种Operation。在实现层面来看,无论是stream,还是后续的operation都会转变成为各个Node,这些Node之间的关系通过重要的数据结构图来维护。具体到TridentTop
1688 0