您的位置:首页 > 运维架构

Geoprocessing scripts 利用多核进行计算

2009-12-01 10:35 323 查看
作者:Flyingis
本文欢迎友情转载,但请注明作者及原文链接,严禁用于商业目的

让ArcGIS桌面软件进行GP脚本并行运算确实不是一件容易的事情,一方面是粗粒度AO对象的限制,另外一方面是Python本身的问题。
Python是解释型的语言,使用GIL全局解释器锁在内部禁止并行运算,因此在相同时间内只能有一条指令被执行,为什么存在GIL?是因为Python解释器后台的不可见变量,比如为了进行垃圾回收而维护的引用计数,如果没有GIL,则可能出现由于线程切换导致的对同一对象释放两次的情况(参考该文),Jython和IronPython没有GIL问题,倒是可以拿来一试。对于ArcGIS 9.3.1,使用的Python版本为2.5,目前最新的Python 3.0都发布了,从2.6开始,Python新增multiprocessing模块来解决这个问题,ArcGIS 9.4将支持这一版本。
那么,Parallel Python不能解决所有问题,又必须得使用Python 2.5,如何有效利用多核计算(注意,是利用多核计算,并非并行计算)?通常情况下,我们会将多个任务分解成不同的脚本,开启多个Python解释器或ArcCatalog运行,每个程序占用一个核,多核可以同时利用起来,各自处理完成后,进行汇总,得到最终计算结果。ESRI Resource Center上提供了一种类似的方法,差别在于将所有任务都写在一个Python脚本中,实际上和前者的原理相同,作为参考

代码

1 # Author: ESRI
2 # Date: August 2009
3 # Purpose: This script demonstrates a methodology for parallel geoprocessing in ArcGIS Desktop.
4 # The input is split into parts which are processed by separate subprocesses,
5 # their outputs are appended into the final result. In this specific case, the
6 # geoprocessing undertaken is address geocoding, but any feasible geoprocessing
7 # can be done by simple modification of this script.
8 # Look for the comment line "Begin code block for GP process of interest"
9 try:
10 import arcgisscripting, os, subprocess, sys, time, traceback
11 gp = arcgisscripting.create(9.3)
12 gp.overwriteoutput = True
13 startTime = time.time()
14
15 #Get the number of concurrent GP processes desired.
16 #It is recommended this agree with the number of your available cores.
17 pCount = gp.getparameter(0)
18 inObject = gp.getparameterastext(1)
19 result = gp.getcount_management(inObject)
20 count = int(result.getoutput(0))
21 gp.addmessage("Processing " + inObject + " in " + str(pCount) + " concurrent processes.")
22 #We will use a generic approach to splitting the input into parts, namely slicing it
23 #with modulo arithmetic. For example, if we want to run 4 concurrent processes
24 #we create 4 parts from the input using "ObjectID modulo 4 = 0 (then 1,2,3)"
25
26 #Build the splitting expressions

(Note you will need to edit this code block if using input from SDE on Oracle)
27 inDesc = gp.describe(inObject)
28 delimitedOIDName = str(gp.addfielddelimiters(inObject,str(inDesc.oidfieldname)))
29 wsType = str(gp.describe(str(inDesc.path)).workspacetype)
30 queryList = []
31 for q in range(pCount):
32 if wsType == "RemoteDatabase": #SDE
33 #Non-Oracle
34 queryList.append(delimitedOIDName + " % " + str(pCount) + " = " + str(q))
35 #Oracle - comment out the above line and uncomment the below line when using Oracle inputs
36 #queryList.append("mod(\"" + delimitedOIDName + "\"," + str(pCount) + ") = " + str(q))
37 elif wsType == "FileSystem": #DBase
38 queryList.append("mod(\"" + delimitedOIDName + "\"," + str(pCount) + ") = " + str(q))
39 elif wsType == "LocalDatabase" and inDesc.path.endswith(".mdb"): #Personal GDB
40 queryList.append(delimitedOIDName + " mod " + str(pCount) + " = " + str(q))
41 elif wsType == "LocalDatabase" and inDesc.path.endswith(".gdb"): #File GDB
42 queryList.append("mod(\"" + delimitedOIDName + "\"," + str(pCount) + ") = " + str(q))
43 #Create a seed name for the parts
44 if inDesc.name.endswith((".dbf",".shp")):
45 seedName = str(inDesc.name).split(".")[0]
46 elif inDesc.name.count(".") > 1: #Fully qualified DBMS object name
47 seedName = str(inDesc.name).split(".")[-1]
48 else:
49 seedName = str(inDesc.name)
50
51 #Make file GDBs at a well known path (to the system); edit this to use fast disk if you have it.
52 #The child scripts will write into these separate file GDBs so as to avoid lock conflicts.
53 #import tempfile
54 #wkPath = tempfile.gettempdir()
55 wkPath = r"C:\Temp"
56 gdbNameList = []
57 j = 0
58 for i in range(pCount):
59 gdbName = wkPath+"\parallel"+str(i)+".gdb"
60 while gp.exists(gdbName): #Do not clobber other parallel processes
61 j+=1
62 gdbName = wkPath+"\parallel"+str(j)+".gdb"
63 gdbNameList.append(gdbName)
64 result = gp.createfilegdb_management(wkPath,os.path.basename(gdbName))
65 gp.workspace = "in_memory"
66
67 #Create scripts that geoprocess the parts, write these into the file GDB directories created above
68 scriptPathList = []
69 for r in range(pCount):
70 gdbName = gdbNameList[r]
71 sName = gdbName+"/parallel.py"
72 scriptPathList.append(sName)
73 scriptLines = []
74 scriptLines.append("import arcgisscripting\n")
75 scriptLines.append("gp = arcgisscripting.create(9.3)\n")
76 scriptLines.append("gp.overwriteoutput = True\n")
77 if str(inDesc.datatype) == "Table" or str(inDesc.datatype) == "DbaseTable":
78 scriptLines.append("result = gp.tabletotable_conversion("+\
79 "\""+inObject+"\","+\
80 "\""+gdbName+"\","+\
81 "\""+seedName+str(r)+"\","+\
82 "\""+queryList[r]+"\")\n")
83 else:
84 scriptlines.append("result = gp.featureclasstofeatureclass_conversion("+\
85 "\""+inObject+"\","+\
86 "\""+gdbName+"\","+\
87 "\""+seedName+str(r)+"\","+\
88 "\""+queryList[r]+"\")\n")
89 scriptLines.append("part = result.getoutput(0)\n")
90 #
91 #Begin code block for GP process of interest:
92 scriptLines.append("aLocator = r\"D:\Dev\Python\GP_multicore\TestData.gdb\NZ Locator\""+"\n")
93 #We could use a service. Note the \\a in \\arcgis below would be a BELL literal if not escaped as in \a (rcgis)
94 #scriptLines.append("aLocator = r\"GIS Servers\\arcgis on bruceh\CA_Locator.GeocodeServer\""+"\n")
95 scriptLines.append("fieldMap = \"Address Address;Suburb Suburb;Mailtown Mailtown;Postcode Postcode\"\n")
96 scriptLines.append("outFC = "+"\""+gdbName+"\\"+"Result"+str(r)+"\"\n")
97 scriptLines.append("result = gp.geocodeaddresses_geocoding(part,aLocator,fieldMap,outFC)\n")
98 scriptLines.append("result = result.getoutput(0)\n")
99 #End code block for GP process of interest:
#
#Make stdio aware of completion - this message will be returned to the GP stdio message pipe
scriptLines.append("print "+ "\"Finished Part" + str(r) + "\"\n")
scriptLines.append("del gp\n")
#Write script
f = open(sName,'w')
f.writelines(scriptLines)
f.flush
f.close()
#Launch the parallel processes
gp.addmessage("Launching parallel processes

\n")
processList = []
for r in range(pCount):
processList.append(subprocess.Popen([r"C:\Python25\python.exe",scriptPathList[r]],\
shell=True,\
stdout=subprocess.PIPE,\
stderr=subprocess.PIPE))
gp.addmessage("Launched process " + str(r)+ "\n")
time.sleep(2)
#
#Wait for completion
messageList = []
Done = False
while not Done:
Done = True
for p in processList:
if p.poll() is None:
Done = False
else:
messageList.append("Process " + str(processList.index(p)) + " complete at " + str(time.ctime()) + "\n")
stdOutLines = p.stdout.readlines()
for s in stdOutLines:
messageList.append(s)
stdErrorLines = p.stderr.readlines()
for e in stdErrorLines:
messageList.append(e)
if not Done: #Save 2 seconds if complete


time.sleep(2)
for m in messageList:
gp.addmessage(m)
#Merge the results
outFC = gp.getparameterastext(2)
fm = gp.createobject("fieldmappings")
vt = gp.createobject("valuetable")
for g in gdbNameList:
gp.workspace = g
if len(gp.listfeatureclasses("Result*")) > 0:
fcDesc = gp.describe(gp.listfeatureclasses("Result*")[0])
fm.addtable(fcDesc.catalogpath)
vt.addrow(fcDesc.catalogpath)
gp.workspace = "in_memory"
result = gp.merge_management(vt,outFC,fm)
#Clean up
for g in gdbNameList:
result = gp.delete_management(g)
#Done!

endTime = time.time()
gp.addmessage("Geoprocessing duration was " + str(int(endTime - startTime)) + " seconds.")
gp.addmessage("Net performance was " + \
str(int(1.0 * count / (endTime - startTime) * 3600.0)) + " records per hour.")
del gp

except:
tb = sys.exc_info()[2]
tbinfo = traceback.format_tb(tb)[0]
pymsg = "PYTHON ERRORS:\nTraceback Info:\n" + tbinfo + "\nError Info:\n " + \
str(sys.exc_type)+ ": " + str(sys.exc_value) + "\n"
gp.AddError(pymsg)

msgs = "GP ERRORS:\n" + gp.GetMessages(2) + "\n"
gp.AddError(msgs)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: