import math
from datetime import timedelta

from autorepr import autorepr, autostr
from django.db import models, transaction
from django.utils import timezone
from django.utils.functional import cached_property

from ..clusters.models import Cluster, EMRReleaseModel
from ..clusters.provisioners import ClusterProvisioner
from ..models import CreatedByModel, EditedAtModel, URLActionModel
from ..stats.models import Metric

from .provisioners import SparkJobProvisioner
from .queries import SparkJobQuerySet, SparkJobRunQuerySet


[docs]class SparkJob(EMRReleaseModel, CreatedByModel, EditedAtModel, URLActionModel): """ A data model to store details about a scheduled Spark job, to be run on AWS EMR. """ INTERVAL_DAILY = 24 INTERVAL_WEEKLY = INTERVAL_DAILY * 7 INTERVAL_MONTHLY = INTERVAL_DAILY * 30 INTERVAL_CHOICES = [ (INTERVAL_DAILY, 'Daily'), (INTERVAL_WEEKLY, 'Weekly'), (INTERVAL_MONTHLY, 'Monthly'), ] RESULT_PRIVATE = 'private' RESULT_PUBLIC = 'public' RESULT_VISIBILITY_CHOICES = [ (RESULT_PRIVATE, 'Private'), (RESULT_PUBLIC, 'Public'), ] identifier = models.CharField( max_length=100, help_text="Job name, used to uniqely identify individual jobs.", unique=True, db_index=True, ) description = models.TextField( help_text='Job description.', default='', ) notebook_s3_key = models.CharField( max_length=800, help_text="S3 key of the notebook after uploading it to the Spark code bucket." ) result_visibility = models.CharField( # can currently be "public" or "private" max_length=50, help_text="Whether notebook results are uploaded to a public or private bucket", choices=RESULT_VISIBILITY_CHOICES, default=RESULT_PRIVATE, ) size = models.IntegerField( help_text="Number of computers to use to run the job." ) interval_in_hours = models.IntegerField( help_text="Interval at which the job should run, in hours.", choices=INTERVAL_CHOICES, default=INTERVAL_DAILY, ) job_timeout = models.IntegerField( help_text="Number of hours before the job times out.", ) start_date = models.DateTimeField( help_text="Date/time that the job should start being scheduled to run." ) end_date = models.DateTimeField( blank=True, null=True, help_text="Date/time that the job should stop being scheduled to run, null if no end date.", ) expired_date = models.DateTimeField( blank=True, null=True, help_text="Date/time that the job was expired.", db_index=True, ) is_enabled = models.BooleanField( default=True, help_text="Whether the job should run or not.", ) objects = SparkJobQuerySet.as_manager() class Meta: permissions = [ ('view_sparkjob', 'Can view Spark job'), ] __str__ = autostr('{self.identifier}') __repr__ = autorepr(['identifier', 'size', 'is_enabled']) url_prefix = 'jobs' url_actions = ['delete', 'detail', 'download', 'edit', 'run', 'zeppelin'] def get_absolute_url(self): return self.urls.detail @property def provisioner(self): return SparkJobProvisioner() # TEMPORARY till we have 1:1 relationship to cluster object # and we can then ask for spark_job.cluster.provisioner @property def cluster_provisioner(self): return ClusterProvisioner() @property def schedule(self): from .schedules import SparkJobSchedule return SparkJobSchedule(self) def has_future_end_date(self, now): # no end date means it'll always be due if self.end_date is None: return True return self.end_date >= now @property def has_never_run(self): """ Whether the job has run before. Looks at both the cluster status and our own record when we asked it to run. """ return (self.latest_run is None or self.latest_run.status == DEFAULT_STATUS or self.latest_run.scheduled_at is None) @property def has_finished(self): """Whether the job's cluster is terminated or failed""" return (self.latest_run and self.latest_run.status in Cluster.FINAL_STATUS_LIST) @property def has_timed_out(self): """ Whether the current job run has been running longer than the job's timeout allows. """ if self.has_never_run: # Job isn't even running at the moment and never ran before return False timeout_delta = timedelta(hours=self.job_timeout) max_run_time = self.latest_run.scheduled_at + timeout_delta timed_out = >= max_run_time return not self.is_runnable and timed_out @property def is_due(self): """ Whether the start date is in the past and the end date is in the future. """ now = has_past_start_date = self.start_date <= now return has_past_start_date and self.has_future_end_date(now) @property def is_runnable(self): """ Either the job has never run before or was never finished. This is checked right before the actual provisioning. """ return self.has_never_run or self.has_finished @property def should_run(self): """Whether the scheduled Spark job should run.""" return self.is_runnable and self.is_enabled and self.is_due @property def is_public(self): return self.result_visibility == self.RESULT_PUBLIC @property def is_active(self): return (self.latest_run and self.latest_run.status in Cluster.ACTIVE_STATUS_LIST) @property def notebook_name(self): return self.notebook_s3_key.rsplit('/', 1)[-1] @cached_property def notebook_s3_object(self): return self.provisioner.get(self.notebook_s3_key) @cached_property def results(self): return self.provisioner.results(self.identifier, self.is_public) def get_latest_run(self): try: return self.runs.latest() except SparkJobRun.DoesNotExist: return None latest_run = cached_property(get_latest_run, name='latest_run')
[docs] def run(self): """Actually run the scheduled Spark job.""" # if the job ran before and is still running, don't start it again if not self.is_runnable: return jobflow_id = user_username=self.created_by.username,, identifier=self.identifier, emr_release=self.emr_release.version, size=self.size, notebook_key=self.notebook_s3_key, is_public=self.is_public, job_timeout=self.job_timeout, ) # Create new job history record. run = self.runs.create( spark_job=self, jobflow_id=jobflow_id,, emr_release_version=self.emr_release.version, size=self.size, ) # Remove the cached latest run to this objects will requery it. try: delattr(self, 'latest_run') except AttributeError: # pragma: no cover pass # It didn't have a `latest_run` and that's ok. with transaction.atomic(): Metric.record('sparkjob-emr-version', data={'version': self.emr_release.version}) # sync with EMR API transaction.on_commit(run.sync)
def expire(self): # TODO disable the job as well once it's easy to re-enable the job deleted = self.schedule.delete() self.expired_date = return deleted
[docs] def terminate(self): """Stop the currently running scheduled Spark job.""" if self.latest_run: self.cluster_provisioner.stop(self.latest_run.jobflow_id)
def first_run(self): if self.latest_run: return None from .tasks import run_job return run_job.apply_async( args=(,), kwargs={'first_run': True}, # make sure we run this task only when we expect it # may be in the future, may be in the past # but definitely at a specific time eta=self.start_date, )
[docs] def save(self, *args, **kwargs): # whether the job is being created for the first time first_save = is None # resetting expired_date in case a user resets the end_date if self.expired_date and self.end_date and self.end_date > self.expired_date = None super().save(*args, **kwargs) # Remove the cached latest run to this objects will requery it. try: delattr(self, 'latest_run') except AttributeError: # pragma: no cover pass # It didn't have a `latest_run` and that's ok. # first remove if it exists self.schedule.delete() # and then add it, but only if the end date is in the future if self.has_future_end_date( self.schedule.add() if first_save: transaction.on_commit(self.first_run)
def delete(self, *args, **kwargs): # make sure to shut down the cluster if it's currently running self.terminate() # make sure to clean up the job notebook from storage self.provisioner.remove(self.notebook_s3_key) self.schedule.delete() super().delete(*args, **kwargs)
[docs]class SparkJobRun(EditedAtModel): """ A data model to store information about every individual run of a scheduled Spark job. This denormalizes some values from its related data model :class:`SparkJob`. """ spark_job = models.ForeignKey( SparkJob, on_delete=models.CASCADE, related_name='runs', related_query_name='runs', ) jobflow_id = models.CharField( max_length=50, blank=True, null=True, ) emr_release_version = models.CharField( max_length=50, blank=True, null=True, ) size = models.IntegerField( help_text="Number of computers used to run the job.", blank=True, null=True, ) status = models.CharField( max_length=50, blank=True, default=DEFAULT_STATUS, db_index=True, ) scheduled_at = models.DateTimeField( blank=True, null=True, help_text="Date/time that the job was scheduled.", ) started_at = models.DateTimeField( blank=True, null=True, help_text="Date/time when the cluster was started on AWS EMR." ) ready_at = models.DateTimeField( blank=True, null=True, help_text="Date/time when the cluster was ready to run steps on AWS EMR." ) finished_at = models.DateTimeField( blank=True, null=True, help_text="Date/time that the job was terminated or failed.", ) objects = SparkJobRunQuerySet.as_manager() class Meta: get_latest_by = 'created_at' ordering = ['-created_at'] __str__ = autostr('{self.jobflow_id}') def spark_job_identifier(self): return self.spark_job.identifier __repr__ = autorepr( ['jobflow_id', 'spark_job_identifier', 'emr_release_version', 'size'], spark_job_identifier=spark_job_identifier, ) @property def info(self): return
[docs] def sync(self, info=None): """ Updates latest status and life cycle datetimes. """ if info is None: info = # a mapping between what the provisioner returns what the data model uses model_field_map = ( ('state', 'status'), ('creation_datetime', 'started_at'), ('ready_datetime', 'ready_at'), ('end_datetime', 'finished_at'), ) save_needed = False date_fields_updated = False # set the various model fields to the value the API returned for api_field, model_field in model_field_map: field_value = info.get(api_field) if field_value is None or field_value == getattr(self, model_field): continue setattr(self, model_field, field_value) save_needed = True if model_field in ('started_at', 'ready_at', 'finished_at'): date_fields_updated = True with transaction.atomic(): # If the job cluster terminated with error raise the alarm. if self.status == Cluster.STATUS_TERMINATED_WITH_ERRORS: transaction.on_commit(lambda: self.alert(info)) # If any data changed, save it. if save_needed: with transaction.atomic(): if date_fields_updated: # When job cluster is ready, record time to ready. if self.ready_at and not self.finished_at: # Time in seconds it took the cluster to be ready. time_to_ready = (self.ready_at - self.started_at).seconds Metric.record( 'sparkjob-time-to-ready', time_to_ready, data={ 'identifier': self.spark_job.identifier, 'size': self.size, 'jobflow_id': self.jobflow_id, } ) if self.finished_at: # When job is finished, record normalized instance hours. hours = math.ceil( (self.finished_at - self.started_at).seconds / 60 / 60 ) normalized_hours = hours * self.size Metric.record( 'sparkjob-normalized-instance-hours', normalized_hours, data={ 'identifier': self.spark_job.identifier, 'size': self.size, 'jobflow_id': self.jobflow_id, } ) if self.finished_at and self.ready_at: # When job is finished, record time in seconds it took the # scheduled job to run. Sometimes `ready_at` won't be # available if the cluster terminated with errors. run_time = (self.finished_at - self.ready_at).seconds Metric.record( 'sparkjob-run-time', run_time, data={ 'identifier': self.spark_job.identifier, 'size': self.size, 'jobflow_id': self.jobflow_id, } ) return self.status
def alert(self, info): self.alerts.get_or_create( reason_code=info['state_change_reason_code'], reason_message=info['state_change_reason_message'], )
[docs]class SparkJobRunAlert(EditedAtModel): """ A data model to store job run alerts for later processing by an async job that sends out emails. """ run = models.ForeignKey( SparkJobRun, on_delete=models.CASCADE, related_name='alerts', ) reason_code = models.CharField( max_length=50, blank=True, null=True, help_text="The reason code for the creation of the alert.", ) reason_message = models.TextField( default='', help_text="The reason message for the creation of the alert.", ) mail_sent_date = models.DateTimeField( blank=True, null=True, help_text="The datetime the alert email was sent.", ) class Meta: unique_together = [ ['run', 'reason_code', 'reason_message'], ] index_together = [ ['reason_code', 'mail_sent_date'], ] __str__ = autostr('{}') def short_reason_message(self): return self.reason_message[:50] __repr__ = autorepr( ['id', 'reason_code', 'short_reason_message'], short_reason_message=short_reason_message, )