Source code for ecsjobs.config

The latest version of this package is available at:

Copyright 2017 Jason Antman <> <>

    This file is part of ecsjobs, also known as ecsjobs.

    ecsjobs is free software: you can redistribute it and/or modify
    it under the terms of the GNU Affero General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    ecsjobs is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    GNU Affero General Public License for more details.

    You should have received a copy of the GNU Affero General Public License
    along with ecsjobs.  If not, see <>.

The Copyright and Authors attributions contained herein may not be removed or
otherwise altered, except to add the Author attribution of a contributor to
this work. (Additional Terms pursuant to Section 7b of the AGPL v3)
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.

Jason Antman <> <>

import os
import logging
import glob
from copy import copy, deepcopy
from datetime import datetime

import yaml
import boto3

from import get_job_classes
from ecsjobs.schema import Schema

logger = logging.getLogger(__name__)

[docs]class Config(object): #: File extensions to consider as YAML config files. YAML_EXTNS = ['.yml', '.yaml'] #: Default values for global configuration settings. _global_defaults = { 'inter_poll_sleep_sec': 10, 'max_total_runtime_sec': 3600, 'email_subject': 'ECSJobs Report', 'failure_html_path': None, 'failure_command': None } def __init__(self): self.s3 = boto3.resource('s3') self._raw_conf = {} self._global_conf = {} self._jobs = [] self._load_config() self._validate_config() self._make_jobs() @property def schedule_names(self): """ Return a list of all String schedule names defined in the config. :return: all defined schedule names :rtype: list """ return sorted(list(set([j.schedule_name for j in self._jobs])))
[docs] def jobs_for_schedules(self, schedule_names): """ Given one or more schedule names, return the list of jobs for those schedules (in order). :param schedule_names: schedule names to get jobs for :type schedule_names: list :return: list of Jobs for the specified schedules :rtype: list """ return [j for j in if j.schedule_name in schedule_names]
@property def jobs(self): """ Return the list of :py:class:`` instances. :return: list of jobs :rtype: list """ return copy(self._jobs)
[docs] def get_global(self, k): """ Return the value of the specified global configuration setting, from the global configuration (if present) or else from the global defaults. :param k: configuration key to get :return: value of global config setting """ if k in self._global_conf: return self._global_conf[k] return self._global_defaults[k]
[docs] def _load_config(self): """ Check environment variables; call either :py:meth:`~._load_config_s3` or :py:meth:`~._load_config_local`. :raises: RuntimeError """ if 'ECSJOBS_BUCKET' in os.environ and 'ECSJOBS_KEY' in os.environ: self._load_config_s3( os.environ['ECSJOBS_BUCKET'], os.environ['ECSJOBS_KEY'] ) elif 'ECSJOBS_LOCAL_CONF_PATH' in os.environ: self._load_config_local(os.environ['ECSJOBS_LOCAL_CONF_PATH']) else: raise RuntimeError( 'ERROR: You must export either ECSJOBS_BUCKET and ECSJOBS_KEY, ' 'or ECSJOBS_LOCAL_CONF_PATH' )
[docs] def _load_config_s3(self, bucket_name, key_name): """ Retrieve and load configuration from S3. Sets ``self._raw_conf``. :param bucket_name: Name of the S3 bucket to retrieve config from :type bucket_name: str :param key_name: config key or prefix in bucket :type key_name: str """ logger.debug('Loading configuration from bucket %s key/prefix %s', bucket_name, key_name) bkt = self.s3.Bucket(bucket_name) if self._key_is_yaml(key_name):'Loading configuration from single file %s in %s', key_name, bucket_name) self._raw_conf = self._get_yaml_from_s3(bkt, key_name) else:'Loading multi-file configuration from prefix %s in ' 'bucket %s', key_name, bucket_name) self._raw_conf = self._get_multipart_config(bkt, key_name) logger.debug('Configuration load complete:\n%s', self._raw_conf)
[docs] def _load_config_local(self, conf_path): """ Load configuration from the local filesystem. Sets ``self._raw_conf``. :param conf_path: path to configuration on local FS :type conf_path: str """ logger.debug( 'Loading configuration from local filesystem: %s', conf_path ) if not os.path.exists(conf_path): raise RuntimeError( 'ERROR: Config path does not exist: %s' % conf_path ) if not os.path.isdir(conf_path): self._raw_conf = self._load_yaml_from_disk(conf_path) return # else it's a directory; load recursively if not conf_path.endswith('/'): conf_path += '/' files = [] for ext in self.YAML_EXTNS: files.extend(glob.glob('%s*%s' % (conf_path, ext))) res = {'global': {}, 'jobs': []} for f in sorted(files): if os.path.basename(f) in ['global%s' % e for e in self.YAML_EXTNS]: res['global'] = self._load_yaml_from_disk(f) else: res['jobs'].append(self._load_yaml_from_disk(f)) self._raw_conf = res
[docs] def _load_yaml_from_disk(self, path): """ Load a YAML file from disk and return the contents. :param path: path to load from :type path: str :return: deserialized YAML file contents :rtype: dict """ with open(path, 'r') as fh: return yaml.load(fh, Loader=yaml.FullLoader)
[docs] def _key_is_yaml(self, key): """ Test whether or not the specified S3 key is a YAML file. :param key: key in S3 :type key: str :return: whether key is a YAML file :rtype: bool """ for extn in self.YAML_EXTNS: if key.endswith(extn): return True return False
[docs] def _get_multipart_config(self, bucket, prefix): """ Retrieve each piece of a multipart config from S3; return the combined configuration (i.e. the corresponding single-dict config). :param bucket: the S3 bucket to retrieve configs from :type bucket: :py:class:`S3.Bucket <S3.Bucket>` :param prefix: prefix for configuration files :type prefix: str :return: combined configuration dict :rtype: dict """ res = {'global': {}, 'jobs': []} for obj in sorted( list(bucket.objects.filter(Prefix=prefix)), key=lambda x: x.key ): fname = obj.key.replace(prefix, '') if not self._key_is_yaml(fname): continue body = self._get_yaml_from_s3(bucket, obj.key) if fname in ['global%s' % extn for extn in self.YAML_EXTNS]: res['global'] = body else: res['jobs'].append(body) return res
[docs] def _get_yaml_from_s3(self, bucket, key): """ Retrieve the contents of a file from S3 and deserialize the YAML. :param bucket: the S3 bucket to retrieve the file from :type bucket: :py:class:`S3.Bucket <S3.Bucket>` :param key: key/path of the file :type key: str :return: deserialized YAML file contents :rtype: dict """ try: obj = bucket.Object(key) except Exception: logger.error('Unable to retrieve s3://%s/%s',, key, exc_info=True) raise RuntimeError( 'ERROR: Unable to retrieve key %s from bucket %s' % (, key ) ) try: body = obj.get()['Body'].read() except Exception: logger.error('Unable to read s3://%s/%s',, key, exc_info=True) raise RuntimeError( 'ERROR: Unable to read key %s from bucket %s' % (, key ) ) try: res = yaml.load(body, Loader=yaml.FullLoader) except Exception: logger.error('Unable to load YAML from s3://%s/%s',, key, exc_info=True) raise RuntimeError( 'ERROR: Unable to load YAML from key %s from bucket %s' % (, key ) ) return res
[docs] def _validate_config(self): """ Validate the configuration in ``self._raw_conf``. Writes ``self._global_conf``. """ Schema().validate(self._raw_conf) self._global_conf = self._raw_conf['global'] if self._global_conf.get('failure_html_path', None) is not None: self._global_conf[ 'failure_html_path' ] = self._global_conf[ 'failure_html_path' ].format('%Y-%m-%dT%H-%M-%S'))
[docs] def _make_jobs(self): """ Reads ``self._jobs_conf`` and instantiates job classes, populating ``self._jobs``. """ logger.debug('Instantiating Job classes...') jobclasses = get_job_classes() for j in self._raw_conf['jobs']: cls = jobclasses.get(j['class_name'], None) if cls is None: raise RuntimeError( 'ERROR: No known Job subclass "%s" (job %s)' % ( j['class_name'], j['name'] ) ) conf = deepcopy(j) del conf['class_name'] self._jobs.append(cls(**conf))'Created %d Job instances', len(self._jobs))