Source code for hpc_scheduler.scheduler

#! /usr/bin/python
# coding: utf-8
"""Scheduler

Classes and methods in :mod:`Scheduler` should create jobscripts
for different schedulers and help submitting and checking them. 

"""

import logging
import os
import subprocess
from configobj import ConfigObj
from string import Template
import datetime
import re
import sys

#logging.basicConfig(level=logging.INFO)

# slurm job template
SLURM_TEMPLATE = \
"""@job_shell
#SBATCH --job-name=@job_name                       # Specify job name
#SBATCH --partition=@partition                     # Specify partition name
#SBATCH --ntasks=@ntasks                           # Specify max. number of tasks to be invoked
#SBATCH --mem-per-cpu=@mem_per_cpu                 # Specify real memory required per CPU in MegaBytes
#SBATCH --time=@time                               # Set a limit on the total run time
#SBATCH --mail-type=@mail_type                     # Notify user by email in case of job failure
#SBATCH --account=@account                         # Charge resources on this project account
#SBATCH --output=@{log_dir}/@{job_name}.o%j        # File name for standard output
#SBATCH --error=@{log_dir}/@{job_name}.o%j         # File name for standard error output
"""

SLURM_COMMENT = \
"""

# job script created by PyRemo Job Scheduler

""" + \
"# "+str(datetime.datetime.now())+"\n"


# define job states
UNKNOWN   = -2
FAILED    = -1
COMPLETED =  0
RUNNING   =  1
PENDING   =  2

LOGLEV = {
  COMPLETED   :logging.getLevelName('INFO'),
  FAILED      :logging.getLevelName('ERROR'),
  UNKNOWN     :logging.getLevelName('WARNING'),
  PENDING     :logging.getLevelName('INFO'),
  RUNNING     :logging.getLevelName('INFO')
}

### SLURM definitions ####
SLURM_STATES   = {'FAILED'   : FAILED,   
                  'COMPLETED': COMPLETED,
                  'RUNNING'  : RUNNING ,
                  'PENDING'  : PENDING  }

SLURM_DEFAULTS = {'job_name'    :'unknown',
                  'partition'   :'',
                  'ntasks'      :'1',\
                  'mem_per_cpu' :'1280',
                  'mail_type'   :'FAIL',
                  'account'     :'',
                  'time'        :'',
                  'log_dir'     :''}

SLURM_CONTROL = ['StdErr','StdOut','WorkDir','JobName','Command'] 

### known Schedulers ####
SCHEDULER     = {'SLURM':{'batch'     :'sbatch --parsable',\
                          'accounting':'sacct --parsable2 --format=jobid,elapsed,ncpus,ntasks,state,end,jobname -j',
                          'control'   :'scontrol show jobid -dd',
                          'tpl'       :SLURM_TEMPLATE, 
                          'states'    :SLURM_STATES,
                          'comment'   :SLURM_COMMENT,
                          'ctr_list'  :SLURM_CONTROL,
                          'defaults'  :SLURM_DEFAULTS} 
                }

# pattern to search for in logfiles
ERROR_PATTERN = ['error','failed','exception', 'not found']


[docs]class Job(): """Class to hold job information Written by Lars Buntemeyer Last modified: 06.02.2019 """ def __init__(self, sys, jobname='', jobscript=None, jobid=-1, tpl=None, commands='', header_dict={}, delimiter='@', control={}): """Determines the appropriate job commands and templates. A job can be created from a template, header dictionary and commands to write and submit a job script, or by defining a jobid to create a job from an existing scheduler job. Therefor, the job definition is hold quite general and requires only the sys argument for creating. **Arguments:** *sys:* The batch scheduler implementation. *jobname:* The batch scheduler implementation. (default: *None*) *jobscript:* A filename for the jobscript. (default: *None*) *jobid:* JobID in the Scheduler. (default: -1) *tpl:* Template file for creating a jobscript. (default: *None*) *commands:* Text containing commands for the jobscript. (default: '') *header_dict:* Dictionary to fill the template header (default: *None*) **Raises:** *Exception:* If the scheduler implementation is unknown """ if sys not in SCHEDULER: print('Unknown scheduler implementation, must be one of: '+\ str(SCHEDULER.keys())) raise Exception('Unknown sys: '+sys) self.batch_command = SCHEDULER[sys]['batch'] self.acct_command = SCHEDULER[sys]['accounting'] self.contr_command = SCHEDULER[sys]['control'] self.header_dict = SCHEDULER[sys]['defaults'] self.comment = SCHEDULER[sys]['comment'] self.ctr_list = SCHEDULER[sys]['ctr_list'] self.jobname = jobname self.jobscript = jobscript self.tpl = tpl self.jobid = jobid self.commands = commands self.delimiter = delimiter self.control = control if not self.tpl : self.tpl = SCHEDULER[sys]['tpl'] if not self.jobscript: self.jobscript = os.path.join(os.getcwd(), jobname+'.sh') self._init_tpl() self.header_dict['job_name'] = self.jobname self.header_dict['job_shell'] = '#!/bin/sh' if header_dict: self.header_dict.update(header_dict) def __eq__(self, other): if self.jobname == other.jobname: return True else: return False def _init_tpl(self): class TmpTemplate(Template): delimiter = self.delimiter if type(self.tpl) is str: if os.path.isfile(self.tpl): self.tpl = TmpTemplate( open(self.tpl).read() ) else: self.tpl = TmpTemplate( self.tpl ) elif type(self.tpl) is not type(Template): raise Exception('unknown type of job template, must be either '+ 'Template or text or valid filename')
[docs] def submit(self, write=False): if write: self.write_jobscript() output = subprocess.Popen(self.batch_command.split()+[self.jobscript], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout,stderr = output.communicate() logging.debug('stdout '+str(stdout)) self.jobid = int(stdout) if not stderr: logging.info('submitted jobscript: '+self.jobscript) logging.debug('jobid: '+str(self.jobid)) self.parse_control() for entry in self.control: logging.debug('control: '+str(entry)+': '+str(self.control[entry])) else: logging.error('submitted jobscript: '+self.jobscript) logging.error('stderr'+str(self.stderr)) return self.jobid
[docs] def write_jobscript(self, header_dict=None): if header_dict: self.header_dict.update(header_dict) fills = self.header_dict logging.info('writing: '+self.jobscript) content = self.tpl.substitute(fills) content += self.comment content += self.commands with open(self.jobscript, "w") as script: script.write(content) return self.jobscript
[docs] def get_acct(self): jobid = self.jobid command = self.acct_command.split(' ')+[str(jobid)] output = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout,stderr = output.communicate() lines = stdout.splitlines() header = lines[0].decode().split('|') data = lines[1].decode().split('|') acct = {} for title,entry in zip(header,data): acct[title] = entry logging.debug(str(jobid)+': '+str(acct)) return acct
[docs] def parse_control(self): jobid = self.jobid command = self.contr_command.split(' ')+[str(jobid)] output = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout,stderr = output.communicate() logging.debug(stdout,stderr) lines = stdout.splitlines() control = {} for line in lines: slines = line.split() for sline in slines: data = sline.split(b'=') if len(data)==2: control[data[0]] = data[1] self.control = control return control
[docs] def get_log(self): # basic info job_log = {'jobid' : self.jobid, \ 'jobscript': self.jobscript} # add some info from control command for ctr in self.control: if ctr in self.ctr_list: job_log[ctr] = self.control[ctr] return job_log
[docs] def grep_log_err(self): error = [] errlog = self.control['StdErr'] if 'StdErr' in self.control else None if errlog and os.path.isfile(errlog): with open(errlog) as origin_file: for line in origin_file: for pattern in ERROR_PATTERN: if re.search(pattern, line, re.IGNORECASE): error.append(line.rstrip()) break return error
[docs]class Scheduler(): """Class to interact with an HPC job scheduler Written by Lars Buntemeyer Last modified: 06.02.2019 """ def __init__(self, sys, name='', tpl=None, logfile='', \ job_list = [], header_dict={}): """Determines the appropriate scheduler commands and templates. **Arguments:** *sys:* The batch scheduler implementation *tpl:* Template file for the scheduler (default: *None*) *logfile* A file in ini style holding jobid information (default: *None*) *jobids* A dictionary holding {jobname:jobid} (default: *None*) *header_dict* A default header dictionary for a new job (default: *None*) *job_list* A list of type Job (default: *None*) **Raises:** *Exception:* If the scheduler implementation is unknown """ self.STAT_STR = '{:<48} | {:>8} | {:<16} | {:<24} | {:<24}' if sys not in SCHEDULER: print('Unknown scheduler implementation, must be one of: '+\ str(SCHEDULER.keys())) raise Exception('Unknown sys: '+sys) self.STAT = SCHEDULER[sys]['states'] self.sys = sys self.name = name self.logfile = logfile self.job_list = job_list self.job_log = ConfigObj(self.logfile) self.batch = SCHEDULER[sys]['batch'] self.acct = SCHEDULER[sys]['accounting'] self.tpl = SCHEDULER[sys]['tpl'] self.header_dict = header_dict if tpl : self.tpl = tpl if self.logfile: self._read_job_log() def _init_job_list(self): """Initiate the job list from scheduler log file Written by Lars Buntemeyer Last changes 06.02.2019 """ self.job_list = [] for jobname in self.job_log: jobid = self.job_log[jobname]['jobid'] jobscript = self.job_log[jobname]['jobscript'] self.job_list.append(Job(self.sys,jobname=jobname,\ jobid=jobid,jobscript=jobscript, \ control=self.job_log[jobname])) def _read_job_log(self): """Reads jobnames and ids from an ini-file. Written by Lars Buntemeyer Last changes 06.02.2019 """ logging.debug('reading jobids: '+str(self.logfile)) self.job_log = ConfigObj(self.logfile) self._init_job_list() def _write_job_log(self): """Writes jobnames and ids to an ini-file. Written by Lars Buntemeyer Last changes 06.02.2019 """ logging.debug('writing jobids to '+str(self.job_log.filename)) for job in self.job_list: self.job_log[job.jobname] = job.get_log() self.job_log.write()
[docs] def create_job(self,jobname,jobscript,commands='', header_dict={},write=True): header=self.header_dict if header_dict: header.update(header_dict) # prefix the jobname with the scheduler's name job = Job(self.sys,jobname=jobname,jobscript=jobscript, commands=commands,header_dict=header,tpl=self.tpl) self.update_job_list(job) if write: job.write_jobscript() return job
[docs] def update_job_list(self,job): # update job list, if job alread in list if job in self.job_list: self.job_list = [job if job==x else x for x in self.job_list] else: self.add_job(job)
[docs] def add_job(self,job): self.job_list.append(job)
[docs] def get_job(self,jobname): job = next((j for j in self.job_list if j.jobname == jobname), None) return job
[docs] def submit(self,jobname=None): if jobname: self.get_job(jobname).submit() else: for job in self.job_list: job.submit() self._write_job_log()
[docs] def write_jobscripts(self): for job in self.job_list: logging.debug('jobname '+job.jobname) job.write_jobscript()
[docs] def get_jobids(self): """Returns a dict containing {jobname:jobid} **Returns:** *jobids:* A dict containing {jobname:jobid}. Written by Lars Buntemeyer Last changes 06.02.2019 """ self._read_jobids() return self.jobids
[docs] def read_jobids(self): pass
[docs] def get_job_list(self, filters=[]): job_list = self.job_list #filter job list by filtering jobnames if filters: job_list = [job for job in self.job_list if any (filter in job.jobname for filter in filters)] return job_list
[docs] def get_jobs_acct(self, filters=[]): """Returns a dict containing job accounting **Arguments:** *filters:* List of strings to filter jobnames before accessing the scheduler database. **Returns:** *jobs_acct:* A dict containing job accounting information, in the form, e.g.: {jobname:{'State':state,'JobID':jobid,...}}. Written by Lars Buntemeyer Last changes 06.02.2019 """ jobs_acct = {} job_list = self.get_job_list(filters=filters) #filter job list by filtering jobnames for job in job_list: jobs_acct[job.jobname] = job.get_acct() return jobs_acct
[docs] def resubmit(self,states=[]): jobs_acct = self.get_jobs_acct() submit_jobs = {} for jobname in jobs_acct: logging.debug('jobname: '+jobname) if jobs_acct[jobname]['State'] in states: self.get_job(jobname).submit() self._write_job_log()
[docs] def log_jobs_acct(self, filters=None): """Logs job accounting information. **Arguments:** *filters:* A string to filter jobnames. Written by Lars Buntemeyer Last changes 06.02.2019 """ counter = {i:0 for i in range(-2,3)} jobs_acct = self.get_jobs_acct(filters=filters) logging.info(self.STAT_STR.format('StdErr','JobID','State','End','Preview')) for jobname in jobs_acct: job = self.get_job(jobname) jobid = jobs_acct[jobname]['JobID'] state = jobs_acct[jobname]['State'] end = jobs_acct[jobname]['End'] stateid = self.STAT[state] if state in self.STAT else UNKNOWN counter[stateid]+=1 logfile = '' error = [] if 'StdErr' in job.control: logfile = os.path.basename(job.control['StdErr']) # if stateid in [FAILED,UNKNOWN]: error = job.grep_log_err() message = self.STAT_STR.format(logfile,jobid,state,end,'|'.join(error)) logging.log(LOGLEV[stateid],message) invert = {v: k for k, v in self.STAT.items()} for i in invert: # log errors only if counter > 0 if counter[i]>0 or i>=0: logging.log(LOGLEV[i], str(counter[i])+' jobs '+ invert[i] )