《Python分布式计算》第2章 异步编程 (Distributed Computing with Python)

简介: 序言第1章 并行和分布式计算介绍第2章 异步编程第3章 Python的并行计算第4章 Celery分布式应用第5章 云平台部署Python第6章 超级计算机群使用Python第7章 测试和调试分布式应用第8章 继续学习从本章开始,终于开始写代码了!本书中所有的代码都适用于Python 3.5及以上版本。

序言
第1章 并行和分布式计算介绍
第2章 异步编程
第3章 Python的并行计算
第4章 Celery分布式应用
第5章 云平台部署Python
第6章 超级计算机群使用Python
第7章 测试和调试分布式应用
第8章 继续学习


从本章开始,终于开始写代码了!本书中所有的代码都适用于Python 3.5及以上版本。当模块、语句或语法结构不适用于以前的版本时(比如Python 2.7),会在本章中指出。进行一些修改,本书代码也可以运行在Python 2.x版本上。

先回顾下上一章的知识。我们已经学到,改变算法的结构可以让其运行在本地计算机,或运行在集群上。即使是在一台计算机上运行,我们也可以使用多线程或多进程,让子程序运行在多个CPU上。

现在暂时不考虑多CPU,先看一下单线程/进程。与传统的同步编程相比,异步编程或非阻塞编程,可以使性能获得极大提高。

任何包含多任务的程序,它的每个每个任务都在执行一个操作。我们可以把这些任务当做功能或方法,也可以把几个任务合并看做一个功能。例如,将总任务细分、在屏幕打印内容、或从网络抓取信息,等等。

看一下传统程序中的这些任务是如何使用一个CPU的。考虑一个原生的实例,它有四个任务:A、B、C、D。这些任务具体是做什么在这里不重要。我们可以假设这四个任务是关于计算和I/O操作的。安排这四个任务的最直观的方式是序列化。下图展示了这四个任务对CPU的使用:

img_892cfa3e5d09b43282ff0fdcf92b7dd6.png

我们看到,当每个任务都执行I/O操作时,CPU处于空闲状态,等待任务进行计算。这使得CPU大部分时间处于闲置状态。

重点是,从不同组件,例如硬盘、内存和网络,向CPU传递数据的速度相差极大(几个数量级)。

这就会造成任何进行大量I/O操作(访问硬盘、网络通讯等等)的代码都极有可能造成CPU大部分时间闲置,如上图所示。

理想的状态应该是安排一下任务,当一个任务等待I/O时,它处于悬停状态,就让另一个任务接管CPU。这就是异步(也称为事件驱动)编程。

下图生动地展示了用异步编程的方式安排四个任务:


img_9e2fa1738e31bdb07cf3991cfd0b8e73.png

任务仍然是序列的,但是不再各自占用CPU直到任务结束,任务不需要计算时,它们会自发地放弃CPU。尽管CPU仍有闲置,程序的总运行时间明显缩短了。

使用多线程在不同的线程并行运行,也可以达到同样的效果。但是,有一个显著的不同:使用多线程时,是由操作系统决定哪个线程处于运行或悬停。然而,在异步编程中,每个任务可以自己决定是否放弃CPU。

另外,单单使用异步编程,我们不能做出真正的并发:同一时间仅仅有一个任务在运行,消除了竞争条件。当然,我们可以混合使用多线程/多进程和异步编程。

另一点要注意的是,异步编程更善于处理I/O密集型任务,而不是CPU密集型任务(暂停任务不会使性能提高)。

协程

在Python中,让一个功能中途暂停的关键是使用协程。为了理解协程,先要理解生成器generator,要理解生成器,先要理解迭代器iterator。

大部分Python开发者都熟悉对类进行迭代(例如,字符串、列表、元组、文件对象等等):

>>> for i in range(3):
...     print(i)
...
1
2
>>> for line in open('exchange_rates_v1.py'):
...     print(line, end='')
... 
#!/usr/bin/env python3
import itertools
import time
import urllib.request
…

我们可以将各种对象(不仅仅是列表和字符串)进行迭代的原因是迭代协议。迭代协议定义了迭代的标准格式:一个执行__iter____next__(或Python 2.x中的 __iter__next)的对象就是一个迭代器,可以进行迭代操作,如下所示:

class MyIterator(object):
    def __init__(self, xs):
        self.xs = xs

    def __iter__(self):
        return self

    def __next__(self):
        if self.xs:
            return self.xs.pop(0)
        else:
            raise StopIteration

for i in MyIterator([0, 1, 2]):
    print(i)

结果如下所示:

1
2
3

我们能对MyIterator中的实例进行循环的原因是,它用__iter____next__方法,运行了迭代协议:前者返回了迭代的对象,后者逐个返回了序列中的元素。

为了进一步理解协议是如何工作的,我们手动分解这个循环,如下所示:

itrtr = MyIterator([3, 4, 5, 6])

print(next(itrtr))
print(next(itrtr))
print(next(itrtr))
print(next(itrtr))

print(next(itrtr))

运行结果如下:

3
4
5
6
Traceback (most recent call last):
  File "iteration.py", line 32, in <module>
    print(next(itrtr))
  File "iteration.py", line 19, in __next__
  raise StopIteration
StopIteration

我们实例化了MyIterator,然后为了获取它的值,我们多次调用了next()。当序列到头时,next()会抛出异常StopIteration。Python中的for循环使用了同样的机制,它调用迭代器的next(),通过获取异常StopIteration得知何时停止。

生成器就是一个callable,它生成一个结果序列,而不是返回结果。这是通过产生(通过yield关键字)值而不是返回值,见下面的例子(generators.py):

def mygenerator(n):
    while n:
        n -= 1
        yield n

if __name__ == '__main__':
    for i in mygenerator(3):
        print(i)

结果如下:

2
1
0

这是一个使用yield使mygenerator成为生成器的简单例子,它的功能并不简单。调用generator函数并不开始生成序列,只是产生一个generator对象,见如下shell语句:

>>> from generators import mygenerator
>>> mygenerator(5)
<generator object mygenerator at 0x101267b48>

为了激活generator对象,需要调用next(),见如下代码:

>>> g = mygenerator(2)
>>> next(g)
1
>>> next(g)
0
>>> next(g)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

每个next()从生成的序列产生一个值,直到序列为空,也就是获得异常StopIteration时。迭代器的行为也是类似的。本质上,生成器是简化的迭代器,免去了定义类中__iter____next__的方法。

另外,生成器是一次性操作,不能重复生成的序列。若要重复序列,必须再次调用generator函数。

用来在generator函数中产生序列值的yield表达式,还可以在等号右边使用,以消除值。这样就可以得到协程。协程就是一类函数,它可以通过yield,在指定位置暂停或继续任务。

需要注意,尽管协程是强化的生成器,在概念意义上并不等于生成器。原因是,协程与迭代无关。另一不同点,生成器产生值,而协程消除值。

让我们做一些协程,看看如何使用。协程有三种主要的结构,如下所示:

  • yield(): 用来暂停协程的执行
  • send(): 用来向协程传递数据(以让协程继续执行)
  • close():用来关闭协程

下面代码展示了协程的使用(coroutines.py):

def complain_about(substring):
    print('Please talk to me!')
    try:
        while True:
            text = (yield)
            if substring in text:
                print('Oh no: I found a %s again!'
                      % (substring))
    except GeneratorExit:
        print('Ok, ok: I am quitting.')

我们先定义个一个协程,它就是一个函数,名字是complain_about,它有一个参数:一个字符串。打印一句话之后,进入一个无限循环,由try except控制退出,即只有通过异常才能退出。利用异常GeneratorExit,当获得这个异常时就会退出。

循环的主体十分简单,使用yield来获取数据,存储在变量text中。然后,我们检测substring是否在text中。如果在的话,弹出一条新语句。

下面代码展示了在shell中如何使用这个协程:

>>> from coroutines import complain_about
>>> c = complain_about('Ruby')
>>> next(c)
Please talk to me!
>>> c.send('Test data')
>>> c.send('Some more random text')
>>> c.send('Test data with Ruby somewhere in it')
Oh no: I found a Ruby again!
>>> c.send('Stop complaining about Ruby or else!')
Oh no: I found a Ruby again!
>>> c.close()
Ok, ok: I am quitting.

执行complain_about('Ruby')产生了协程。为了使用新建的协程,我们用next()调用它,与在生成器中所做的相同。只有调用next()之后,才在屏幕上看到Please talk to me!

这时,协程到达了text = (yield),意味着它暂停了执行。控制点返回了shell,我们就可以向协程发送数据了。我们使用send()方法发送数据,如下所示:

>>> c.send('Test data')
>>> c.send('Some more random text')
>>> c.send('Test data with Ruby somewhere in it')
Oh no: I found a Ruby again!

每次调用send()方法都使代码到达下一个yield。在我们的例子中,到达while循环的下一次迭代,返回text = (yield)。这里,控制点返回shell。

我们可以调用close()方法停止协程,它可以在协程内部抛出异常GeneratorExit。此时,协程唯一能做的就是清理数据并退出。下面的代码展示了如何结束协程:

>>> c.close()
Ok, ok: I am quitting.

如果将try except部分注释掉,就不会获得GeneratorExit异常。但是协程还是会停止,如下所示:

>>> def complain_about2(substring):
...     print('Please talk to me!')
...     while True:
...         text = (yield)
...         if substring in text:
...             print('Oh no: I found a %s again!'
...                   % (substring))
... 
>>> c = complain_about2('Ruby')
>>> next(c)
Please talk to me!
>>> c.close()
>>> c.send('This will crash')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration
>>> next(c)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

我们看到,一旦关闭协程,对象仍会保持,但是用途为零:不能向它发送数据,也不能调用next()使用它。

当使用协程时,许多人觉得必须要用next()很繁琐,转而使用装饰器,避免多余的调用,如下所示:

>>> def coroutine(fn):
...     def wrapper(*args, **kwargs):
...         c = fn(*args, **kwargs)
...         next(c)
...         return c
...     return wrapper
... 
>>> @coroutine
... def complain_about2(substring):
...     print('Please talk to me!')
...     while True:
...         text = (yield)
...         if substring in text:
...             print('Oh no: I found a %s again!'
...                   % (substring))
... 
>>> c = complain_about2('JavaScript')
Please talk to me!
>>> c.send('Test data with JavaScript somewhere in it')
Oh no: I found a JavaScript again!
>>> c.close()

协程的层级结构可以很复杂,可以让一个协程向其它多个协程发送数据,或从多个源接收数据。这在网络集群编程和系统编程中很有用(为了提高性能),可以用纯Python高效代替大多数Unix工具。

一个异步实例

为了简单又有趣,让我们写一个工具,可以对指定的文件,统计某个词的出现次数。使用之前的协程做基础,再添加一些功能。

在Linux和Mac OS X上,可以使用grep命令获得同样的结果。我们先下载一个大的文本文件,用作输入的数据。我们选择的是Project Gutenberg上列夫托尔斯泰所写的《战争与和平》,它的地址是http://www.gutenberg.org/cache/epub/2600/pg2600.txt

下面代码展示了如何下载(译者注:win上使用Git Bash):

$ curl -sO http://www.gutenberg.org/cache/epub/2600/pg2600.txt
$ wc pg2600.txt
   65007  566320 3291648 pg2600.txt

接下来,我们统计love一词出现的次数,忽略大小写,如下所示(译者注:会有编码问题):

$ time (grep -io love pg2600.txt | wc -l)
677
(grep -io love pg2600.txt) 0.11s user 0.00s system 98% cpu 0.116 total

现在使用Python的协程来做(grep.py):

def coroutine(fn):
    def wrapper(*args, **kwargs):
        c = fn(*args, **kwargs)
        next(c)
        return c
    return wrapper


def cat(f, case_insensitive, child):
    if case_insensitive:
        line_processor = lambda l: l.lower()
    else:
        line_processor = lambda l: l

    for line in f:
        child.send(line_processor(line))


@coroutine
def grep(substring, case_insensitive, child):
    if case_insensitive:
        substring = substring.lower()
    while True:
        text = (yield)
        child.send(text.count(substring))


@coroutine
def count(substring):
    n = 0
    try:
        while True:
            n += (yield)
    except GeneratorExit:
        print(substring, n)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument('-i', action='store_true',
                        dest='case_insensitive')
    parser.add_argument('pattern', type=str)
    parser.add_argument('infile', type=argparse.FileType('r'))

    args = parser.parse_args()

    cat(args.infile, args.case_insensitive,
        grep(args.pattern, args.case_insensitive,
             count(args.pattern)))

分析代码之前,我们先运行一下,和grep进行比较:

$ time python3.5 grep.py -i love pg2600.txt
love 677
python3.5 grep.py -i love pg2600.txt  0.09s user 0.01s system 97% cpu 0.097 total

可以看到,使用协程的纯Python版本与使用grepwc命令的Unix相比,十分具有竞争力。当然,Unix的grep命令远比Python版本强大。不能简单宣称Python比C语言快!但是,Python的结果也是让人满意的。

来分析下代码。首先,再次执行coroutine的装饰器。之后,将总任务分解成三块:

  • 逐行读取文件(通过cat函数)
  • 统计每行中substring的出现次数(grep协程)
  • 求和并打印数据(count协程)

在脚本文件的主体部分,我们解析命令行选项,将cat结果传给grep,将grep结果传给count,就像操作普通的Unix工具。

实现这个链条极其简单。我们将接收数据的协程当做参数(前面例子的child),传递给产生数据的函数或协程。然后,在数据源中,调用协程的send方法。

第一个函数cat,作为整个函数的数据源,它逐行读取文件,将每行发送给grep
child.send(line))。如果匹配是大小写不敏感的,不需要进行转换;如果大小写敏感,则都转化为小写。

grep命令是我们的第一个协程。这里,进入一个无限循环,持续获取数据(text = (yield)),统计substringtext中的出现次数,,将次数发送给写一个协程(即count):child.send(text.count(substring)))

count协程用总次数n,从grep获取数据,对总次数进行求和,n += (yield)。它捕获发送给各个协程关闭时的GeneratorExit异常(在我们的例子中,到达文件最后就会出现异常),以判断何时打印这个substringn

当把协程组织为更复杂的结构时,会更有趣。比如,我们可以统计多个单词出现的次数。

下面的代码展示了一种这样做的方法,通过一个额外的协程负责广播,将输入数据发送给任意数目的子协程(mgrep.py):

def coroutine(fn):
    def wrapper(*args, **kwargs):
        c = fn(*args, **kwargs)
        next(c)
        return c
    return wrapper

def cat(f, case_insensitive, child):
    if case_insensitive:
        line_processor = lambda l: l.lower()
    else:
        line_processor = lambda l: l

    for line in f:
        child.send(line_processor(line))

@coroutine
def grep(substring, case_insensitive, child):
    if case_insensitive:
        substring = substring.lower()
    while True:
        text = (yield)
        child.send(text.count(substring))

@coroutine
def count(substring):
    n = 0
    try:
        while True:
            n += (yield)
    except GeneratorExit:
        print(substring, n)

@coroutine
def fanout(children):
    while True:
        data = (yield)
        for child in children:
            child.send(data)

if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument('-i', action='store_true', dest='case_insensitive')
    parser.add_argument('patterns', type=str, nargs='+',)
    parser.add_argument('infile', type=argparse.FileType('r'))

    args = parser.parse_args()

    cat(args.infile, args.case_insensitive,
        fanout([grep(p, args.case_insensitive,
                     count(p)) for p in args.patterns]))

代码看上去和之前的例子差不多。让我们分析一下差别。我们定义了一个广播器:fanoutfanout()协程使用一列协程作为输入,自身位于一个无限循环中。当收到数据后(data = (yield)),便将数据分发给注册的协程(for child in children: child.send(data))。

不用修改catgrepcount的代码,我们就可以利用原有的代码来搜索任意个数的字符串了!

它的性能依旧很好,如下所示:

$ time python3.5 mgrep.py -i love hate hope pg2600.txt
hate 103
love 677
hope 158
python3.5 mgrep.py -i love hate hope pg2600.txt  0.16s user 0.01s system 98% cpu 0.166 total

总结

Python从1.5.2版本之后引入了asyncoreasynchat模块,开始支持异步编程。2.5版本引入了yield,可以向协程传递数据,简化了代码、加强了性能。Python 3.4 引入了一个新的库进行异步I/O,称作asyncio

Python 3.5通过async defawait,引入了真正的协程类型。感兴趣的读者可以继续研究Python的新扩展。一句警告:异步编程是一个强大的工具,可以极大地提高I/O密集型代码的性能。但是异步编程也是存在问题的,而且还相当复杂。

任何异步代码都要精心选择非阻塞的库,以防使用阻塞代码。并且要运行一个协程规划期(因为OS不能像规划线程一样规划协程),包括写一个事件循环和其它事务。读异步代码会有一定困难,即使我们的最简单的例子也很难一眼看懂。所以,一定要小心!


序言
第1章 并行和分布式计算介绍
第2章 异步编程
第3章 Python的并行计算
第4章 Celery分布式应用
第5章 云平台部署Python
第6章 超级计算机群使用Python
第7章 测试和调试分布式应用
第8章 继续学习


目录
相关文章
|
1月前
|
安全 Java 关系型数据库
深入探究Python的多线程与异步编程:实战与最佳实践
【2月更文挑战第1天】 深入探究Python的多线程与异步编程:实战与最佳实践
133 0
|
1月前
|
Java 调度 Python
深入解析 Python asyncio 库:如何使用线程池实现高效异步编程
深入解析 Python asyncio 库:如何使用线程池实现高效异步编程
55 0
|
11天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
1月前
|
并行计算 调度 开发者
深入浅出Python协程:提升你的异步编程效率
在当今快速发展的软件开发领域,异步编程已成为提高程序性能和用户体验的关键技术。Python,作为一门广泛使用的高级编程语言,其协程(Coroutine)功能为开发者提供了强大的异步编程工具。本文将从协程的基本概念入手,通过实例深入浅出地讲解如何在Python中有效利用协程来提升异步编程的效率和可读性。我们将探讨协程的工作原理、与传统多线程/多进程相比的优势,以及如何在实际项目中应用协程来解决复杂的并发问题。通过本文的学习,读者将能够掌握Python协程的核心知识,为构建高效、可维护的异步应用奠定坚实基础。
|
3天前
|
开发者 Python
探索Python中的异步编程:从回调到async/await
随着计算机系统的不断发展和多核处理器的普及,异步编程在Python中变得越来越重要。本文将深入探讨Python中异步编程的发展历程,从最初的回调函数到如今的async/await关键字,帮助读者更好地理解和应用异步编程技术。
|
7天前
|
API 调度 开发者
深入理解Python异步编程:从Asyncio到实战应用
在现代软件开发中,异步编程技术已成为提升应用性能和响应速度的关键策略。本文将通过实例讲解Python中的异步编程核心库Asyncio的基本概念、关键功能以及其在Web开发中的应用。我们不仅将理论与实践结合,还将展示如何通过实际代码示例解决常见的并发问题,帮助开发者更有效地利用Python进行异步编程。
|
9天前
|
Python
Python 的异步编程: 解释什么是事件循环(Event Loop)?
事件循环是Python异步编程的关键,它无限循环等待并处理异步任务,协调I/O操作执行顺序,实现并发,提高性能。
10 0
|
27天前
|
API 数据处理 调度
Python中的异步编程与协程应用
传统的Python编程在处理IO密集型任务时常常面临效率低下的问题,而异步编程和协程技术的引入为解决这一问题提供了有效的途径。本文将介绍Python中异步编程的基本概念,深入探讨asyncio库的使用以及协程在实际项目中的应用,旨在帮助开发者更好地理解和运用异步编程技术。
|
1月前
|
物联网 调度 开发者
构建高效Python Web应用:异步编程与Tornado框架解析
【2月更文挑战第27天】 在处理高并发的Web应用场景时,传统的同步阻塞模型往往难以满足性能需求。本文将深入探讨Python世界中的异步编程概念,并结合Tornado这一轻量级、非阻塞式Web服务器及框架,展示如何构建高性能的Web应用。通过实例驱动的方法论,我们将剖析Tornado的核心组件,包括其IOLoop、异步HTTP客户端和服务器端处理机制,以及与协程集成的细节。文章旨在为开发者提供一套实践指南,帮助他们利用Python实现快速响应和资源高效的Web服务。
29 2
|
1月前
|
调度 开发者 Python
Python的asyncio库:掌握异步编程的利器
Python的asyncio库:掌握异步编程的利器
24 0