Flink之CEP案例分析-网络攻击检测

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 上一篇我们介绍了Flink CEP的API,这一篇我们将以结合一个案例来练习使用CEP的API编写应用程序,以强化对API的理解。所选取的案例是对网络遭受的潜在攻击进行检测并给出告警。当下互联网安全形势仍然严峻,网络攻击屡见不鲜且花样众多,这里我们以DDOS(分布式拒绝服务攻击)产生的流入流量来作为遭受攻击的判断依据。

上一篇我们介绍了Flink CEP的API,这一篇我们将以结合一个案例来练习使用CEP的API编写应用程序,以强化对API的理解。所选取的案例是对网络遭受的潜在攻击进行检测并给出告警。当下互联网安全形势仍然严峻,网络攻击屡见不鲜且花样众多,这里我们以DDOS(分布式拒绝服务攻击)产生的流入流量来作为遭受攻击的判断依据。

假定一家云服务提供商,有多个跨地区的数据中心,每个数据中心会定时向监控中心上报其瞬时流量。

我们将检测的结果分为三个等级:

  • 正常:流量在预设的正常范围内;
  • 警告:某数据中心在10秒内连续两次上报的流量超过认定的正常值;
  • 报警:某数据中心在30秒内连续两次匹配警告;

首先,我们构建source,这里我们选择的是并行source,因此需要继承RichParallelSourceFunction类。所有的数据通过模拟器随机生成,其中数据中心编号为整型且取值范围为[0, 10),数据生成的事件间隔由PAUSE常量指定,默认为100毫秒:

//parallel source
DataStream<MonitorEvent> inputEventStream = env.addSource(
    new MonitorEventSource(
        MAX_DATACENTER_ID,
        STREAM_STD,
        STREAM_MEAN,
        PAUSE
    )
).assignTimestampsAndWatermarks(new IngestionTimeExtractor<MonitorEvent>());

下面,我们来构建警告模式,按照我们设定的警告等级,其模式定义如下:

Pattern<MonitorEvent, ?> warningPattern = Pattern.<MonitorEvent>begin("first")
    .subtype(NetworkStreamEvent.class)
    .where(evt -> evt.getStream() >= STREAM_THRESHOLD)
    .next("second")
    .subtype(NetworkStreamEvent.class)
    .where(evt -> evt.getStream() >= STREAM_THRESHOLD)
    .within(Time.seconds(10));

根据该模式构建模式流:

PatternStream<MonitorEvent> warningPatternStream =
    CEP.pattern(inputEventStream.keyBy("dataCenterId"), warningPattern);

在警告的模式流中筛选出配对的警告事件对,生成警告事件对象流(告警事件对象会算出,前后两个匹配的流量事件的平均值):

DataStream<NetworkStreamWarning> warnings = warningPatternStream.select(
    (Map<String, MonitorEvent> pattern) -> {
        NetworkStreamEvent first = (NetworkStreamEvent) pattern.get("first");
        NetworkStreamEvent second = (NetworkStreamEvent) pattern.get("second");

        return new NetworkStreamWarning(first.getDataCenterId(),
            (first.getStream() + second.getStream()) / 2);
    }
);

按照设定的等级,告警模式定义如下:

Pattern<NetworkStreamWarning, ?> alertPattern = Pattern.<NetworkStreamWarning>
    begin("first").next("second").within(Time.seconds(30));

在警告事件流中应用告警模式,得到告警模式流:

PatternStream<NetworkStreamWarning> alertPatternStream = CEP.pattern(warnings.keyBy
    ("dataCenterId"), alertPattern);

在告警模式流中匹配警告模式对,如果模式对中第一个警告对象的平均流量值小于第二个警告对象的平均流量值,则构建告警对象并输出该对象从而形成告警流:

DataStream<NetworkStreamAlert> alerts = alertPatternStream.flatSelect(
    (Map<String, NetworkStreamWarning> pattern, Collector<NetworkStreamAlert> out) -> {
        NetworkStreamWarning first = pattern.get("first");
        NetworkStreamWarning second = pattern.get("second");

        //first avg < second avg
        if (first.getAverageStream() < second.getAverageStream()) {
            out.collect(new NetworkStreamAlert(first.getDataCenterId()));
        }
    }
);

最终,sink到控制台:

warnings.print();
alerts.print();

从上面的代码段可见,CEP的关键是定义合适的模式。关于模式的相关的API,我们之前已进行过分析。为了节省篇幅,本文只列出了核心代码片段。

需要注意的是,因为包含Java 8的lambdas,当你使用javac作为编译器时,将会得到错误提示:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: 
The generic type parameters of 'Map' are missing. 
It seems that your compiler has not stored them into the .class file. 
Currently, only the Eclipse JDT compiler preserves the type information necessary to 
use the lambdas feature type-safely. 
See the documentation for more information about how to compile jobs containing lambda expressions.
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1331)
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:1317)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:347)
at org.apache.flink.cep.PatternStream.select(PatternStream.java:81)
at com.diveintoapacheflink.chapter11.NetworkAttackMonitor.main(NetworkAttackMonitor.java:55)
at ...

解决方案是使用Eclipse JDT来编译代码。


原文发布时间为:2017-03-01
本文作者:vinoYang
本文来自云栖社区合作伙伴 CSDN博客,了解相关信息可以关注CSDN博客。
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
机器学习/深度学习 算法 计算机视觉
基于yolov2深度学习网络的火焰烟雾检测系统matlab仿真
基于yolov2深度学习网络的火焰烟雾检测系统matlab仿真
|
2月前
|
机器学习/深度学习 编解码 计算机视觉
【APFN】从大佬论文中探索如何分析改进金字塔网络
【APFN】从大佬论文中探索如何分析改进金字塔网络
34 0
|
2月前
|
机器学习/深度学习 分布式计算 资源调度
【社交网络分析】课程考试复盘 + 相关资料补充
【社交网络分析】课程考试复盘 + 相关资料补充
52 0
|
24天前
|
监控 Shell Linux
【Shell 命令集合 网络通讯 】Linux 分析串口的状态 statserial命令 使用指南
【Shell 命令集合 网络通讯 】Linux 分析串口的状态 statserial命令 使用指南
32 0
|
29天前
|
机器学习/深度学习 算法 计算机视觉
基于yolov2深度学习网络的视频手部检测算法matlab仿真
基于yolov2深度学习网络的视频手部检测算法matlab仿真
|
7天前
|
安全 网络安全 网络虚拟化
虚拟网络设备与网络安全:深入分析与实践应用
在数字化时代📲,网络安全🔒成为了企业和个人防御体系中不可或缺的一部分。随着网络攻击的日益复杂和频繁🔥,传统的物理网络安全措施已经无法满足快速发展的需求。虚拟网络设备🖧,作为网络架构中的重要组成部分,通过提供灵活的配置和强大的隔离能力🛡️,为网络安全提供了新的保障。本文将从多个维度深入分析虚拟网络设备是如何保障网络安全的,以及它们的实际意义和应用场景。
|
17天前
|
缓存 网络协议 数据库连接
【底层服务/编程功底系列】「网络通信体系」深入探索和分析TCP协议的运输连接管理的核心原理和技术要点
【底层服务/编程功底系列】「网络通信体系」深入探索和分析TCP协议的运输连接管理的核心原理和技术要点
10 0
|
21天前
|
运维 负载均衡 监控
【软件设计师备考 专题 】网络性能分析
【软件设计师备考 专题 】网络性能分析
39 0
|
1月前
|
定位技术 流计算
在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
【2月更文挑战第12天】在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
33 3
|
2月前
|
JSON 监控 网络安全
使用Perl编写的上网监控管理软件:网络数据包拦截与分析功能
网络安全一直是互联网时代的重要议题之一。随着网络技术的不断发展,网络攻击和数据泄露等问题也变得日益严重。为了有效监控和管理网络流量,开发了一款基于Perl语言的上网监控管理软件,该软件具有强大的网络数据包拦截与分析功能,能够帮助网络管理员实时监控网络流量,并及时发现和应对各种网络安全威胁。
128 0