Source code for

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at
from collections import OrderedDict

import constance

from ..provisioners import Provisioner

[docs]class SparkJobProvisioner(Provisioner): """The Spark job specific provisioner.""" log_dir = 'jobs' name_component = 'job' def __init__(self): super().__init__() # the S3 URI to the job shell script self.batch_uri = 's3://%s/steps/' % constance.config.AWS_SPARK_EMR_BUCKET
[docs] def add(self, identifier, notebook_file): """ Upload the notebook file to S3 """ key = 'jobs/%s/%s' % (identifier, self.s3.put_object( Bucket=self.config['CODE_BUCKET'], Key=key, Body=notebook_file ) return key
[docs] def get(self, key): """Get the S3 file with the given key from the code S3 bucket.""" return self.s3.get_object(Bucket=self.config['CODE_BUCKET'], Key=key)
[docs] def remove(self, key): """Remove the S3 file with the given key from the code S3 bucket.""" self.s3.delete_object(Bucket=self.config['CODE_BUCKET'], Key=key)
[docs] def run(self, user_username, user_email, identifier, emr_release, size, notebook_key, is_public, job_timeout): """ Run the Spark job with the given parameters :param user_username: The username of the Spark job owner. :param user_email: The email address of the Spark job owner. :param identifier: The unique identifier of the Spark job. :param emr_release: The EMR release version. :param size: The size of the cluster. :param notebook_key: The name of the notebook file on S3. :param is_public: Whether the job result should be public or not. :param job_timeout: The maximum runtime of the job. :return: AWS EMR jobflow ID :rtype: str """ # first get the common job flow parameters job_flow_params = self.job_flow_params( user_username=user_username, user_email=user_email, identifier=identifier, emr_release=emr_release, size=size, ) # the S3 URI to the Jupyter notebook file notebook_uri = 's3://%s/%s' % (self.config['CODE_BUCKET'], notebook_key) if is_public: data_bucket = self.config['PUBLIC_DATA_BUCKET'] else: data_bucket = self.config['PRIVATE_DATA_BUCKET'] job_flow_params.update({ 'BootstrapActions': [{ 'Name': 'setup-telemetry-spark-job', 'ScriptBootstrapAction': { 'Path': self.script_uri, 'Args': [ '--timeout', str(job_timeout * 60), ] } }], 'Steps': [{ 'Name': 'setup-zeppelin', 'ActionOnFailure': 'TERMINATE_JOB_FLOW', 'HadoopJarStep': { 'Jar': self.jar_uri, 'Args': [ self.zeppelin_uri ] } }, { 'Name': 'RunNotebookStep', 'ActionOnFailure': 'TERMINATE_JOB_FLOW', 'HadoopJarStep': { 'Jar': self.jar_uri, 'Args': [ self.batch_uri, '--job-name', identifier, '--notebook', notebook_uri, '--data-bucket', data_bucket ] } }], }) cluster = self.emr.run_job_flow(**job_flow_params) return cluster['JobFlowId']
[docs] def results(self, identifier, is_public): """ Return the results created by the job with the given identifier that were uploaded to S3. :param identifier: Unique identifier of the Spark job. :param is_public: Whether the Spark job is public or not. :return: A mapping of result prefixes to lists of results. :rtype: dict """ if is_public: bucket = self.config['PUBLIC_DATA_BUCKET'] else: bucket = self.config['PRIVATE_DATA_BUCKET'] params = { 'Prefix': '%s/' % identifier, 'Bucket': bucket, } results = OrderedDict() list_objects_v2_paginator = self.s3.get_paginator('list_objects_v2') for page in list_objects_v2_paginator.paginate(**params): for item in page.get('Contents', []): try: prefix = item['Key'].split('/')[1] except IndexError: continue results.setdefault(prefix, []).append(item['Key']) return results