浅尝辄止 Parallel Python

本文涉及的产品
简介:

  最近在关注如何提升Python执行效率的问题,自己没有时间去深入研究,就直接选择了开源的Parallel Python,希望能够充分发挥多核CPU及集群环境的优势。
    Parallel Python是Python进行分布式计算的开源模块,能够将计算压力分布到多核CPU或集群的多台计算机上,能够非常方便的在内网中搭建一个自组织的分布式计算平台。先从多核计算开始,普通的Python应用程序只能够使用一个CPU进程,而通过Parallel Python能够很方便的将计算扩展到多个CPU进程中,使用官方网站上的一个例子。

复制代码
import  math, time

def  isprime(n):
    
""" Returns True if n is prime and False otherwise """
    
if   not  isinstance(n, int):
        
raise  TypeError( " argument passed to is_prime is not of 'int' type " )
    
if  n  <   2 :
        
return  False
    
if  n  ==   2 :
        
return  True
    max 
=  int(math.ceil(math.sqrt(n)))
    i 
=   2
    
while  i  <=  max:
        
if  n  %  i  ==  0:
            
return  False
        i 
+=   1
    
return  True

def  sum_primes(n):
    
""" Calculates sum of all primes below given integer n """
    
return  sum([x  for  x  in  xrange( 2 ,n)  if  isprime(x)])

start_time 
=  time.time()

inputs 
=  ( 100000 100100 100200 100300 100400 100500 100600 100700 )

jobs 
=  [(input, sum_primes(input))  for  input  in  inputs]

for  input, job  in  jobs:
    
print   " Sum of primes below " , input,  " is " , job

print   " Time elapsed:  " , time.time()  -  start_time,  " s "
复制代码
 
    计算指定数值范围内所有素数的和,运行程序消耗时间为4.46900010109 s,程序运行结果和CPU使用率如下所示:
Sum of primes below 100000 is 454396537
Sum of primes below 100100 is 454996777
Sum of primes below 100200 is 455898156
Sum of primes below 100300 is 456700218
Sum of primes below 100400 is 457603451
Sum of primes below 100500 is 458407033
Sum of primes below 100600 is 459412387
Sum of primes below 100700 is 460217613
Time elapsed:  4.46900010109 s
 
    将程序稍作调整,引入pp模块。
复制代码
# !
#
 File: sum_primes.py
#
 Author: VItalii Vanovschi
#
 Desc: This program demonstrates parallel computations with pp module
#
 It calculates the sum of prime numbers below a given integer in parallel
#
 Parallel Python Software: http://www.parallelpython.com/
import  math, sys, time
import  pp
def  isprime(n):
    
""" Returns True if n is prime and False otherwise """
    
if   not  isinstance(n, int):
        
raise  TypeError( " argument passed to is_prime is not of 'int' type " )
    
if  n  <   2 :
        
return  False
    
if  n  ==   2 :
        
return  True
    max 
=  int(math.ceil(math.sqrt(n)))
    i 
=   2
    
while  i  <=  max:
        
if  n  %  i  ==  0:
            
return  False
        i 
+=   1
    
return  True
def  sum_primes(n):
    
""" Calculates sum of all primes below given integer n """
    
return  sum([x  for  x  in  xrange( 2 ,n)  if  isprime(x)])
print   """ Usage: python sum_primes.py [ncpus]
    [ncpus] - the number of workers to run in parallel, 
    if omitted it will be set to the number of processors in the system
"""
#  tuple of all parallel python servers to connect with
ppservers  =  ()
# ppservers = ("10.0.0.1",)
if  len(sys.argv)  >   1 :
    ncpus 
=  int(sys.argv[ 1 ])
    
#  Creates jobserver with ncpus workers
    job_server  =  pp.Server(ncpus, ppservers = ppservers)
else :
    
#  Creates jobserver with automatically detected number of workers
    job_server  =  pp.Server(ppservers = ppservers)
print   " Starting pp with " , job_server.get_ncpus(),  " workers "
#  Submit a job of calulating sum_primes(100) for execution. 
#
 sum_primes - the function
#
 (100,) - tuple with arguments for sum_primes
#
 (isprime,) - tuple with functions on which function sum_primes depends
#
 ("math",) - tuple with module names which must be imported before sum_primes execution
#
 Execution starts as soon as one of the workers will become available
job1  =  job_server.submit(sum_primes, ( 100 ,), (isprime,), ( " math " ,))
#  Retrieves the result calculated by job1
#
 The value of job1() is the same as sum_primes(100)
#
 If the job has not been finished yet, execution will wait here until result is available
result  =  job1()
print   " Sum of primes below 100 is " , result
start_time 
=  time.time()
#  The following submits 8 jobs and then retrieves the results
inputs  =  ( 100000 100100 100200 100300 100400 100500 100600 100700 )
jobs 
=  [(input, job_server.submit(sum_primes,(input,), (isprime,), ( " math " ,)))  for  input  in  inputs]
for  input, job  in  jobs:
    
print   " Sum of primes below " , input,  " is " , job()
print   " Time elapsed:  " , time.time()  -  start_time,  " s "
job_server.print_stats()
复制代码
 
    再次执行,任务管理器中两个CPU进程齐头并进,咱们不仅仅需要所有CPU努力干活,而且还需得到非常好的效果,通过简单的"time.time() - start_time"发现时间比之前缩短了近100%为2.26600003242,job_server.print_stats()能够得到更加详细的分析结果:
Starting pp with 2 workers
Sum of primes below 100 is 1060
Sum of primes below 100000 is 454396537
Sum of primes below 100100 is 454996777
Sum of primes below 100200 is 455898156
Sum of primes below 100300 is 456700218
Sum of primes below 100400 is 457603451
Sum of primes below 100500 is 458407033
Sum of primes below 100600 is 459412387
Sum of primes below 100700 is 460217613
Time elapsed:  2.26600003242 s
Job execution statistics:
 job count | % of all jobs | job time sum | time per job | job server
         9 |        100.00 |       3.9700 |     0.441111 | local

    看到这样的测试结果,还来不及尝试多计算机的集群计算,就开始好奇该模块在ArcGIS Python空间计算中的应用效果。动手之前先想想问题吧,ArcGIS Python实际上是对粗粒度AO对象的调用,真正的计算压力实际上是在COM里面,而Parallel Python针对的是原生Python脚本中产生的计算量,所以应该不会有明显的性能提升,实践出真理,怎么都得写个Sample测试一番:将指定文件夹内所有personal geodatabase的空间数据拷贝到相应的file geodatabase中,以全国400万数据为例。
复制代码
#  Import native arcgisscripting module
#
import  arcgisscripting
import  os
import  time
#  import sys
#
 Create the geoprocessor object
#
gp  =  arcgisscripting.create( 9.3 )
#  Allow for the overwriting of file geodatabases, if they previously exist
#
gp.OverWriteOutput  =   1
#  Set workspace to folder containing personal geodatabases
#
#
 gp.Workspace = sys.argv[1]
gp.workspace  =   " D:\\Dev\\Python\\pp\\arcgis_test "
#  Identify personal geodatabases
#
pgdbs  =  gp.ListWorkspaces( "" " Access " )
start_time 
=  time.time()
for  pgdb  in  pgdbs:
    
#  Set workspace to current personal geodatabase
     #
    gp.workspace  =  pgdb
    
#  Create file geodatabase based on personal geodatabase
     #
    fgdb  =  pgdb[: - 4 +   " .gdb "
    gp.CreateFileGDB(os.path.dirname(fgdb), os.path.basename(fgdb))
    
#  Identify feature classes and copy to file gdb
     #
    fcs  =  gp.ListFeatureClasses()
    
for  fc  in  fcs:
        
print   " Copying feature class  "   +  fc  +   "  to  "   +  fgdb
        gp.Copy(fc, fgdb 
+  os.sep  +  fc)
    
#  Identify tables and copy to file gdb
     #
    tables  =  gp.ListTables()
    
for  table  in  tables:
        
print   " Copying table  "   +  table  +   "  to  "   +  fgdb
        gp.Copy(table, fgdb 
+  os.sep  +  table)
    
#  Identify datasets and copy to file gdb
     #    Copy will include contents of datasets
     #
    datasets  =  gp.ListDatasets()
    
for  dataset  in  datasets:
        
print   " Copying dataset  "   +  dataset  +   "  to  "   +  fgdb
        gp.Copy(dataset, fgdb 
+  os.sep  +  dataset)
    
print   " Time elapsed:  " , time.time()  -  start_time,  " s "
复制代码

计算完成后消耗时间为73.453999962s,引入pp模块。
复制代码
import  arcgisscripting
import  pp
import  os
import  time
import  sys

def  func(ws  =   " D:\\Dev\\Python\\pp\\arcgis_test " ):
    
#  Create the geoprocessor object
     #
    gp  =  arcgisscripting.create( 9.3 )
    
    
#  Allow for the overwriting of file geodatabases, if they previously exist
     #
    gp.OverWriteOutput  =   1
    
    
#  Set workspace to folder containing personal geodatabases
     #
     #  gp.Workspace = sys.argv[1]
    gp.workspace  =  ws
    
    
#  Identify personal geodatabases
     #
    pgdbs  =  gp.ListWorkspaces( "" " Access " )
    
    
for  pgdb  in  pgdbs:
        
#  Set workspace to current personal geodatabase
         #
        gp.workspace  =  pgdb
    
        
#  Create file geodatabase based on personal geodatabase
         #
        fgdb  =  pgdb[: - 4 +   " .gdb "
        gp.CreateFileGDB(os.path.dirname(fgdb), os.path.basename(fgdb))
    
        
#  Identify feature classes and copy to file gdb
         #
        fcs  =  gp.ListFeatureClasses()
    
        
for  fc  in  fcs:
            
print   " Copying feature class  "   +  fc  +   "  to  "   +  fgdb
            gp.Copy(fc, fgdb 
+  os.sep  +  fc)
    
        
#  Identify tables and copy to file gdb
         #
        tables  =  gp.ListTables()
    
        
for  table  in  tables:
            
print   " Copying table  "   +  table  +   "  to  "   +  fgdb
            gp.Copy(table, fgdb 
+  os.sep  +  table)
    
        
#  Identify datasets and copy to file gdb
         #    Copy will include contents of datasets
         #
        datasets  =  gp.ListDatasets()
    
        
for  dataset  in  datasets:
            
print   " Copying dataset  "   +  dataset  +   "  to  "   +  fgdb
            gp.Copy(dataset, fgdb 
+  os.sep  +  dataset)
            
    
#  tuple of all parallel python servers to connect with
ppservers  =  ()
# ppservers = ("10.0.0.1",)

if  len(sys.argv)  >   1 :
    ncpus 
=  int(sys.argv[ 1 ])
    
#  Creates jobserver with ncpus workers
    job_server  =  pp.Server(ncpus, ppservers = ppservers)
else :
    
#  Creates jobserver with automatically detected number of workers
    job_server  =  pp.Server(ppservers = ppservers)

print   " Starting pp with " , job_server.get_ncpus(),  " workers "

start_time 
=  time.time()

#  job1 = job_server.submit(sum_primes, (100,), (isprime,), ("math",))
job_server.submit(func(), ( " D:\\Dev\\Python\\pp\\arcgis_test " , ), (), ( " arcgisscripting " " os " ))

print   " Time elapsed:  " , time.time()  -  start_time,  " s "
job_server.print_stats()

复制代码
 
    时间几乎相同,没有太大变化,这也证明了最初的分析,看来提升ArcGIS Python应用效率还得从根本入手,当然也有可能是对Parallel Python和Python多线程处理了解不够深入,关于Parallel Python的问题,网上有一些说法:
    1.如果数据交换存在瓶颈,大数据量的应用效果不明显,或反而效率更低。
    2.高计算复杂度的应用效果不明显,ArcGIS Python中的分析计算恰好属于此类。
 
    当然测试脚本针对的主要是空间数据拷贝的操作,如果不死心还可以试试空间分析。接着深入,待续吧。

本文转自Flyingis博客园博客,原文链接:http://www.cnblogs.com/flyingis/archive/2009/11/12/1601574.html,如需转载请自行联系原作者
相关实践学习
基于函数计算一键部署掌上游戏机
本场景介绍如何使用阿里云计算服务命令快速搭建一个掌上游戏机。
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
相关文章
|
11天前
|
Serverless Python
「Python系列」Python statistics模块
Python 的 `statistics` 模块提供了一个强大的数学统计功能集合,用于处理数值数据。这个模块提供了一些常用的统计函数,如平均值、中位数、众数、方差、标准差、偏度、峰度等。
28 0
|
9月前
|
Python
13-Python-pathlib库使用
13-Python-pathlib库使用
|
Python
Python的OptionParser模块教程
Python的OptionParser模块教程
115 0
|
SQL JSON 分布式计算
【Python】PySpark 入门
【Python】PySpark 入门
210 0
|
数据挖掘 BI Python
第40天:Python statistics 模块
第40天:Python statistics 模块
105 0
|
分布式计算 Spark Python
[Spark][Python]PageRank 程序
PageRank 程序: file contents: page1 page3 page2 page1 page4 page1 page3 page1 page4 page2 page3 page4 def computeContribs(neighbors,rank):     for nei.
2400 0
|
分布式计算 算法 大数据
|
Oracle 关系型数据库 Python