1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
一、线程介绍
处理线程的模块是threading,multiprocessing模块处理方式跟threading相似
 
开启线程的两种方式:
例子:
from threading import Thread
from multiprocessing import Process
 
def work(name):
    print('%s say hello' %name)
 
if __name__ == '__main__':
    = Thread(target=work, args=('hyh',))
    t.start()
    print('主线程')
     
class Work(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name
 
    def run(self):
        print('%s say hello' %self.name)
 
if __name__ == '__main__':
    = Work('hyh')
    t.start()
    print('主线程')
     
二、线程方法
queue方法
例子:
import queue
 
= queue.Queue(3)    #先进先出
q.put(1)
q.put('hyh')
q.put([1,2,3,4])
 
print(q.get())
print(q.get())
print(q.get())
 
= queue.LifoQueue()    #后进先出
q.put(1)
q.put('hyh')
q.put([1,2,3,4])
 
print(q.get())
print(q.get())
print(q.get())
 
= queue.PriorityQueue()    #优先级,数字越小优先级越高
q.put((10'a'))
q.put((9'b'))
q.put((11'c'))
 
print(q.get())
print(q.get())
print(q.get())
 
线程其他方法
例子:
import time
from threading import Thread
import threading
 
def work():
    time.sleep(2)
    print('%s say hello' %(threading.current_thread().getName()))
 
if __name__ == '__main__':
    = Thread(target=work)
    t.setDaemon(True)    #设置成守护线程
    t.start()
    t.join()
    print(threading.enumerate())    #当前活跃的线程对象,是一个列表形式
    print(threading.active_count()) #当前活跃的线程数目
    print('主线程', threading.current_thread().getName())  #线程名字
     
三、python全局解释器锁GIL
python同一进程的线程利用不了多核优势,因为一个线程运行时获取GIL锁,等到运行结束释放GIL,
其它线程才能申请GIL
 
现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上
的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的
 
例子:
计算密集型
from threading import Thread
from multiprocessing import Process
import os
import time
 
def work():
    res = 0
    for in range(1000000):
        res += i
 
if __name__ == '__main__':
    t_l = []
    start_time = time.time()
    for in range(300):
        = Thread(target=work)
        t_l.append(t)
        t.start()
 
    for in t_l:
        i.join()
 
    stop_time = time.time()
    print('run time is %s' %(stop_time - start_time))
    print('主线程')
 
IO密集型    
from threading import Thread
from multiprocessing import Process
import time
import os
 
def work():
    time.sleep(2)
    print(os.getpid())
 
if __name__ == '__main__':
    t_l = []
    start_time = time.time()
    for in range(1000):
        = Thread(target=work)
        t_l.append(t)
        t.start()
 
    for in t_l:
        t.join()
    stop_time = time.time()
    print('run time is %s' %(stop_time - start_time))
 
线程锁Lock
import threading
 
R=threading.Lock()
 
R.acquire()
'''
对公共数据的操作
'''
R.release()
 
 
死锁
例子:
from threading import Thread,Lock
import time
mutexA = Lock()
mutexB = Lock()
 
class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
 
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A锁\033[0m' %self.name)
 
        mutexB.acquire()
        print('\033[42m%s 拿到B锁\033[0m' %self.name)
        mutexB.release()
 
        mutexA.release()
 
    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B锁\033[0m' %self.name)
        time.sleep(2)
 
        mutexA.acquire()
        print('\033[44m%s拿到A锁\033[0m' %self.name)
        mutexA.release()
 
        mutexB.release()
 
if __name__ == '__main__':
    for in range(10):
        = MyThread()
        t.start()
         
输出结果:
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁
卡住。。。
 
 
递归锁RLock
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可
以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子
如果使用RLock代替Lock,则不会发生死锁
 
from threading import Thread,RLock
import time
mutex = RLock()
 
 
class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
 
    def func1(self):
        mutex.acquire()
        print('\033[41m%s 拿到A锁\033[0m' %self.name)
 
        mutex.acquire()
        print('\033[42m%s 拿到B锁\033[0m' %self.name)
        mutex.release()
 
        mutex.release()
 
    def func2(self):
        mutex.acquire()
        print('\033[43m%s 拿到B锁\033[0m' %self.name)
        time.sleep(2)
 
        mutex.acquire()
        print('\033[44m%s拿到A锁\033[0m' %self.name)
        mutex.release()
 
        mutex.release()
 
if __name__ == '__main__':
    for in range(10):
        = MyThread()
        t.start()
         
信号量Semahpore
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1
调用release() 时内置计数器+1
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()
例子:
import threading
import time
 
semaphore = threading.Semaphore(5)
 
def func():
    if semaphore.acquire():
        print(threading.current_thread().getName() + ' get spmaphore')
        time.sleep(2)
        semaphore.release()
 
for in range(20):
    t1 = threading.Thread(target=func)
    t1.start()
     
event对象
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断
某个线程的状态来确定自己下一步的操作,这时线程同步问题就 会变得非常棘手。为了解决这些问题,
我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某
些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 
而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个
Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经
被设置为真的Event对象,那么它将忽略这个事件, 继续执行
event.isSet():返回event的状态值;
 
event.wait():如果 event.isSet()==False将阻塞线程;
 
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
 
event.clear():恢复event的状态值为False
 
例子:
from threading import Thread,Event
import threading
import time,random
 
def conn_mysql():
    print('\033[42m%s 等待链接Mysql...\033[0m' %threading.current_thread().getName())
    event.wait()
    print('\033[42mMysql初始化成功,%s开始连接...\033[0m' %threading.current_thread().getName())
 
def check_mysql():
    print('\033[41m正在检查mysql...\033[0m')
    time.sleep(random.randint(1,3))
    event.set()
    time.sleep(random.randint(1,3))
if __name__ == '__main__':
    event = Event()
    t1 = Thread(target=conn_mysql)
    t2 = Thread(target=conn_mysql)
    t3 = Thread(target=check_mysql)
 
    t1.start()
    t2.start()
    t3.start()
     
wait(time)设置超时时间
from threading import Thread,Event
import threading
import time,random
 
def conn_mysql():
    while not event.is_set():
        print('\033[42m%s 等待连接mysql...\033[0m' %threading.current_thread().getName())
        event.wait(0.1)
    print('\033[42mMysql初始化成功,%s开始连接...\033[0m' %threading.current_thread().getName())
 
def check_mysql():
    print('\033[41m正在检查mysql...\033[0m')
    time.sleep(random.randint(1,3))
    event.set()
    time.sleep(random.randint(1,3))
 
if __name__ == '__main__':
    event=Event()
    t1 = Thread(target=conn_mysql)
    t2 = Thread(target=conn_mysql)
    t3 = Thread(target=check_mysql)
    t1.start()
    t2.start()
    t3.start()
     
Timer定时器,指定n秒后执行操作
例子:
from threading import Timer
 
def hello():
    print("hello, world")
 
= Timer(3, hello)
t.start()
 
四、协程
协程: 单线程下的并发,又称微线程,协程是一种用户态的轻量级线程,即协程是由用户程序自己控制
调度的
要实现协程,关键在于用户程序自己控制程序切换,切换之前必须由用户程序自己保存协程上一次调用
时的状态,如此,每次重新调用时,能够从上次的位置继续执行
 
我们之前已经学习过一种在单线程下可以保存程序运行状态的方法,即yield
 
不使用yield
import time
 
def consumer(item):
    = 1111111111111
    = 222222222222222
    = 3333333333333333
    x1 = 122324234534534
    x2 = 21324354654654
    x3 = 3243565432435
 
def producer(target,seq):
    for item in seq:
        target(item)每次调用函数,会临时产生名称空间,调用结束则释放,循环100000000次,则重复这么多次的创建和释放,开销非常大
start_time = time.time()
producer(consumer,range(100000000))
stop_time = time.time()
print('run time is:%s' %(stop_time - start_time))
打印结果:run time is:14.8908851146698
 
使用yield
import time
 
def init(func):
    def wrapper(*args, **kwargs):
        = func(*args, **kwargs)
        next(g)
        return g
    return wrapper
 
@init
def consumer():
 
    = 1111111111111
    = 222222222222222
    = 3333333333333333
    x1 = 122324234534534
    x2 = 21324354654654
    x3 = 3243565432435
    while True:
        item = yield
 
def producer(target, seq):
    for item in seq:
        target.send(item)
 
start_time = time.time()
producer(consumer(), range(100000000))
stop_time=time.time()
print('run time is:%s' %(stop_time-start_time))
 
greenlet实现线程的切换
例子:
from greenlet import greenlet
 
def test1():
    print('test1,first')
    gr2.switch()
    print('test1,second')
    gr2.switch()
 
def test2():
    print('test2,first')
    gr1.switch()
    print('test2,second')
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
 
switch传参数
import time
from greenlet import greenlet
def eat(name):
    print('%s eat food 1' %name)
    gr2.switch('alex fly fly fly')
    print('%s eat food 2' %name)
    gr2.switch()
 
def play_phone(name):
    print('%s play 1' %name)
    gr1.switch()
    print('%s play 2' %name)
 
gr1 = greenlet(eat)
gr2=greenlet(play_phone)
gr1.switch(name='egon啦啦啦')
 
gevent第三方库
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
  
g1=gevent.spawn()创建一个协程对象g1
io阻塞切换
例子:
import gevent
import time
 
def eat():
    print('eat food 1')
    gevent.sleep(2)
    print('eat food 2')
 
def play_phone():
    print('play phone 1')
    gevent.sleep(1)
    print('play phone 2')
 
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play_phone)
gevent.joinall([g1, g2])
print('主')
 
gevent.sleep(2)模拟的是gevent可以识别的io阻塞
time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码
例子:
from gevent import monkey;monkey.patch_all()
import gevent
import time
 
def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')
 
def play_phone():
    print('play phone 1')
    time.sleep(1)
    print('play phone 2')
 
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play_phone)
gevent.joinall([g1, g2])
print('主')
 
 
gevent实现单线程下的socket并发
例子:
服务端
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent
 
def server(server_ip, port):
    = socket(AF_INET, SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR, 1)
    s.bind((server_ip,port))
    s.listen(5)
 
    while True:
        conn, addr = s.accept()
        gevent.spawn(talk, conn, addr)
 
def talk(conn,addr):
    try:
        while True:
            res = conn.recv(1024)
            print('client %s:%s msg: %s' %(addr[0], addr[1], res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()
 
if __name__ == '__main__':
    server('127.0.0.1'8080)
     
客户端
#!/usr/bin/python
# --*-- coding: utf-8 --*--
 
from socket import *
client=socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1'8080))
 
while True:
    msg = input('>>: ').strip()
    if not msg:continue
 
    client.send(msg.encode('utf-8'))
    msg = client.recv(1024)
    print(msg.decode('utf-8'))