atmo.jobs

The code base to manage scheduled Spark job via AWS EMR clusters.

atmo.jobs.forms

class atmo.jobs.forms.BaseSparkJobForm(*args, **kwargs)[source]

A base form used for creating new jobs.

Parameters:
  • identifier (RegexField) – A unique identifier for your Spark job, visible in the AWS management console. (Lowercase, use hyphens instead of spaces.)
  • description (CharField) – A brief description of your Spark job’s purpose. This is intended to provide extra context for the data engineering team.
  • result_visibility (ChoiceField) – Whether notebook results are uploaded to a public or private S3 bucket.
  • size (IntegerField) – Number of workers to use when running the Spark job (1 is recommended for testing or development).
  • interval_in_hours (ChoiceField) – Interval at which the Spark job should be run.
  • job_timeout (IntegerField) – Number of hours that a single run of the job can run for before timing out and being terminated.
  • start_date (DateTimeField) – Date and time of when the scheduled Spark job should start running.
  • end_date (DateTimeField) – Date and time of when the scheduled Spark job should stop running - leave this blank if the job should not be disabled.
clean_notebook()[source]

Validate the uploaded notebook file if it ends with the ipynb file extension.

field_order

Copy the defined model form fields and insert the notebook field at the second spot

save(commit=True)[source]

Store the notebook file on S3 and save the Spark job details to the datebase.

class atmo.jobs.forms.EditSparkJobForm(*args, **kwargs)[source]

A BaseSparkJobForm subclass used for editing jobs.

Parameters:
  • identifier (RegexField) – A unique identifier for your Spark job, visible in the AWS management console. (Lowercase, use hyphens instead of spaces.)
  • description (CharField) – A brief description of your Spark job’s purpose. This is intended to provide extra context for the data engineering team.
  • result_visibility (ChoiceField) – Whether notebook results are uploaded to a public or private S3 bucket.
  • size (IntegerField) – Number of workers to use when running the Spark job (1 is recommended for testing or development).
  • interval_in_hours (ChoiceField) – Interval at which the Spark job should be run.
  • job_timeout (IntegerField) – Number of hours that a single run of the job can run for before timing out and being terminated.
  • start_date (DateTimeField) – Date and time of when the scheduled Spark job should start running.
  • end_date (DateTimeField) – Date and time of when the scheduled Spark job should stop running - leave this blank if the job should not be disabled.
class atmo.jobs.forms.NewSparkJobForm(*args, **kwargs)[source]

A BaseSparkJobForm subclass used for creating new jobs.

Parameters:
  • identifier (RegexField) – A unique identifier for your Spark job, visible in the AWS management console. (Lowercase, use hyphens instead of spaces.)
  • description (CharField) – A brief description of your Spark job’s purpose. This is intended to provide extra context for the data engineering team.
  • result_visibility (ChoiceField) – Whether notebook results are uploaded to a public or private S3 bucket.
  • size (IntegerField) – Number of workers to use when running the Spark job (1 is recommended for testing or development).
  • interval_in_hours (ChoiceField) – Interval at which the Spark job should be run.
  • job_timeout (IntegerField) – Number of hours that a single run of the job can run for before timing out and being terminated.
  • start_date (DateTimeField) – Date and time of when the scheduled Spark job should start running.
  • end_date (DateTimeField) – Date and time of when the scheduled Spark job should stop running - leave this blank if the job should not be disabled.
  • emr_release (EMRReleaseChoiceField) – Different AWS EMR versions have different versions of software like Hadoop, Spark, etc. See what’s new in each.
class atmo.jobs.forms.SparkJobAvailableForm(data=None, files=None, auto_id='id_%s', prefix=None, initial=None, error_class=<class 'django.forms.utils.ErrorList'>, label_suffix=None, empty_permitted=False, field_order=None, use_required_attribute=None, renderer=None)[source]

A form used in the views that checks for the availability of identifiers.

atmo.jobs.models

class atmo.jobs.models.SparkJob(*args, **kwargs)[source]

A data model to store details about a scheduled Spark job, to be run on AWS EMR.

Parameters:
  • id (AutoField) – Id
  • created_at (DateTimeField) – Created at
  • modified_at (DateTimeField) – Modified at
  • created_by_id (ForeignKey to User) – User that created the instance.
  • emr_release_id (ForeignKey to EMRRelease) – Different AWS EMR versions have different versions of software like Hadoop, Spark, etc. See what’s new in each.
  • identifier (CharField) – Job name, used to uniqely identify individual jobs.
  • description (TextField) – Job description.
  • notebook_s3_key (CharField) – S3 key of the notebook after uploading it to the Spark code bucket.
  • result_visibility (CharField) – Whether notebook results are uploaded to a public or private bucket
  • size (IntegerField) – Number of computers to use to run the job.
  • interval_in_hours (IntegerField) – Interval at which the job should run, in hours.
  • job_timeout (IntegerField) – Number of hours before the job times out.
  • start_date (DateTimeField) – Date/time that the job should start being scheduled to run.
  • end_date (DateTimeField) – Date/time that the job should stop being scheduled to run, null if no end date.
  • expired_date (DateTimeField) – Date/time that the job was expired.
  • is_enabled (BooleanField) – Whether the job should run or not.
exception DoesNotExist
exception MultipleObjectsReturned
has_finished

Whether the job’s cluster is terminated or failed

has_never_run

Whether the job has run before. Looks at both the cluster status and our own record when we asked it to run.

has_timed_out

Whether the current job run has been running longer than the job’s timeout allows.

is_due

Whether the start date is in the past and the end date is in the future.

is_runnable

Either the job has never run before or was never finished.

This is checked right before the actual provisioning.

run()[source]

Actually run the scheduled Spark job.

save(*args, **kwargs)[source]

Saves the current instance. Override this in a subclass if you want to control the saving process.

The ‘force_insert’ and ‘force_update’ parameters can be used to insist that the “save” must be an SQL insert or update (or equivalent for non-SQL backends), respectively. Normally, they should not be set.

should_run

Whether the scheduled Spark job should run.

terminate()[source]

Stop the currently running scheduled Spark job.

class atmo.jobs.models.SparkJobRun(*args, **kwargs)[source]

A data model to store information about every individual run of a scheduled Spark job.

This denormalizes some values from its related data model SparkJob.

Parameters:
  • id (AutoField) – Id
  • created_at (DateTimeField) – Created at
  • modified_at (DateTimeField) – Modified at
  • spark_job_id (ForeignKey to SparkJob) – Spark job
  • jobflow_id (CharField) – Jobflow id
  • emr_release_version (CharField) – Emr release version
  • size (IntegerField) – Number of computers used to run the job.
  • status (CharField) – Status
  • scheduled_at (DateTimeField) – Date/time that the job was scheduled.
  • started_at (DateTimeField) – Date/time when the cluster was started on AWS EMR.
  • ready_at (DateTimeField) – Date/time when the cluster was ready to run steps on AWS EMR.
  • finished_at (DateTimeField) – Date/time that the job was terminated or failed.
exception DoesNotExist
exception MultipleObjectsReturned
sync(info=None)[source]

Updates latest status and life cycle datetimes.

class atmo.jobs.models.SparkJobRunAlert(*args, **kwargs)[source]

A data model to store job run alerts for later processing by an async job that sends out emails.

Parameters:
exception DoesNotExist
exception MultipleObjectsReturned

atmo.jobs.provisioners

class atmo.jobs.provisioners.SparkJobProvisioner[source]

The Spark job specific provisioner.

add(identifier, notebook_file)[source]

Upload the notebook file to S3

get(key)[source]

Get the S3 file with the given key from the code S3 bucket.

remove(key)[source]

Remove the S3 file with the given key from the code S3 bucket.

results(identifier, is_public)[source]

Return the results created by the job with the given identifier that were uploaded to S3.

Parameters:
  • identifier – Unique identifier of the Spark job.
  • is_public – Whether the Spark job is public or not.
Returns:

A mapping of result prefixes to lists of results.

Return type:

dict

run(user_username, user_email, identifier, emr_release, size, notebook_key, is_public, job_timeout)[source]

Run the Spark job with the given parameters

Parameters:
  • user_username – The username of the Spark job owner.
  • user_email – The email address of the Spark job owner.
  • identifier – The unique identifier of the Spark job.
  • emr_release – The EMR release version.
  • size – The size of the cluster.
  • notebook_key – The name of the notebook file on S3.
  • is_public – Whether the job result should be public or not.
  • job_timeout – The maximum runtime of the job.
Returns:

AWS EMR jobflow ID

Return type:

str

atmo.jobs.queries

class atmo.jobs.queries.SparkJobQuerySet(model=None, query=None, using=None, hints=None)[source]
active()[source]

The Spark jobs that have an active cluster status.

failed()[source]

The Spark jobs that have a failed cluster status.

lapsed()[source]

The Spark jobs that have passed their end dates but haven’t been expired yet.

terminated()[source]

The Spark jobs that have a terminated cluster status.

with_runs()[source]

The Spark jobs with runs.

class atmo.jobs.queries.SparkJobRunQuerySet(model=None, query=None, using=None, hints=None)[source]
active()[source]

The Spark jobs that have an active cluster status.

atmo.jobs.tasks

class atmo.jobs.tasks.SparkJobRunTask[source]

A Celery task base classes to be used by the run_job() task to simplify testing.

check_enabled(spark_job)[source]

Checks if the job should be run at all

get_spark_job(pk)[source]

Load the Spark job with the given primary key.

max_retries = 9

The max number of retries which does not run too long when using the exponential backoff timeouts.

provision_run(spark_job, first_run=False)[source]

Actually run the given Spark job.

If this is the first run we’ll update the “last_run_at” value to the start date of the spark_job so Celery beat knows what’s going on.

sync_run(spark_job)[source]

Updates the cluster status of the latest Spark job run, if available.

terminate_and_notify(spark_job)[source]

When the Spark job has timed out because it has run longer than the maximum runtime we will terminate it (and its cluster) and notify the owner to optimize the Spark job code.

unschedule_and_expire(spark_job)[source]

Remove the Spark job from the periodic schedule and send an email to the owner that it was expired.

atmo.jobs.views

atmo.jobs.views.check_identifier_available(request)[source]

Given a Spark job identifier checks if one already exists.

atmo.jobs.views.delete_spark_job(request, id)[source]

View to delete a scheduled Spark job and then redirects to the dashboard.

atmo.jobs.views.detail_spark_job(request, id)[source]

View to show the details for the scheduled Spark job with the given ID.

atmo.jobs.views.detail_zeppelin_job(request, id)[source]

View to show the details for the scheduled Zeppelin job with the given ID.

atmo.jobs.views.download_spark_job(request, id)[source]

Download the notebook file for the scheduled Spark job with the given ID.

atmo.jobs.views.edit_spark_job(request, id)[source]

View to edit a scheduled Spark job that runs on AWS EMR.

atmo.jobs.views.new_spark_job(request)[source]

View to schedule a new Spark job to run on AWS EMR.

atmo.jobs.views.run_spark_job(request, id)[source]

Run a scheduled Spark job right now, out of sync with its actual schedule.

This will actively ask for confirmation to run the Spark job.