celery что это такое
Зачем нужен celery?
Celery это ничто иное как распределённая очередь заданий, реализованная на языке Python.
Итак, что я тут понял, что это очередь.
Но очередь очень простая структура данных, зачем для неё celery, почему бы самому не создать?
Почему это оно только для заданий? Откуда такая специализация?
Какие реальные проблемы celery решало в вашей практике и почему именно оно? (интересно именно реальное применение у вас)
И извините новичка, если вопрос совсем глупый, но хотелось разобрать этот вопрос.
————
Спасибо!
Например, очередь заданий по конвертации видео для веб-приложения на Django.
Попытка выполнять такую задачу в Django непосредственно при обработке запросов по-умолчанию будет приводить к 504 Gateway Timeout (слишком долгий запрос) со стороны Nginx или иного HTTP-сервера.
Но за 5 минут файл ещё не будет помечен как конвертированный, поэтому cron за 5 минут создаст 4-5 процессов, которые будут конвертировать один и тот же файл.
Ладно, тогда при начале конвертирования файл сразу будем помечать как конвертируемый, чтобы другие процессы брали другие файлы.
И что если пользователи загрузят 10 файлов каждый из которых будет конвертироваться 30 минут? Через 10-11 минут на сервере будут работать 10 процессов конвертирующих видео. Большая нагрузка на сервер. Можно завести в базе данных таблицу в которой отмечается сколько файлов конвертируется в данный момент.
Решив использовать cron для долгих по выполнении задач, нам придётся решать целый ряд проблем.
Celery: лучшие практики
Если вы работаете с Django, то на некотором этапе разработке вам может понадобиться фоновая обработка долго выполняющихся задач. Возможно, что для такого рода задач вы используете какой-либо инструмент для управления очередями задач. Celery — один из самых популярных проектов для решения подобных задач в мире python и Django на данный момент, но есть и другие проекты для этой цели.
Пока я работал над некоторыми проектами, использующими Celery для управления очередями задач, выявились некоторые лучшие практики, которые я решил задокументировать. Впрочем это громкие слова для того, что я думаю о правильном подходе к решению подобных задач, а также о некоторых недостаточно используемых возможностях, которые предлагает сообщество проекта Celery.
No.1: Не используйте СУБД как ваш AMQP брокер
Позвольте мне объяснить почему я считаю это неправильным (помимо тех ограничений что описаны в документации Celery).
СУБД не разрабатывались для тех задач, которые выполняют полноценный AMQP брокер такой как RabbitMQ. Она упадет в «боевых» условиях даже на проекте с не очень большим трафиком\пользовательской базой.
Я предполагаю, что самой популярной причиной того почему люди решают использовать СУБД в том что, как правило, у них уже есть одна СУБД для веб-приложения, так почему бы не воспользоваться ей еще раз. Начать работать с таким вариантом несложно и не надо беспокоиться о других компонентах (таких как RabbitMQ).
Предположим не такой уж гипотетический сценарий: у вас есть 4 фоновых воркера для обработки, которые вы помещаете в базу данных. Это значит что вы получаете 4 процесса, которые достаточно часто запрашивают базу о новых задачах, не говоря уже о том, что каждый из них может иметь собственные конкурирующие потоки. В некоторый момент времени вы понимаете, что растет задержка при обработке задач, а потому приходит больше новых задач чем завершается, необходимо увеличивать количество воркеров. Вдруг скорость вашей базы данных начинает «проседать» из-за огромного количества запросов воркеров к базе, дисковый ввод\вывод превышает заданные лимиты, а все это начинает влиять на ваше приложение, так как воркеры, фактически, устроили DDOS-атаку вашей базе.
Этого не произошло бы при использовании полноценного AMQP брокера, так как очередь размещается в памяти и таким образом устраняется высокая нагрузка на жесткий диск. Потребителям (воркерам) нет необходимости часто запрашивать информацию, так как очередь имеет механизм доставки новой задачи воркеру, и даже, если AMQP брокер будет перегружен по каким-либо иным причинам это не приведет к падению и тормозам того веб-приложения, которое взаимодействует с пользователем.
Я пойду еще дальше и скажу, что вы не должны использовать СУБД как брокера даже в процессе разработки, тогда когда есть такие вещи как Docker и множество преднастроенных образов, которые предоставляют настроенный RabbitMQ «из коробки».
No.2: Используйте больше очередей (т.е. не только одну, которая дается по умолчанию)
Celery очень легко начать использовать, и она предоставляет сразу же одну очередь по умолчанию, в которую и помещаются все задачи пока не будет явно предписано другое поведение Celery. Наиболее общий пример того, что вы можете увидеть:
Что происходит, если обе задачи будут размещены в одной очереди, если иное не определено в файле celeryconfig.py. Я полностью пониманию чем может оправдывать подобный подход, у вас есть один декоратор, который создает удобные фоновые задачи. Здесь я хотел бы обратить внимание, что taskA и taskB, находясь в одной очереди могут делать совершенно разные вещи и таким образом одна из них может быть куда важнее другой, так почему они находятся все в одной корзине? Даже, если у вас один воркер, то представьте такую ситуацию что менее важная задача taskB окажется настолько массовой, что более важной задаче taskA воркер не сможет уделить необходимого внимания.Это приводит нас к к следующему пункту.
No.3: Используйте приоритеты воркеров
Путем решения проблемы, указанной выше является размещение задачи taskA в одной очереди, а taskB в другой и после этого присвоить x воркеров обработке очередь Q1, а остальных на обработку Q2, так как в нее приходит больше задач. Таким образом вы можете быть уверены, что задача taskB получит достаточно воркеров, а остальные тем временем будут обрабатывать менее приоритетную задачу, когда она придет, не провоцируя длительного ожидания и обработки. Потому, определите ваши очереди сами:
И ваши роутеры, которые определять куда направлять задачу:
Это позволит выполнять воркеры для каждой задачи:
No.4: используйте механизмы Celery для обработки ошибок
Большинство задач, которые я видел не имеют механизмов обработки ошибок. Если в задаче произошла ошибка, то она просто падает. Это может быть удобно для некоторых задач, однако большинство задач, которые я видел взаимодействовали с внешними API и падали из-за некоторых видов сетевых ошибок или иных проблем «доступности ресурса». Самый простой подход к обработке таких ошибок перевыполнить код задачи, так как, возможно, проблемы взаимодействия с внешним API были уже устранены.
Я люблю определять по умолчанию для задачи время ожидания, которое она будет ждать прежде чем попытается выполниться снова и как много попыток перевыполнения она предпримет прежде чем окончательно выбросить ошибку(параметры default_retry_delay и max_retries соответственно). Это наиболее простая форма обработки ошибок, которую я могу представить, но я видел, что и она практически не применяется. Разумеется Celery имеет и более сложные методы обработки ошибок, они описаны в документации Celery.
No.5: используйте Flower
No.6: Отслеживайте статус задачи, только если вам это необходимо
Статус задачи это информация о том успешно или нет завершилась задача. Она может быть полезна для некоторых статистических показателей. Важная вещь, которую следует понимать в данном случае: статус задачи это не результирующие данные и той работы, которая она выполняла, такая информация наиболее похожа на неявные изменения, записываемые в базу данных (такие, например, как изменения списка друзей пользователя).
Celery Python: основы и примеры
Все в сообществе Python слышали о Celery хотя бы один раз, и, возможно, уже работали с ним. По сути, это удобный инструмент, который помогает запускать отложенный или выделенный код в отдельном процессе или даже на отдельном компьютере или сервере. Это экономит время и усилия на многих уровнях.
Введение в Celery Python
Celery снижает нагрузку на производительность, выполняя часть функциональности в виде отложенных задач либо на том же сервере, что и другие задачи, либо на другом сервере. Чаще всего разработчики используют его для отправки электронных писем. Тем не менее, Celery может предложить гораздо больше. В этой статье я покажу вам некоторые основы Celery, а также пару лучших практик Python-Celery.
Основы Celery
Если вы уже работали с Celery, не стесняйтесь пропустить эту главу. Но если в Celery вы новичок, здесь вы узнаете, как включить Celery в своем проекте, и примите участие в отдельном руководстве по использованию Celery с Django. По сути, вам нужно создать экземпляр Celery и использовать его для пометки функций Python как задач.
Лучше создать экземпляр в отдельном файле, так как будет необходимо запустить Celery так же, как он работает с WSGI в Django. Например, если вы создадите два экземпляра Flask и Celery в одном файле в приложении Flask и запустите его, у вас будет два экземпляра, но вы будете использовать только один. То же самое, когда вы запускаете Celery.
Основные примеры Python Celery
Как я упоминал ранее, в случае использования Celery отправляется электронное письмо. Я буду использовать этот пример, чтобы показать вам основы использования Celery. Вот краткое руководство по Celery Python:
В этом коде используется Django, поскольку он является нашей основной средой для веб-приложений. Используя Celery, мы сокращаем время ответа клиенту, поскольку отделяем процесс отправки от основного кода, отвечающего за возврат ответа.
Теперь задача будет перезапущена через десять минут, если отправка не удалась. Кроме того, вы сможете установить количество повторных попыток.
Celery позволяет запускать задачи с помощью планировщиков, таких как crontab в Linux.
если вы не используете Django, вы должны использовать celery_app.conf.beat_schedule вместо CELERY_BEAT_SCHEDULE
Отложенное выполнение задачи в Celery
Давайте посмотрим, как это может выглядеть в коде:
Настройка Python Celery Queues
Celery может быть распределен, когда у вас есть несколько воркеров на разных серверах, которые используют одну очередь сообщений для планирования задач. Вы можете настроить дополнительную очередь для вашей задачи / воркера. Например, отправка электронных писем является важной частью вашей системы, и вы не хотите, чтобы какие-либо другие задачи влияли на отправку. Затем вы можете добавить новую очередь, назовем ее mail и использовать эту очередь для отправки электронных писем.
если вы не используете Django, используйте celery_app.conf.task_routes вместо CELERY_TASK_ROUTES
Запустите два отдельных воркера Celery для очереди по умолчанию и новой очереди:
Долгосрочные задачи Python Celery
Это очень простой пример того, как такая задача может быть реализована. В конце задачи мы проверяем, сколько пользователей мы нашли в базе данных. Если число равно пределу, то, вероятно, у нас есть новые пользователи для обработки. Поэтому мы снова запускаем задачу с новым смещением. Если количество пользователей меньше лимита, это означает, что это последний кусок, и нам не нужно продолжать. Но будьте осторожны: для реализации этой задачи каждый раз должен быть одинаковый порядок записей.
Celery: получение результатов задачи
Большинство разработчиков не записывают результаты, полученные после выполнения задачи. Представьте, что вы можете взять часть кода, назначить его для задачи и выполнить эту задачу независимо, как только вы получите запрос пользователя. Когда нам нужны результаты задания, мы либо сразу получаем результаты (если задание выполнено), либо ждем его завершения. Затем мы включаем результат в общий ответ. Используя этот подход, вы можете уменьшить время отклика, что очень хорошо для ваших пользователей и рейтинга сайта.
Мы используем эту функцию для запуска одновременных операций. В одном из наших проектов у нас много пользовательских данных и много поставщиков услуг. Чтобы найти лучшего поставщика услуг, мы делаем тяжелые расчеты и проверки. Чтобы сделать это быстрее, мы создаем задачи для пользователя с каждым поставщиком услуг, запускаем их и собираем результаты, чтобы показать их пользователю. Это очень легко сделать с целевыми группами Celery.
Вот пример того, как использовать этот подход в коде:
Здесь мы выполняем вычисления как можно скорее, ожидаем результатов в конце метода, затем готовим ответ и отправляем его пользователю.
Полезные советы
Крошечные данные
Я, наверное, уже упоминал, что я использую идентификаторы записей базы данных в качестве аргументов задачи вместо полных объектов. Это хороший способ уменьшить размер очереди сообщений. Но что более важно, это то, что при выполнении задачи данные в базе данных могут быть изменены. И когда у вас есть только идентификаторы, вы получите свежие данные, а не устаревшие данные, которые вы получаете при передаче объектов.
Операции
Иногда могут возникнуть проблемы, когда выполненная задача не может найти объект в базе данных. Почему это происходит? Например, в Django вы хотите запускать задачи после регистрации пользователя, например отправку приветственного письма, а ваши настройки Django заключают все запросы в транзакцию. В Celery, однако, задачи выполняются быстро, еще до того, как транзакция будет завершена. Поэтому, если вы используете Celery при работе в Django, вы можете увидеть, что пользователь не существует в базе данных (пока).
3 кейса для использования Celery в Django-приложении
Я занимаюсь созданием веб-приложений на Django. В основном, это SaaS сервисы для бизнеса. Во всех этих приложениях есть необходимость в асинхронных задачах. Для их реализации использую Celery. В статье расскажу о ситуациях, в которых применяю Celery, с примерами кода.
Celery – это система для управления очередями задач. Принципиально умеет 2 вещи: брать задачи из очереди и выполнять задачи по расписанию. В качестве брокера очередей обычно используются RabbitMQ или Redis. В очереди кладутся задачи, а потом воркеры Celery берут их оттуда и выполняют.
Для Celery можно придумать применение почти в любом приложении, но далее я опишу только те кейсы, в которых использую его сам.
1. Задачи по расписанию
Часто есть задачи, которые нужно выполнить в определенную дату и время: отправить пользователю напоминание, закончить пробный период аккаунта, опубликовать пост в соцсетях.
В Celery есть возможность при вызове таска указать параметр ETA – время, в которое надо запустить таск. Но если так планировать задачи, то получается очень ненадежно: они могут не запуститься и их неудобно отменять.
Более надежный способ – это использовать celerybeat schedule. То есть создать расписание, где будут таски, которые запускаются с определенной периодичностью или в определенное время. Например, если необходимо опубликовать пост в соцсетях по расписанию, то таск для этого запускается раз в минуту. Если надо закончить пробный период у аккаунта, то можно запускать таск раз в сутки.
В таске стартере получаем все инстансы, у которых запланированное время уже наступило. Проходимся по инстансам и для каждого вызываем основной таск. В качестве аргументов передаем только id инстанса, чтобы не засорять очередь ненужными данными. Можем сразу пройтись по всем инстансам и выполнить действия, но чаще всего лучше вызвать отдельный таск для каждого инстанса. Так мы ускорим выполнение, и, если произойдет ошибка, то она повлияет только на один из тасков.
2. Долгие вычисления и вызовы API из WSGI
Под WSGI подразумевается контекст, в котором обрабатываются запросы от пользователей (Request-Response Cycle). В противовес контексту асинхронных задач – Celery.
Для создания отзывчивого интерфейса все кнопки должны реагировать мгновенно и не должны блокировать остальной интерфейс. Для этого после нажатия кнопка блокируется, на нее ставится спиннер и отправляется ajax-запрос на сервер. Если обработка запроса занимает дольше пары секунд, то можно переместить вычисления в Celery-таск.
В WSGI вызываем таск и возвращаем ответ. На фронте разблокируем кнопку и убираем спиннер. Пользователю показываем сообщение, что действие запущено. Параллельно выполняется Celery-таск, который по завершению возвращает ответ по вебсокету. Получив результат на фронте, показываем его пользователю.
Отдельно можно выделить вызовы внешнего API из WSGI. В данном случае все вызовы, независимо от длительности их выполнения, запускаются через Celery-таск. Это защита от дурака. Не должно быть ситуации, когда из-за недоступности какого-то внешнего API подвисает интерфейс у пользователя.
3. Вызовы из Tornado
При интеграции с соцсетью, Telegram или платежным сервисом нужен webhook-урл, на который буду приходить оповещения. Количество запросов не всегда можно рассчитать заранее, но скорее всего их количество будет превышать запросы от пользователей. Эти запросы буду приходить до того момента, как получат ответ с кодом 200.
Для обработки таких запросов подходит асинхронный фреймворк Tornado. Чтобы не превращать обработку в синхронную в Tornado не должно быть блокирующих операций. Тут и нужен Celery. Tornado handler получает запрос, валидирует данные, вызывает Celery-таск и возвращает успешный ответ.
Как настроить Celery в Django
В этом руководстве по использованию Celery совместно с Django я расскажу:
Вы можете использовать на исходный код проекта из этого репозитория.
Зачем приложению на Django нужен Celery
Celery нужен для запуска задач в отдельном рабочем процессе ( worker ), что позволяет немедленно отправить HTTP-ответ пользователю в веб-процессе (даже если задача в рабочем процессе все еще выполняется). Цикл обработки запроса не будет заблокирован, что повысит качество взаимодействия с пользователем.
Ниже приведены некоторые примеры использования Celery:
Когда вы создаете веб-приложение, постарайтесь сделать время отклика не более, чем 500мс (используйте New Relic или Scout APM), если пользователь ожидает ответа слишком долго, выясните причину и попытайтесь устранить ее. В решении такой проблемы может помочь Celery.
Celery или RQ
RQ (Redis Queue) — еще одна библиотека Python, которая решает вышеуказанные проблемы.
Логика работы RQ схожа с Celery (используется шаблон проектирования производитель/потребитель). Далее я проведу поверхностное сравнение для лучшего понимания, какой из инструментов более подходит для задачи.
Я предпочитаю Celery, поскольку он замечательно подходит для решения многих проблем. Данная статья написана мной, чтобы помочь читателю (особенно новичку) быстро изучить Celery!
Брокер сообщений и бэкенд результатов
Брокер сообщений — это хранилище, которое играет роль транспорта между производителем и потребителем.
Из документации Celery рекомендуемым брокером является RabbitMQ, потому что он поддерживает AMQP (расширенный протокол очереди сообщений).
Так как во многих случаях нам не нужно использовать AMQP, другой диспетчер очереди, такой как Redis, также подойдет.
Бэкенд результатов — это хранилище, которое содержит информацию о результатах выполнения Celery-задач и о возникших ошибках.
Здесь рекомендуется использовать Redis.
Как настроить Celery
Celery не работает на Windows. Используйте Linux или терминал Ubuntu в Windows.
Далее я покажу вам, как импортировать Celery worker в ваш Django-проект.
Мы будем использовать Redis в качестве брокера сообщений и бэкенда результатов, что немного упрощает задачу. Но вы свободны в выборе любой другой комбинации, которая удовлетворяет требованиям вашего приложения.
Используйте Docker для подготовки среды разработки
Если вы работаете в Linux или Mac, у вас есть возможность использовать менеджер пакетов для настройки Redis (brew, apt-get install), однако я хотел бы порекомендовать вам попробовать применить Docker для установки сервера redis.
Команда выше запустит Redis на 127.0.0.1:6379.
Теперь импортируем Celery в наш Django-проект.
Создание Django-проекта
Рекомендую создать отдельное виртуальное окружение и работать в нем.
Ниже представлена структура проекта.
Файл celery.py
Давайте приступим к установке и настройке Celery.