Source code for syconn.multi_proc.multi_proc_main

# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max-Planck-Institute for Medical Research, Heidelberg, Germany
# Authors: Sven Dorkenwald, Philipp Schubert, Jörgen Kornfeld

import cPickle as pkl
import getpass
from multiprocessing import cpu_count, Process
import multiprocessing.pool
import numpy as np
import os
import re
import shutil
import string
import subprocess
import sys
import time

__QSUB__ = True
try:
    with open(os.devnull, 'w') as devnull:
        subprocess.check_call('qstat', shell=True,
                                stdout=devnull, stderr=devnull)
except subprocess.CalledProcessError:
    print "QSUB not found, switching to single node multiprocessing."
    __QSUB__ = False

home_dir = os.environ['HOME'] + "/"
path_to_scripts_default = os.path.dirname(__file__)
qsub_work_folder = "%s/QSUB/" % home_dir
subp_work_folder = "%s/SUBP/" % home_dir
username = getpass.getuser()
python_path = sys.executable


[docs]def negative_to_zero(a): if a > 0: return a else: return 0
[docs]def QSUB_script(params, name, queue=None, pe=None, n_cores=1, sge_additional_flags='', suffix="", job_name="default", script_folder=None): """ QSUB handler - takes parameter list like normal multiprocessing job and runs them on the specified cluster IMPORTANT NOTE: the user has to make sure that queues exist and work; we suggest to generate multiple queues handling different workloads Parameters ---------- params: list list of all parameter sets to be processed name: str name of job - specifies script with QSUB_%s % name queue: str or None queue name pe: str parallel environment name sge_additional_flags: str additional command line flags to be passed to qsub suffix: str suffix for folder names - enables the execution of multiple qsub jobs for the same function job_name: str unique name for job - or just 'default' which gets changed into a random name automatically script_folder: str or None directory in which the QSUB_* file is located Returns ------- path_to_out: str path to the output directory """ if job_name == "default": letters = string.ascii_lowercase job_name = "".join([letters[l] for l in np.random.randint(0, len(letters), 10)]) print "Random job_name created: %s" % job_name else: print "WARNING: running multiple jobs via qsub is only supported " \ "with non-default job_names" if len(job_name) > 10: print "WARNING: Your job_name is longer than 10. job_names have " \ "to be distinguishable with only using their first 10 characters." if script_folder is not None: path_to_scripts = script_folder else: path_to_scripts = path_to_scripts_default if os.path.exists(qsub_work_folder+"/%s_folder%s/" % (name, suffix)): shutil.rmtree(qsub_work_folder+"/%s_folder%s/" % (name, suffix)) path_to_script = path_to_scripts + "/QSUB_%s.py" % (name) path_to_storage = qsub_work_folder+"/%s_folder%s/storage/" % (name, suffix) path_to_sh = qsub_work_folder+"/%s_folder%s/sh/" % (name, suffix) path_to_log = qsub_work_folder+"/%s_folder%s/log/" % (name, suffix) path_to_err = qsub_work_folder+"/%s_folder%s/err/" % (name, suffix) path_to_out = qsub_work_folder+"/%s_folder%s/out/" % (name, suffix) if pe is not None: sge_queue_option = "-pe %s %d" % (pe, n_cores) elif queue is not None: sge_queue_option = "-q %s" % queue else: raise Exception("No queue or parallel environment defined") if not os.path.exists(path_to_storage): os.makedirs(path_to_storage) if not os.path.exists(path_to_sh): os.makedirs(path_to_sh) if not os.path.exists(path_to_log): os.makedirs(path_to_log) if not os.path.exists(path_to_err): os.makedirs(path_to_err) if not os.path.exists(path_to_out): os.makedirs(path_to_out) print "Number of jobs:", len(params) time_start = time.time() for ii in range(len(params)): this_storage_path = path_to_storage+"job_%d.pkl" % ii this_sh_path = path_to_sh+"job_%d.sh" % ii this_out_path = path_to_out+"job_%d.pkl" % ii job_log_path = path_to_log + "job_%d.log" % ii job_err_path = path_to_err + "job_%d.log" % ii with open(this_sh_path, "w") as f: f.write("#!/bin/bash\n") f.write("{0} {1} {2} {3}".format(python_path, path_to_script, this_storage_path, this_out_path)) with open(this_storage_path, "wb") as f: for param in params[ii]: pkl.dump(param, f) os.chmod(this_sh_path, 0744) subprocess.call("qsub {0} -o {1} -e {2} -N {3} {4} {5}".format( sge_queue_option, job_log_path, job_err_path, job_name, sge_additional_flags, this_sh_path), shell=True) print "All jobs are submitted: %s" % name while True: process = subprocess.Popen("qstat -u %s" % username, shell=True, stdout=subprocess.PIPE) nb_lines = 0 for line in iter(process.stdout.readline, ''): if job_name[:10] in line: nb_lines += 1 if nb_lines == 0: sys.stdout.write('\rAll jobs were finished in %.2fs\n' % (time.time()-time_start)) break else: progress = 100*(len(params) - negative_to_zero(nb_lines))/\ float(len(params)) sys.stdout.write('\rProgress: %.2f%% in %.2fs' % (progress, time.time()-time_start)) sys.stdout.flush() time.sleep(5.) return path_to_out
[docs]def SUBP_script(params, name, suffix="", delay=0): """ Runs multiple subprocesses on one node at the same time - no load balancing, all jobs get executed right away (or with a specified delay) Parameters ---------- params: list list of all paramter sets to be processed name: str name of job - specifies script with QSUB_%s % name suffix: str suffix for folder names - enables the execution of multiple subp jobs for the same function delay: int delay between executions in seconds Returns ------- path_to_out: str path to the output directory """ if os.path.exists(subp_work_folder + "/%s_folder%s/" % (name, suffix)): shutil.rmtree(subp_work_folder + "/%s_folder%s/" % (name, suffix)) path_to_script = path_to_scripts_default + "/QSUB_%s.py" % (name) path_to_storage = subp_work_folder + "/%s_folder%s/storage/" % (name, suffix) path_to_out = subp_work_folder + "/%s_folder%s/out/" % (name, suffix) if not os.path.exists(path_to_storage): os.makedirs(path_to_storage) if not os.path.exists(path_to_out): os.makedirs(path_to_out) processes = [] for ii in range(len(params)): this_storage_path = path_to_storage + "job_%d.pkl" % ii this_out_path = path_to_out + "job_%d.pkl" % ii with open(this_storage_path, "wb") as f: for param in params[ii]: pkl.dump(param, f) p = subprocess.Popen("%s %s %s %s" % (python_path, path_to_script, this_storage_path, this_out_path), shell=True) processes.append(p) time.sleep(delay) for p in processes: p.wait() return path_to_out
[docs]def delete_jobs_by_name(job_name): """ Deletes a group of jobs that have the same name Parameters ---------- job_name: str job_name as shown in qstats Returns ------- """ process = subprocess.Popen("qstat -u %s" % username, shell=True, stdout=subprocess.PIPE) job_ids = [] for line in iter(process.stdout.readline, ''): if job_name[:10] in line: job_ids.append(re.findall("[\d]+", line)[0]) command = "qdel " for job_id in job_ids: command += job_id + ", " command = command[:-2] process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
[docs]def start_multiprocess(func, params, debug=False, verbose=False, nb_cpus=None): """ Parameters ---------- func : function params : function parameters debug : boolean verbose : bool nb_cpus : int Returns ------- result: list list of function returns """ # found NoDaemonProcess on stackexchange by Chris Arndt - enables # multprocessed grid search with gpu's class NoDaemonProcess(Process): # make 'daemon' attribute always return False def _get_daemon(self): return False def _set_daemon(self, value): pass daemon = property(_get_daemon, _set_daemon) # We sub-class multi_proc.pool.Pool instead of multi_proc.Pool # because the latter is only a wrapper function, not a proper class. class MyPool(multiprocessing.pool.Pool): Process = NoDaemonProcess if nb_cpus is None: nb_cpus = max(cpu_count(), 1) if debug: nb_cpus = 1 if verbose: print "Computing %d parameters with %d cpus." % (len(params), nb_cpus) start = time.time() if not debug: pool = MyPool(nb_cpus) result = pool.map(func, params) pool.close() pool.join() else: result = map(func, params) if verbose: print "\nTime to compute grid:", time.time() - start return result