《Flink官方文档》Python 编程指南测试版(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

为元组定义keys

最简单的情形是对一个数据集中的元组按照一个或多个域进行分组:

reduced = data \
  .group_by(0) \
  .reduce_group(<do something>)

数据集中的元组被按照第一个域分组。对于接下来的group-reduce函数,输入的数据组中,每个元组的第一个域都有相同的值。

grouped = data \
  .group_by(0,1) \
  .reduce(/*do something*/)

在上面的例子中,数据集的分组基于第一个和第二个域形成的复合关键字,因此,reduce函数输入数据组中,每个元组两个域的值均相同。
关于嵌套元组需要注意:如果你有一个使用了嵌套元组的数据集,指定group_by(<index of tuple>)操作,系统将把整个元组作为关键字使用。

向Flink传递函数

一些特定的操作需要采用用户自定义的函数,因此它们都接受lambda表达式和rich functions作为输入参数。

data.filter(lambda x: x > 5)
class Filter(FilterFunction):
    def filter(self, value):
        return value > 5

data.filter(Filter())

Rich functions可以将函数作为输入参数,允许使用broadcast-variables(广播变量),能够由init()函数参数化,是复杂函数的一个可考虑的实现方式。它们也是在reduce操作中,定义一个可选的combine function的唯一方式。
Lambda表达式可以让函数在一行代码上实现,非常便捷。需要注意的是,如果某个操作会返回多个数值,则其使用的lambda表达式应当返回一个迭代器。(所有函数将接收一个collector输入 参数)。

数据类型

Flink的Python API目前仅支持python中的基本数据类型(int,float,bool,string)以及byte arrays。
运行环境对数据类型的支持,包括序列化器serializer,反序列化器deserializer,以及自定义类型的类。

class MyObj(object):
    def __init__(self, i):
        self.value = i


class MySerializer(object):
    def serialize(self, value):
        return struct.pack(">i", value.value)


class MyDeserializer(object):
    def _deserialize(self, read):
        i = struct.unpack(">i", read(4))[0]
        return MyObj(i)


env.register_custom_type(MyObj, MySerializer(), MyDeserializer())

Tuples/Lists

你可以使用元组(或列表)来表示复杂类型。Python中的元组可以转换为Flink中的Tuple类型,它们包含数量固定的不同类型的域(最多25个)。每个域的元组可以是基本数据类型,也可以是其他的元组类型,从而形成嵌套元组类型。

word_counts = env.from_elements(("hello", 1), ("world",2))

counts = word_counts.map(lambda x: x[1])

当进行一些要求指定关键字的操作时,例如对数据记录进行分组或配对。通过设定关键字,可以非常便捷地指定元组中各个域的位置。你可以指定多个位置,从而实现复合关键字(更多信息,查阅Section Data Transformations)。

wordCounts \
    .group_by(0) \
    .reduce(MyReduceFunction())

数据源

数据源创建了初始的数据集,包括来自文件,以及来自数据接口/集合两种方式。

基于文件的:

  • read_text(path) – 按行读取文件,并将每一行以String形式返回。
  • read_csv(path,type) – 解析以逗号(或其他字符)划分数据域的文件。
    返回一个包含若干元组的数据集。支持基本的java数据类型作为字段类型。

基于数据集合的:

  • from_elements(*args) – 基于一系列数据创建一个数据集,包含所有元素。
  • generate_sequence(from, to) – 按照指定的间隔,生成一系列数据。

Examples

env  = get_environment

\# read text file from local files system
localLiens = env.read_text("file:#/path/to/my/textfile")

\# read text file from a HDFS running at nnHost:nnPort
hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")

\# read a CSV file with three fields, schema defined using constants defined in flink.plan.Constants
csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))

\# create a set from some given elements
values = env.from_elements("Foo", "bar", "foobar", "fubar")

\# generate a number sequence
numbers = env.generate_sequence(1, 10000000)

 

数据池

数据池可以接收数据集,并被用来存储或返回它们:

  • write_text() – 按行以String形式写入数据。可通过对每个数据项调用str()函数获取String。
  • write_csv(…) – 将元组写入逗号分隔数值文件。行数和数据字段均可配置。每个字段的值可通过对数据项调用str()方法得到。
  • output() – 在标准输出上打印每个数据项的str()字符串。

一个数据集可以同时作为多个操作的输入数据。程序可以在写入或打印一个数据集的同时,对其进行其他的变换操作。

Examples

标准数据池相关方法示例如下:

write DataSet to a file on the local file system
textData.write_text("file:///my/result/on/localFS")

 write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")

 write DataSet to a file and overwrite the file if it exists
textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)

 tuples as lines with pipe as the separator "a|b|c"
values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")

 this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.write_text("file:///path/to/the/result/file")

 

广播变量

使用广播变量,能够在使用普通输入参数的基础上,使得一个数据集同时被多个并行的操作所使用。这对于实现辅助数据集,或者是基于数据的参数化法非常有用。这样,数据集就可以以集合的形式被访问。

  • 注册广播变量:广播数据集可通过调用with_broadcast_set(DataSet,String)函数,按照名字注册广播变量。
  • 访问广播变量:通过对调用self.context.get_broadcast_variable(String)可获取广播变量。
class MapperBcv(MapFunction):
    def map(self, value):
        factor = self.context.get_broadcast_variable("bcv")[0][0]
        return value * factor

# 1. The DataSet to be broadcasted
toBroadcast = env.from_elements(1, 2, 3)
data = env.from_elements("a", "b")

# 2. Broadcast the DataSet
data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast)

确保在进行广播变量的注册和访问时,应当采用相同的名字(示例中的”bcv”)。
注意:由于广播变量的内容被保存在每个节点的内部存储中,不适合包含过多内容。一些简单的参数,例如标量值,可简单地通过参数化rich function来实现。

并行执行

该章节将描述如何在Flink中配置程序的并行执行。一个Flink程序可以包含多个任务(操作,数据源和数据池)。一个任务可以被划分为多个可并行运行的部分,每个部分处理输入数据的一个子集。并行运行的实例数量被称作它的并行性或并行度degree of parallelism (DOP)。
在Flink中可以为任务指定不同等级的并行度。

运行环境级

Flink程序可在一个运行环境execution environment的上下文中运行。一个运行环境为其中运行的所有操作,数据源和数据池定义了一个默认的并行度。运行环境的并行度可通过对某个操作的并行度进行配置来修改。
一个运行环境的并行度可通过调用set_parallelism()方法来指定。例如,为了将WordCount示例程序中的所有操作,数据源和数据池的并行度设置为3,可以通过如下方式设置运行环境的默认并行度。

env = get_environment()
env.set_parallelism(3)

text.flat_map(lambda x,c: x.lower().split()) \
    .group_by(1) \
    .reduce_group(Adder(), combinable=True) \
    .output()

env.execute()

系统级

通过设置位于./conf/flink-conf.yaml.文件的parallelism.default属性,改变系统级的默认并行度,可设置所有运行环境的默认并行度。具体细节可查阅Configuration文档。

执行方法

为了在Flink中运行计划任务,到Flink目录下,运行/bin文件夹下的pyflink.sh脚本。对于python2.7版本,运行pyflink2.sh;对于python3.4版本,运行pyflink3.sh。包含计划任务的脚本应当作为第一个输入参数,其后可添加一些另外的python包,最后,在“-”之后,输入其他附加参数。

./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
12天前
|
安全 Java 数据处理
Python网络编程基础(Socket编程)多线程/多进程服务器编程
【4月更文挑战第11天】在网络编程中,随着客户端数量的增加,服务器的处理能力成为了一个重要的考量因素。为了处理多个客户端的并发请求,我们通常需要采用多线程或多进程的方式。在本章中,我们将探讨多线程/多进程服务器编程的概念,并通过一个多线程服务器的示例来演示其实现。
|
12天前
|
程序员 开发者 Python
Python网络编程基础(Socket编程) 错误处理和异常处理的最佳实践
【4月更文挑战第11天】在网络编程中,错误处理和异常管理不仅是为了程序的健壮性,也是为了提供清晰的用户反馈以及优雅的故障恢复。在前面的章节中,我们讨论了如何使用`try-except`语句来处理网络错误。现在,我们将深入探讨错误处理和异常处理的最佳实践。
|
16天前
|
缓存 监控 Python
解密Python中的装饰器:优雅而强大的编程利器
Python中的装饰器是一种强大而又优雅的编程工具,它能够在不改变原有代码结构的情况下,为函数或类添加新的功能和行为。本文将深入解析Python装饰器的原理、用法和实际应用,帮助读者更好地理解和利用这一技术,提升代码的可维护性和可扩展性。
|
1月前
|
编译器 测试技术 C++
【Python 基础教程 01 全面介绍】 Python编程基础全攻略:一文掌握Python语法精髓,从C/C++ 角度学习Python的差异
【Python 基础教程 01 全面介绍】 Python编程基础全攻略:一文掌握Python语法精髓,从C/C++ 角度学习Python的差异
164 0
|
4天前
|
安全 数据处理 开发者
《Python 简易速速上手小册》第7章:高级 Python 编程(2024 最新版)
《Python 简易速速上手小册》第7章:高级 Python 编程(2024 最新版)
17 1
|
4天前
|
人工智能 数据挖掘 程序员
《Python 简易速速上手小册》第1章:Python 编程入门(2024 最新版)
《Python 简易速速上手小册》第1章:Python 编程入门(2024 最新版)
34 0
|
5天前
|
API Python
Python模块化编程:面试题深度解析
【4月更文挑战第14天】了解Python模块化编程对于构建大型项目至关重要,它涉及代码组织、复用和维护。本文深入探讨了模块、包、导入机制、命名空间和作用域等基础概念,并列举了面试中常见的模块导入混乱、不适当星号导入等问题,强调了避免循环依赖、合理使用`__init__.py`以及理解模块作用域的重要性。掌握这些知识将有助于在面试中自信应对模块化编程的相关挑战。
18 0
|
6天前
|
Python
Python金融应用编程:衍生品定价和套期保值的随机过程
Python金融应用编程:衍生品定价和套期保值的随机过程
|
7天前
|
Python
python面型对象编程进阶(继承、多态、私有化、异常捕获、类属性和类方法)(上)
python面型对象编程进阶(继承、多态、私有化、异常捕获、类属性和类方法)(上)
44 0
|
7天前
|
机器学习/深度学习 算法 定位技术
python中使用马尔可夫决策过程(MDP)动态编程来解决最短路径强化学习问题
python中使用马尔可夫决策过程(MDP)动态编程来解决最短路径强化学习问题
22 1