diff options
Diffstat (limited to 'src/python/pyabc_split.py')
-rw-r--r-- | src/python/pyabc_split.py | 72 |
1 files changed, 61 insertions, 11 deletions
diff --git a/src/python/pyabc_split.py b/src/python/pyabc_split.py index b889d857..bbdcb249 100644 --- a/src/python/pyabc_split.py +++ b/src/python/pyabc_split.py @@ -82,20 +82,36 @@ Author: Baruch Sterin <sterin@berkeley.edu> """ import os +import select +import fcntl import errno import sys -import pickle +import cPickle as pickle import signal +import cStringIO + from contextlib import contextmanager import pyabc +def _retry_select(rlist): + while True: + try: + rrdy,_,_ = select.select(rlist,[],[]) + if rrdy: + return rrdy + except select.error as e: + if e[0] == errno.EINTR: + continue + raise + class _splitter(object): def __init__(self, funcs): self.funcs = funcs self.pids = [] self.fds = {} + self.buffers = {} self.results = {} def is_done(self): @@ -117,6 +133,7 @@ class _splitter(object): self.results[pid] = None self.fds = {} + self.buffers = {} def child( self, fdw, f): # call function @@ -133,6 +150,9 @@ class _splitter(object): # create a pipe to communicate with the child process pr,pw = os.pipe() + # set pr to be non-blocking + fcntl.fcntl(pr, fcntl.F_SETFL, os.O_NONBLOCK) + parentpid = os.getpid() rc = 1 @@ -162,25 +182,55 @@ class _splitter(object): pid, fd = self.fork_one(f) self.pids.append(pid) self.fds[pid] = (i,fd) + self.buffers[fd] = cStringIO.StringIO() + + def communicate(self): + + rlist = [ fd for _, (_,fd) in self.fds.iteritems() ] + rlist.append(pyabc.wait_fd) + + stop = False + + while not stop: + + rrdy = _retry_select( rlist ) + + for fd in rrdy: + + if fd == pyabc.wait_fd: + stop = True + continue + + self.buffers[fd].write( os.read(fd, 16384) ) def get_next_result(self): + + # read from the pipes as needed, while waiting for the next child process to terminate + self.communicate() # wait for the next child process to terminate pid, rc = os.wait() assert pid in self.fds - # retrieve the pipe file descriptor1 + # retrieve the pipe file descriptor i, fd = self.fds[pid] del self.fds[pid] - - assert pid not in self.fds - - # read result from file - with os.fdopen( fd, "r" ) as fin: - try: - return (i,pickle.load(fin)) - except EOFError, pickle.UnpicklingError: - return (i, None) + + # retrieve the buffer + buffer = self.buffers[fd] + del self.buffers[fd] + + # fill the buffer + while True: + s = os.read(fd, 16384) + if not s: + break + buffer.write(s) + + try: + return (i, pickle.loads(buffer.getvalue())) + except EOFError, pickle.UnpicklingError: + return (i, None) @contextmanager def _splitter_wrapper(funcs): |