[Flink]Flink1.3 Batch指南一 本地运行

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink可以在单台机器上运行,甚至可以在单个Java虚拟机中运行。 这运行机制可以方便用户在本地测试和调试Flink程序。

Flink可以在单台机器上运行,甚至可以在单个Java虚拟机中运行。 这运行机制可以方便用户在本地测试和调试Flink程序。本节概述了Flink的本地执行机制。

本地环境和执行器(executors)允许你可以在本地Java虚拟机上运行Flink程序,或者是在正在运行程序的Java虚拟机上(with within any JVM as part of existing programs)。对于大部分示例程序而言,你只需简单的地点击你IDE上的运行(Run)按钮就可以执行。

Flink支持两种不同的本地运行机制: (1) LocalExecutionEnvironment启动完整的Flink运行环境,包括一个JobManager和一个TaskManager。这些包含了内存管理以及在集群模式下运行时所运行的所有内部算法。 (2) CollectionEnvironment在Java集合上运行Flink程序(executing the Flink program on Java collections)。这种模式不会启动完整的Flink运行环境,因此运行开销比较低以及轻量级。例如,DataSet的map转换操作将map()函数应用于Java列表中的所有元素上。

1. 调试

如果你在本地运行Flink程序,还可以像任何其他Java程序一样来调试程序。你可以使用System.out.println()来打印一些内部变量,也可以使用调试器。可以在map(),reduce()以及所有其他方法中设置断点。请参阅Java API文档中的调试部分,来了解如何使用Java API来测试和本地调试程序。

2. Maven

如果你在Maven项目中开发程序,则必须使用下面依赖关系添加flink-clients模块:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.10</artifactId>
  <version>1.3.2</version>
</dependency>

3. 本地运行环境

LocalEnvironment是本地运行Flink程序的句柄,可以使用它在本地的JVM,独立运行或嵌入其他程序里运行。

本地运行执行环境通过ExecutionEnvironment.createLocalEnvironment()方法实例化。默认情况下,Flink将尽可能使用跟你机器CPU核数一样多的本地线程来执行程序。你可以指定程序你想要的并行度。本地运行环境可以通过enableLogging()/disableLogging()来配置日志的输出。

在大多数情况下,ExecutionEnvironment.getExecutionEnvironment()是一种更好的选择。当程序在本地启动时(不使用命令行接口),该方法返回LocalEnvironment,当程序是通过命令行接口提交时,则该方法会返回为在集群中运行提前配置好的运行环境(pre-configured environment for cluster execution)。

public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

    DataSet<String> data = env.readTextFile("file:///path/to/file");

    data.filter(new FilterFunction<String>() {
            public boolean filter(String value) {
                return value.startsWith("http://");
            }
        })
        .writeAsText("file:///path/to/result");

    JobExecutionResult res = env.execute();
}

在程序执行结束时会返回JobExecutionResult对象,这个类中包含了程序的运行状态(runtime)和累加器(accumulator)结果。

LocalEnvironment也可以向Flink传入用户自定义配置。

Configuration conf = new Configuration();
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);

备注:

本地运行环境不启动任何Web前端来监控运行。

4. 集合运行环境

使用CollectionEnvironment在Java集合上运行,对于运行Flink程序是一种开销比较低的方法。在这种模式中通常用于自动化测试、调试、代码重用等场景。

用户可以使用用于批处理的算法,或者是用于更具交互性的算法(Users can use algorithms implemented for batch processing also for cases that are more interactive)。Flink程序通过稍微修改就可用于处理请求的Java应用服务器。

下面是集合环境的例子:

public static void main(String[] args) throws Exception {
    // initialize a new Collection-based execution environment
    final ExecutionEnvironment env = new CollectionEnvironment();

    DataSet<User> users = env.fromCollection( /* get elements from a Java Collection */);

    /* Data Set transformations ... */

    // retrieve the resulting Tuple2 elements into a ArrayList.
    Collection<...> result = new ArrayList<...>();
    resultDataSet.output(new LocalCollectionOutputFormat<...>(result));

    // kick off execution.
    env.execute();

    // Do some work with the resulting ArrayList (=Collection).
    for(... t : result) {
        System.err.println("Result = "+t);
    }
}

flink-examples-batch模块包含一个完整的示例,名称为CollectionExecutionExample

备注:

基于集合的Flink程序仅适用于小数据量,这样可以完全放进JVM堆中。在集合上的运行不是多线程的,只使用一个线程。

备注:

Flink版本为1.3

原文:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/local_execution.html

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
|
4月前
|
监控 流计算
Flink 运行时日志分析
Flink 运行时日志分析
96 0
|
4月前
|
资源调度 分布式计算 Java
Flink(三)【运行时架构】
Flink(三)【运行时架构】
|
6月前
|
Kubernetes 流计算 容器
Flink on k8s的话,怎么在容器运行前初始化一些脚本?
Flink on k8s的话,怎么在容器运行前初始化一些脚本?
42 1
|
5月前
|
流计算
Flink CDC在运行过程中遇到"Could not upload job files"的问题
Flink CDC在运行过程中遇到"Could not upload job files"的问题
109 1
|
1月前
|
消息中间件 Kafka 流计算
如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
【2月更文挑战第30天】如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
20 2
|
1月前
|
Kubernetes 网络协议 Java
在Kubernetes上运行Flink应用程序时
【2月更文挑战第27天】在Kubernetes上运行Flink应用程序时
34 10
|
1月前
|
SQL 资源调度 Oracle
Flink CDC产品常见问题之sql运行中查看日志任务失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
消息中间件 Java Kafka
Apache Hudi + Flink作业运行指南
Apache Hudi + Flink作业运行指南
85 1
|
1月前
|
监控 Apache 开发工具
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
67 0