Source code for atmo.celery
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
import os
import random
from celery import Celery
from celery.five import string_t
from celery.utils.time import maybe_iso8601
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'atmo.settings')
os.environ.setdefault('DJANGO_CONFIGURATION', 'Dev')
import configurations # noqa
configurations.setup()
[docs]class ExpoBackoffFullJitter:
"""
Implement fully jittered exponential retries.
See for more infos:
- https://www.awsarchitectureblog.com/2015/03/backoff.html
- https://github.com/awslabs/aws-arch-backoff-simulator
"""
# Copyright 2015 Amazon
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def __init__(self, base, cap):
self.base = base
self.cap = cap
[docs] def expo(self, n):
"""Return the exponential value for the given number."""
return min(self.cap, pow(2, n) * self.base)
[docs] def backoff(self, n):
"""Return the exponential backoff value for the given number."""
v = self.expo(n)
return random.uniform(0, v)
[docs]class AtmoCelery(Celery):
"""
A custom Celery class to implement exponential backoff retries.
"""
[docs] def send_task(self, *args, **kwargs):
# HACK: This needs to be removed once Celery > 4.0.2 is out:
# see https://github.com/celery/celery/issues/3734
# and https://github.com/celery/celery/pull/3790
expires = kwargs.get('expires')
if isinstance(expires, string_t):
kwargs['expires'] = maybe_iso8601(expires)
return super().send_task(*args, **kwargs)
[docs] def backoff(self, n, cap=60 * 60):
"""
Return a fully jittered backoff value for the given number.
"""
return ExpoBackoffFullJitter(base=1, cap=cap).backoff(n)
#: The Celery app instance used by ATMO, which auto-detects Celery
#: config values from Django settings prefixed with "CELERY\_"
#: and autodiscovers Celery tasks from ``tasks.py`` modules in
#: Django apps.
celery = AtmoCelery('atmo')
# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
celery.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
celery.autodiscover_tasks()