summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorBaruch Sterin <baruchs@gmail.com>2013-01-30 23:50:53 -0800
committerBaruch Sterin <baruchs@gmail.com>2013-01-30 23:50:53 -0800
commite6be7ddfe69f74c658d04f822724bfdcf39ee412 (patch)
tree325a8cce60e6b90cb118651d098e66ad96f19fb5 /src
parente9286513fd615fac7f67bc1e48e5fe26b9684086 (diff)
downloadabc-e6be7ddfe69f74c658d04f822724bfdcf39ee412.tar.gz
abc-e6be7ddfe69f74c658d04f822724bfdcf39ee412.tar.bz2
abc-e6be7ddfe69f74c658d04f822724bfdcf39ee412.zip
pyabc: allow returning large result from sub processes
Diffstat (limited to 'src')
-rw-r--r--src/python/pyabc.i22
-rw-r--r--src/python/pyabc_split.py72
2 files changed, 83 insertions, 11 deletions
diff --git a/src/python/pyabc.i b/src/python/pyabc.i
index 0e1f2c3d..74c21cc6 100644
--- a/src/python/pyabc.i
+++ b/src/python/pyabc.i
@@ -755,6 +755,16 @@ def _retry_read(fd):
continue
raise
+def _retry_os_read(fd):
+
+ while True:
+ try:
+ return os.read(fd, 1)
+ except OSError as e:
+ if e.errno == errno.EINTR:
+ continue
+ raise
+
def _retry_wait():
while True:
@@ -817,6 +827,7 @@ def _child_wait_thread_func(fd):
_active_pids.remove(pid)
_terminated_pids[pid] = status
+ os.write(_wait_fd_write, "1")
_terminated_pids_cond.notifyAll()
_sigint_pipe_read_fd = -1
@@ -825,8 +836,14 @@ _sigint_pipe_write_fd = -1
_sigchld_pipe_read_fd = -1
_sigchld_pipe_write_fd = -1
+wait_fd = -1
+_wait_fd_write = -1
+
def _start_threads():
+ global wait_fd, _wait_fd_write
+ wait_fd, _wait_fd_write = os.pipe()
+
global _sigint_pipe_read_fd, _sigint_pipe_write_fd
_sigint_pipe_read_fd, _sigint_pipe_write_fd = os.pipe()
@@ -865,6 +882,9 @@ def after_fork():
_close_on_fork = []
+ os.close(wait_fd)
+ os.close(_wait_fd_write)
+
os.close(_sigint_pipe_read_fd)
os.close(_sigint_pipe_write_fd)
@@ -934,6 +954,7 @@ def _waitpid(pid, options=0):
with _active_lock:
if pid in _terminated_pids:
+ _retry_os_read(wait_fd)
status = _terminated_pids[pid]
del _terminated_pids[pid]
return pid, status
@@ -950,6 +971,7 @@ def _wait(options=0):
with _active_lock:
for pid, status in _terminated_pids.iteritems():
+ _retry_os_read(wait_fd)
del _terminated_pids[pid]
return pid, status
diff --git a/src/python/pyabc_split.py b/src/python/pyabc_split.py
index b889d857..bbdcb249 100644
--- a/src/python/pyabc_split.py
+++ b/src/python/pyabc_split.py
@@ -82,20 +82,36 @@ Author: Baruch Sterin <sterin@berkeley.edu>
"""
import os
+import select
+import fcntl
import errno
import sys
-import pickle
+import cPickle as pickle
import signal
+import cStringIO
+
from contextlib import contextmanager
import pyabc
+def _retry_select(rlist):
+ while True:
+ try:
+ rrdy,_,_ = select.select(rlist,[],[])
+ if rrdy:
+ return rrdy
+ except select.error as e:
+ if e[0] == errno.EINTR:
+ continue
+ raise
+
class _splitter(object):
def __init__(self, funcs):
self.funcs = funcs
self.pids = []
self.fds = {}
+ self.buffers = {}
self.results = {}
def is_done(self):
@@ -117,6 +133,7 @@ class _splitter(object):
self.results[pid] = None
self.fds = {}
+ self.buffers = {}
def child( self, fdw, f):
# call function
@@ -133,6 +150,9 @@ class _splitter(object):
# create a pipe to communicate with the child process
pr,pw = os.pipe()
+ # set pr to be non-blocking
+ fcntl.fcntl(pr, fcntl.F_SETFL, os.O_NONBLOCK)
+
parentpid = os.getpid()
rc = 1
@@ -162,25 +182,55 @@ class _splitter(object):
pid, fd = self.fork_one(f)
self.pids.append(pid)
self.fds[pid] = (i,fd)
+ self.buffers[fd] = cStringIO.StringIO()
+
+ def communicate(self):
+
+ rlist = [ fd for _, (_,fd) in self.fds.iteritems() ]
+ rlist.append(pyabc.wait_fd)
+
+ stop = False
+
+ while not stop:
+
+ rrdy = _retry_select( rlist )
+
+ for fd in rrdy:
+
+ if fd == pyabc.wait_fd:
+ stop = True
+ continue
+
+ self.buffers[fd].write( os.read(fd, 16384) )
def get_next_result(self):
+
+ # read from the pipes as needed, while waiting for the next child process to terminate
+ self.communicate()
# wait for the next child process to terminate
pid, rc = os.wait()
assert pid in self.fds
- # retrieve the pipe file descriptor1
+ # retrieve the pipe file descriptor
i, fd = self.fds[pid]
del self.fds[pid]
-
- assert pid not in self.fds
-
- # read result from file
- with os.fdopen( fd, "r" ) as fin:
- try:
- return (i,pickle.load(fin))
- except EOFError, pickle.UnpicklingError:
- return (i, None)
+
+ # retrieve the buffer
+ buffer = self.buffers[fd]
+ del self.buffers[fd]
+
+ # fill the buffer
+ while True:
+ s = os.read(fd, 16384)
+ if not s:
+ break
+ buffer.write(s)
+
+ try:
+ return (i, pickle.loads(buffer.getvalue()))
+ except EOFError, pickle.UnpicklingError:
+ return (i, None)
@contextmanager
def _splitter_wrapper(funcs):