开发者社区> 问答> 正文

自定义AllWindowFunction类数字

我有一个自定义AllWindowFunction类,它有一个负责数据库插入的数据。与数据库的连接是持久的,并在构造期间打开。

问题是,AllWindowFunction创建/打开连接的实例与在apply事件上调用的实例不同。解决方法是一个静态数据,但我想知道这是否是唯一的解决方法?

示例代码:

public class CustomWindowFunction implements AllWindowFunction {

private static Connection database;

CustomWindowFunction() {
    database = new Connection();
}

@Override
public void apply(TimeWindow timeWindow, Iterable<String> trades, Collector<String> out) {
    // process data
    database.save(data);
    out.collect(data.toString());
}

}
我找不到任何关于这种机制的内容,我所知道的是构造函数中的对象ID与调用的对象ID不同apply。

展开
收起
flink小助手 2018-12-10 11:38:34 2821 0
2 条回答
写回答
取消 提交回答
  • 借楼问下, open方法在什么时候被调用?
    RichAllWindowFunction 方法在apply应该怎样被是使用?

    'var filteredstream = inputStream

      .flatMap(buildHbaseMsg(_))
      .filter(b => b.actType match {
        case 0|1|2|3 => true
        case _ => false
      })
      .countWindowAll(2L).apply(new CustomWindowFunction ())'   

    这样吗?


    {
    stream.apply((window, input, out: Collector[Unit]) => new CustomWindowFunction ().apply(window, input, out))
    }

    这样调用并不会触发open方法

    应该怎么使用自定义的function类

    2019-07-17 23:19:09
    赞同 展开评论 打赏
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    这是因为每个函数都必须被序列化以分布在集群的节点上。您可以尝试使用RichAllWindowFunction,这就是所谓的“Rich”版本,您可以在其中使用open()方法,该方法将在每个并行运算符的开头调用。在此方法中,您可以创建连接

    public class CustomWindowFunction implements RichAllWindowFunction {

    private Connection database;
    
    @Override
    public void open(Configuration parameters) {
        database = new Connection();
    }
    
    @Override
    public void apply(TimeWindow timeWindow, Iterable<String> trades, Collector<String> out) {
        // process data
        database.save(data);
        out.collect(data.toString());
    }

    }

    2019-07-17 23:19:09
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
继承与功能组合 立即下载
重新定义计算的边界 立即下载
低代码开发师(初级)实战教程 立即下载