小规模的流处理框架.Part 2: RxJava 1.x/2.x

简介:

part 1: thread pools中,我们设计并实现了一个相对简单的实时处理events的系统。在阅读本文之前你应该确保已经读懂了Part1的那篇文章,这里重新阐述一遍系统的设计要求:

系统能够每秒处理1000个任务,每一个Event至少有2个属性:

  • clientId-我们希望每一秒有多个任务是在同一个客户端下处理的(译者:不同的clientId对应不同的ClientProjection,即对应不同的一系列操作)
  • UUID-全局唯一的

消费一个任务要花费10毫秒,为这样的流设计一个消费者:

  1. 能够实时的处理任务
  2. 和同一个客户端有关的任务应该被有序地处理,例如你不能对拥有同一个clientId的任务序列使用并行处理
  3. 如果10秒内出现了重复的UUID,丢弃它。假设10秒后不会重复

到目前为止我们提出了线程池和共享缓存结合的设计,而在这篇文章中我们会使用RxJava进行实现。开始之前,我从没有提到EventStream是如何实现的,仅仅是给出了API:

1 interface EventStream {
2  
3     void consume(EventConsumer consumer);
4  
5 }

事实上为了能够进行测试,我建立了一个RxJava流,它所有的行为都符合设计要求:

01 @Slf4j
02 class EventStream {
03  
04     void consume(EventConsumer consumer) {
05         observe()
06             .subscribe(
07                 consumer::consume,
08                 e -> log.error("Error emitting event", e)
09         );
10     }
11  
12     Observable<Event> observe() {
13         return Observable
14                 .interval(1, TimeUnit.MILLISECONDS)
15                 .delay(x -> Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS))
16                 .map(x -> new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID()))
17                 .flatMap(this::occasionallyDuplicate, 100)
18                 .observeOn(Schedulers.io());
19     }
20  
21     private Observable<Event> occasionallyDuplicate(Event x) {
22         final Observable<Event> event = Observable.just(x);
23         if (Math.random() >= 0.01) {
24             return event;
25         }
26         final Observable<Event> duplicated =
27                 event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS);
28         return event.concatWith(duplicated);
29     }
30  
31 }

虽然你们没必要明白这个流模拟器是怎么工作的,但它的工作过程相当有趣。首先我们使用interval()产生一个每毫秒输出一个Long型值(0,1,2)的稳定流(这是因为设计要求系统每秒能处理1000个event)。然后我们使用delay()对每个event进行0到1000微秒内的随机延迟,在这之后events出现的时机就变得不可预测,就更符合真实情况。最终我们使用map()将每个Long型值映射到一个随机的Event上,每个Event都包含一个1000到1100(inclusive-exclusive)之间的clientId。
最后一点就有趣了,我们想模拟随机的重复事件。为了做到这点,我们使用flatMap()将每个event映射到自身(99%情况下)。然而在剩下的1%情况中,我们将event返回两次,第二次出现的时间延迟了10ms到5s。实际应用时,重复的event与第一次出现的event之间会相隔几百个其他的event,这就使得流的行为更加符合真实情况。
有两种方法可以与EventStream进行交互-基于回调的consume()和基于流的observer()。我们可以利用Observable快速地建立处理管道,这种方法的功能和part1中的非常的像但更加简单。

Missing backpressure

首先利用RxJava实现最初的方案非常简短:

01 EventStream es = new EventStream();
02 EventConsumer clientProjection = new ClientProjection(
03         new ProjectionMetrics(
04                 new MetricRegistry()));
05  
06 es.observe()
07         .subscribe(
08                 clientProjection::consume,
09                 e -> log.error("Fatal error", e)
10         );

(ClientProjection,ProjectionMetrics等来自于part1).使用以上的代码几乎会立刻抛出MissingBackpressureException,这也是预料之中的。你们记得吗,我们在part1中最初的方案会运行的越来越慢是因为处理event的潜伏期越来越长。RxJava会尽量避免这种情况,而且也会避免队列溢出。之所以会抛出MissingBackpressureException是因为消费者(ClientProjection)没有能力实时地处理event。这是一个fail-fast机制。聪明的做法就是将处理的过程移到一个独立的线程池,就像之前那样,但这次要使用RxJava来实现:

01 EventStream es = new EventStream();
02 EventConsumer clientProjection = new FailOnConcurrentModification(
03         new ClientProjection(
04                 new ProjectionMetrics(
05                         new MetricRegistry())));
06  
07 es.observe()
08         .flatMap(e -> clientProjection.consume(e, Schedulers.io()))
09         .window(1, TimeUnit.SECONDS)
10         .flatMap(Observable::count)
11         .subscribe(
12                 c -> log.info("Processed {} events/s", c),
13                 e -> log.error("Fatal error", e)
14         );

EventConsumer中添加了一个辅助方法,它能够利用提供的Scheduler异步地处理event:

01 @FunctionalInterface
02 interface EventConsumer {
03     Event consume(Event event);
04  
05     default Observable<Event> consume(Event event, Scheduler scheduler) {
06         return Observable
07                 .fromCallable(() -> this.consume(event))
08                 .subscribeOn(scheduler);
09     }
10  
11 }

使用flatMap()在一个独立的Scheduler.io()中处理event,这样每一个消费过程都是异步调用的。这次event的处理已经符合实时性的要求了,但还有一个更大的问题。我使用FailOnConcurrentModification对ClientProjection进行包装是有原因的。events的处理都是彼此独立的,所以对于同一个clientId有可能会并发地处理两个event,这样并不好。幸运的是比起使用线程来说,用RxJava解决这个问题要更加简单:

01 es.observe()
02         .groupBy(Event::getClientId)
03         .flatMap(byClient -> byClient
04                 .observeOn(Schedulers.io())
05                 .map(clientProjection::consume))
06         .window(1, TimeUnit.SECONDS)
07         .flatMap(Observable::count)
08         .subscribe(
09                 c -> log.info("Processed {} events/s", c),
10                 e -> log.error("Fatal error", e)
11         );

上面的代码改动的地方只有一点点。首先我们依据clientId对event进行分组,将单一的Observable流分割成多个流,每个名为byClient的子流都代表着拥有相同clientId的event。现在如果我们对子流进行映射,我们能够确定有相同clientId的event是绝不会并发地被处理的。输出流是惰性的,所以我们必须对流调用subscribe。与其对每一个event单独地调用subscribe,我们选择将每一秒内处理的event收集起来并对其计数。这样一来每秒我们接收到的就是一个Integer类型的event,它代表着每秒内我们处理的event数量。

Impure, non-idiomatic, error-prone, unsafe solution of deduplication using global state

现在我们必须除去重复的UUID,最简单也是最笨的做法就是利用全局状态。我们能够简单地利用filter()在cache中查找重复的event:

01 final Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder()
02         .expireAfterWrite(10, TimeUnit.SECONDS)
03         .build();
04  
05 es.observe()
06         .filter(e -> seenUuids.getIfPresent(e.getUuid()) == null)
07         .doOnNext(e -> seenUuids.put(e.getUuid(), e.getUuid()))
08         .subscribe(
09                 clientProjection::consume,
10                 e -> log.error("Fatal error", e)
11         );

如果你想要监控上面代码的效果可以简单的加入一个度量器:

01 Meter duplicates = metricRegistry.meter("duplicates");
02  
03 es.observe()
04         .filter(e -> {
05             if (seenUuids.getIfPresent(e.getUuid()) != null) {
06                 duplicates.mark();
07                 return false;
08             else {
09                 return true;
10             }
11         })

在操作符内部访问全局的、尤其是可变的状态时是非常危险的,并且这样会破坏RxJava唯一的目的-简单并发。虽然我们使用的是Guava中线程安全的Cache,但在很多情况下你很容易会忘记这个全局共享的可变状态是可以被多个线程访问的,如果你发现你在操作符链中修改外部的一些变量的话,那就要非常小心了。

Custom distinct() operator in RxJava 1.x

RxJava 1.x有一个distinct()运算函数,它大概可以做如下的工作:

1 es.observe()
2         .distinct(Event::getUuid)
3         .groupBy(Event::getClientId)

不幸的是distinct()会在内部将所有的UUID都存储在一个不断增长的HashSet里面,但我们只关心10s内的重复事件。通过复制粘贴DistinctOperator的实现,我创造了DistinctEvent操作符,它利用了Guava的cache仅仅只存储10s内的UUID。我故意将Event硬编码在这个操作符内而不是将它写成一般性的就是为了让代码更易懂:

01 class DistinctEvent implements Observable.Operator<Event, Event> {
02     private final Duration duration;
03  
04     DistinctEvent(Duration duration) {
05         this.duration = duration;
06     }
07  
08     @Override
09     public Subscriber<? super Event> call(Subscriber<? super Event> child) {
10         return new Subscriber<Event>(child) {
11             final Map<UUID, Boolean> keyMemory = CacheBuilder.newBuilder()
12                     .expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS)
13                     .<UUID, Boolean>build().asMap();
14  
15             @Override
16             public void onNext(Event event) {
17                 if (keyMemory.put(event.getUuid(), true) == null) {
18                     child.onNext(event);
19                 else {
20                     request(1);
21                 }
22             }
23  
24             @Override
25             public void onError(Throwable e) {
26                 child.onError(e);
27             }
28  
29             @Override
30             public void onCompleted() {
31                 child.onCompleted();
32             }
33  
34         };
35     }
36 }

自定义的操作符使用起来非常简单,实现如下:

01 es.observe()
02         .lift(new DistinctEvent(Duration.ofSeconds(10)))
03         .groupBy(Event::getClientId)
04         .flatMap(byClient -> byClient
05                 .observeOn(Schedulers.io())
06                 .map(clientProjection::consume)
07         )
08         .window(1, TimeUnit.SECONDS)
09         .flatMap(Observable::count)
10         .subscribe(
11                 c -> log.info("Processed {} events/s", c),
12                 e -> log.error("Fatal error", e)
13         );

事实上如果我们跳过每秒的logging实现可以变得更加简单:

01 es.observe()
02         .lift(new DistinctEvent(Duration.ofSeconds(10)))
03         .groupBy(Event::getClientId)
04         .flatMap(byClient -> byClient
05                 .observeOn(Schedulers.io())
06                 .map(clientProjection::consume)
07         )
08         .subscribe(
09                 e -> {},
10                 e -> log.error("Fatal error", e)
11         );

这个方案比之前的基于线程池和装饰者模式的要更加简短,其中唯一麻烦的部分就是在自定义的操作符中当存储了太多的UUID之后会造成内存泄漏,幸好RxJava 2能解决这个问题。

RxJava 2.x and more powerful built-in distinct()

distinct()允许使用自定义的Collection而不必使用内置的HashSet(感觉2.x中可以使用自定义的数据结构后,1.x中的DistinctEvent就完全没必要了)。不管你是否相信,依赖倒置不仅仅只出现在Spring框架或者Java EE中。当一个库允许你提供它内部数据结构的自定义实现时,这就已经是依赖反转。首先我创造了一个辅助方法,它能够建立Set,Set由Map提供依赖,而Map则由Cache提供依赖。这就像委托一样!

1 private Set<UUID> recentUuids() {
2     return Collections.newSetFromMap(
3             CacheBuilder.newBuilder()
4                     .expireAfterWrite(10, TimeUnit.SECONDS)
5                     .<UUID, Boolean>build()
6                     .asMap()
7     );
8 }

有了这个方法之后,我们就能利用以下的代码实现整个任务:

01 es.observe()
02         .distinct(Event::getUuid, this::recentUuids)
03         .groupBy(Event::getClientId)
04         .flatMap(byClient -> byClient
05                 .observeOn(Schedulers.io())
06                 .map(clientProjection::consume)
07         )
08         .subscribe(
09                 e -> {},
10                 e -> log.error("Fatal error", e)
11         );

这段代码是如此的优雅、简单、清晰!它的大致流程如下:

  • observe一个event流
  • 消除重复的UUID
  • 依据clientId对event分组
  • 对每一个client有序地处理event

希望你能喜欢这些方案,并能将之运用到你的日常生活中去。

See also:

相关文章
|
14天前
|
分布式计算 大数据 数据处理
Apache Spark:提升大规模数据处理效率的秘籍
【4月更文挑战第7天】本文介绍了Apache Spark的大数据处理优势和核心特性,包括内存计算、RDD、一站式解决方案。分享了Spark实战技巧,如选择部署模式、优化作业执行流程、管理内存与磁盘、Spark SQL优化及监控调优工具的使用。通过这些秘籍,可以提升大规模数据处理效率,发挥Spark在实际项目中的潜力。
30 0
|
13天前
|
机器学习/深度学习 分布式计算 大数据
一文读懂Apache Beam:统一的大数据处理模型与工具
【4月更文挑战第8天】Apache Beam是开源的统一大数据处理模型,提供抽象化编程模型,支持批处理和流处理。它提倡"一次编写,到处运行",可在多种引擎(如Spark、Dataflow、Flink)上运行。Beam的核心特性包括抽象化概念(PCollection、PTransform和PipelineRunner)、灵活性(支持多种数据源和转换)和高效执行。它广泛应用在ETL、实时流处理、机器学习和大数据仓库场景,助力开发者轻松应对数据处理挑战。
17 1
|
1月前
|
分布式计算 API 数据处理
Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
【2月更文挑战第15天】Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
60 1
|
3月前
|
消息中间件 Dubbo Java
Simple RPC - 01 框架原理及总体架构初探
Simple RPC - 01 框架原理及总体架构初探
51 0
|
SQL 存储 缓存
Apache Calcite 框架 50 倍性能优化实践
某天临时被当成壮丁拉去参加一个非常牛逼的应用监控平台(后续会开源),然后大佬就给我派了一个任务,要将项目中的查询性能优化 50 倍以上,大佬对我如此地寄予厚望,我怎么能让大佬失望呢(虽然我内心瑟瑟发抖)?于是我就开始了这段性能优化之旅。
1285 0
Apache Calcite 框架 50 倍性能优化实践
|
存储 分布式计算 资源调度
Spark框架深度理解三:运行架构、核心数据集RDD
Spark框架深度理解三:运行架构、核心数据集RDD
318 0
Spark框架深度理解三:运行架构、核心数据集RDD
|
存储 消息中间件 传感器
超越Storm,SparkStreaming——Flink如何实现有状态的计算
超越Storm,SparkStreaming——Flink如何实现有状态的计算
168 0
超越Storm,SparkStreaming——Flink如何实现有状态的计算
|
SQL 分布式计算 算法
Flink 的编程模型与其他框架比较
我们在讲解 Flink 程序的编程模型之前,先来了解一下 Flink 中的 Streams、State、Time 等核心概念和基础语义,以及 Flink 提供的不同层级的 API。
220 0
Flink 的编程模型与其他框架比较
|
机器学习/深度学习 并行计算 算法
阿里开源 支持10万亿模型的自研分布式训练框架EPL(Easy Parallel Library)
最近阿里云机器学习PAI平台和达摩院智能计算实验室一起发布“低碳版”巨模型M6-10T,模型参数已经从万亿跃迁到10万亿,规模远超业界此前发布的万亿级模型,成为当前全球最大的AI预训练模型。同时做到了业内极致的低碳高效,使用512 GPU在10天内即训练出具有可用水平的10万亿模型。
|
消息中间件 SQL 分布式计算
Spark Sreaming实战(二)-小试流式处理
Spark Sreaming实战(二)-小试流式处理
83 0
Spark Sreaming实战(二)-小试流式处理