Python 学习笔记 - 线程池

  1. 云栖社区>
  2. 博客>
  3. 正文

Python 学习笔记 - 线程池

余二五 2017-11-07 12:13:00 浏览964
展开阅读全文

前面我们学校里如何创建多线程,当我们接到一个新的请求时,会创建一个线程,执行完毕之后又销毁掉这个线程。对于一些数目巨大,但是单个快速执行的任务,每个任务真正执行消耗的时间和线程创建销毁的时间可能都差不多。这样一来,线程的效率浪费的比较严重。因此可以考虑使用线程池的技术,预先创建一些空闲的线程,当他们接收具体任务的时候,就去直接执行了,执行完了也不销毁,接着执行下一个任务。


Python里面,因为暂时还没有功能完备的线程池,因此这部分功能需要自己实现。


实现的基本原理是创建一个队列,把填充的任务数据一个个地塞进去;然后创建一个线程池,线程池里面的线程不断地读取队列里面的任务数据,执行任务,直到所有的任务都完成。


下面是我写的模拟Fabric批量远程操作的部分代码,paramiko模块可以让我远程的进行ssh连接;对于每个连接,我尝试使用了线程池。



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author Yuan Li
"""
 本程序模拟Fabric,远程的批量进行SSH连接,可以执行下载,上传和shell命令执行。
 远程命令的执行,使用了线程池的技术,因为执行的时间比较少,而线程本身执行的时间占的比重比较大;
 对于下载和上传,因为本身就是比较消耗时间的操作,因此每个连接单独使用了线程创建和销毁,因为时间比较久,线程的时间可以忽略了
"""
import threading
import queue
import time
import paramiko
import os
#找到相对路径
parent_path = os.path.abspath(os.pardir)
db_path=os.path.join(parent_path,'db')
#一个管理类,基本思路是把任务和相关的参数填充到队列(任务池)中,然后创建一个进程池,里面的进程循环地读取任务池里面的内容,任何执行其中的内容,直到所有任务全部实现。
class workmanager(object):
    #构造函数
    def __init__(self,cmd,username,password,work_num=1000,thread_num=2,):
        """
        :param cmd:远程命令
        :param username: 用户名
        :param password: 密码
        :param work_num: 任务池(队列大小)
        :param thread_num: 线程池大小
        """
        self.cmd=cmd
        self.work_num=work_num
        self.thread_num=thread_num
        self.queue=queue.Queue()
        self.threads=[]
        self.init_task(work_num,cmd,username,password)
        self.init_threadpool(thread_num)
    #初始化任务池
    def init_task(self,num,inp,username,password):
        for in range(num):
            self.add_job(do_job,i,inp,username,password)
    #添加任务到任务池
    def add_job(self,job,*args):
        #填充任务到任务池,每一个任务是一个元祖(任务,参数列表)
        self.queue.put((job,list(args)))
    #初始化线程池
    def init_threadpool(self,num):
        for in range(num):
            self.threads.append(work(self.queue))
    #等待挂起主线程
    def wait_allcomplete(self):
        for item in self.threads:
            if item.isAlive():
                item.join()
#线程类,每个线程循环地去任务池取任务
class work(threading.Thread):
    def __init__(self,que):
        super(work, self).__init__()
        self.queue=que
        self.start()
    def run(self):
        while True:
            try:
                #当任务池为空的时候,强制报错,退出
                do,args=self.queue.get(block=False)
                # print(do,args)
                do(args[0],args[1],args[2],args[3])
                #确保队列里面的任务都完成了
                self.queue.task_done()
            except:
                break
#初始化的一个主机组,测试用的
hosts=['anoble-ise','bberry-ise','blackbr-ise','jlau-ise','kwood-ise','marwa-ise','smaroo-ise','psekarwin-ise','spare2-ise']
#远程连接SSH并且执行命令
def do_job(args,inp,username,password):
    """
    :param args: hosts列表的索引
    :param inp: 远程命令
    :param username: 用户名
    :param password: 密码
    :return:
    """
    # time.sleep(0.1)
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(hosts[args], 22, username, password)
    # 执行命令测试
    stdin, stdout, stderr = ssh.exec_command(inp)
    for line in stdout.readlines():
        print(line.strip())
    print(("\x1b[5;19;32m  %s \x1b[0m" % hosts[args]).center(40,'*'))
    print("\n")
#选择主机组
username=""
password=""
#入口文件
def display():
    global hosts,username,password
    msg="""
    欢迎使用Fabric模拟程序,您可以执行以下操作
    1.显示主机组
    2.批量执行远程命令
    3.批量上传
    4.批量下载
    5.输入管理员账号
    6.退出
    """
    while True:
        print(msg)
        inpt=input("请输入选项")
        #输出主机组的相关信息
        if inpt=='1':pass
        #远程批量操作
        elif inpt=='2':
            # username=input("用户名")
            # password=input("密码")
            if not username:
                print("请先配置登录账号信息")
            else:
                while True:
                    inp = input("输入指令(q返回上级目录)\n>>>")
                    if inp =='q':break
                    if not inp:
                        print("不能输入空命令")
                    else:
                        start = time.time()
                        #指定命令,用户名,密码,任务池(队列)的大小,和线程的个数)
                        work_manager = workmanager(inp,username,password, len(hosts), 2)
                        work_manager.wait_allcomplete()
                        end = time.time()
                        print("Cost time is %s" % (end - start))
        #创建批量上传的多线程
        elif inpt=='3':
            pass
        #创建批量下载的多线程
        elif inpt=='4':
            pass
        elif inpt=='5':
            username = input("用户名")
            password = input("密码")
        elif inpt=='6':
            exit("退出程序")
        else:
            print("无效输入,请重试")
if __name__ == '__main__':
    display()





本文转自 beanxyz 51CTO博客,原文链接:http://blog.51cto.com/beanxyz/1868230,如需转载请自行联系原作者

网友评论

登录后评论
0/500
评论
余二五
+ 关注