Source code for lfd.createjobs.createjobs

"""Job class is the interface to the writer which stores all parameters
required to sucessfully populate the templte. It also wraps additional
functionality such as directory and script IO that will try and prevent
overwriting of previously created scripts.

"""
import os
import warnings

import numpy as np

from lfd.detecttrails.sdss import files
from . import writer
from lfd.results import Event, Frame

__all__ = ["Jobs"]

#from lfd.gui.utils import expandpath

def expandpath(path):
    if path is not None and path != "":
        if path[0] == "~":
            path = os.path.expanduser(path)
        if os.path.exists(path):
            path = os.path.abspath(path)
            return (True, path)
    return (False, None)


[docs]class Jobs: """Class that holds all the important functions for making Qsub jobs. Template is located inside this package in "createjobs" folder under the name "generic". Location where final results are saved on Fermi cluster by default is:: /home/fermi/$user/$res_path/$JOB_ID. Can be changed by editing the template or providing a new one. One can also be provided in string format as a kwargs named "template". Parameters ---------- n : int number of jobs you want to start. save_path : str path to directory where jobs will be stored. By default set to ~/Desktop/createjobs res_path : str path to subdirectory on cluster master where results will be copied once the job is finished. template_path : str path to the desired template template : str a full template text as a string queue : str sets the QSUB queue type: serial, standard or parallel. Defaults to standard. Your local QSUB setup will limit wallclock, cputime and queue name differently than assumed here. wallclock : str set maximum wallclock time allowed for a job in hours. Default: 24:00:00 cputime : str set maximum cpuclock time allowed for a job in hours. Default: 48:00:00 ppn : str maximum allowed processors per node. Default: 3 command : str command that will be invoked by the job. Default: python -c "import detect_trails as dt; dt.DetectTrails($).process()" where "$" gets expanded depending on kwargs. **kwargs : dict named parameters that will be forwarded to command. Allow for different targeting of data. See documentation for examples runs: if runs are not specified, all SDSS runs found in runlist.par file will be used. If runs is a list of runs only those runs will be sorted into jobs. If runs is a list of Event or Frame instances, only those frames will be sorted into jobs. See docs on detailed usage. """ def __init__(self, n, runs=None, template_path=None, save_path=None, queue="standard", wallclock="24:00:00", ppn="3", cputime="48:00:00", pernode=False, command = ('python3 -c "import detecttrails as dt; ' 'dt.DetectTrails($).process()"\n'), res_path="run_results", **kwargs): self.n = n self.ppn = ppn self.runs = runs self.pick = None self.queue = queue self.cputime = cputime self.pernode = pernode self.command = command self.wallclock = wallclock self.kwargs = kwargs # Paths are harder to set up. There are three paths to track: where to # save DQS scripts (save_path), where to find template path # (template_path) and into which folder to copy the results on the # cluster after processing (res_path) self.res_path = res_path self.__init__createjobs_folder(save_path) self.__init__template(template_path, kwargs.pop("template", None)) # set the initial value of self.pick if at all possible self._findKwargs() def __init__createjobs_folder(self, save_path): """Creates the createjobs folder at the given location, the current directory by default. In createjobs folder a new directory is opened for each set of jobs produced to avoid overwriting each other. """ # this will be the location of invocation, not the dir of this file curpath = os.path.abspath(os.path.curdir) cjpath = os.path.join(curpath, "createjobs/") tmppath = cjpath if save_path is not None: tmppath = os.path.expanduser(save_path) if os.path.split(tmppath)[-1] == "createjobs": tmppath = os.path.join(tmppath, "jobs") if os.path.isdir(tmppath) and len(os.listdir(tmppath)) == 0: pass else: os.makedirs(tmppath) self.save_path = tmppath def __init__template(self, template_path, template): """Verifies if the provided path to template is correct or not and loads the template. If the provided template path is incorrect it loads the default template provided with this module. If a whole template is provided as a string, it will use that string as the template. """ # this will be the dir of this file default_path = os.path.split(__file__)[0] default_path = os.path.join(default_path, "generic") if template_path is not None: tmppath = expandpath(template_path) else: tmppath = default_path if os.path.isfile(tmppath): self.template_path = tmppath else: warnings.Warning(("Could not open the given template. Check it " "exists, is not a folder, is not corrupt. Using " "the default template provided with the module.")) self.template_path = os.path.join(cjpath, "generic") if template is None: self.template = open(self.template_path).read() else: self.template = template
[docs] def getAllRuns(self): """Returns a list of all runs found in runlist.par file.""" rl = files.runlist() indices = np.where(rl["rerun"] == b"301") rl = rl[indices] return rl["run"]
[docs] def makeRunlst(self, runs=None): """ Create a runlst from a list of runs or Results instance. Recieves a list of runs: [N1,N2,N3,N4,N5...] and returns a runlst:: [ (N1, N2...N( n_runs / n_jobs)) # idx = 0 ... (N1, N2...N( n_runs / n_jobs)) # idx = n_jobs ] Runlst is a list of lists. Inner lists contain runs that will be executed in a single job. Lenght of outter list matches the number of jobs that will be started, f.e.:: runls = list( (2888, 2889, 2890) (3001, 3002, 3003) ) will start 2 jobs (job0.dqs, job1.dqs), where job0.dqs will call DetectTrails.process on 3 runs: 2888, 2889, 2890. If (optionally) a list of runs is supplied a run list will be produced fom that list, instead of the runs attribute. """ runs = self.runs if runs is None else runs whole, remain = divmod(len(runs), self.n) nruns = whole if remain == 0 else whole+1 njobs = int(np.ceil(len(runs)/nruns)) return [runs[i:i+nruns] for i in range(0, len(runs), nruns)]
def _createBatch(self, njobs): """ Writes a batch.sh script that contians qsub job#.dqs for each job found in runlst. Created automatically if you use create or createAll methods. """ newbatch = open(self.save_path+"/batch.sh", "w") for i in range(0, njobs): newbatch.writelines("qsub job"+str(i)+".dqs\n") newbatch.close def _findKwargs(self): """ Works out what kwargs, if any were sent. If camcol and/or filter was sent it adds them as instance attributes. It always adds a "pick" instance attribute that describes the parameters sent. 'pick0 attribute is used by writeJob from writer module expand command attribute to appropriate string. METHOD DOESN'T GET CALLED UNTILL CREATE FUNCTION! This avoids potential problems, i.e. if the user sets runs attribute after initialization but makes it impossible to determine the 'pick' attribute value untill execution. """ kwargs = self.kwargs if 'camcol' in kwargs: if kwargs['camcol'] not in (1, 2, 3, 4, 5, 6): raise ValueError("Nonexisting camcol") self.camcol = kwargs['camcol'] self.pick = 'RunCamcol' if 'filter' in kwargs: if kwargs['filter'] not in ('u', 'g', 'r', 'i', 'z'): raise ValueError("Nonexistting filter") self.filter = kwargs['filter'] if 'camcol' in kwargs: self.pick = 'RunCamcolFilter' else: self.pick = 'RunFilter' # this will be true even when runs is a list of results, but in that # case pick will be re-picked in the following if. Effectively this # means that "run" is the default pick. if all(["camcol" not in kwargs, "filter" not in kwargs]): self.pick="Run" if isinstance(self.runs, list) and (isinstance(self.runs[0], Event) or isinstance(self.runs[0], Frame)): self.pick="Results"
[docs] def create(self): """Creates job#.dqs files from runlst. runlst is a list(list()) in which inner list contains all runs per job. Length of outter list is the number of jobs started. See class help. """ # Continuously updating all the class attributes based on the exact # settings chosen in real-time is very ugly. So we force update all # attributes that could have changed (i.e. self.pick, self.runs, job # partitioning stats etc.) self._findKwargs() if not self.runs: print("There are no runs to create jobs from. Creating jobs"+\ " for all runs in runlist.par file.") self.runs = self.getAllRuns() whole, remain = divmod(len(self.runs), self.n) nruns = whole if remain == 0 else whole+1 njobs = int(np.ceil(len(self.runs)/nruns)) print("Creating: \n "\ " {} jobs with {} runs per job \n"\ " Queue: {} \n"\ " Wallclock: {} \n"\ " Cputime: {} \n"\ " Ppn: {} \n"\ " Path: {}".format(njobs, nruns, self.queue, self.wallclock, self.cputime, self.ppn, self.save_path) ) writer.writeJob(self) self._createBatch(njobs)