MultiProcess-MultiThread

  1. 云栖社区>
  2. 博客>
  3. 正文

MultiProcess-MultiThread

余二五 2017-11-15 17:33:00 浏览409
展开阅读全文

听到一些关于python多进程与多线的例子,感觉比较经典,把一些例子分享一下.

内容如下:

    Process、Thread、GIL、Process fork、Multiprocessing、Queue、ProcessPool、Multiprocess-Multithread comparison


(1) Process : 程序的一次执行(程序编写完毕后代码装载入内存,系统分配资源运行)。每个进程有自己的内存空间、数据栈等,只能使用进 程间通讯,而不能直接共享信息 


(2) Thread线程:所有线程运行在同一个进程中,共享相同的运行环境。 每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口; 线程的运行可以被抢占(中断),或暂时被挂起 (睡眠),让其他线程运行(让步);一个进程中的各个线程间共享同一片数据空间 


(3) 全局解释器锁GIL 

    GIL全称全局解释器锁Global Interpreter Lock,GIL并不是Python的特性,它是在实现Python解析器(CPython) 所引入的一个概念。 

    GIL是一把全局排他锁,同一时刻只有一个线程在运行。 毫无疑问全局锁的存在会对多线程的效率有不小影响。甚至就几乎等于Python是个单线程的程序。 

    multiprocessing库的出现很大程度上是为了弥补thread库因为 GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。 


多线程处理的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from threading import Thread
import time
def my_counter():
    = 0
    for in range(100000000):
        =i+1
    return True
def main():
    thread_array = {}
    start_time = time.time()
    for tid in range(2):
        = Thread(target=my_counter())
        t.start()
        t.join()
        #以单线程、阻塞的方式顺序运行两次my_counter函数
    end_time = time.time()
    print("Total time:{}").format(end_time - start_time)
     
if __name__=="__main__":
    main()

执行结果如下:

Total time:12.7875118256


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from threading import Thread
import time
def my_counter():
    = 0
    for in range(100000000):
        = i+1
    return True
def main():
    thread_array = {}
    start_time = time.time()
    for tid in range(2):
        = Thread(target=my_counter())
        t.start()
        thread_array[tid] = t
    for in range(2):
        thread_array[i].join()
        #以多进程、并发的方式运行两次my_counter函数
    end_time = time.time()
    print("Total time:{}").format(end_time - start_time)
     
if __name__=="__main__":
    main()

执行结果如下:

Total time:15.8216409683


上述两个例子的结果发现:单线程运行两次函数的时间   要比   两个线程同时运行函数的时间短(仅限于本例)。


(4) fork操作: 

调用一次,返回两次。因为操作系统自动把当前进程(称为父 进程)复制了一份(称为子进程),然后分别在父进程和子进 程内返回。子进程永远返回0,而父进程返回子进程的ID。子 进程只需要调用getppid()就可以拿到父进程的ID

例:

1
2
3
4
5
6
7
import os
print 'Process (%s) start ...' % os.getpid()
pid=os.fork()
if pid==0:
    print 'I am child process (%s) and my parent is (%s)' % (os.getpid(),os.getppid())
else:
    print 'I (%s) just created a child process (%s).' % (os.getpid(),pid)

执行结果如下:

Process (16480) start ...

I (16480) just created a child process (16481).

I am child process (16481) and my parent is (16480)


(5) multiprocessing是跨平台版本的多进程模块,它提供了一个Process类来代表一个进程对象,下面是示例代码: 

1
2
3
4
5
6
7
8
9
10
11
from multiprocessing import Process
import time
def f(n):
    time.sleep(1)
    print n*n
     
if __name__=="__main__":
    for in range(10):
        p=Process(target=f,args=[i,])
        p.start()
        #使用多进程并发的方式执行函数f

这个程序如果用单进程写则需要执行10秒以上的时间而用多进程则启动10个进程并行执行,只需要用1秒多的时间。多进程时,每个进程各自有各自的GIL。而同一进程中的多线程受到GIL的影响,效率返而会下降。


(6) Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing import Process,Queue
import time
def write(q):
    for in ['A','B','C','D','E']:
        print ('Put %s to queue' % i)
        q.put(i)
        time.sleep(0.5)
                
def read(q):
    while True:
        = q.get(True)
        print('get %s from queue' %v)
         
if __name__ == '__main__':
    = Queue()
    pw = Process(target=write,args=(q,))
    pr = Process(target=read,args=(q,))
    pw.start()
    pr.start()
    pr.join()
    pr.terminate()

输出结果:

Put A to queue

get A from queue

Put B to queue

get B from queue

Put C to queue

get C from queue

Put D to queue

get D from queue

Put E to queue

get E from queue


(7) 进程池pool 用于批量创建子进程,可以灵活控制子进程的数量 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from multiprocessing import Pool
import time
def f(x):
    print x*x
    time.sleep(2)
    return x*x
if __name__ == '__main__':
    pool = Pool(processes=5)
    res_list = []
    for in range(10):        
        res = pool.apply_async(f,[i,])
        ''' 以异步并行的方式启动进程处理函数f,如果要同步等待的方式,可以在每次进程启动之后调用res.get()方法,也可以使用Pool.apply'''
        print('-------:',i)
        res_list.append(res)
    pool.close()
    pool.join()
    for in res_list:
        print 'result',(r.get(timeout=5))

输出结果如下:

('-------:', 0)

('-------:', 1)

('-------:', 2)

('-------:', 3)

('-------:', 4)

('0-------:', 5)

('-------:', 6)

('-------:', 7)

1

('-------:', 8)

('-------:', 9)

4

16

9

25

36

49

64

81

result 0

result 1

result 4

result 9

result 16

result 25

result 36

result 49

result 64

result 81

如果使用同步方式,处理函数时 必须等待 前一个处理的结束,所以如果将该程序换为同步方式,输出结果则是顺序的。


(8) 多进程与多线程的对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing import Process
import threading
import time
lock = threading.Lock()
def run(info_list,n):
    lock.acquire()
    info_list.append(n)
    lock.release()
    print('%s' % info_list)
if __name__ == '__main__':
    info = []
    for in range(10):
        = Process(target=run,args=[info,i])
        p.start()
        p.join()
    time.sleep(1)
    print('-----------------threading--------------')
    for in range(10):
        = threading.Thread(target=run,args=[info,i])
        p.start()
        p.join()

输出结果为:

[0]

[1]

[2]

[3]

[4]

[5]

[6]

[7]

[8]

[9]

-----------------threading--------------

[0]

[0, 1]

[0, 1, 2]

[0, 1, 2, 3]

[0, 1, 2, 3, 4]

[0, 1, 2, 3, 4, 5]

[0, 1, 2, 3, 4, 5, 6]

[0, 1, 2, 3, 4, 5, 6, 7]

[0, 1, 2, 3, 4, 5, 6, 7, 8]

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


多进程间数据不能直接共享,每次处理函数run的结果都不能继承。而多线程间数据可以共享,但受到GIL的影响,run函数中在将数据追加到列表时,使用lock锁,追回完毕再释放lock,这样避免冲突。










本文转自 meteor_hy 51CTO博客,原文链接:http://blog.51cto.com/caiyuanji/1966279,如需转载请自行联系原作者

网友评论

登录后评论
0/500
评论
余二五
+ 关注