[Flink]Flink1.3 指南四 命令行接口

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Flink提供了一个命令行接口(CLI)用来运行打成JAR包的程序,并且可以控制程序的运行。命令行接口在Flink安装完之后即可拥有,本地单节点或是分布式的部署安装都会有命令行接口。

Flink提供了一个命令行接口(CLI)用来运行打成JAR包的程序,并且可以控制程序的运行。命令行接口在Flink安装完之后即可拥有,本地单节点或是分布式的部署安装都会有命令行接口。命令行接口启动脚本是 $FLINK_HOME/bin目录下的flink脚本, 默认情况下会连接运行中的Flink master(JobManager),JobManager的启动脚本与CLI在同一安装目录下。

使用命令行接口的先决条件是JobManager已经被启动或是在Flink YARN环境下。JobManager可以通过如下命令启动:

$FLINK_HOME/bin/start-local.sh

$FLINK_HOME/bin/start-cluster.sh

1. Example

(1) 运行示例程序,不传参数:

./bin/flink run ./examples/batch/WordCount.jar

(2) 运行示例程序,带输入和输出文件参数:

./bin/flink run ./examples/batch/WordCount.jar --input file:///home/xiaosi/a.txt --output file:///home/xiaosi/result.txt

(3) 运行示例程序,带输入和输出文件参数,并设置16个并发度:

./bin/flink run -p 16 ./examples/batch/WordCount.jar --input file:///home/xiaosi/a.txt --output file:///home/xiaosi/result.txt

(4) 运行示例程序,并禁止Flink输出日志

./bin/flink run -q ./examples/batch/WordCount.jar

(5) 以独立(detached)模式运行示例程序

./bin/flink run -d ./examples/batch/WordCount.jar

(6) 在指定JobManager上运行示例程序

./bin/flink run -m myJMHost:6123 ./examples/batch/WordCount.jar --input file:///home/xiaosi/a.txt --output file:///home/xiaosi/result.txt

(7) 运行示例程序,指定程序入口类(Main方法所在类):

./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount ./examples/batch/WordCount.jar --input file:///home/xiaosi/a.txt --output file:///home/xiaosi/result.txt

(8) 运行示例程序,使用per-job YARN 集群启动 2 个TaskManager

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar --input hdfs:///xiaosi/a.txt --output hdfs:///xiaosi/result.txt

(9) 以JSON格式输出 WordCount示例程序优化执行计划:

./bin/flink info ./examples/batch/WordCount.jar --input file:///home/xiaosi/a.txt --output file:///home/xiaosi/result.txt

(10) 列出已经调度的和正在运行的Job(包含Job ID信息)

./bin/flink list

(11) 列出已经调度的Job(包含Job ID信息)

./bin/flink list -s

(13) 列出正在运行的Job(包含Job ID信息)

./bin/flink list -r

(14) 列出在Flink YARN中运行Job

./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r

(15) 取消一个Job

./bin/flink cancel <jobID>

(16) 取消一个带有保存点(savepoint)的Job

./bin/flink cancel -s [targetDirectory] <jobID>

(17) 停止一个Job(只适用于流计算Job)

./bin/flink stop <jobID>

备注:

取消和停止Job区别如下:
调用取消Job时,作业中的operator立即收到一个调用cancel()方法的指令以尽快取消它们。如果operator在调用取消操作后没有停止,Flink将定期开启中断线程来取消作业直到作业停止。
调用停止Job是一种停止正在运行的流作业的更加优雅的方法。停止仅适用于使用实现`StoppableFunction`接口的源的那些作业。当用户请求停止作业时,所有源将收到调用stop()方法指令。但是Job还是会持续运行,直到所有来源已经正确关闭。这允许作业完成处理所有正在传输的数据(inflight data)。

2. 保存点

保存点通过命令行客户端进行控制:

2.1 触发保存点

./bin/flink savepoint <jobID> [savepointDirectory]

返回创建的保存点的路径。你需要此路径来还原和处理保存点。

触发保存点时,可以选择是否指定savepointDirectory。如果在此处未指定,则需要为Flink安装配置默认的保存点目录(请参阅保存点)。

2.2 根据保存点取消Job

你可以自动触发保存点并取消一个Job:

./bin/flink cancel -s  [savepointDirectory] <jobID>

如果没有指定保存点目录,则需要为Flink安装配置默认的保存点目录(请参阅保存点)。如果保存点触发成功,该作业将被取消

2.3 恢复保存点

./bin/flink run -s <savepointPath> ...

这个run命令提交Job时带有一个保存点标记,这使得程序可以从保存点中恢复状态。保存点路径是通过保存点触发命令得到的。

默认情况下,我们尝试将所有保存点状态与正在提交的作业相匹配。 如果要允许跳过那些无法使用它恢复新作业的保存点状态(allow to skip savepoint state that cannot be restored with the new job),则可以设置allowNonRestoredState标志。如果当保存点触发时,从你程序中删除了作为程序一部分的operator,但是仍然要使用保存点,则需要允许这一点(You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered and you still want to use the savepoint.)。

./bin/flink run -s <savepointPath> -n ...

如果你的程序删除了作为保存点一部分的operator,这时会非常有用(This is useful if your program dropped an operator that was part of the savepoint.)。

2.4 销毁保存点

./bin/flink savepoint -d <savepointPath>

销毁一个保存点同样需要一个路径。这个保存点路径是通过保存点触发命令得到的。

3. 用法

下面是Flink命令行接口的用法:

xiaosi@yoona:~/qunar/company/opt/flink-1.3.2$ ./bin/flink
./flink <ACTION> [OPTIONS] [ARGUMENTS]

The following actions are available:

Action "run" compiles and runs a program.

  Syntax: run [OPTIONS] <jar-file> <arguments>
  "run" action options:
     -c,--class <classname>                         Class with the program entry
                                                    point ("main" method or
                                                    "getPlan()" method. Only
                                                    needed if the JAR file does
                                                    not specify the class in its
                                                    manifest.
     -C,--classpath <url>                           Adds a URL to each user code
                                                    classloader  on all nodes in
                                                    the cluster. The paths must
                                                    specify a protocol (e.g.
                                                    file://) and be accessible
                                                    on all nodes (e.g. by means
                                                    of a NFS share). You can use
                                                    this option multiple times
                                                    for specifying more than one
                                                    URL. The protocol must be
                                                    supported by the {@link
                                                    java.net.URLClassLoader}.
     -d,--detached                                  If present, runs the job in
                                                    detached mode
     -m,--jobmanager <host:port>                    Address of the JobManager
                                                    (master) to which to
                                                    connect. Use this flag to
                                                    connect to a different
                                                    JobManager than the one
                                                    specified in the
                                                    configuration.
     -n,--allowNonRestoredState                     Allow to skip savepoint
                                                    state that cannot be
                                                    restored. You need to allow
                                                    this if you removed an
                                                    operator from your program
                                                    that was part of the program
                                                    when the savepoint was
                                                    triggered.
     -p,--parallelism <parallelism>                 The parallelism with which
                                                    to run the program. Optional
                                                    flag to override the default
                                                    value specified in the
                                                    configuration.
     -q,--sysoutLogging                             If present, suppress logging
                                                    output to standard out.
     -s,--fromSavepoint <savepointPath>             Path to a savepoint to
                                                    restore the job from (for
                                                    example
                                                    hdfs:///flink/savepoint-1537
                                                    ).
     -z,--zookeeperNamespace <zookeeperNamespace>   Namespace to create the
                                                    Zookeeper sub-paths for high
                                                    availability mode
  Options for yarn-cluster mode:
     -yD <arg>                            Dynamic properties
     -yd,--yarndetached                   Start detached
     -yid,--yarnapplicationId <arg>       Attach to running YARN session
     -yj,--yarnjar <arg>                  Path to Flink jar file
     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container [in
                                          MB]
     -yn,--yarncontainer <arg>            Number of YARN container to allocate
                                          (=Number of Task Managers)
     -ynm,--yarnname <arg>                Set a custom name for the application
                                          on YARN
     -yq,--yarnquery                      Display available YARN resources
                                          (memory, cores)
     -yqu,--yarnqueue <arg>               Specify YARN queue.
     -ys,--yarnslots <arg>                Number of slots per TaskManager
     -yst,--yarnstreaming                 Start Flink in streaming mode
     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)
     -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container [in
                                          MB]
     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                                          sub-paths for high availability mode

  Options for yarn mode:
     -ya,--yarnattached                   Start attached
     -yD <arg>                            Dynamic properties
     -yj,--yarnjar <arg>                  Path to Flink jar file
     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container [in
                                          MB]
     -yqu,--yarnqueue <arg>               Specify YARN queue.
     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)
     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                                          sub-paths for high availability mode



Action "info" shows the optimized execution plan of the program (JSON).

  Syntax: info [OPTIONS] <jar-file> <arguments>
  "info" action options:
     -c,--class <classname>           Class with the program entry point ("main"
                                      method or "getPlan()" method. Only needed
                                      if the JAR file does not specify the class
                                      in its manifest.
     -p,--parallelism <parallelism>   The parallelism with which to run the
                                      program. Optional flag to override the
                                      default value specified in the
                                      configuration.
  Options for yarn-cluster mode:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session

  Options for yarn mode:




Action "list" lists running and scheduled programs.

  Syntax: list [OPTIONS]
  "list" action options:
     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
                                   to connect. Use this flag to connect to a
                                   different JobManager than the one specified
                                   in the configuration.
     -r,--running                  Show only running programs and their JobIDs
     -s,--scheduled                Show only scheduled programs and their JobIDs
  Options for yarn-cluster mode:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session

  Options for yarn mode:




Action "stop" stops a running program (streaming jobs only).

  Syntax: stop [OPTIONS] <Job ID>
  "stop" action options:
     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
                                   to connect. Use this flag to connect to a
                                   different JobManager than the one specified
                                   in the configuration.
  Options for yarn-cluster mode:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session

  Options for yarn mode:




Action "cancel" cancels a running program.

  Syntax: cancel [OPTIONS] <Job ID>
  "cancel" action options:
     -m,--jobmanager <host:port>            Address of the JobManager (master)
                                            to which to connect. Use this flag
                                            to connect to a different JobManager
                                            than the one specified in the
                                            configuration.
     -s,--withSavepoint <targetDirectory>   Trigger savepoint and cancel job.
                                            The target directory is optional. If
                                            no directory is specified, the
                                            configured default directory
                                            (state.savepoints.dir) is used.
  Options for yarn-cluster mode:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session

  Options for yarn mode:




Action "savepoint" triggers savepoints for a running job or disposes existing ones.

  Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
  "savepoint" action options:
     -d,--dispose <arg>            Path of savepoint to dispose.
     -j,--jarfile <jarfile>        Flink program JAR file.
     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
                                   to connect. Use this flag to connect to a
                                   different JobManager than the one specified
                                   in the configuration.
  Options for yarn-cluster mode:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session

  Options for yarn mode:

  Please specify an action.

原文:https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/cli.html

备注:

Flink版本:1.3
由于翻译者水平有限,如果有问题,欢迎指正。在阅读时可以参考原文阅读。
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
消息中间件 Kafka API
Flink接口问题之接口异常如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
机器学习/深度学习 数据采集 消息中间件
|
Java 流计算 容器
Flink类型系统的根及基本接口
之前我们谈到了Flink通过自主管理内存的方式来,避免了让JVM管理内存带来的一些问题。自主管理内存之后,JVM中原生的类型也就不适合使用了。因此Flink也对Java的类型进行了扩展,这就是我们本节关注的内容。
924 0
|
流计算 数据安全/隐私保护 Windows
[Flink]Flink1.3 Stream指南七 理解事件时间与Watermarks
Flink实现了数据流模型(Dataflow Model)中许多技术。如果想对事件时间(event time)和水位线(watermarks)更详细的了解,请参阅下面的文章: The world beyond batch: Streaming 101 The Dataflow Model 支持事件时间的流处理器需要一种方法来衡量事件时间的进度。
1504 0
|
流计算
[Flink]Flink1.3 Stream指南八 图解事件时间与Watermarks
如果你正在构建实时流处理应用程序,那么事件时间处理是你迟早必须使用的功能之一。因为在现实世界的大多数用例中,消息到达都是无序的,应该有一些方法,通过你建立的系统知道消息可能延迟到达,并且有相应的处理方案。
1643 0
|
流计算 Java Scala
[Flink]Flink1.3 Stream指南六 事件时间与处理时间
Flink在数据流中支持几种不同概念的时间。 1. 处理时间 Processing Time Processing Time(处理时间)是指执行相应操作机器的系统时间(Processing time refers to the system time of the machine that is executing the respective operation.)。
1758 0
|
流计算 API Windows
[Flink]Flink1.3 Stream指南五 窗口触发器与驱逐器
1. 窗口触发器 触发器(Trigger)确定窗口(由窗口分配器形成)何时准备好被窗口函数处理。每个窗口分配器都带有默认触发器。
2551 0
|
Java Scala 流计算
[Flink]Flink1.3 Stream指南三 窗口分配器
1.4版本:Flink1.4 窗口概述 Windows(窗口)是处理无限数据流的核心。Windows将流分解成有限大小的"桶",在上面我们可以进行计算。
1936 0
|
Java 流计算 Maven
[Flink]Flink1.3 Batch指南二 集群运行
Flink程序可以分布在许多机器的群集上。有两种方式可以将程序发送到集群上运行: (1) 命令行接口 (2) 远程环境 1. 命令行接口 命令行接口允许你将打包程序(JAR)提交到集群(或单机配置)。
1530 0
|
Java Scala 流计算
[Flink]Flink1.3 Stream指南四 窗口函数
1.4版本:Flink1.4 窗口函数 在定义窗口分配器之后,我们需要在每个窗口上指定我们要执行的计算。这是窗口函数的责任,一旦系统确定窗口准备好处理数据,窗口函数就处理每个窗口中的元素。
1688 0