Python多进程教程

简介:

Python多进程教程


Python2.6版本中新添了multiprocessing模块。它最初由Jesse Noller和Richard Oudkerk定义在PEP 371中。就像你能通过threading模块衍生线程一样,multiprocessing 模块允许你衍生进程。这里用到的思想:因为你现在能衍生进程,所以你能够避免使用全局解释器锁(GIL),并且充分利用机器的多个处理器。

多进程包也包含一些根本不在threading 模块中的API。比如:有一个灵活的Pool类能让你在多个输入下并行化地执行函数。我们将在后面的小节讲解Pool类。我们将以multiprocessing模块的Process类开始讲解。

开始学习multiprocessing模块

Process这个类和threading模块中的Thread类很像。让我们创建一系列调用相同函数的进程,并且看看它是如何工作的。

 
  1. import os 
  2.  
  3. from multiprocessing import Process 
  4.  
  5. def doubler(number): 
  6.  
  7.     ""
  8.  
  9.     A doubling function that can be used by a process 
  10.  
  11.     ""
  12.  
  13.     result = number * 2 
  14.  
  15.     proc = os.getpid() 
  16.  
  17.     print('{0} doubled to {1} by process id: {2}'.format( 
  18.  
  19.         number, result, proc)) 
  20.  
  21. if __name__ == '__main__'
  22.  
  23.     numbers = [5, 10, 15, 20, 25] 
  24.  
  25.     procs = [] 
  26.  
  27.     for index, number in enumerate(numbers): 
  28.  
  29.         proc = Process(target=doubler, args=(number,)) 
  30.  
  31.         procs.append(proc) 
  32.  
  33.         proc.start() 
  34.  
  35.     for proc in procs: 
  36.  
  37.         proc.join() 

对于上面的例子,我们导入Process类、创建一个叫doubler的函数。在函数中,我们将传入的数字乘上2。我们也用Python的os模块来获取当前进程的ID(pid)。这个ID将告诉我们哪个进程正在调用doubler函数。然后,在下面的代码块中,我们实例化了一系列的Process类并且启动它们。最后一个循环只是调用每个进程的join()方法,该方法告诉Python等待进程直到它结束。如果你需要结束一个进程,你可以调用它的terminate()方法。

当你运行上面的代码,你应该看到和下面类似的输出结果:

 
  1. 5 doubled to 10 by process id: 10468 
  2.  
  3. 10 doubled to 20 by process id: 10469 
  4.  
  5. 15 doubled to 30 by process id: 10470 
  6.  
  7. 20 doubled to 40 by process id: 10471 
  8.  
  9. 25 doubled to 50 by process id: 10472 

有时候,你最好给你的进程取一个易于理解的名字 。幸运的是,Process类确实允许你访问同样的进程。让我们来看看如下例子:

 
  1. import os 
  2.  
  3. from multiprocessing import Process, current_process 
  4.  
  5. def doubler(number): 
  6.  
  7.     ""
  8.  
  9.     A doubling function that can be used by a process 
  10.  
  11.     ""
  12.  
  13.     result = number * 2 
  14.  
  15.     proc_name = current_process().name 
  16.  
  17.     print('{0} doubled to {1} by: {2}'.format( 
  18.  
  19.         number, result, proc_name)) 
  20.  
  21. if __name__ == '__main__'
  22.  
  23.     numbers = [5, 10, 15, 20, 25] 
  24.  
  25.     procs = [] 
  26.  
  27.     proc = Process(target=doubler, args=(5,)) 
  28.  
  29.     for index, number in enumerate(numbers): 
  30.  
  31.         proc = Process(target=doubler, args=(number,)) 
  32.  
  33.         procs.append(proc) 
  34.  
  35.         proc.start() 
  36.  
  37.     proc = Process(target=doubler, name='Test', args=(2,)) 
  38.  
  39.     proc.start() 
  40.  
  41.     procs.append(proc) 
  42.  
  43.     for proc in procs: 
  44.  
  45.         proc.join() 

这一次,我们多导入了current_process。current_process基本上和threading模块的current_thread是类似的东西。我们用它来获取正在调用我们的函数的线程的名字。你将注意到我们没有给前面的5个进程设置名字。然后我们将第6个进程的名字设置为“Test”。

让我们看看我们将得到什么样的输出结果:

 
  1. 5 doubled to 10 by: Process-2 
  2.  
  3. 10 doubled to 20 by: Process-3 
  4.  
  5. 15 doubled to 30 by: Process-4 
  6.  
  7. 20 doubled to 40 by: Process-5 
  8.  
  9. 25 doubled to 50 by: Process-6 
  10.  
  11. 2 doubled to 4 by: Test 

输出结果说明:默认情况下,multiprocessing模块给每个进程分配了一个编号,而该编号被用来组成进程的名字的一部分。当然,如果我们给定了名字的话,并不会有编号被添加到名字中。

multiprocessing模块支持锁,它和threading模块做的方式一样。你需要做的只是导入Lock,获取它,做一些事,释放它。

 
  1. from multiprocessing import Process, Lock 
  2.  
  3. def printer(item, lock): 
  4.  
  5.     ""
  6.  
  7.     Prints out the item that was passed in 
  8.  
  9.     ""
  10.  
  11.     lock.acquire() 
  12.  
  13.     try: 
  14.  
  15.         print(item) 
  16.  
  17.     finally: 
  18.  
  19.         lock.release() 
  20.  
  21. if __name__ == '__main__'
  22.  
  23.     lock = Lock() 
  24.  
  25.     items = ['tango''foxtrot', 10] 
  26.  
  27.     for item in items: 
  28.  
  29.         p = Process(target=printer, args=(item, lock)) 
  30.  
  31.         p.start() 

我们在这里创建了一个简单的用于打印函数,你输入什么,它就输出什么。为了避免线程之间互相阻塞,我们使用Lock对象。代码循环列表中的三个项并为它们各自都创建一个进程。每一个进程都将调用我们的函数,并且每次遍历到的那一项作为参数传入函数。因为我们现在使用了锁,所以队列中下一个进程将一直阻塞,直到之前的进程释放锁。

日志

为进程创建日志与为线程创建日志有一些不同。它们存在不同是因为Python的logging包不使用共享锁的进程,因此有可能以来自不同进程的信息作为结束的标志。让我们试着给前面的例子添加基本的日志。代码如下:

 
  1. import logging 
  2.  
  3. import multiprocessing 
  4.  
  5. from multiprocessing import Process, Lock 
  6.  
  7. def printer(item, lock): 
  8.  
  9.     ""
  10.  
  11.     Prints out the item that was passed in 
  12.  
  13.     ""
  14.  
  15.     lock.acquire() 
  16.  
  17.     try: 
  18.  
  19.         print(item) 
  20.  
  21.     finally: 
  22.  
  23.         lock.release() 
  24.  
  25. if __name__ == '__main__'
  26.  
  27.     lock = Lock() 
  28.  
  29.     items = ['tango''foxtrot', 10] 
  30.  
  31.     multiprocessing.log_to_stderr() 
  32.  
  33.     logger = multiprocessing.get_logger() 
  34.  
  35.     logger.setLevel(logging.INFO) 
  36.  
  37.     for item in items: 
  38.  
  39.         p = Process(target=printer, args=(item, lock)) 
  40.  
  41.         p.start() 

最简单的添加日志的方法通过推送它到stderr实现。我们能通过调用thelog_to_stderr() 函数来实现该方法。然后我们调用get_logger 函数获得一个logger实例,并将它的日志等级设为INFO。之后的代码是相同的。需要提示下这里我并没有调用join()方法。取而代之的:当它退出,父线程将自动调用join()方法。

当你这么做了,你应该得到类似下面的输出:

 
  1. [INFO/Process-1] child process calling self.run() 
  2.  
  3. tango 
  4.  
  5. [INFO/Process-1] process shutting down 
  6.  
  7. [INFO/Process-1] process exiting with exitcode 0 
  8.  
  9. [INFO/Process-2] child process calling self.run() 
  10.  
  11. [INFO/MainProcess] process shutting down 
  12.  
  13. foxtrot 
  14.  
  15. [INFO/Process-2] process shutting down 
  16.  
  17. [INFO/Process-3] child process calling self.run() 
  18.  
  19. [INFO/Process-2] process exiting with exitcode 0 
  20.  
  21. 10 
  22.  
  23. [INFO/MainProcess] calling join() for process Process-3 
  24.  
  25. [INFO/Process-3] process shutting down 
  26.  
  27. [INFO/Process-3] process exiting with exitcode 0 
  28.  
  29. [INFO/MainProcess] calling join() for process Process-2 

现在如果你想要保存日志到硬盘中,那么这件事就显得有些棘手。你能在Python的logging Cookbook阅读一些有关那类话题。

Pool类

Pool类被用来代表一个工作进程池。它有让你将任务转移到工作进程的方法。让我们看下面一个非常简单的例子。

 
  1. from multiprocessing import Pool 
  2.  
  3. def doubler(number): 
  4.  
  5.     return number * 2 
  6.  
  7. if __name__ == '__main__'
  8.  
  9.     numbers = [5, 10, 20] 
  10.  
  11.     pool = Pool(processes=3) 
  12.  
  13.     print(pool.map(doubler, numbers)) 

基本上执行上述代码之后,一个Pool的实例被创建,并且该实例创建了3个工作进程。然后我们使用map 方法将一个函数和一个可迭代对象映射到每个进程。最后我们打印出这个例子的结果:[10, 20, 40]。

你也能通过apply_async方法获得池中进程的运行结果:

 
  1. from multiprocessing import Pool 
  2.  
  3. def doubler(number): 
  4.  
  5.     return number * 2 
  6.  
  7. if __name__ == '__main__'
  8.  
  9.     pool = Pool(processes=3) 
  10.  
  11.     result = pool.apply_async(doubler, (25,)) 
  12.  
  13.     print(result.get(timeout=1)) 

我们上面做的事实际上就是请求进程的运行结果。那就是get函数的用途。它尝试去获取我们的结果。你能够注意到我们设置了timeout,这是为了预防我们调用的函数发生异常的情况。毕竟我们不想要它被无限期地阻塞。

进程通信

当遇到进程间通信的情况,multiprocessing 模块提供了两个主要的方法:Queues 和 Pipes。Queue 实现上既是线程安全的也是进程安全的。让我们看一个相当简单的并且基于 Queue的例子。代码来自于我的文章(threading articles)。

 
  1. from multiprocessing import Process, Queue 
  2.  
  3. sentinel = -1 
  4.  
  5. def creator(data, q): 
  6.  
  7.     ""
  8.  
  9.     Creates data to be consumed and waits for the consumer 
  10.  
  11.     to finish processing 
  12.  
  13.     ""
  14.  
  15.     print('Creating data and putting it on the queue'
  16.  
  17.     for item in data: 
  18.  
  19.         q.put(item) 
  20.  
  21. def my_consumer(q): 
  22.  
  23.     ""
  24.  
  25.     Consumes some data and works on it 
  26.  
  27.     In this caseall it does is double the input 
  28.  
  29.     ""
  30.  
  31.     while True
  32.  
  33.         data = q.get() 
  34.  
  35.         print('data found to be processed: {}'.format(data)) 
  36.  
  37.         processed = data * 2 
  38.  
  39.         print(processed) 
  40.  
  41.         if data is sentinel: 
  42.  
  43.             break 
  44.  
  45. if __name__ == '__main__'
  46.  
  47.     q = Queue() 
  48.  
  49.     data = [5, 10, 13, -1] 
  50.  
  51.     process_one = Process(target=creator, args=(data, q)) 
  52.  
  53.     process_two = Process(target=my_consumer, args=(q,)) 
  54.  
  55.     process_one.start() 
  56.  
  57.     process_two.start() 
  58.  
  59.     q.close() 
  60.  
  61.     q.join_thread() 
  62.  
  63.     process_one.join() 
  64.  
  65.     process_two.join() 

在这里我们只需要导入Queue和Process。Queue用来创建数据和添加数据到队列中,Process用来消耗数据并执行它。通过使用Queue的put()和get()方法,我们就能添加数据到Queue、从Queue获取数据。代码的最后一块只是创建了Queue 对象以及两个Process对象,并且运行它们。你能注意到我们在进程对象上调用join()方法,而不是在Queue本身上调用。

总结

我们这里有大量的资料。你已经学习如何使用multiprocessing模块指定不变的函数、使用Queues在进程间通信、给进程命名等很多事。在Python文档中也有很多本文没有接触到的知识点,因此也务必深入了解下文档。与此同时,你现在知道如何用Python利用你电脑所有的处理能力了!


作者:佚名

来源:51CTO

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
7天前
|
安全 Java 数据处理
Python网络编程基础(Socket编程)多线程/多进程服务器编程
【4月更文挑战第11天】在网络编程中,随着客户端数量的增加,服务器的处理能力成为了一个重要的考量因素。为了处理多个客户端的并发请求,我们通常需要采用多线程或多进程的方式。在本章中,我们将探讨多线程/多进程服务器编程的概念,并通过一个多线程服务器的示例来演示其实现。
|
25天前
|
安全 Python
Python中的并发编程:多线程与多进程技术探究
本文将深入探讨Python中的并发编程技术,重点介绍多线程和多进程两种并发处理方式的原理、应用场景及优缺点,并结合实例分析如何在Python中实现并发编程,以提高程序的性能和效率。
|
27天前
|
JSON C语言 C++
【Python 基础教程 26】Python3标准库全面入门教程:一步步带你深入理解与应用
【Python 基础教程 26】Python3标准库全面入门教程:一步步带你深入理解与应用
60 1
|
27天前
|
存储 安全 API
【Python 基础教程 21】Python3 文件操作全面指南:从入门到精通的综合教程
【Python 基础教程 21】Python3 文件操作全面指南:从入门到精通的综合教程
73 0
|
1天前
|
调度 Python
Python多线程、多进程与协程面试题解析
【4月更文挑战第14天】Python并发编程涉及多线程、多进程和协程。面试中,对这些概念的理解和应用是评估候选人的重要标准。本文介绍了它们的基础知识、常见问题和应对策略。多线程在同一进程中并发执行,多进程通过进程间通信实现并发,协程则使用`asyncio`进行轻量级线程控制。面试常遇到的问题包括并发并行混淆、GIL影响多线程性能、进程间通信不当和协程异步IO理解不清。要掌握并发模型,需明确其适用场景,理解GIL、进程间通信和协程调度机制。
6 0
|
9天前
|
存储 Shell Perl
Perl 教程 之 Perl 进程管理 4
Perl教程介绍了进程管理,包括使用$$或$PROCESS_ID获取PID,%ENV存储环境变量,exit()退出子进程,fork()创建新进程。在父进程返回子进程PID,在子进程返回0。fork与exec配合执行命令。示例展示了父进程如何等待子进程结束。当子进程变为僵死状态时,父进程需使用wait或waitpid终止,或设置$SIG{CHLD}为"IGNORE"。
14 1
|
24天前
|
并行计算 Python
Python中的并发编程:多线程与多进程的比较
在Python编程中,实现并发操作是提升程序性能的重要手段之一。本文将探讨Python中的多线程与多进程两种并发编程方式的优劣及适用场景,帮助读者更好地选择合适的方法来提高程序运行效率。
|
27天前
|
存储 算法 数据挖掘
【Python 基础教程 25】全面入门指南:深度解析Python3的命名空间,作用域及变量使用教程
【Python 基础教程 25】全面入门指南:深度解析Python3的命名空间,作用域及变量使用教程
50 0
|
27天前
|
存储 机器学习/深度学习 数据安全/隐私保护
【Python 基础教程 24】全面入门Python面向对象编程:深度探索与实战教程
【Python 基础教程 24】全面入门Python面向对象编程:深度探索与实战教程
76 0
|
27天前
|
Linux 数据库连接 C++
【Python 基础教程 23】Python3 错误与异常处理全面指南:从入门到精通的实用教程
【Python 基础教程 23】Python3 错误与异常处理全面指南:从入门到精通的实用教程
104 0