from StringIO import *
import sys
import os
import time
import pprint
# add third party module to PyPy path
from lxml import etree
import multiprocessing
import subprocess
import math
class Parser:
_prefixmap = { 'spirit' : 'http://www.spiritconsortium.org/XMLSchema/SPIRIT/1.4',
'vendorExtensions' : '$UVM_REG_GEN/XMLSchema/SPIRIT',
'xsi' : 'http://www.w3.org/2001/XMLSchema-instance',
}
def __init__(self):
self._tree = None
def __del__(self):
self._tree = None
def open(self, ifile):
""" open from IP-XACT file and build up etree by lxml """
try:
self._ifile = StringIO(open(ifile, 'r').read())
self._tree = etree.parse(self._ifile)
self._ifile.close()
except IOError as e:
print "%s not found or build up lxml etree fail" %(ifile)
def close(self):
""" close """
self._tree = None
def findAll(self, stmt, dtype='text'):
""" return list in ['text', 'tag', 'tail', attrib' ]dtype
return list in [element ]
"""
found = filter(lambda i: i !=None, self._tree.xpath(stmt, namespaces=self._prefixmap))
if dtype == 'text':
return map(lambda i: i.text, found)
elif dtype == 'tag':
return map(lambda i: i.tag, found)
elif dtype == 'tail':
return map(lambda i: i.tail, found)
elif dtype == 'attrib':
return map(lambda i: i.attrib, found)
elif dtype == 'obj':
return found
else:
raise valueError("dtype not support %s" %(dtype))
def test_run_lxml(ifile=None, query=None):
""" stackless test """
pp = Parser()
pp.open(ifile)
rst = pp.findAll(stmt=query, dtype='text')
del pp
return rst
def serial_test_lxml(nums):
""" serial test lxml """
start_time = time.time()
rsts = []
for i in nums:
rsts.append(test_run_lxml(ifile=i, query='//spirit:vendor'))
print "sriial nums(%d) runtime(%s)" %(len(nums), time.time() - start_time)
def multiprocess_test_lxml(nums, nprocs=4):
""" multiprocess test lxml """
start_time = time.time()
procs = []
def worker(nums, out_q):
"""
The worker function, invoked in a process.
"""
outdict = {}
for i in nums:
outdict[i] = test_run_lxml(ifile=i, query='//spirit:vendor')
out_q.put(outdict)
# Each process will get 'chunksize' nums and a queue to put his out
# dict into
out_q = multiprocessing.Queue()
chunksize = int(math.ceil(len(nums) / float(nprocs)))
procs = []
for i in range(nprocs):
p = multiprocessing.Process(
target=worker,
args=(nums[chunksize * i:chunksize * (i + 1)],
out_q))
procs.append(p)
p.start()
# Collect all results into a single result dict. We know how many dicts
# with results to expect.
resultdict = {}
for i in range(nprocs):
resultdict.update(out_q.get())
# Wait for all worker processes to finish
for p in procs:
p.join()
print "multis nums(%d) procs(%d) runtime(%s)" %(len(nums), nprocs, time.time() - start_time)
def regression_test(num, npros=4):
for i in range(1,num):
cmd = "cp 0.xml %d.xml" %(i)
os.system(cmd)
nums = [ "%s.xml" %(i) for i in range(num)]
multiprocess_test_lxml(nums, npros)
serial_test_lxml(nums)
for i in range(1,num):
cmd = "rm %d.xml" %(i)
os.system(cmd)
def main():
regression_test(10,1)
regression_test(10,4)
regression_test(10,8)
regression_test(1000,1)
regression_test(1000,4)
regression_test(1000,8)
if __name__ == '__main__':
main()
Download
resultsmultis nums(10) procs(1) runtime(0.0143690109253)
sriial nums(10) runtime(0.00273704528809)
multis nums(10) procs(4) runtime(0.0109529495239)
sriial nums(10) runtime(0.00267791748047)
multis nums(10) procs(8) runtime(0.0175120830536)
sriial nums(10) runtime(0.00293707847595)
multis nums(1000) procs(1) runtime(0.229081869125)
sriial nums(1000) runtime(0.233665943146)
multis nums(1000) procs(4) runtime(0.116140127182)
sriial nums(1000) runtime(0.237390041351)
multis nums(1000) procs(8) runtime(0.122074127197)
sriial nums(1000) runtime(0.275545120239)
沒有留言:
張貼留言