Flink1.9 Sate Processor API 介绍和实例demo

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 功能介绍 Flink1.9 新添加的功能,其能够帮助用户直接访问Flink中存储的State,API能够帮助用户非常方便地读取、修改甚至重建整个State。这个功能的强大之处在于几个方面,第一个就是灵活地读取外部的数据,比如从一个数据库中读取自主地构建Savepoint,解决作业冷启动问题,这样就不用从N天前开始重跑整个数据 可以使用的场景 异步校验或者查看某个阶段的状态,一般而言,flink作业的最终结果都会持久化输出,但在面临问题的时候,如何确定哪一级出现问题,state processor api也提供了一种可能,去检验state中的数据是否与预期的一致。

功能介绍

Flink1.9 新添加的功能,其能够帮助用户直接访问Flink中存储的State,API能够帮助用户非常方便地读取、修改甚至重建整个State。这个功能的强大之处在于几个方面,第一个就是灵活地读取外部的数据,比如从一个数据库中读取自主地构建Savepoint,解决作业冷启动问题,这样就不用从N天前开始重跑整个数据

可以使用的场景

  • 异步校验或者查看某个阶段的状态,一般而言,flink作业的最终结果都会持久化输出,但在面临问题的时候,如何确定哪一级出现问题,state processor api也提供了一种可能,去检验state中的数据是否与预期的一致。
  • 脏数据订正,比如有一条脏数据污染了State,就可以用State Processor API对于状态进行修复和订正。
  • 状态迁移,当用户修改了作业逻辑,还想要复用原来作业中大部分的State,或者想要升级这个State的结构就可以用这个API来完成相应的工作。
  • 解决作业冷启动问题,这样就不用从N天前开始重跑整个数据。

一些限制点

  • window state暂时修改不了
  • 每个有状态的算子都必须手动指定uid
  • 无法通过读取savepoint 直接获取到metadata 信息(existing operator ids)

关联的知识点

State 分为: 1: Operator States 2: Keyed States
在读取state的时候需要根据对应的类型选择不同的读取方式

Operator States Keyed States
readListState readKeyedState
readUnionState
readBroadcastState

基于batch 热加载数据生成Savepoint 和 Savepoint state 修改

最后会给出对应的两个demo。
基本流程两者比较类似

  • 基于batch 热加载数据

    1: batch读取数据 --> Dataset (比如读取文本文件)
    2: 编写业务逻辑处理数据 --> 获取转换后的DataSet(处理文本生成一个Tuple2<key, num>
    3: 将数据结果转换为state --> KeyedStateBootstrapFunction
    4: 生成外部Savepoint(注意对uid的指定和StateBackend 类型的选择)
    • Savepoint state 修改
    1: 调用Savepoint.load 加载当前已经存在的Savepoint(注意StateBackend 必须和之前生成的任务一致)
    2: 调用 savepoint.readKeyedState 读取获取到的ExistingSavepoint,结果是一个DataSet数据集
    3:编写Batch 业务逻辑调整生成的DataSet(比如删除某个元素),其结果还算一个DataSet
    4: 自定义 KeyedStateBootstrapFunction 将数据结果转换为state
    5: 生成外部Savepoint(注意对uid的指定和StateBackend 类型的选择)

基于batch 重新构建stream样例

public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //获取外部离线数据源
        DataSource<String> textSource =  env.readTextFile("D:\\sources\\data.txt");
        DataSet<Tuple2<String, Integer>> sourceDataSet = textSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] strArr = value.split(",");
                for (String str : strArr) {
                    Tuple2<String, Integer> worldTuple = new Tuple2<>(str, 1);
                    out.collect(worldTuple);
                }
            }
        });

        //计算出需要的历史状态
        DataSet<ReadAndModifyState.KeyedValueState> dataSet = sourceDataSet
                .groupBy(0)
                .reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, ReadAndModifyState.KeyedValueState>() {
            @Override
            public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<ReadAndModifyState.KeyedValueState> out) throws Exception {

                Iterator iterator = values.iterator();
                Long countNum = 0L;
                String worldkey = null;
                while(iterator.hasNext()){
                    Tuple2<String, Integer> info = (Tuple2<String, Integer>) iterator.next();
                    if(worldkey == null){
                        worldkey = info.f0;
                    }
                    countNum++;
                }

                ReadAndModifyState.KeyedValueState keyedValueState = new ReadAndModifyState.KeyedValueState();
                keyedValueState.key = new Tuple1<>(worldkey);
                keyedValueState.countNum = countNum;

                out.collect(keyedValueState);
            }
        });

        //将历史状态转换为state 并转换为savepoint 写入hdfs上
        BootstrapTransformation<ReadAndModifyState.KeyedValueState> transformation = OperatorTransformation
                .bootstrapWith(dataSet)
                .keyBy(new KeySelector<ReadAndModifyState.KeyedValueState, Tuple1<String>>() {
                    @Override
                    public Tuple1<String> getKey(ReadAndModifyState.KeyedValueState value) throws Exception {
                        return value.key;
                    }
                })
                .transform(new ReadAndModifyState.KeyedValueStateBootstrapper());

        String uid = "keyby_summarize";
        String savePointPath = "hdfs://ns1/user/xc/savepoint-from-batch";
        StateBackend rocksDBBackEnd = new RocksDBStateBackend("hdfs://ns1/user/xc");
        Savepoint.create(rocksDBBackEnd, 128)
                .withOperator(uid, transformation)
                .write(savePointPath);


        env.execute("batch build save point");
        System.out.println("-------end------------");
    }

读取和修改样例

 public static void main(String[] args) throws Exception {
        ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
        String savePointPath = "hdfs://ns1/user/xc/savepoint-61b8e1-bbee958b3087";
        StateBackend rocksDBBackEnd = new RocksDBStateBackend("hdfs://ns1/user/xc");

        ExistingSavepoint savepoint = Savepoint.load(bEnv, savePointPath, rocksDBBackEnd);

        //读取
        String uid = "keyby_summarize";
        DataSet<KeyedValueState> keyState = savepoint.readKeyedState(uid, new StateReaderFunc());

        //修改
        DataSet<KeyedValueState> dataSet = keyState.flatMap((FlatMapFunction<KeyedValueState, KeyedValueState>) (value, out) -> {
            value.countNum = value.countNum * 2;
            out.collect(value);
        }).returns(KeyedValueState.class);

        BootstrapTransformation<KeyedValueState> transformation = OperatorTransformation
                .bootstrapWith(dataSet)
                //注意keyby操作的key一定要和原来的相同
                .keyBy(new KeySelector<KeyedValueState, Tuple1<String>>() {
                    @Override
                    public Tuple1<String> getKey(KeyedValueState value) throws Exception {
                        return value.key;
                    }
                })
                .transform(new KeyedValueStateBootstrapper());

        Savepoint.create(rocksDBBackEnd, 128)
                .withOperator(uid, transformation)
                .write("hdfs://ns1/user/xc/savepoint-after-modify3");


        bEnv.execute("read the list state");
        System.out.println("-----end------------");
    }

    public static class StateReaderFunc extends KeyedStateReaderFunction<Tuple1<String>, KeyedValueState> {

        private static final long serialVersionUID = -3616180524951046897L;
        private transient ValueState<Long> state;

        @Override
        public void open(Configuration parameters) {
            ValueStateDescriptor currentCountDescriptor = new ValueStateDescriptor("currentCountState", Long.class);
            state = getRuntimeContext().getState(currentCountDescriptor);
        }

        @Override
        public void readKey(Tuple1<String> key, Context ctx, Collector<KeyedValueState> out) throws Exception {
            System.out.println(key.f0 +":" + state.value());

            KeyedValueState keyedValueState = new KeyedValueState();
            keyedValueState.key = new Tuple1<>(key.f0);
            keyedValueState.countNum = state.value();

            out.collect(keyedValueState);
        }
    }

    public static class KeyedValueState {
        Tuple1<String> key;
        Long countNum;
    }

    private static class KeyedValueStateBootstrapper extends KeyedStateBootstrapFunction<Tuple1<String>, KeyedValueState>{

        private static final long serialVersionUID = 1893716139133502118L;
        private ValueState<Long> currentCount = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor currentCountDescriptor = new ValueStateDescriptor("currentCountState", Long.class, 0L);
            currentCount = getRuntimeContext().getState(currentCountDescriptor);
        }

        @Override
        public void processElement(KeyedValueState value, Context ctx) throws Exception {
            currentCount.update(value.countNum);
        }
    }
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
|
3月前
|
JSON API 数据格式
您可以在钉钉开放平台的API文档中找到对应的API接口来创建审批实例
您可以在钉钉开放平台的API文档中找到对应的API接口来创建审批实例【1月更文挑战第20天】【1月更文挑战第96篇】
43 2
|
3月前
|
JSON API 数据格式
您可以在钉钉开放平台的API文档中找到对应的API接口来创建审批实例
您可以在钉钉开放平台的API文档中找到对应的API接口来创建审批实例【1月更文挑战第9天】【1月更文挑战第41篇】
129 2
|
3月前
|
API 网络安全
调用钉钉的API获取审批实例ID列表时返回的结果为
调用钉钉的API获取审批实例ID列表时返回的结果为【1月更文挑战第5天】【1月更文挑战第21篇】
36 1
|
30天前
|
SQL 分布式计算 测试技术
Flink API的4个层次
【2月更文挑战第28天】
|
1月前
|
消息中间件 SQL Kafka
如何高效接入 Flink: Connecter / Catalog API 核心设计与社区进展
本文整理自阿里云实时计算团队 Apache Flink Committer 和 PMC Member 任庆盛在 FFA 2023 核心技术专场(二)中的分享。
282 0
如何高效接入 Flink: Connecter / Catalog API 核心设计与社区进展
|
1月前
|
分布式计算 API 数据处理
Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
【2月更文挑战第15天】Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
56 1
|
3月前
|
API 流计算
flink读写hudi的demo
flink读写hudi的demo
|
3月前
|
JSON 关系型数据库 MySQL
这个问题是由于Flink的Table API在处理MySQL数据时,将MULTISET类型的字段转换为了JSON格式
【1月更文挑战第17天】【1月更文挑战第84篇】这个问题是由于Flink的Table API在处理MySQL数据时,将MULTISET类型的字段转换为了JSON格式
34 1
|
4月前
|
存储 API 数据安全/隐私保护
2021年最新最全Flink系列教程__Flink高级API(四)
2021年最新最全Flink系列教程__Flink高级API(四)
35 0

热门文章

最新文章