Flink - FLIP

  1. 云栖社区>
  2. 博客>
  3. 正文

Flink - FLIP

小旋风柴进 2017-04-07 09:37:00 浏览2215



FLIP-1 : Fine Grained Recovery from Task Failures


When a task fails during execution, Flink currently resets the entire execution graph and triggers complete re-execution from the last completed checkpoint. This is more expensive than just re-executing the failed tasks.




image 、



每个node,把要发出去的Intermediate Result缓存下来,当一个node的task挂了后, 只需要从上一层node把Intermediate Result从发出来,就可以避免从source重启 
至于如何cache Intermediate Result,在memory还是disk,还是其他,只是方案不同

Caching Intermediate Result

This type of data stream caches all elements since the latest checkpoint, possibly spilling them to disk, if the data exceeds the memory capacity.

When a downstream operator restarts from that checkpoint, it can simply re-read that data stream without requiring the producing operator to restart. Applicable to both batch (bounded) and streaming (unbounded) operations. When no checkpoints are used (batch), it needs to cache all data.

Memory-only caching Intermediate Result

Similar to the caching intermediate result, but discards sent data once the memory buffering capacity is exceeded. Acts as a “best effort” helper for recovery, which will bound recovery when checkpoints are frequent enough to hold data in between checkpoints in memory. On the other hand, it comes absolutely for free, it simply used memory that would otherwise not be used anyways.

Blocking Intermediate Result

This is applicable only to bounded intermediate results (batch jobs). It means that the consuming operator starts only after the entire bounded result has been produced. This bounds the cancellations/restarts downstream in batch jobs.



FLIP-2 Extending Window Function Metadata

Right now, in Flink a WindowFunction does not get a lot of information when a window fires. 

The signature of WindowFunction is this:

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

    void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out);



i.e , the user code only has access to the key for which the window fired, the window for which we fired and the data of the window itself. In the future, we might like to extend the information available to the user function. We initially propose this as additional information:

  • Why/when did the window fire. Did it fire on time, i.e. when the watermark passed the end of the window. Did it fire early because of a speculative early trigger or did it fire on late-arriving data.

  • How many times did we fire before for the current window. This would probably be an increasing index, such that each firing for a window can be uniquely identified.

当前在window functions中暴露出来的信息不够,需要给出更多的信息,比如why,when fire等

FLIP-3 - Organization of Documentation


FLIP-4 : Enhance Window Evictor

Right now, the ability of Window Evictor is limited

  • The Evictor is called only before the WindowFunction. (There can be use cases where the elements have to be evicted after the WindowFunction is applied)
  • Elements are evicted only from the beginning of the Window. (There can be cases where we need to allow eviction of elements from anywhere within in the Window as per the eviction logic that user wish to implement)

当前Evictor只是在WindowFunction 之前被执行,是否可以在WindowFunction 之后被执行?

当前的接口只是从beginning of the Window开始,是否可以从任意位置开始evict


FLIP-5: Only send data to each taskmanager once for broadcasts


We experience some unexpected increase of data sent over the network for broadcasts with increasing number of slots per task manager.








FLIP-6 - Flink Deployment and Process Model - Standalone, Yarn, Mesos, Kubernetes, etc.


增加两个新的模块, ResourceManagerdispatcher

The ResourceManager (introduced in Flink 1.1) is the cluster-manager-specific component. There is a generic base class, and specific implementations for:

  • YARN

  • Mesos

  • Standalone-multi-job (Standalone mode)

  • Self-contained-single-job (Docker/Kubernetes)



于是JobManager, TaskManager和ResourceManager之间的关系就变成这样




同时JobManager会有slot pool,来保持申请到的slots

The SlotPool is a modification of what is currently the InstanceManager.



The new design includes the concept of a Dispatcher. The dispatcher accepts job submissions from clients and starts the jobs on their behalf on a cluster manager.


In the future run, the dispatcher will also help with the following aspects:

  • The dispatcher is a cross-job service that can run a long-lived web dashboard

  • Future versions of the dispatcher should receive only HTTP calls and thus can act as a bridge in firewalled clusters

  • The dispatcher never executes code and can thus be viewed as a trusted process. It can run with higher privileges (superuser credentials) and spawn jobs on behalf of other users (acquiring their authentication tokens). Building on that, the dispatcher can manage user authentications


首先dispatcher是可以跨cluster的,是个long-lived web dashboard,比如后面如果一个cluster或jobmanager挂了,我可以简单的spawn到另外一个 


所以对于不同的cluster manager的具体架构如下,


Compared to the state in Flink 1.1, the new Flink-on-YARN architecture offers the following benefits:

  • The client directly starts the Job in YARN, rather than bootstrapping a cluster and after that submitting the job to that cluster. The client can hence disconnect immediately after the job was submitted

  • All user code libraries and config files are directly in the Application Classpath, rather than in the dynamic user code class loader

  • Containers are requested as needed and will be released when not used any more

  • The “as needed” allocation of containers allows for different profiles of containers (CPU / memory) to be used for different operators





你不需要先拉起flink集群,然后再提交job,只需要直接提交job;Yarn的ResourcManager会先拉起Application Master,其中包含Resource Manager和Job Manager;然后当Flink resource manager需要资源时,会先和YARN ResourceManager请求,它会去创建container,其中包含TaskManager;





Mesos-specific Fault Tolerance Aspects

ResourceManager and JobManager run inside a regular Mesos container. The Dispatcher is responsible for monitoring and restarting those containers in case they fail. The Dispatcher itself must be made highly available by a Mesos service like Marathon




The Standalone Setup is should keep compatibility with current Standalone Setups.

The role of the long running JobManager is now  a “local dispatcher” process that spawns JobManagers with Jobs internally. The ResourceManager lives across jobs and handles TaskManager registration.

For highly-available setups, there are multiple dispatcher processes, competing for being leader, similar as the currently the JobManagers do.



Component Design and Details




FLIP-7: Expose metrics to WebInterface

With the introduction of the metric system it is now time to make it easily accessible to users. As the WebInterface is the first stop for users for any details about Flink, it seems appropriate to expose the gathered metrics there as well.

The changes can be roughly broken down into 4 steps:

  1.     Create a data-structure on the Job-/TaskManager containing a metrics snapshot
  2.     Transfer this snapshot to the WebInterface back-end
  3.     Store the snapshot in the WebRuntimeMonitor in an easily accessible way
  4.     Expose the stored metrics to the WebInterface via REST API


FLIP-8: Rescalable Non-Partitioned State

要解决的问题是,当dynamic scaling的时候,如何解决状态的问题




同时在Flink里面,状态分为3部分,operator state, the function state and key-value states

其中对于key-value states的方案相对简单一些,https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#


比如对于key-value,创建一个叫key groups的概念,以key group作为一个checkpoint的单元

In order to efficiently distribute key-value states across the cluster, they should be grouped into key groups. Each key group represents a subset of the key space and is checkpointed as an independent unit. The key groups can then be re-assigned to different tasks if the DOP changes.

这样当发生增减operator的并发度的时候,只需要以key group为单位调度到新的operator上,同时在该operator上恢复相应的checkpoint即可,如图


然后,对于non-partitioned operator and function state,这个问题怎么解



scaling down后,就会出现下图,左边的情况,因为只有s1 task了,他只会load他自己的checkpoint,而之前s2的checkpoint就没人管了 


scaling up后,也会出现下图左边的case,因为S1,S2加载了原来的checkpoint,但是当前其实partition3,partition4已经不再分配到s2了






FLIP-9: Trigger DSL

当前支持的trigger方式不够灵活,而且对late element只能drop,需要设计更为灵活和合理的DSL,用于描述Trigger policy


FLIP-10: Unify Checkpoints and Savepoints

Currently checkpoints and savepoints are handled in slightly different ways with respect to storing and restoring them. The main differences are that savepoints 1) are manually triggered, 2) persist checkpoint meta data, and 3) are not automatically discarded.

With this FLIP, I propose to allow to unify checkpoints and savepoints by allowing savepoints to be triggered automatically.


FLIP-11: Table API Stream Aggregations

The Table API is a declarative API to define queries on static and streaming tables. So far, only projection, selection, and union are supported operations on streaming tables. This FLIP proposes to add support for different types of aggregations on top of streaming tables. In particular, we seek to support:

  • Group-window aggregates, i.e., aggregates which are computed for a group of elements. A (time or row-count) window is required to bound the infinite input stream into a finite group.

  • Row-window aggregates, i.e., aggregates which are computed for each row, based on a window (range) of preceding and succeeding rows.

Each type of aggregate shall be supported on keyed/grouped or non-keyed/grouped data streams for streaming tables as well as batch tables.

Since time-windowed aggregates will be the first operation that require the definition of time, this FLIP does also discuss how the Table API handles time characteristics, timestamps, and watermarks.


FLIP-12: Asynchronous I/O Design and Implementation

I/O access, for the most case, is a time-consuming process, making the TPS for single operator much lower than in-memory computing, particularly for streaming job, when low latency is a big concern for users. Starting multiple threads may be an option to handle this problem, but the drawbacks are obvious: The programming model for end users may become more complicated as they have to implement thread model in the operator. Furthermore, they have to pay attention to coordinate with checkpointing.




AsyncFunction: Async I/O will be triggered in AsyncFunction.

AsyncWaitOperator: An StreamOperator which will invoke AsyncFunction.

AsyncCollector: For each input streaming record, an AsyncCollector will be created and passed into user's callback to get the async i/o result.

AsyncCollectorBuffer: A buffer to keep all AsyncCollectors.

Emitter Thread: A working thread in AsyncCollectorBuffer, being signalled while some of AsyncCollectors have finished async i/o and emitting results to the following opeartors.









这里还需要考虑的是, watermark,它必须要等到前面的数据都已经被成功emit后,才能被emit;这样才能保证一致性


+ 关注