delegate
index
/Users/ping/dev/python/delegate.py

delegate.py - delegation of work to multiple processes
Ka-Ping Yee, 15 December 1999
 
This module provides generic mechanisms for delegating work items.
At the moment, the main delegation mechanism is running work items in
parallel in child processes.  Call 'parallelize' with a function and a
list of items to start the work.  Here is an example:
 
    def work(arg): return arg * 3
    
    from delegate import parallelize
    print parallelize(work, [2, 5, "a", 7])
 
The function must return a value that can be pickled so that it can be
communicated back to the parent.  Most kinds of Python objects can be
pickled, including instances of custom classes as long as the class
allows the __init__ constructor to be called with no arguments.  See
the 'pickle' module for details.  The results are returned in a
dictionary that maps each item to its result.
 
The 'parallelize' function accepts an optional 'reporter' parameter
with which you can provide callbacks when jobs start and finish.  See
the documentation on 'parallelize' and the Reporter class for details.
 
The 'timeout' function will run a job in a child process in order to
limit its running time.  Here is an example:
 
    import time
    def work():
        time.sleep(10)
        return 5
 
    from delegate import timeout
    print timeout(work, 30)             # this will print 5 after 10 seconds
    print timeout(work, 5)              # this will print None after 5 seconds
 
See the documentation string on 'timeout' for details.

 
Modules
       
cPickle
os
cPickle
select
signal
sys
traceback

 
Classes
       
Exception
IdPrinter
Parallelizer
PipeReader
Reporter
LogReporter
TerminalReporter

 
class Exception
    Class for passing on exceptions raised in child processes.
 
  Methods defined here:
__init__(self, type, value, tbdump='')
__repr__(self)

 
class IdPrinter
     Methods defined here:
__init__(self, maxrows)
delid(self, id)
delrow(self, row)
printid(self, id, text)
printrow(self, row, text)

 
class LogReporter(Reporter)
    A reporter that prints out status line-by-line.
 
  Methods defined here:
abort(self, pid, item)
begin(self, pid, item)
cleanup(self)
exit(self, pid)
fail(self, pid, item, exception)
init(self, children)
spawn(self, pid)
success(self, pid, item, result)

 
class Parallelizer
    Class for maintaining the state of work being done in parallel.
Usually, the parallelize() function in this module is sufficient; if
you want to use this class directly, then:
 
    1. Create a Parallelizer object by passing in the work
       function and the optional Reporter object to the
       Parallelizer() constructor.
 
    2. Call the 'spawn(children)' method to spawn the specified
       number of child processes.
 
    3. Call the 'process(items)' method with a list of items to
       process.  You can do this as many times as you want.
       
    4. Be sure to call the 'cleanup()' method to clean away the
       child processes when you are done.
 
    5. Repeat steps 2 through 4 if you want to spawn more
       children and collect more results.  The results accrue
       in a dictionary in the 'results' attribute.
 
  Methods defined here:
__init__(self, function, reporter=<delegate.Reporter instance at 0xdac88>)
child(self, reader, writer)
cleanup(self)
process(self, items)
spawn(self, children)

 
class PipeReader
    A file-like read-only interface to a file descriptor.
 
  Methods defined here:
__init__(self, fd)
read(self, bytes)
readline(self)

 
class Reporter
    Abstract base class for Reporter objects.  To handle callbacks
during processing, declare your own Reporter object that implements
these methods.
 
  Methods defined here:
abort(self, pid, item)
Called by the Parallelizer when a child terminates unexpectedly.
begin(self, pid, item)
Called by the Parallelizer when a child starts work on an item.
cleanup(self)
Called by the Parallelizer when child processes are cleaned up.
exit(self, pid)
Called by the Parallelizer when a child terminates.
fail(self, pid, item, exception)
Called by the Parallelizer when a child raises an exception.
init(self, children)
Called by the Parallelizer just before processes are spawned.
spawn(self, pid)
Called by the Parallelizer for each individual child spawned.
success(self, pid, item, result)
Called by the Parallelizer when a child produces a result.

 
class TerminalReporter(Reporter)
    A reporter that displays animated status on an ANSI terminal.
 
  Methods defined here:
abort(self, pid, item)
begin(self, pid, item)
cleanup(self)
exit(self, pid)
fail(self, pid, item, exception)
init(self, children)
spawn(self, pid)
success(self, pid, item, result)

 
Functions
       
failsafe(function, *args, **kw)
Run the function with the given arguments and keyword arguments,
returning the function's return value.  In the event of any exception,
catch it and return None.
parallelize(function, items, children=6, reporter=<delegate.Reporter instance at 0xdacb0>)
Run the given function on each member in the 'items' list in parallel,
and return a dictionary that maps each item to the result returned by the
function.  The return value of the function must be picklable so that it
can be communicated from the child process to the parent.  If an exception
is raised by the function, the result dictionary will map the item to an
Exception object containing the exception's type and value and a string
with a printout of the traceback.
 
The number of child processes to be spawned is determined by 'children'.
The optional 'reporter' argument provides a Reporter object to receive
callbacks as the work proceeds.  See the definition of the Reporter class
for the interface it should provide.
pipedump(object, fd)
Pickle an object to the stream given by a numeric descriptor.
pipeload(fd)
Unpickle an object from the stream given by a numeric descriptor.
reap()
Reap any defunct child processes.
suicide()
Terminate this process.  Unlike sys.exit(), which raises a SystemExit
exception that might be caught, this routine kills the current process.
Spawned children must exit this way because exceptions must not be allowed
to escape to code beyond the fork(), which would cause great confusion.
timeout(function, time=2)
Run the given function in a child process and return its return value.
If it raises an exception, return an Exception object containing the
exception's type and value and a string with a printout of the traceback.
If the function doesn't return within the given number of seconds, kill
it and return None.

 
Data
        BEGIN = 2
EXIT = 3
FAIL = 1
SUCCESS = 0