2012年11月2日 星期五

multiprocess + queue in python

寫了個簡單的 verilog always block simulator. 用到 multiprocessing, task queue, decorator 的技巧.
# --*-- utf8 --*--

import timeit
from collections import defaultdict
from multiprocessing import Process, Manager
import time

# global define
TASKQUEUE  = None
PROFILE    = None
STIMULATORS = defaultdict(list)

#class

class Task(object):
    """ atomic basic element for Task assign """

    def __init__(self, func, sensitives, *args, **kwargs):
        """ init
        params:
        func :     link 2 func ptr
        sensitive : (sensitive, condition)
        time : current simulation time
        ex:
        ">>>
        @decorator_block
        def add()
        ....
        Task(func, (clk,1))
        """
        self.func = func
        self.sensitives = sensitives
        self.time = 0
        self.args = args
        self.kwargs = kwargs

    def purge(self):
        self.func = None
        self.sensitives = None
        self.time = None
        self.args = None
        self.kwargs = None

    def __repr__(self):
        return "%s(Time:%s, Func:%s, Sensitive:%s, args=%s, kwargs=%s)" \
                %(self.__class__,\
                self.time,\
                self.func,\
                self.sensitives,\
                self.args,\
                self.kwargs)

    def isUpdateable(self, other):
        """  isUpdateable if other(time) > cur(time) """
        if other > self.time:
            return True
        else:
            return False


    def update(self, time=None):
        """ update """
        self.time = time


class TaskQueue(object):
    """ register each Task in TaskQueue """

    tasks = []

    def __init__(self):
        """ init """
        self.tasks = []

    def register(self, task):
        """ add task in task queue """
        assert(isinstance(task, Task))
        self.tasks.append(task)

    def dump(self):
        """ update task in task queue """
        print self.tasks

    def purge(self):
        """ purge """
        [it.purge() for it in self.tasks if it != None]

    def get(self):
        """ get tasks """
        return self.tasks


#-------------------------------------------------------

TASKQUEUE = TaskQueue()
PROFILE   = {}

def decodrator_block(**kwargs): #decorator **kwargs
    """ decorator profile collect all blocks and register it to TaskQueue """

    global TASKQUEUE

    sensitives = list(tuple(kwargs.items()))

    def inner(func):

        def wrapper(*args, **kwargs): # func *args, **kwargs

            def _store_cProfile():
                """ store cProfile """
                t = timeit.Timer()
                retval = func(*args, **kwargs)
                PROFILE[func.__name__] = str(t.timeit())

            def _store_TaskQueue():
                """ store TaskQueue """
                task = Task(func, sensitives, *args, **kwargs)
                TASKQUEUE.register(task)

            _store_cProfile()
            _store_TaskQueue()

        return wrapper
    return inner


def getValuableTasks(trigger=None):
    """ get valuable Tasks """

    global TASKQUEUE
    fit = []

    for task in TASKQUEUE.get():
        for sensitive in task.sensitives:
            if sensitive[0] == trigger:
                fit.append(task)

    return fit



def runParallelTasks(time=None,trigger=None):
    """ run each Task when the time and trigger are matched """

    valuableTasks = getValuableTasks(trigger=trigger)

    procs = []

    for valuableTask in valuableTasks:
        p = Process(target=valuableTask.func, \
                    args=valuableTask.args, \
                    kwargs=valuableTask.kwargs)
        p.start()
        procs.append(p)

    for proc in procs:
        proc.join()



#--------------------------------------------------------
def DesignUnderTest():
    """ DUT """
# as the same as verilog always block
#>>> always@(clk1.pos) begin
#>>>    a <= a+1
#>>> end

    @decodrator_block(clk1=True, clk2=True)
    def ADD0(a):
        print "@time %0.2f decodrator_block_ADD0 %d = %d + 1" %(time.time(), a+1, a)
        return a+1
#
# as the same as verilog always block
##>>> always@(clk2.pos) begin
##>>>    b <= b*3
##>>> end

    @decodrator_block(clk2=True)
    def MUX1(b):
        print "@time %0.2f decodrator_block_MUX1 %d = %d * 3" %(time.time(), b*3, b)
        return b*3

    ADD0(3)
    MUX1(3)


def preTest():
    """ pre simulation test env  """

    global STIMULATORS

    for i in range(4):
        STIMULATORS[i*2].append("clk1")
        STIMULATORS[i*4].append("clk2")

def runTest():
    """ run simulation test env """

    times = sorted(STIMULATORS.keys())
    for time in times:
        triggers = STIMULATORS[time]
        for trigger in triggers:
            runParallelTasks(time=time,trigger=trigger)


def rptTest():
    """ report simulation test env """


if __name__ == "__main__":
    DesignUnderTest()
    preTest()
    runTest()
    rptTest()







code download : https://docs.google.com/open?id=0B35jh2lwIpeKTlpEeENPNkxlNlU

Refs:
http://docs.python.org/2/library/multiprocessing.html

沒有留言:

張貼留言