summaryrefslogtreecommitdiffstats
path: root/src/python/pyabc_split.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/pyabc_split.py')
-rw-r--r--src/python/pyabc_split.py72
1 files changed, 61 insertions, 11 deletions
diff --git a/src/python/pyabc_split.py b/src/python/pyabc_split.py
index b889d857..bbdcb249 100644
--- a/src/python/pyabc_split.py
+++ b/src/python/pyabc_split.py
@@ -82,20 +82,36 @@ Author: Baruch Sterin <sterin@berkeley.edu>
"""
import os
+import select
+import fcntl
import errno
import sys
-import pickle
+import cPickle as pickle
import signal
+import cStringIO
+
from contextlib import contextmanager
import pyabc
+def _retry_select(rlist):
+ while True:
+ try:
+ rrdy,_,_ = select.select(rlist,[],[])
+ if rrdy:
+ return rrdy
+ except select.error as e:
+ if e[0] == errno.EINTR:
+ continue
+ raise
+
class _splitter(object):
def __init__(self, funcs):
self.funcs = funcs
self.pids = []
self.fds = {}
+ self.buffers = {}
self.results = {}
def is_done(self):
@@ -117,6 +133,7 @@ class _splitter(object):
self.results[pid] = None
self.fds = {}
+ self.buffers = {}
def child( self, fdw, f):
# call function
@@ -133,6 +150,9 @@ class _splitter(object):
# create a pipe to communicate with the child process
pr,pw = os.pipe()
+ # set pr to be non-blocking
+ fcntl.fcntl(pr, fcntl.F_SETFL, os.O_NONBLOCK)
+
parentpid = os.getpid()
rc = 1
@@ -162,25 +182,55 @@ class _splitter(object):
pid, fd = self.fork_one(f)
self.pids.append(pid)
self.fds[pid] = (i,fd)
+ self.buffers[fd] = cStringIO.StringIO()
+
+ def communicate(self):
+
+ rlist = [ fd for _, (_,fd) in self.fds.iteritems() ]
+ rlist.append(pyabc.wait_fd)
+
+ stop = False
+
+ while not stop:
+
+ rrdy = _retry_select( rlist )
+
+ for fd in rrdy:
+
+ if fd == pyabc.wait_fd:
+ stop = True
+ continue
+
+ self.buffers[fd].write( os.read(fd, 16384) )
def get_next_result(self):
+
+ # read from the pipes as needed, while waiting for the next child process to terminate
+ self.communicate()
# wait for the next child process to terminate
pid, rc = os.wait()
assert pid in self.fds
- # retrieve the pipe file descriptor1
+ # retrieve the pipe file descriptor
i, fd = self.fds[pid]
del self.fds[pid]
-
- assert pid not in self.fds
-
- # read result from file
- with os.fdopen( fd, "r" ) as fin:
- try:
- return (i,pickle.load(fin))
- except EOFError, pickle.UnpicklingError:
- return (i, None)
+
+ # retrieve the buffer
+ buffer = self.buffers[fd]
+ del self.buffers[fd]
+
+ # fill the buffer
+ while True:
+ s = os.read(fd, 16384)
+ if not s:
+ break
+ buffer.write(s)
+
+ try:
+ return (i, pickle.loads(buffer.getvalue()))
+ except EOFError, pickle.UnpicklingError:
+ return (i, None)
@contextmanager
def _splitter_wrapper(funcs):