from StringIO import * import sys import os import time import pprint # add third party module to PyPy path from lxml import etree import multiprocessing import subprocess import math class Parser: _prefixmap = { 'spirit' : 'http://www.spiritconsortium.org/XMLSchema/SPIRIT/1.4', 'vendorExtensions' : '$UVM_REG_GEN/XMLSchema/SPIRIT', 'xsi' : 'http://www.w3.org/2001/XMLSchema-instance', } def __init__(self): self._tree = None def __del__(self): self._tree = None def open(self, ifile): """ open from IP-XACT file and build up etree by lxml """ try: self._ifile = StringIO(open(ifile, 'r').read()) self._tree = etree.parse(self._ifile) self._ifile.close() except IOError as e: print "%s not found or build up lxml etree fail" %(ifile) def close(self): """ close """ self._tree = None def findAll(self, stmt, dtype='text'): """ returnDownload resultslist in ['text', 'tag', 'tail', attrib' ]dtype return list in [element ] """ found = filter(lambda i: i !=None, self._tree.xpath(stmt, namespaces=self._prefixmap)) if dtype == 'text': return map(lambda i: i.text, found) elif dtype == 'tag': return map(lambda i: i.tag, found) elif dtype == 'tail': return map(lambda i: i.tail, found) elif dtype == 'attrib': return map(lambda i: i.attrib, found) elif dtype == 'obj': return found else: raise valueError("dtype not support %s" %(dtype)) def test_run_lxml(ifile=None, query=None): """ stackless test """ pp = Parser() pp.open(ifile) rst = pp.findAll(stmt=query, dtype='text') del pp return rst def serial_test_lxml(nums): """ serial test lxml """ start_time = time.time() rsts = [] for i in nums: rsts.append(test_run_lxml(ifile=i, query='//spirit:vendor')) print "sriial nums(%d) runtime(%s)" %(len(nums), time.time() - start_time) def multiprocess_test_lxml(nums, nprocs=4): """ multiprocess test lxml """ start_time = time.time() procs = [] def worker(nums, out_q): """ The worker function, invoked in a process. """ outdict = {} for i in nums: outdict[i] = test_run_lxml(ifile=i, query='//spirit:vendor') out_q.put(outdict) # Each process will get 'chunksize' nums and a queue to put his out # dict into out_q = multiprocessing.Queue() chunksize = int(math.ceil(len(nums) / float(nprocs))) procs = [] for i in range(nprocs): p = multiprocessing.Process( target=worker, args=(nums[chunksize * i:chunksize * (i + 1)], out_q)) procs.append(p) p.start() # Collect all results into a single result dict. We know how many dicts # with results to expect. resultdict = {} for i in range(nprocs): resultdict.update(out_q.get()) # Wait for all worker processes to finish for p in procs: p.join() print "multis nums(%d) procs(%d) runtime(%s)" %(len(nums), nprocs, time.time() - start_time) def regression_test(num, npros=4): for i in range(1,num): cmd = "cp 0.xml %d.xml" %(i) os.system(cmd) nums = [ "%s.xml" %(i) for i in range(num)] multiprocess_test_lxml(nums, npros) serial_test_lxml(nums) for i in range(1,num): cmd = "rm %d.xml" %(i) os.system(cmd) def main(): regression_test(10,1) regression_test(10,4) regression_test(10,8) regression_test(1000,1) regression_test(1000,4) regression_test(1000,8) if __name__ == '__main__': main()
multis nums(10) procs(1) runtime(0.0143690109253)
sriial nums(10) runtime(0.00273704528809)
multis nums(10) procs(4) runtime(0.0109529495239)
sriial nums(10) runtime(0.00267791748047)
multis nums(10) procs(8) runtime(0.0175120830536)
sriial nums(10) runtime(0.00293707847595)
multis nums(1000) procs(1) runtime(0.229081869125)
sriial nums(1000) runtime(0.233665943146)
multis nums(1000) procs(4) runtime(0.116140127182)
sriial nums(1000) runtime(0.237390041351)
multis nums(1000) procs(8) runtime(0.122074127197)
sriial nums(1000) runtime(0.275545120239)
沒有留言:
張貼留言