From 34d59b0b91f09b35b7c3b4e74411f11b8b3994cc Mon Sep 17 00:00:00 2001 From: Baruch Sterin Date: Sun, 27 Feb 2011 18:33:56 -0800 Subject: fixes to pyabc kill mechanism --- src/python/module.make | 17 ++ src/python/pyabc.i | 719 ++++++++++++++++++++++++++++++++++++++++++++-- src/python/pyabc_split.py | 92 ++---- 3 files changed, 731 insertions(+), 97 deletions(-) (limited to 'src/python') diff --git a/src/python/module.make b/src/python/module.make index ac56208d..3f48bbaf 100644 --- a/src/python/module.make +++ b/src/python/module.make @@ -61,4 +61,21 @@ pyabc.tgz : $(PROG) $(ABC_PYTHON_SRC:_wrap.c=.py) $(ABC_PYTHON_FILES_PREFIX)/abc --out=$@ \ $(ABC_PYTHON_OPTIONS) +PYABC_INSTALL_TARGET ?= $(shell date +%Y-%m-%d_%H-%M.%N_${USER}) +PYABC_INSTALL_TARGET := $(PYABC_INSTALL_TARGET) + +PYABC_INSTALL_DIR ?= /hd/common/pyabc/builds/pyabc_builds/ + +.PHONY: zzz + +pyabc_install_target: pyabc_extension_bdist + mkdir -p "$(PYABC_INSTALL_DIR)/$(PYABC_INSTALL_TARGET)" + tar \ + --directory="$(PYABC_INSTALL_DIR)/$(PYABC_INSTALL_TARGET)" \ + --show-transformed-names \ + --transform='s#^.*/##g' \ + -xvzf "$(ABC_PYTHON_FILES_PREFIX)/dist/pyabc-1.0.linux-x86_64.tar.gz" + find "$(PYABC_INSTALL_DIR)/$(PYABC_INSTALL_TARGET)/"* -type d | xargs rmdir + echo "Installed at $(PYABC_INSTALL_DIR)/$(PYABC_INSTALL_TARGET)" + endif diff --git a/src/python/pyabc.i b/src/python/pyabc.i index 0bf58288..e258575e 100644 --- a/src/python/pyabc.i +++ b/src/python/pyabc.i @@ -23,9 +23,18 @@ %{ #include +#include + #include #include -#include "utilSignal.h" + +#include +#include +#include +#include +#include +#include +#include int n_ands() { @@ -160,6 +169,56 @@ int n_phases() return pNtk ? Abc_NtkPhaseFrameNum(pNtk) : 1; } +Abc_Cex_t* _cex_get() +{ + Abc_Frame_t* pAbc = Abc_FrameGetGlobalFrame(); + Abc_Cex_t* pCex = Abc_FrameReadCex(pAbc); + + if ( ! pCex ) + { + return NULL; + } + + return Abc_CexDup( pCex, -1 ); +} + +void _cex_put(Abc_Cex_t* pCex) +{ + Abc_Frame_t* pAbc = Abc_FrameGetGlobalFrame(); + + if ( pCex ) + { + pCex = Abc_CexDup(pCex, -1); + } + + Abc_FrameSetCex( Abc_CexDup(pCex, -1) ); +} + +void _cex_free(Abc_Cex_t* pCex) +{ + Abc_CexFree(pCex); +} + +int _cex_n_regs(Abc_Cex_t* pCex) +{ + return pCex->nRegs; +} + +int _cex_n_pis(Abc_Cex_t* pCex) +{ + return pCex->nPis; +} + +int _cex_get_po(Abc_Cex_t* pCex) +{ + return pCex->iPo; +} + +int _cex_get_frame(Abc_Cex_t* pCex) +{ + return pCex->iFrame; +} + static PyObject* pyabc_internal_python_command_callback = 0; void pyabc_internal_set_command_callback( PyObject* callback ) @@ -170,6 +229,8 @@ void pyabc_internal_set_command_callback( PyObject* callback ) pyabc_internal_python_command_callback = callback; } +PyThreadState *_save; + static int pyabc_internal_abc_command_callback(Abc_Frame_t * pAbc, int argc, char ** argv) { int i; @@ -183,6 +244,8 @@ static int pyabc_internal_abc_command_callback(Abc_Frame_t * pAbc, int argc, cha if ( !pyabc_internal_python_command_callback ) return 0; + Py_BLOCK_THREADS + args = PyList_New(argc); for( i=0 ; i0: + return pid, status + + elif pid==0: + return 0,0 + + elif pid == -1 and e == errno.ECHILD: + return 0,0 + + elif pid==-1 and e != errno.EINTR: + raise OSError(e, 'unknown error in wait3()') + +def _sigint_wait_thread_func(fd): + + global _die_flag + + while True: + + _retry_select(fd) + _retry_read(fd) + + with _active_lock: + + if _die_flag: + os._exit(-1) + + _die_flag = True + + for pid in _active_pids: + rc = _posix_kill(pid, signal.SIGINT) + + for fname in _active_temp_files: + os.remove(fname) + + os._exit(-1) + +def _child_wait_thread_func(fd): + + while True: + + _retry_select(fd) + rc = _retry_read(fd) + + with _active_lock: + + while True: + + pid, status = _retry_wait() + + if pid==0: + break + + if pid in _active_pids: + _active_pids.remove(pid) + + _terminated_pids[pid] = status + _terminated_pids_cond.notifyAll() + +_sigint_pipe_read_fd = -1 +_sigint_pipe_write_fd = -1 + +_sigchld_pipe_read_fd = -1 +_sigchld_pipe_write_fd = -1 + +def _start_threads(): + + global _sigint_pipe_read_fd, _sigint_pipe_write_fd + + _sigint_pipe_read_fd, _sigint_pipe_write_fd = os.pipe() + sigint_read = os.fdopen(_sigint_pipe_read_fd, "r", 0 ) + + sigint_wait_thread = threading.Thread(target=_sigint_wait_thread_func, name="SIGINT wait thread", args=(sigint_read,)) + sigint_wait_thread.setDaemon(True) + sigint_wait_thread.start() + + install_sigint_handler(_sigint_pipe_write_fd) + + global _sigchld_pipe_read_fd, _sigchld_pipe_write_fd + + _sigchld_pipe_read_fd, _sigchld_pipe_write_fd = os.pipe() + sigchld_read = os.fdopen(_sigchld_pipe_read_fd, "r", 0 ) + + child_wait_thread = threading.Thread(target=_child_wait_thread_func, name="child process wait thread", args=(sigchld_read,)) + child_wait_thread.setDaemon(True) + child_wait_thread.start() + + install_sigchld_handler(_sigchld_pipe_write_fd) + +_close_on_fork = [] + +def close_on_fork(fd): + _close_on_fork.append(fd) + +def after_fork(): + + _set_death_signal() + + global _close_on_fork + + for fd in _close_on_fork: + os.close(fd) + + _close_on_fork = [] + + os.close(_sigint_pipe_read_fd) + os.close(_sigint_pipe_write_fd) + + os.close(_sigchld_pipe_read_fd) + os.close(_sigchld_pipe_write_fd) + + global _active_lock + _active_lock = threading.Lock() + + global _terminated_pids_cond + _terminated_pids_cond = threading.Condition(_active_lock) + + global _terminated_pids + _terminated_pids = {} + + global _active_pids + _active_pids = set() + + global _active_temp_files + _active_temp_files = set() + + _start_threads() + +class _sigint_block_section(object): + def __init__(self): + self.blocked = False + + def __enter__(self): + block_sigint() + self.blocked = True + + def __exit__(self, type, value, traceback): + self.release() + + def release(self): + if self.blocked: + self.blocked = False + unblock_sigint() + +_old_os_fork = os.fork + +def _fork(): + + ppid = os.getpid() + + with _sigint_block_section() as cs: + + with _active_lock: + + if _die_flag: + os._exit(-1) + + pid = _old_os_fork() + + if pid == 0: + after_fork() + + if pid > 0: + _active_pids.add(pid) + + return pid + +def _waitpid(pid, options=0): + + while True: + + with _active_lock: + + if pid in _terminated_pids: + status = _terminated_pids[pid] + del _terminated_pids[pid] + return pid, status + + if options==os.WNOHANG: + return 0, 0 + + _terminated_pids_cond.wait() + +def _wait(options=0): + + while True: + + with _active_lock: + + for pid, status in _terminated_pids.iteritems(): + del _terminated_pids[pid] + return pid, status + + if options==os.WNOHANG: + return 0, 0 + + _terminated_pids_cond.wait() + +_old_os_kill = os.kill + +def _kill(pid, sig): + + with _active_lock: + + if pid in _terminated_pids: + return None + + return _old_os_kill(pid,sig) + +os.kill = _kill +os.fork = _fork +os.wait = _wait +os.waitpid = _waitpid + +def _split_command_line(cmd): + + args = [] + + i=0 + + while i= len(cmd): + break + + arg = [] + + in_quotes = None + + while i': + + with open(args[-1],'w') as fout: + p = subprocess.Popen(args[:-2], stdout=fout) + rc = p.wait() + return rc + + else: + p = subprocess.Popen(args) + return p.wait() + +def tmpfile(prefix, suffix): + + with _active_lock: + with tempfile.NamedTemporaryFile(delete=False, prefix=prefix, suffix=suffix) as file: + _active_temp_files.add(file.name) + return file.name + +def tmpfile_remove(fname, leave): + + with _active_lock: + os.remove(fname) + _active_temp_files.remove(fname) + +pyabc_internal_set_util_callbacks( system, tmpfile,tmpfile_remove ) + + +_start_threads() + + _registered_commands = {} def _cmd_callback(args): @@ -309,7 +974,8 @@ def _cmd_callback(args): return res except Exception, e: - print "Python error: ", e + import traceback + traceback.print_exc() except SystemExit, se: pass @@ -322,14 +988,11 @@ def add_abc_command(fcmd, group, cmd, change): _registered_commands[ cmd ] = fcmd pyabc_internal_register_command( group, cmd, change) -import sys import optparse -import os.path -import __main__ +xxx = {} def cmd_python(cmd_args): - global __main__ usage = "usage: %prog [options] " @@ -345,7 +1008,7 @@ def cmd_python(cmd_args): return 0 if options.cmd: - exec options.cmd in __main__.__dict__ + exec options.cmd in xxx return 0 scripts_dir = os.getenv('ABC_PYTHON_SCRIPTS', ".") @@ -353,12 +1016,12 @@ def cmd_python(cmd_args): for fname in args[1:]: if os.path.isabs(fname): - execfile(fname, __main__.__dict__) + execfile(fname, xxx) else: for d in scripts_dirs: fname = os.path.join(scripts_dir, fname) if os.path.exists(fname): - execfile(fname, __main__.__dict__) + execfile(fname, xxx) break return 0 diff --git a/src/python/pyabc_split.py b/src/python/pyabc_split.py index 431a87a8..b889d857 100644 --- a/src/python/pyabc_split.py +++ b/src/python/pyabc_split.py @@ -15,7 +15,6 @@ Caveats: 1. Global variables in the parent process are not affected by the child processes. 2. The functions can only return simple types, see the pickle module for details -3. Signals are currently not handled correctly Usage: @@ -91,47 +90,6 @@ from contextlib import contextmanager import pyabc -def _waitpid(pid, flags): - while True: - try: - res = os.waitpid(pid, flags) - return res - except OSError as e: - if e.errno != errno.EINTR: - raise - -def _wait(): - while True: - try: - pid,rc = os.wait() - return pid, rc - except OSError as e: - if e.errno != errno.EINTR: - raise - except Exceptions as e: - raise - -class _sigint_critical_section(object): - def __init__(self): - self.blocked = False - - def __enter__(self): - self.acquire() - return self - - def __exit__(self, type, value, traceback): - self.release() - - def acquire(self): - if not self.blocked: - self.blocked = True - pyabc.block_sigint() - - def release(self): - if self.blocked: - self.blocked = False - pyabc.restore_sigint_block() - class _splitter(object): def __init__(self, funcs): @@ -144,18 +102,19 @@ class _splitter(object): return len(self.fds) == 0 def cleanup(self): - # close pipes and kill child processes for pid,(i,fd) in self.fds.iteritems(): os.close(fd) - os.kill( pid, signal.SIGINT ) + try: + os.kill( pid, signal.SIGINT ) + except Exception as e: + print >>sys.stderr, 'exception while trying to kill pid=%d: '%pid, e + raise - with _sigint_critical_section() as cs: # wait for termination and update result - for pid, _ in self.fds.iteritems(): - _waitpid( pid, 0 ) - pyabc.remove_child_pid(pid) - self.results[pid] = None + for pid, _ in self.fds.iteritems(): + os.waitpid( pid, 0 ) + self.results[pid] = None self.fds = {} @@ -179,22 +138,20 @@ class _splitter(object): try: - with _sigint_critical_section() as cs: - # create child process - pid = os.fork() - - if pid == 0: - # child process: - pyabc.reset_sigint_handler() - cs.release() - os.close(pr) - rc = self.child( pw, f) - os._exit(rc) - else: - # parent process: - pyabc.add_child_pid(pid) - os.close(pw) - return (pid, pr) + # create child process + pid = os.fork() + + if pid == 0: + # child process: + os.close(pr) + pyabc.close_on_fork(pw) + + rc = self.child( pw, f) + os._exit(rc) + else: + # parent process: + os.close(pw) + return (pid, pr) finally: if os.getpid() != parentpid: @@ -209,12 +166,9 @@ class _splitter(object): def get_next_result(self): # wait for the next child process to terminate - pid, rc = _wait() + pid, rc = os.wait() assert pid in self.fds - with _sigint_critical_section() as cs: - pyabc.remove_child_pid(pid) - # retrieve the pipe file descriptor1 i, fd = self.fds[pid] del self.fds[pid] -- cgit v1.2.3