底下 example 利用 multiprocessor 來實現 switch swap. 當然也可以考慮用 stackless 的方式, 把 task 加載到 scheduler manager 來減少 acquire 的次數, 達到 atomic thread 更加的 independent http://www.stackless.com/
from multiprocessing import Process
import time
class Switcher(object):
""" Switch(swap) the data stored sequence
-0 for store in seq0, run in seq1
-1 for store in seq1, run in seq0
"""
def __init__(self):
self._seq0 = []
self._seq1 = []
self._mode = 0
def switch(self):
""" switch mode """
self._mode = 1 if self._mode == 0 else 0
def getStoreSeq(self):
seq = self._seq0[:] if self._mode == 0 else self._seq1[:]
return seq
def getRunSeq(self):
seq = self._seq1[:] if self._mode == 0 else self._seq0[:]
print "run switcher.seq0 %s" %(self._seq0)
print "run switcher.seq1 %s" %(self._seq1)
return seq
def setStoreSeq(self,pat=[]):
if self._mode == 0:
self._seq0 = pat[:]
else:
self._seq1 = pat[:]
print "store switcher.seq0 %s" %(self._seq0)
print "store switcher.seq1 %s" %(self._seq1)
def setRunSeq(self,pat=[]):
if self._mode == 1:
self._seq0 = pat[:]
else:
self._seq1 = pat[:]
def clear(self):
del self._seq0[:]
del self._seq1[:]
def test(switcher):
""" test """
switcher.setStoreSeq([1,2,3])
switcher.switch()
assert(switcher.getRunSeq()==[1,2,3])
switcher.setStoreSeq([4,5,6])
switcher.switch()
switcher.clear()
def sample_run(switcher):
""" avg switcher.getRunSeq() """
try:
seq = switcher.getRunSeq()
print "avg %.2f, %s" %(sum(seq)/len(seq), seq)
except ZeroDivisionError:
print "Error %s" %(seq)
def patten_gen(switcher,sel=0):
""" random patten gen """
if sel ==0:
switcher.setStoreSeq([i for i in range(10)])
else:
switcher.setStoreSeq([i for i in range(10,20,1)])
def pre_load_status(switcher):
""" pre load status """
print "pre_load_status"
p0 = Process(target=patten_gen, args=(switcher,0))
p0.start()
p0.join()
print switcher._seq0
print switcher._seq1
def switch_status(switcher):
switcher.switch()
def pipe_load_status(switcher,sel=0):
""" pipe load status """
print "pipe_load_status"
p0 = Process(target=patten_gen, args=(switcher,sel))
p1 = Process(target=sample_run, args=(switcher,))
p0.start()
p1.start()
p0.join()
p1.join()
def end_load_status(switcher):
""" end load status """
print "end_load_status"
p1 = Process(target=sample_run, args=(switcher,))
p1.start()
p1.join()
def main():
switcher = Switcher()
test(switcher)
for i in range(5):
if i == 0:
pre_load_status(switcher)
switch_status(switcher)
elif i in [1,2,3]:
pipe_load_status(switcher, i%2)
switch_status(switcher)
else:
end_load_status(switcher)
switch_status(switcher)
if __name__ == "__main__":
main()
沒有留言:
張貼留言