Source code for ecsjobs.jobs.base

"""
The latest version of this package is available at:
<http://github.com/jantman/ecsjobs>

##################################################################################
Copyright 2017 Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>

    This file is part of ecsjobs, also known as ecsjobs.

    ecsjobs is free software: you can redistribute it and/or modify
    it under the terms of the GNU Affero General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    ecsjobs is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Affero General Public License for more details.

    You should have received a copy of the GNU Affero General Public License
    along with ecsjobs.  If not, see <http://www.gnu.org/licenses/>.

The Copyright and Authors attributions contained herein may not be removed or
otherwise altered, except to add the Author attribution of a contributor to
this work. (Additional Terms pursuant to Section 7b of the AGPL v3)
##################################################################################
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <https://github.com/jantman/ecsjobs> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.
##################################################################################

AUTHORS:
Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>
##################################################################################
"""

import abc
import logging
import re
import time
from cronex import CronExpression

logger = logging.getLogger(__name__)


[docs]class Job(object): """ Base class for all Job types/classes. """ __metaclass__ = abc.ABCMeta #: Dictionary describing the configuration file schema, to be validated #: with `jsonschema <https://github.com/Julian/jsonschema>`_. _schema_dict = { 'type': 'object', 'title': 'Configuration for base Job class', 'properties': { 'name': {'type': 'string'}, 'schedule': {'type': 'string'}, 'class_name': {'type': 'string'}, 'summary_regex': {'type': 'string'}, 'cron_expression': {'type': 'string'} }, 'required': [ 'name', 'schedule', 'class_name' ] } def __init__(self, name, schedule, summary_regex=None, cron_expression=None): """ :param name: unique name for this job :type name: str :param schedule: the name of the schedule this job runs on :type schedule: str :param summary_regex: A regular expression to use for extracting a string from the job output for use in the summary table. If there is more than one match, the last one will be used. :type summary_regex: ``string`` or ``None`` :param cron_expression: A cron-like expression parsable by `cronex <https://github.com/ericpruitt/cronex>`_ specifying when the job should run. This has the effect of causing runs to skip this job unless the expression matches. It's recommended not to use any minute specifiers and not to use any hour specifiers if the total runtime of all jobs is more than an hour. :type cron_expression: str """ self._name = name self._schedule_name = schedule self._started = False self._finished = False self._exit_code = None self._output = None self._start_time = None self._finish_time = None self._summary_regex = summary_regex self._skip_reason = None self._cron_expression = None if cron_expression is not None: self._cron_expression = CronExpression(cron_expression) if not self._cron_expression.check_trigger( time.gmtime(time.time())[:5] ): self._skip_reason = 'cronex: "%s"' % cron_expression def __repr__(self): return '<%s name="%s">' % (type(self).__name__, self.name) @property def error_repr(self): """ Return a detailed representation of the job state for use in error reporting. :return: detailed representation of job in case of error :rtype: str """ ecode = '' if self._exit_code is not None: ecode = 'Exit Code: %s\n' % self._exit_code return "%s\nSchedule Name: %s\nStarted: %s\nFinished: %s\n" \ "Duration: %s\n%sOutput: %s\n" % ( self.__repr__(), self._schedule_name, self._started, self._finished, self.duration, ecode, self._output ) @property def name(self): """ Return the Job Name. :return: Job name :rtype: str """ return self._name @property def skip(self): """ Either None if the job should not be skipped, or a string reason describing why the Job should be skipped. :rtype: ``None`` or ``str`` """ return self._skip_reason @property def schedule_name(self): """ Return the configured schedule name for this job. :return: schedule name :rtype: str """ return self._schedule_name @property def is_started(self): """ Return whether or not the Job has been started. :return: whether or not the Job has been started :rtype: bool """ return self._started @property def is_finished(self): """ Return whether or not the Job is finished. :return: whether or not the Job is finished :rtype: bool """ return self._finished @property def exitcode(self): """ For Job subclasses that result in a command exit code, return the integer exitcode. For Job subclasses that result in a boolean (success / failure) status, return 0 on success or 1 on failure. Returns -1 if the Job has not completed. :return: Job exit code or (0 / 1) status :rtype: int """ return self._exit_code @property def output(self): """ Return the output of the Job as a string, or None if the job has not completed. :return: Job output :rtype: str """ return self._output
[docs] def summary(self): """ Retrieve a simple one-line summary of the Job output/status. :return: Job one-line summary. :rtype: str """ if self.output is None: return '' if self._summary_regex is not None: res = re.findall(self._summary_regex, self.output, re.M) if len(res) > 0: return res[-1] lines = [x for x in self.output.split("\n") if x.strip() != ''] if len(lines) < 1: return '' return lines[-1]
[docs] @abc.abstractmethod def report_description(self): """ Return a one-line description of the Job for use in reports. :rtype: str """ raise NotImplementedError()
@property def duration(self): """ Return the duration/runtime of the job, or None if the job did not run. :return: job duration :rtype: ``datetime.timedelta`` or ``None`` """ if self._start_time is None or self._finish_time is None: return None return self._finish_time - self._start_time
[docs] @abc.abstractmethod def run(self): """ Run the job. This method sets ``self._started`` and ``self._start_time``. If the Job runs synchronously, this method also sets ``self._finished``, ``self._exit_code``, ``self._finish_time`` and ``self._output``. In the case of an exception, this method must still set those attributes as appropriate and then raise the exception. :return: True if job finished successfully, False if job finished but failed, or None if the job is still running in the background. """ raise NotImplementedError( 'ERROR: Job subclass must implement run() method.' )
[docs] def poll(self): """ For asynchronous jobs (:py:attr:`~.is_started` is True but :py:attr:`~.is_finished` is False), check if the job has finished yet. If not, return ``False``. If the job has finished, update ``self._finish_time``, ``self._exit_code``, ``self._output`` and ``self._finished`` and then return ``True``. This method should **never** raise exceptions; recoverable exceptions should be handled via internal retry logic on subsequent poll attempts. Retries should be done on the next call of this method; we never want to sleep during this method. Unrecoverable exceptions should set ``self._exit_code``, ``self._output`` and ``self._finished``. :return: :py:attr:`~.is_finished` :rtype: bool """ return self.is_finished