# --*-- utf8 --*--
import timeit
from collections import defaultdict
from multiprocessing import Process, Manager
import time
# global define
TASKQUEUE = None
PROFILE = None
STIMULATORS = defaultdict(list)
#class
class Task(object):
""" atomic basic element for Task assign """
def __init__(self, func, sensitives, *args, **kwargs):
""" init
params:
func : link 2 func ptr
sensitive : (sensitive, condition)
time : current simulation time
ex:
">>>
@decorator_block
def add()
....
Task(func, (clk,1))
"""
self.func = func
self.sensitives = sensitives
self.time = 0
self.args = args
self.kwargs = kwargs
def purge(self):
self.func = None
self.sensitives = None
self.time = None
self.args = None
self.kwargs = None
def __repr__(self):
return "%s(Time:%s, Func:%s, Sensitive:%s, args=%s, kwargs=%s)" \
%(self.__class__,\
self.time,\
self.func,\
self.sensitives,\
self.args,\
self.kwargs)
def isUpdateable(self, other):
""" isUpdateable if other(time) > cur(time) """
if other > self.time:
return True
else:
return False
def update(self, time=None):
""" update """
self.time = time
class TaskQueue(object):
""" register each Task in TaskQueue """
tasks = []
def __init__(self):
""" init """
self.tasks = []
def register(self, task):
""" add task in task queue """
assert(isinstance(task, Task))
self.tasks.append(task)
def dump(self):
""" update task in task queue """
print self.tasks
def purge(self):
""" purge """
[it.purge() for it in self.tasks if it != None]
def get(self):
""" get tasks """
return self.tasks
#-------------------------------------------------------
TASKQUEUE = TaskQueue()
PROFILE = {}
def decodrator_block(**kwargs): #decorator **kwargs
""" decorator profile collect all blocks and register it to TaskQueue """
global TASKQUEUE
sensitives = list(tuple(kwargs.items()))
def inner(func):
def wrapper(*args, **kwargs): # func *args, **kwargs
def _store_cProfile():
""" store cProfile """
t = timeit.Timer()
retval = func(*args, **kwargs)
PROFILE[func.__name__] = str(t.timeit())
def _store_TaskQueue():
""" store TaskQueue """
task = Task(func, sensitives, *args, **kwargs)
TASKQUEUE.register(task)
_store_cProfile()
_store_TaskQueue()
return wrapper
return inner
def getValuableTasks(trigger=None):
""" get valuable Tasks """
global TASKQUEUE
fit = []
for task in TASKQUEUE.get():
for sensitive in task.sensitives:
if sensitive[0] == trigger:
fit.append(task)
return fit
def runParallelTasks(time=None,trigger=None):
""" run each Task when the time and trigger are matched """
valuableTasks = getValuableTasks(trigger=trigger)
procs = []
for valuableTask in valuableTasks:
p = Process(target=valuableTask.func, \
args=valuableTask.args, \
kwargs=valuableTask.kwargs)
p.start()
procs.append(p)
for proc in procs:
proc.join()
#--------------------------------------------------------
def DesignUnderTest():
""" DUT """
# as the same as verilog always block
#>>> always@(clk1.pos) begin
#>>> a <= a+1
#>>> end
@decodrator_block(clk1=True, clk2=True)
def ADD0(a):
print "@time %0.2f decodrator_block_ADD0 %d = %d + 1" %(time.time(), a+1, a)
return a+1
#
# as the same as verilog always block
##>>> always@(clk2.pos) begin
##>>> b <= b*3
##>>> end
@decodrator_block(clk2=True)
def MUX1(b):
print "@time %0.2f decodrator_block_MUX1 %d = %d * 3" %(time.time(), b*3, b)
return b*3
ADD0(3)
MUX1(3)
def preTest():
""" pre simulation test env """
global STIMULATORS
for i in range(4):
STIMULATORS[i*2].append("clk1")
STIMULATORS[i*4].append("clk2")
def runTest():
""" run simulation test env """
times = sorted(STIMULATORS.keys())
for time in times:
triggers = STIMULATORS[time]
for trigger in triggers:
runParallelTasks(time=time,trigger=trigger)
def rptTest():
""" report simulation test env """
if __name__ == "__main__":
DesignUnderTest()
preTest()
runTest()
rptTest()
code download : https://docs.google.com/open?id=0B35jh2lwIpeKTlpEeENPNkxlNlU
Refs:
http://docs.python.org/2/library/multiprocessing.html
沒有留言:
張貼留言