python day Twelve

  1. 云栖社区>
  2. 博客>
  3. 正文

python day Twelve

技术小胖子 2017-11-07 16:35:00 浏览874
展开阅读全文

python day Twelve

一、线程池(补充)

1.上下文管理  

2.终止线程池操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
1.线程和队列
 
import queue
import threading
 
class ThreadPool(object):
    def __init__(self, max_num=20):         #max_num:固定队列参数20
        self.queue = queue.Queue(max_num)
        for in range(max_num):            #往队列里边循环加线程
            self.queue.put(threading.Thread)
    def get_thread(self):                   #获取线程
        return self.queue.get()
    def add_thread(self):                   #增加线程,用于不断增加线程
        self.queue.put(threading.Thread)
 
pool = ThreadPool(10)           #定义线程池10个
def func(arg, p):
    print (arg)
    import time
    time.sleep(2)               #等待2s
    p.add_thread()              #上边执行完一个任务后,线程自动关闭,再手动增加一个线程到队列。
 
for in range(30):             #有30个任务需要执行
    thread = pool.get_thread()   #从队列里获取一个线程
    = thread(target=func, args=(i, pool))
    t.start()                   #启动线程
     
     
2.上下文管理和终止线程操作
 
import queue
import threading
import contextlib
import time
 
StopEvent = object()
class ThreadPool(object):
    def __init__(self, max_num, max_task_num = None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        self.max_num = max_num
        self.cancel = False
        self.terminal = False
        self.generate_list = []
        self.free_list = []
    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        = (func, args, callback,)
        self.q.put(w)
 
    def generate_thread(self):                      #创建一个线程
        = threading.Thread(target=self.call)
        t.start()
 
    def call(self):                         #循环去获取任务函数并执行任务函数
        current_thread = threading.currentThread()
        self.generate_list.append(current_thread)
        event = self.q.get()
        while event != StopEvent:
            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None
            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass
            with self.worker_state(self.free_list, current_thread):
                #上下文管理,内部运行一个装饰器,具体内容看例3
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:
            self.generate_list.remove(current_thread)
 
    def close(self):                        #执行完所有的任务后,所有线程停止
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1
    def terminate(self):                    #无论是否还有任务,终止线程
        self.terminal = True
        while self.generate_list:
            self.q.put(StopEvent)
        self.q.queue.clear()
    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        #用于记录线程中正在等待的线程数
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)
# How to use
pool = ThreadPool(5)
def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass
def action(i):
    print(i)
for in range(30):
    ret = pool.run(action, (i,), callback)
 
time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()            #等待任务结束之后,终止线程;终止线程池操作方法1.
pool.terminate()          #无论是否还有任务,终止线程;终止线程池操作方法2.
 
python 3.0官方上下文管理详见:https://docs.python.org/3/library/contextlib.html


二、redis 发布订阅


三、rabbitMQ


四、mysql


五、python pymysql模块


六、python ORM框架:SQLAchemy


七、python Paramiko模块


八、基于Paramiko模块来看堡垒机如何实现



     本文转自506554897 51CTO博客,原文链接:http://blog.51cto.com/506554897/1846557,如需转载请自行联系原作者





网友评论

登录后评论
0/500
评论
技术小胖子
+ 关注