Flink CEP来自Esper CEP引擎。正如您可能(或不知道)所知,在Esper使用他们的语法(EPL)时,您可以轻松地创建一个batch或一个slide窗口,在这些窗口中对事件进行分组,并允许您将这些事件与函数(avg,max,min,...)一起使用。
例如,使用以下模式,您可以创建5秒的批处理窗口,并计算在指定窗口中收到price的所有Stock事件的属性的平均值。
select avg(price) from Stock#time_batch(5 sec)
问题是我想知道如何实现这一点Flink CEP。我知道,可能的目标或方法可能Flink CEP不同,因此实现这一目标的方式可能并不像in中那么简单Esper CEP。
我已经看过关于时间窗口的文档,但我无法实现这个窗口Flink CEP。因此,给出以下代码:
DataStream stream = ...; // Consume events from Kafka
// Filtering events with negative price
Pattern pattern = Pattern.begin("start")
.where(
new SimpleCondition<Stock>() {
public boolean filter(Stock event) {
return event.getPrice() >= 0;
}
}
);
PatternStream patternStream = CEP.pattern(stream, pattern);
/**
CREATE A BATCH WINDOW OF 5 SECONDS IN WHICH
I COMPUTE OVER THE AVERAGE PRICES AND, IF IT IS
GREATER THAN A THREESHOLD, AN ALERT IS DETECTED
return avg(allEventsInWindow.getPrice()) > 1;
*/
DataStream result = patternStream.select(
new PatternSelectFunction<Stock, Alert>() {
@Override
public Alert select(Map<String, List<Stock>> pattern) throws Exception {
return new Alert(pattern.toString());
}
}
);
如何创建该窗口,从第一个窗口开始,我开始计算5秒内后续事件的平均值。例如:
t = 0 seconds
Stock(price = 1); (...starting batch window...)
Stock(price = 1);
Stock(price = 1);
Stock(price = 2);
Stock(price = 2);
Stock(price = 2);
t = 5 seconds (...end of batch window...)
Avg = 1.5 => Alert detected!
5秒后的平均值为1.5,并将触发警报。我该如何编码呢?
使用Flink的CEP库时,此行为无法表达。我建议使用Flink DataStream或Table API来计算平均值。基于此,您可以再次使用CEP生成其他事件。
final DataStream input = env
.fromElements(
new Stock(1L, 1.0),
new Stock(2L, 2.0),
new Stock(3L, 1.0),
new Stock(4L, 2.0))
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Stock>(Time.seconds(0L)) {
@Override
public long extractTimestamp(Stock element) {
return element.getTimestamp();
}
});
final DataStream windowAggregation = input
.timeWindowAll(Time.milliseconds(2))
.aggregate(new AggregateFunction<Stock, Tuple2<Integer, Double>, Double>() {
@Override
public Tuple2<Integer, Double> createAccumulator() {
return Tuple2.of(0, 0.0);
}
@Override
public Tuple2<Integer, Double> add(Stock value, Tuple2<Integer, Double> accumulator) {
return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + value.getValue());
}
@Override
public Double getResult(Tuple2<Integer, Double> accumulator) {
return accumulator.f1 / accumulator.f0;
}
@Override
public Tuple2<Integer, Double> merge(Tuple2<Integer, Double> a, Tuple2<Integer, Double> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
});
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。