Storm的BaseBasicBolt源码解析ack机制

简介: 我们在学习ack机制的时候,我们知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。在BaseBasicBolt中,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack。

我们在学习ack机制的时候,我们知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。
在BaseBasicBolt中,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack。
在使用BaseRichBolt需要在emit数据的时候,显示指定该数据的源tuple要加上第二个参数anchor tuple,以保持tracker链路,即collector.emit(oldTuple, newTuple);并且需要在execute执行成功后调用OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple);

那么我们来看看BasicBolt的源码是不是这样的,不能因为看到别人的帖子说是这样的,我们就这样任务,以讹传讹,我们要To see is to believe。

 

为了方便看源代码,我先上我们的继承类:

public class SplitSentenceBolt extends BaseBasicBolt {  public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
    }
    
  //5:执行我们自己的逻辑处理方法,接收传入的参数。
  
public void execute(Tuple input, BasicOutputCollector collector) { String sentence = (String)input.getValueByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { word = word.trim(); word = word.toLowerCase(); collector.emit(new Values(word,1));//这个地方就是调用OutputCollector的包装类,来发消息 } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","num")); } }

 



通过打断点,我们发现,bolt的task会创建这个类下面会标准执行顺序

public class BasicBoltExecutor implements IRichBolt {
    public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);    
    
    private IBasicBolt _bolt;
    private transient BasicOutputCollector _collector;
    //1:创建该对象,然后把我们写的SplitSentenceBolt对象赋给父类IBasicBolt。
    public BasicBoltExecutor(IBasicBolt bolt) {
        _bolt = bolt;
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        _bolt.declareOutputFields(declarer);//这里就是调用SplitSentenceBolt对象的方法了。
    }
   //2:给BasicOutputCollector _collector字段赋值,BasicOutputCollector就是对OutputCollector类的包装。
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        _bolt.prepare(stormConf, context);
        _collector = new BasicOutputCollector(collector);
    }
  //3:然后程序执行该方法,input的值source: spout1:4, stream: default, id: {}, [+ - * % /]
    public void execute(Tuple input) {
        _collector.setContext(input);//把接收到的tuple值设置给BasicOutputCollector中inputTuple字段。
        try {
            _bolt.execute(input, _collector);//这个地方是调用我们实现类SplitSentenceBolt的ececute方法。
            _collector.getOutputter().ack(input);//这个地方就是响应
        } catch(FailedException e) {
            if(e instanceof ReportedFailedException) {
                _collector.reportError(e);
            }
            _collector.getOutputter().fail(input);//这个地方就是响应
        }
    }
    public void cleanup() {
        _bolt.cleanup();
    }
    public Map<String, Object> getComponentConfiguration() {
        return _bolt.getComponentConfiguration();
    }
}

 

 

public class BasicOutputCollector implements IBasicOutputCollector {
    private OutputCollector out;
    private Tuple inputTuple;
    public BasicOutputCollector(OutputCollector out) {
        this.out = out;
    }
//4:把收到的tuple数据赋值给inputTuple,这个时候BasicOutputCollector对象的字段都具有值了。
   public void setContext(Tuple inputTuple) {
        this.inputTuple = inputTuple; }
   //6:这里我们发送新的(转换后的)tuple数据,看他内部的调用,其实他也会发送一个anchor tuple保持tracker链路
而这个anchor tuple就是bolt接收到转换前的源tuple数据。

  public List<Integer> emit(List<Object> tuple) {
     return emit(Utils.DEFAULT_STREAM_ID, tuple);
   }
public List<Integer> emit(String streamId, List<Object> tuple) { return out.emit(streamId, inputTuple, tuple); } public void emitDirect(int taskId, String streamId, List<Object> tuple) { out.emitDirect(taskId, streamId, inputTuple, tuple); } public void emitDirect(int taskId, List<Object> tuple) { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple); } protected IOutputCollector getOutputter() { return out; } public void reportError(Throwable t) { out.reportError(t); } }

这里大家不要纠结bolt的启动时从哪里开始的,我后面会讲的,这里我们关注的是,BasicBoltExecutor对象创建后的执行过程,以这我们来看执行的过程。在BasicBoltExecutor的execute方法中,我们看到了ack和fail方法会被自动调用的,当我们的程序抛出异常则会执行fail方法的。

这个

 

作者: intsmaze(刘洋)
老铁,你的--->推荐,--->关注,--->评论--->是我继续写作的动力。
微信公众号号:Apache技术研究院
由于博主能力有限,文中可能存在描述不正确,欢迎指正、补充!
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
相关实践学习
容器服务Serverless版ACK Serverless 快速入门:在线魔方应用部署和监控
通过本实验,您将了解到容器服务Serverless版ACK Serverless 的基本产品能力,即可以实现快速部署一个在线魔方应用,并借助阿里云容器服务成熟的产品生态,实现在线应用的企业级监控,提升应用稳定性。
云原生实践公开课
课程大纲 开篇:如何学习并实践云原生技术 基础篇: 5 步上手 Kubernetes 进阶篇:生产环境下的 K8s 实践 相关的阿里云产品:容器服务&nbsp;ACK 容器服务&nbsp;Kubernetes&nbsp;版(简称&nbsp;ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情:&nbsp;https://www.aliyun.com/product/kubernetes
相关文章
|
8天前
yolo-world 源码解析(六)(2)
yolo-world 源码解析(六)
18 0
|
8天前
yolo-world 源码解析(六)(1)
yolo-world 源码解析(六)
12 0
|
8天前
yolo-world 源码解析(五)(4)
yolo-world 源码解析(五)
19 0
|
8天前
yolo-world 源码解析(五)(1)
yolo-world 源码解析(五)
31 0
|
8天前
yolo-world 源码解析(二)(2)
yolo-world 源码解析(二)
21 0
|
8天前
Marker 源码解析(二)(3)
Marker 源码解析(二)
13 0
|
8天前
Marker 源码解析(一)(4)
Marker 源码解析(一)
12 0
|
8天前
Marker 源码解析(一)(2)
Marker 源码解析(一)
17 0
|
8天前
PokéLLMon 源码解析(六)(4)
PokéLLMon 源码解析(六)
12 0

推荐镜像

更多