Best Practices — Airflow 3 Документация

Перевод документации Apache Airflow 3 — Best Practices

Table of Contents

Лучшие практики по работе с Apache Airflow 3

Создание нового Dag — это процесс из трёх шагов:

  • написание Python-кода для создания объекта Dag,
  • проверка того, что код соответствует вашим ожиданиям,
  • настройка зависимостей окружения для запуска вашего Dag

В этом руководстве представлены лучшие практики для этих трёх шагов.

Написание Dag

Создание нового Dag в Airflow довольно простое. Однако существует множество вещей, о которых необходимо позаботиться, чтобы запуск Dag или его сбой не приводили к неожиданным результатам.

Создание пользовательского Operator/Hook

Пожалуйста, следуйте нашему руководству по пользовательским (custom) Operator’ам.

Создание задачи

Вы должны рассматривать задачи в Airflow как эквивалент транзакций в базе данных. Это означает, что ваши задачи никогда не должны производить неполные результаты. Например, нельзя оставлять неполные данные в HDFS или S3 по завершении задачи.

Airflow может повторно запускать задачу в случае её сбоя. Следовательно, задачи должны выдавать одинаковый результат при каждом повторном запуске. Некоторые способы избежать получения различного результата:

  • Не используйте INSERT при повторном запуске задачи — оператор INSERT может привести к появлению дублирующихся строк в базе данных. Замените его на UPSERT.
  • Читайте и записывайте данные в конкретный партицию. Никогда не читайте самые последние доступные данные в задаче. Кто-то может обновить входные данные между повторными запусками, что приведёт к разным результатам. Лучший подход — читать входные данные из конкретного партициона. В качестве партициона можно использовать data_interval_start. Этот же метод партиционирования следует применять и при записи данных в S3/HDFS.
  • Функция Python datetime now() возвращает текущий объект datetime. Эту функцию никогда не следует использовать внутри задачи, особенно для выполнения критических вычислений, так как это приводит к разным результатам при каждом запуске. Допустимо использовать её, например, для генерации временного лога.

Совет

Следует определять повторяющиеся параметры, такие как connection_id или пути S3, в default_args, а не объявлять их для каждой задачи. default_args помогают избежать ошибок, таких как опечатки. Кроме того, большинство типов соединений имеют уникальные имена параметров в задачах, поэтому вы можете объявить соединение только один раз в default_args (например, gcp_conn_id), и оно будет автоматически использоваться всеми операторами, которые работают с данным типом соединения.

Удаление задачи

Будьте осторожны при удалении задачи из Dag. После удаления вы не сможете увидеть эту задачу в Graph View, Grid View и других представлениях, что усложнит проверку логов данной задачи через Webserver. Если такое поведение нежелательно, пожалуйста, создайте новый Dag.

Коммуникация

Airflow выполняет задачи Dag на разных серверах в случае использования Kubernetes Executor или Celery Executor. Поэтому не следует хранить какие-либо файлы или конфигурации в локальной файловой системе, так как следующая задача с большой вероятностью будет выполняться на другом сервере без доступа к ним — например, задача, которая загружает файл с данными, который затем обрабатывается следующей задачей. В случае использования Local Executor хранение файлов на диске также может усложнить повторные запуски, например если вашей задаче требуется конфигурационный файл, который удаляется другой задачей в Dag.

По возможности используйте XCom для передачи небольших сообщений между задачами, а для передачи больших объёмов данных используйте удалённое хранилище, такое как S3 или HDFS. Например, если у вас есть задача, которая сохраняет обработанные данные в S3, эта задача может положить путь к выходным данным в S3 в XCom, а downstream-задачи смогут получить этот путь из XCom и использовать его для чтения данных.

Задачи также не должны хранить внутри себя какие-либо параметры аутентификации, такие как пароли или токены. По возможности используйте Connections для безопасного хранения данных в backend’е Airflow и получайте их с помощью уникального connection id.

Код верхнего уровня Python

Следует избегать написания кода верхнего уровня, который не требуется для создания Operator’ов и построения связей Dag между ними. Это связано с архитектурным решением планировщика Airflow и влиянием скорости парсинга кода верхнего уровня на производительность и масштабируемость Airflow.

Планировщик Airflow выполняет код вне методов execute операторов с минимальным интервалом min_file_process_interval секунд. Это делается для того, чтобы обеспечить динамическое планирование Dag’ов — когда расписание и зависимости могут со временем изменяться и влиять на следующий запуск Dag. Планировщик Airflow постоянно старается убедиться, что то, что описано в Dag’ах, корректно отражено в запланированных задачах.

В частности, не следует выполнять доступ к базам данных, тяжёлые вычисления и сетевые операции.

Одним из важных факторов, влияющих на время загрузки Dag, который часто упускают из виду Python-разработчики, является то, что импорты на верхнем уровне могут занимать неожиданно много времени и создавать значительные накладные расходы. Этого легко избежать, переместив такие импорты в локальные импорты внутри Python-callable, например.

Рассмотрим два примера ниже. В первом примере Dag будет парситься на дополнительные 1000 секунд дольше, чем функционально эквивалентный Dag во втором примере, где expensive_api_call выполняется в контексте своей задачи.

Неизбежание кода верхнего уровня Dag:

Избегание кода верхнего уровня Dag:

В первом примере expensive_api_call выполняется каждый раз при парсинге файла Dag, что приводит к неоптимальной производительности при обработке Dag-файла. Во втором примере expensive_api_call вызывается только во время выполнения задачи и, таким образом, Dag может быть распарсен без потери производительности. Чтобы проверить это самостоятельно, реализуйте первый Dag и посмотрите, как строка «Hello from Airflow!» выводится в логах планировщика.

Обратите внимание, что операторы import также считаются кодом верхнего уровня. Поэтому, если у вас есть import, который выполняется долго, или импортируемый модуль сам выполняет код на верхнем уровне, это также может негативно сказаться на производительности планировщика. Следующий пример показывает, как работать с дорогостоящими импортами.

Как проверить, является ли мой код «кодом верхнего уровня»

Чтобы понять, является ли ваш код «кодом верхнего уровня» или нет, необходимо разбираться во многих тонкостях того, как работает парсинг Python. В общем случае, когда Python парсит файл, он выполняет весь код, который видит, за исключением (как правило) внутреннего кода методов, который он не выполняет.

Существует ряд неочевидных специальных случаев — например, к коду верхнего уровня также относится любой код, используемый для определения значений по умолчанию у методов.

Однако есть простой способ проверить, является ли ваш код «кодом верхнего уровня» или нет. Достаточно распарсить ваш код и посмотреть, выполняется ли данный фрагмент кода.

Представьте следующий код:

Чтобы это проверить, вы можете добавить несколько операторов print в код, который хотите проверить, а затем выполнить команду python <my_dag_file>.py.

При выполнении этого кода вы увидите:

Это означает, что get_array не выполняется как код верхнего уровня, а get_task_id — выполняется.

Качество кода и линтинг

Поддержание высокого качества кода имеет ключевое значение для надёжности и сопровождаемости ваших workflow в Airflow. Использование инструментов линтинга помогает выявлять потенциальные проблемы и обеспечивать соблюдение стандартов кодирования. Одним из таких инструментов является ruff — быстрый линтер для Python, который теперь включает специальные правила для Airflow.

ruff помогает выявлять устаревшие возможности и паттерны, которые могут повлиять на миграцию на Airflow 3.0. Например, он включает правила с префиксом AIR, предназначенные для обнаружения потенциальных проблем.

Полный список этих правил описан в разделе Airflow (AIR).

Установка и использование ruff

ruff — это очень быстрый линтер и автоформаттер для Python, написанный на Rust (в десятки раз быстрее flake8, isort, pylint).


Установка: установите ruff с помощью pip:

Запуск ruff: выполните ruff для проверки ваших Dag’ов на наличие потенциальных проблем:

Эта команда проанализирует ваши Dag’и, расположенные в директории dags/, и сообщит о проблемах, связанных с указанными правилами.

Пример

Рассмотрим legacy Dag, определённый следующим образом:

Запуск ruff приведёт к следующему выводу:

Интегрируя ruff в ваш процесс разработки, вы можете заблаговременно устранять устаревшие элементы и поддерживать высокое качество кода, что облегчает переход между версиями Airflow.

Динамическая генерация Dag

Иногда написание Dag’ов вручную нецелесообразно. Возможно, у вас есть большое количество Dag’ов, которые делают одно и то же, отличаясь лишь параметрами. Или вам нужен набор Dag’ов для загрузки таблиц, но вы не хотите вручную обновлять Dag’и каждый раз при изменении этих таблиц. В этих и других случаях может быть полезно динамически генерировать Dag’и.

Избегание избыточной обработки в коде верхнего уровня, описанное в предыдущей главе, особенно важно в случае динамической конфигурации Dag’ов, которая, по сути, может быть реализована одним из следующих способов:

  • через переменные окружения (не путать с Airflow Variables)
  • через внешне предоставляемый, сгенерированный Python-код, содержащий метаданные в папке Dag’ов
  • через внешний, сгенерированный файл конфигурационных метаданных в папке Dag’ов

Некоторые случаи динамической генерации Dag’ов описаны в разделе Dynamic Dag Generation.

Переменные Airflow

Использование переменных Airflow приводит к сетевым вызовам и обращениям к базе данных, поэтому их применение в коде Python верхнего уровня для DAG-ов следует по возможности избегать, как упоминалось в предыдущей главе Python-код верхнего уровня. Если переменные Airflow всё же необходимо использовать в коде DAG верхнего уровня, их влияние на парсинг DAG можно снизить, включив экспериментальный кэш, настроенный с разумным значением ttl.

Вы можете свободно использовать переменные Airflow внутри методов execute() операторов, а также передавать переменные Airflow в существующие операторы через Jinja-шаблоны, что откладывает чтение значения до момента выполнения задачи. Синтаксис шаблона для этого следующий:

или, если требуется десериализовать JSON-объект из переменной:

В коде верхнего уровня переменные, использующие Jinja-шаблоны, не выполняют запрос до момента запуска задачи, тогда как Variable.get() выполняет запрос каждый раз, когда файл DAG парсится планировщиком, если кэширование не включено. Использование Variable.get() без включённого кэширования приводит к неоптимальной производительности при обработке файлов DAG.

В некоторых случаях это может привести к тому, что файл DAG не успеет полностью распарситься и произойдёт тайм-аут.

Плохой пример:

Хороший пример:

В целях безопасности рекомендуется использовать Secrets Backend для любых переменных, содержащих чувствительные данные.

Расписания (Timetables)

Избегайте использования переменных/подключений Airflow или обращения к базе данных Airflow на верхнем уровне кода расписаний. Доступ к базе данных должен быть отложен до момента выполнения DAG. Это означает, что не следует получать переменные/подключения в качестве аргументов при инициализации класса расписания, а также использовать Variable/Connection на верхнем уровне вашего пользовательского модуля расписания.

Плохой пример:

Хороший пример:

Запуск DAG-ов после изменений

Избегайте запуска DAG-ов сразу после их изменения или изменения любых сопутствующих файлов в папке DAG-ов.

Необходимо дать системе достаточно времени для обработки изменённых файлов. Этот процесс включает несколько этапов. Сначала файлы должны быть доставлены планировщику — обычно через распределённую файловую систему или Git-Sync, затем планировщик должен распарсить Python-файлы и сохранить их в базе данных. В зависимости от вашей конфигурации, скорости распределённой файловой системы, количества файлов, количества DAG-ов, числа изменений в файлах, размеров файлов, количества планировщиков, скорости CPU, этот процесс может занимать от нескольких секунд до нескольких минут, а в крайних случаях — многие минуты. Вам следует дождаться появления DAG-а в UI, прежде чем пытаться его запустить.

Если вы наблюдаете большие задержки между обновлением DAG-а и моментом, когда он становится доступен для запуска, вы можете обратить внимание на следующие параметры конфигурации и настроить их в соответствии с вашими потребностями (подробности по каждому параметру см. по ссылкам):

  • scheduler_idle_sleep_time — Управляет временем ожидания планировщика между циклами, но если в цикле ничего не нужно делать, то есть если что-то запланировано, то следующая итерация цикла начнется немедленно.
  • min_file_process_interval — Количество секунд, по истечении которых происходит разбор DAG-файла. Разбор DAG-файла происходит каждые несколько секунд. Обновления DAG-файлов отражаются после этого интервала. Низкое значение этого параметра приведет к увеличению загрузки ЦП.
  • refresh_interval — Как часто (в секундах) следует обновлять или искать новые файлы в пакете DAG.
  • parsing_processes — Процессор DAG может запускать несколько процессов параллельно для анализа DAG. Это определяет, сколько процессов будет запущено.
  • file_parsing_sort_mode — Один из вариантов modified_time, random_seeded_by_host и alphabetical. Процессор DAG перечислит и отсортирует файлы DAG, чтобы определить порядок их анализа.
    • modified_time — Сортировка файлов по времени изменения. Это полезно в больших масштабах для предварительной обработки недавно измененных DAG-графов.
    • random_seeded_by_host — Произвольная сортировка файлов несколькими процессорами DAG, но в одном и том же порядке на одном и том же хосте, что позволяет каждому процессору обрабатывать файлы в разном порядке.
    • alphabetical — Сортировка по имени файла

Пример паттерна watcher с правилами триггеров

Паттерн watcher — это способ организации DAG-а с задачей, которая «наблюдает» за состояниями других задач. Его основное назначение — пометить запуск DAG-а как failed, если любая другая задача завершилась с ошибкой. Необходимость в этом возникла в системных тестах Airflow, которые представляют собой DAG-и с разными задачами (аналогично тесту, состоящему из шагов).

Обычно, когда любая задача завершается с ошибкой, все остальные задачи не выполняются, и весь запуск DAG-а также получает статус failed. Однако при использовании правил триггеров можно нарушить стандартный поток выполнения задач, и весь DAG может получить статус, отличный от ожидаемого. Например, можно иметь задачу очистки ресурсов (teardown task) с правилом триггера TriggerRule.ALL_DONE, которая будет выполняться независимо от состояния других задач (например, для освобождения ресурсов). В такой ситуации DAG всегда выполнит эту задачу, и запуск DAG-а получит статус именно этой задачи, в результате чего можно потерять информацию о задачах, завершившихся с ошибкой. Если требуется гарантировать, что DAG с задачей очистки завершится с ошибкой при падении любой задачи, необходимо использовать паттерн watcher.

Задача watcher — это задача, которая всегда завершается с ошибкой при выполнении, но она должна запускаться только в том случае, если любая другая задача завершилась с ошибкой. Для неё необходимо установить правило триггера TriggerRule.ONE_FAILED, а также сделать её downstream-задачей для всех остальных задач в DAG-е. Благодаря этому, если все остальные задачи завершатся успешно, watcher будет пропущена, а если произойдёт ошибка, задача watcher выполнится и завершится с ошибкой, что приведёт к статусу failed у всего запуска DAG-а.

Примечание

Следует учитывать, что правила триггеров опираются только на непосредственные upstream-задачи (родительские). Например, TriggerRule.ONE_FAILED будет игнорировать любые задачи со статусом failed (или upstream_failed), которые не являются прямыми родителями параметризуемой задачи.

Проще понять концепцию на примере. Предположим, у нас есть следующий DAG:

Визуальное представление этого DAG-а после выполнения выглядит следующим образом:

В нём есть несколько задач, выполняющих разные роли:

  • failing_task — всегда завершается с ошибкой;
  • passing_task — всегда завершается успешно (если выполняется);
  • teardown — всегда запускается (независимо от состояний других задач) и должна всегда завершаться успешно;
  • watcher — является downstream-задачей для всех остальных задач, то есть запускается, когда любая задача завершается с ошибкой, и тем самым переводит весь запуск DAG-а в состояние failed, так как является листовой задачей.

Важно отметить, что без задачи watcher весь запуск DAG-а получит состояние success, поскольку единственная задача, завершающаяся с ошибкой, не является листовой, а задача teardown завершится успешно. Если мы хотим, чтобы watcher отслеживала состояние всех задач, необходимо сделать её зависимой от каждой из них по отдельности. Благодаря этому мы можем перевести запуск DAG-а в состояние failed, если любая из задач завершится с ошибкой. Обратите внимание, что для задачи watcher установлено правило триггера «one_failed».

С другой стороны, без задачи teardown задача watcher не понадобилась бы, поскольку failing_task передала бы свой статус failed downstream-задаче passing_task, и весь запуск DAG-а также получил бы статус failed.

Использование исключения AirflowClusterPolicySkipDag в кластерных политиках для пропуска определённых DAG-ов

Добавлено в версии 2.7.

DAGAirflow обычно разворачиваются и обновляются из конкретной ветки Git-репозитория с помощью git-sync. Однако, когда по операционным причинам требуется запускать несколько кластеров Airflow, поддержка нескольких Git-веток становится крайне неудобной. Особенно это усложняется, когда необходимо периодически синхронизировать две отдельные ветки (например, prod и beta) с использованием корректной стратегии ветвления.

  • cherry-pick слишком трудоёмок для сопровождения Git-репозитория;
  • hard-reset не является рекомендуемым подходом в GitOps.

Вместо этого можно рассмотреть вариант подключения нескольких кластеров Airflow к одной и той же ветке Git (например, main) и управления ими с помощью разных переменных окружения и различных конфигураций подключений с одинаковым connection_id. При необходимости также можно выбрасывать исключение AirflowClusterPolicySkipDag в кластерной политике, чтобы загружать определённые DAG-и в DagBag только в конкретном развертывании Airflow.

Приведённый выше пример показывает фрагмент кода dag_policy, который пропускает DAG в зависимости от тегов, указанных у него.

Снижение сложности DAG-ов

Хотя Airflow хорошо справляется с обработкой большого количества DAG-ов с множеством задач и зависимостей между ними, при наличии большого числа сложных DAG-ов их сложность может негативно сказаться на производительности планирования. Одним из способов поддерживать высокую производительность и эффективное использование экземпляра Airflow является стремление к упрощению и оптимизации DAG-ов везде, где это возможно. Следует помнить, что процесс парсинга и создания DAG-а — это всего лишь выполнение Python-кода, и именно от вас зависит, насколько производительным он будет. Не существует «волшебных рецептов» для того, чтобы сделать DAG «менее сложным» — поскольку это Python-код, именно автор DAG-а контролирует сложность своего кода.

Не существует метрик «сложности DAG-а», и в частности нет метрик, которые могли бы однозначно сказать, является ли DAG «достаточно простым». Однако, как и в случае с любым Python-кодом, можно определить, что код DAG-а стал «проще» или «быстрее», если он оптимизирован. Если вы хотите оптимизировать свои DAG-и, можно предпринять следующие действия:

  • Сделайте загрузку DAG-а быстрее:
    Это единственная рекомендация по улучшению, которая может быть реализована разными способами, но именно она оказывает наибольшее влияние на производительность планировщика. Если у вас есть возможность ускорить загрузку DAG-а — делайте это, если ваша цель — повышение производительности. Обратитесь к разделу Python-код верхнего уровня для получения советов, а также к Dag Loader Test, чтобы оценить время загрузки DAG-а.
  • Генерируйте более простую структуру DAG-а:
    Каждая зависимость между задачами добавляет дополнительную нагрузку на планирование и выполнение. DAG с простой линейной структурой A → B → C будет испытывать меньшие задержки при планировании задач, чем DAG с глубоко вложенной древовидной структурой, например с экспоненциально растущим числом зависимых задач. Если вы можете сделать свои DAG-и более линейными — так, чтобы в каждый момент времени было как можно меньше потенциальных задач-кандидатов на запуск, — это, как правило, улучшит общую производительность планирования.
  • Уменьшите количество DAG-ов в одном файле:
    Хотя Airflow 2 оптимизирован для сценария, при котором в одном файле описано несколько DAG-ов, в системе есть компоненты, из-за которых такой подход иногда менее производителен или приводит к большим задержкам по сравнению с разбиением DAG-ов по нескольким файлам. Уже сам факт того, что один файл может быть обработан только одним FileProcessor, делает этот подход менее масштабируемым. Если у вас много DAG-ов, генерируемых из одного файла, рассмотрите возможность их разделения, особенно если вы замечаете, что изменения в файлах DAG-ов долго отражаются в UI Airflow.
  • Пишите эффективный Python-код:
    Необходимо соблюдать баланс между меньшим количеством DAG-ов в файле (как указано выше) и общим объёмом кода. Файлы Python, описывающие DAG-и, должны следовать лучшим практикам программирования и не должны рассматриваться как конфигурационные файлы. Если ваши DAG-и используют схожий код, не следует копировать его снова и снова в большое количество почти идентичных исходных файлов, так как это приведёт к ненужным повторным импортам одних и тех же ресурсов. Вместо этого следует стремиться к минимизации повторяющегося кода во всех DAG-ах, чтобы приложение работало эффективно и было проще в отладке.
    См. раздел Dynamic Dag Generation о том, как создавать несколько DAG-ов с похожей логикой.

Тестирование DAG-а

Пользователям Airflow следует относиться к DAG-ам как к коду промышленного уровня, и у DAG-ов должны быть различные связанные тесты, чтобы гарантировать получение ожидаемых результатов. Для DAG-а можно написать широкий спектр тестов. Рассмотрим некоторые из них.

Тест загрузки DAG-а (Dag Loader Test)

Этот тест должен гарантировать, что ваш DAG не содержит кода, который вызывает ошибку во время загрузки. Для запуска этого теста пользователю не требуется писать дополнительный код.

Выполнение приведённой выше команды без ошибок гарантирует, что в DAG-е нет неустановленных зависимостей, синтаксических ошибок и т. д. Убедитесь, что вы загружаете DAG в окружении, соответствующем окружению планировщика — с теми же зависимостями, переменными окружения и общим кодом, на который ссылается DAG.

Это также отличный способ проверить, загружается ли DAG быстрее после оптимизации, если вы хотите попробовать оптимизировать время загрузки DAG-а. Просто запустите DAG и измерьте время его выполнения, но, опять же, необходимо убедиться, что DAG выполняется с теми же зависимостями, переменными окружения и общим кодом.

Существует множество способов измерить время выполнения, один из них в Linux — использование встроенной команды time. Обязательно запускайте её несколько раз подряд, чтобы учесть эффекты кэширования. Сравнивайте результаты до и после оптимизации (в одинаковых условиях — на той же машине, в том же окружении и т. д.), чтобы оценить влияние оптимизации.

Результат:

Важной метрикой является «real time», которая показывает, сколько времени заняла обработка DAG-а. Обратите внимание, что при таком способе загрузки файла каждый раз запускается новый интерпретатор, поэтому присутствует начальное время инициализации, которого нет при парсинге DAG-а самим Airflow. Оценить время инициализации можно, выполнив:

Результат:

В данном случае начальное время запуска интерпретатора составляет примерно ~0,07 с, что составляет около 10% времени, необходимого для парсинга example_python_operator.py выше, поэтому фактическое время парсинга для примера DAG-а составляет примерно ~0,62 с.

Подробности о том, как тестировать отдельные операторы, см. в разделе Testing a Dag.

Юнит-тесты

Юнит-тесты гарантируют отсутствие некорректного кода в вашем DAG-е. Вы можете писать юнит-тесты как для отдельных задач, так и для самого DAG-а.

Юнит-тест загрузки DAG-а:

Юнит-тест структуры DAG-а:

Это пример теста, предназначенного для проверки структуры DAG-а, сгенерированного кодом, путём сравнения с объектом типа dict.

Юнит-тест для пользовательского оператора:

Самопроверки (Self-Checks)

Вы также можете реализовать проверки непосредственно в DAG-е, чтобы убедиться, что задачи производят ожидаемые результаты. Например, если у вас есть задача, которая выгружает данные в S3, вы можете реализовать проверку в следующей задаче. Такая проверка, к примеру, может удостовериться, что партиция создана в S3, и выполнить простые проверки, чтобы определить корректность данных.

Аналогично, если у вас есть задача, которая запускает микросервис в Kubernetes или Mesos, следует проверить, был ли сервис успешно запущен, используя airflow.providers.http.sensors.http.HttpSensor.

Staging-окружение

По возможности поддерживайте staging-окружение для тестирования полного выполнения DAG-а перед деплоем в production. Убедитесь, что ваш DAG параметризован и позволяет изменять переменные, например путь вывода при работе с S3 или базу данных, используемую для чтения конфигурации. Не хардкодьте значения внутри DAG-а и не изменяйте их вручную в зависимости от окружения.

Для параметризации DAG-а вы можете использовать переменные окружения.

Мокирование переменных и подключений

При написании тестов для кода, использующего переменные или подключения, необходимо убедиться, что они существуют во время выполнения тестов. Очевидное решение — сохранить эти объекты в базе данных, чтобы их можно было прочитать во время выполнения кода. Однако чтение и запись объектов в базу данных сопровождаются дополнительными временными затратами. Чтобы ускорить выполнение тестов, имеет смысл имитировать наличие этих объектов без сохранения их в базе данных. Для этого можно создать переменные окружения, замокировав os.environ с помощью unittest.mock.patch.dict().

Для переменных используйте AIRFLOW_VAR_{KEY}.

Для подключений используйте AIRFLOW_CONN_{CONN_ID}.

Обслуживание metadata DB

Со временем база метаданных будет увеличивать занимаемое дисковое пространство по мере накопления запусков DAG-ов и задач, а также логов событий.

Для очистки старых данных можно использовать Airflow CLI с командой airflow db clean.

Подробности см. в разделе использования db clean.

Обновления и откаты версий

Резервное копирование базы данных

Всегда разумно делать резервную копию базы метаданных перед выполнением любых операций, изменяющих базу данных.

Отключение планировщика

Во время проведения такого обслуживания можно рассмотреть отключение кластера Airflow.

Один из способов — установить параметр [scheduler] > use_job_schedule в значение False и дождаться завершения всех выполняющихся DAG-ов; после этого новые запуски DAG-ов не будут создаваться, если только они не будут запущены извне.

Лучший способ (хотя и более ручной) — использовать команду dags pause. Вам потребуется заранее зафиксировать список DAG-ов, которые не находятся в состоянии паузы, чтобы затем знать, какие из них нужно вернуть в активное состояние после завершения обслуживания. Сначала выполните airflow dags list и сохраните список не приостановленных DAG-ов. Затем используйте этот же список для выполнения dags pause для каждого DAG-а перед обслуживанием и dags unpause после его завершения. Преимущество такого подхода в том, что после обновления можно попробовать снять с паузы только один или два DAG-а (например, специальные тестовые DAG-и), чтобы убедиться, что всё работает корректно, прежде чем включать все DAG-и обратно.

Добавление DAG-ов для интеграционного тестирования

Полезно добавить несколько DAG-ов для «интеграционного тестирования», которые используют все основные сервисы вашей экосистемы (например, S3, Snowflake, Vault), но с тестовыми ресурсами или «dev»-аккаунтами. Эти тестовые DAG-и можно запускать первыми после обновления, поскольку в случае их сбоя это не приведёт к негативным последствиям, и вы сможете откатиться к резервной копии. Если же они выполняются успешно, это подтвердит, что кластер способен выполнять задачи с использованием необходимых библиотек и сервисов.

Например, если вы используете внешний secrets backend, убедитесь, что у вас есть задача, которая извлекает подключение. Если вы используете KubernetesPodOperator, добавьте задачу, выполняющую sleep 30; echo "hello". Если требуется запись в S3 — реализуйте это в тестовой задаче. А если нужен доступ к базе данных, добавьте задачу, выполняющую select 1 на сервере.

Очистка данных перед обновлением (Prune data)

Некоторые миграции базы данных могут занимать значительное время. Если база метаданных имеет очень большой размер, перед выполнением обновления стоит рассмотреть возможность очистки части старых данных с помощью команды db clean. Используйте с осторожностью.

Работа с конфликтующими и сложными Python-зависимостями

Airflow имеет множество Python-зависимостей, и иногда зависимости Airflow конфликтуют с зависимостями, которые ожидает код ваших задач. Поскольку по умолчанию окружение Airflow представляет собой единый набор Python-зависимостей и одно Python-окружение, нередко возникают ситуации, когда разные задачи требуют различных зависимостей, которые при этом конфликтуют между собой.

Если вы используете предопределённые Operator’ы Airflow для взаимодействия с внешними сервисами, выбор обычно невелик, однако такие операторы, как правило, имеют зависимости, не конфликтующие с базовыми зависимостями Airflow. Airflow использует механизм constraints, что означает наличие «зафиксированного» набора зависимостей, с которым сообщество гарантирует корректную установку Airflow (включая все community-провайдеры) без возникновения конфликтов. При этом вы можете обновлять провайдеры независимо, и их constraints вас не ограничивают, поэтому вероятность конфликтов зависимостей ниже (хотя такие зависимости всё равно необходимо тестировать). Таким образом, при использовании предопределённых операторов вероятность столкнуться с конфликтующими зависимостями минимальна или отсутствует вовсе.

Однако при более «современном» подходе к использованию Airflow — когда вы применяете TaskFlow API и большинство операторов реализуете с помощью собственного Python-кода, либо когда вы пишете собственные Custom Operator’ы — вы можете столкнуться с ситуацией, когда зависимости, требуемые вашим кастомным кодом, конфликтуют с зависимостями Airflow, или даже когда зависимости нескольких ваших Custom Operator’ов конфликтуют между собой.

Существует несколько стратегий, которые можно использовать для смягчения этой проблемы. И хотя работа с конфликтами зависимостей в кастомных операторах может быть сложной, она значительно упрощается при использовании airflow.providers.standard.operators.python.PythonVirtualenvOperator или airflow.providers.standard.operators.python.ExternalPythonOperator — как при прямом использовании классического подхода с Operator’ами, так и при использовании задач, декорированных @task.virtualenv или @task.external_python, если вы применяете TaskFlow.

Начнём со стратегий, которые проще всего реализовать (хотя они имеют определённые ограничения и накладные расходы), и постепенно перейдём к стратегиям, требующим изменений в развертывании Airflow.

Использование PythonVirtualenvOperator

Это самая простая в использовании и одновременно наиболее ограниченная стратегия. PythonVirtualenvOperator позволяет динамически создавать virtualenv, в котором будет выполняться ваш Python-callable. В современном подходе TaskFlow, описанном в разделе Pythonic Dags with the TaskFlow API, это также можно сделать, задекорировав callable декоратором @task.virtualenv (рекомендуемый способ использования оператора). Каждая задача airflow.providers.standard.operators.python.PythonVirtualenvOperator может иметь собственный независимый Python virtualenv (динамически создаваемый при каждом запуске задачи) и задавать детальный набор зависимостей, которые необходимо установить для выполнения этой задачи.

Оператор берёт на себя:

  • создание virtualenv на основе вашего окружения,
  • сериализацию вашего Python-callable и передачу его на выполнение Python-интерпретатору внутри virtualenv,
  • выполнение callable, получение результата и передачу его через XCom, если это указано.

Преимущества оператора:

  • Нет необходимости заранее подготавливать virtualenv. Он динамически создаётся перед запуском задачи и удаляется после её завершения, поэтому для использования нескольких виртуальных окружений не требуется ничего особенного (кроме наличия пакета virtualenv в зависимостях Airflow).
  • Вы можете запускать задачи с разными наборами зависимостей на одних и тех же воркерах — таким образом, ресурсы памяти переиспользуются (хотя см. ниже про накладные расходы на CPU при создании virtualenv).
  • В крупных инсталляциях авторам Dag’ов не нужно просить кого-то создавать virtualenv за них. Как автор Dag’а, вам достаточно иметь установленную зависимость virtualenv, и вы можете задавать и изменять окружения по своему усмотрению.
  • Не требуется изменений в требованиях к деплою — независимо от того, используете ли вы локальный virtualenv, Docker или Kubernetes, задачи будут работать без добавления чего-либо в окружение развертывания.
  • Автору Dag’ов не нужно изучать контейнеры или Kubernetes. Для такого подхода к написанию Dag’ов достаточно знания Python-зависимостей.

У данного оператора есть определённые ограничения и накладные расходы:

  • Ваш Python-callable должен быть сериализуемым. Существует множество Python-объектов, которые не сериализуются стандартной библиотекой pickle. Часть этих ограничений можно обойти с помощью библиотеки dill, однако и она не решает всех проблем сериализации.
  • Все зависимости, отсутствующие в окружении Airflow, должны импортироваться локально внутри используемого callable, а код верхнего уровня Dag не должен импортировать или использовать эти библиотеки.
  • Virtualenv запускаются в рамках одной и той же операционной системы, поэтому они не могут иметь конфликтующие системные зависимости (устанавливаемые через apt или yum). Независимо могут устанавливаться только Python-зависимости.
  • Оператор добавляет накладные расходы на CPU, сеть и общее время выполнения каждой задачи — Airflow вынужден пересоздавать virtualenv с нуля для каждого запуска задачи.
  • Воркеры должны иметь доступ к PyPI или приватным репозиториям для установки зависимостей.
  • Динамическое создание virtualenv подвержено временным сбоям (например, если репозиторий недоступен или возникают сетевые проблемы при подключении к нему).
  • Легко попасть в ситуацию «слишком» динамичного окружения — устанавливаемые зависимости могут обновляться, а их транзитивные зависимости могут получать независимые обновления, в результате чего задача может перестать работать из-за выхода новой версии зависимости или вы можете стать жертвой атаки на цепочку поставок, когда новая версия зависимости оказывается вредоносной.
  • Задачи изолированы друг от друга только за счёт выполнения в разных окружениях. Это означает, что выполняющиеся задачи всё ещё могут влиять друг на друга — например, последующие задачи, выполняемые на том же воркере, могут быть затронуты предыдущими задачами, которые создавали или изменяли файлы и т. п.

Подробные примеры использования airflow.providers.standard.operators.python.PythonVirtualenvOperator приведены в соответствующем разделе руководства по TaskFlow API.

Использование ExternalPythonOperator

Добавлено в версии 2.4.

Более сложным в использовании, но при этом значительно менее накладным с точки зрения ресурсов, безопасности и стабильности вариантом является использование airflow.providers.standard.operators.python.ExternalPythonOperator. В современном подходе TaskFlow, описанном в разделе Pythonic Dags with the TaskFlow API, этого также можно добиться, задекорировав ваш callable декоратором @task.external_python (рекомендуемый способ использования оператора). Однако для этого требуется заранее подготовленное, неизменяемое Python-окружение. В отличие от airflow.providers.standard.operators.python.PythonVirtualenvOperator, вы не можете добавлять новые зависимости в такое предсуществующее окружение. Все необходимые зависимости должны быть добавлены заранее и быть доступны на всех воркерах, если Airflow работает в распределённом окружении.

Таким образом, вы избегаете накладных расходов и проблем, связанных с пересозданием virtualenv, однако такие окружения необходимо подготовить и задеплоить вместе с установкой Airflow. Обычно в этот процесс вовлечены специалисты, отвечающие за установку Airflow, и в крупных инсталляциях это, как правило, другие люди, нежели авторы Dag’ов (DevOps/System Admins).

Такие virtualenv могут быть подготовлены разными способами: при использовании LocalExecutor их достаточно установить на машине, где запускается планировщик; при использовании распределённой установки Celery должна существовать пайплайн, который устанавливает эти virtualenv на нескольких машинах; наконец, если вы используете Docker-образы (например, в Kubernetes), создание virtualenv должно быть добавлено в пайплайн сборки вашего кастомного образа.

Преимущества оператора:

  • Отсутствие накладных расходов при запуске задачи. Virtualenv уже готов в момент начала выполнения задачи.
  • Вы можете запускать задачи с разными наборами зависимостей на одних и тех же воркерах — таким образом, все ресурсы переиспользуются.
  • Воркерам не требуется доступ к PyPI или приватным репозиториям. Меньше вероятность временных сбоев, связанных с сетью.
  • Зависимости могут быть заранее проверены администраторами и командой безопасности, и никакой новый, неожиданный код не будет динамически добавляться. Это полезно как с точки зрения безопасности, так и стабильности.
  • Минимальное влияние на деплой — вам не нужно переходить на Docker-контейнеры или Kubernetes, чтобы эффективно использовать оператор.
  • Автору Dag’ов не нужно изучать контейнеры или Kubernetes. Для написания Dag’ов таким способом достаточно знания Python и работы с requirements.

Недостатки:

  • Окружения должны быть подготовлены заранее. Обычно это означает, что вы не можете менять их «на лету»: добавление новых зависимостей или изменение существующих требует как минимум повторного деплоя Airflow, а время итераций при разработке новых версий может увеличиться.
  • Ваш Python-callable должен быть сериализуемым. Существует множество Python-объектов, которые не сериализуются стандартной библиотекой pickle. Часть этих ограничений можно смягчить с помощью библиотеки dill, однако она также не решает всех проблем сериализации.
  • Все зависимости, отсутствующие в окружении Airflow, должны импортироваться локально внутри используемого callable, а код верхнего уровня Dag не должен импортировать или использовать эти библиотеки.
  • Virtualenv запускаются в рамках одной и той же операционной системы, поэтому они не могут иметь конфликтующие системные зависимости (устанавливаемые через apt или yum). Независимо могут устанавливаться только Python-зависимости.
  • Задачи изолированы друг от друга только за счёт выполнения в разных окружениях. Это означает, что выполняющиеся задачи всё ещё могут влиять друг на друга — например, последующие задачи, выполняемые на том же воркере, могут быть затронуты предыдущими задачами, которые создавали или изменяли файлы и т. п.

PythonVirtualenvOperator и ExternalPythonOperator можно рассматривать как взаимодополняющие инструменты, которые упрощают переход от этапа разработки к продакшену. Как автор Dag’ов, вы обычно будете итерироваться с зависимостями и разрабатывать Dag, используя PythonVirtualenvOperator (декорируя задачи @task.virtualenv), а после завершения итераций и внесения изменений, для продакшена, скорее всего, переключитесь на ExternalPythonOperator@task.external_python) после того, как команды DevOps/System Admin развернут новые зависимости в предсуществующих virtualenv в продакшене. Преимущество такого подхода в том, что вы в любой момент можете вернуть декоратор обратно и продолжить «динамическую» разработку с PythonVirtualenvOperator.

Подробные примеры использования airflow.providers.standard.operators.python.ExternalPythonOperator приведены в разделе TaskFlow External Python example.

Использование DockerOperator или KubernetesPodOperator

Ещё одной стратегией является использование airflow.providers.docker.operators.docker.DockerOperator и airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator. Для этого требуется, чтобы Airflow имел доступ к Docker Engine или кластеру Kubernetes.

Аналогично Python-операторам, декораторы TaskFlow удобны в случае, если вы хотите использовать эти операторы для выполнения вашего Python-callable.

Однако этот подход значительно сложнее — вам необходимо понимать, как работают Docker-контейнеры и Kubernetes Pod’ы, если вы хотите его использовать. Зато задачи полностью изолированы друг от друга, и вы даже не ограничены выполнением только Python-кода. Вы можете писать задачи на любом языке программирования. Кроме того, ваши зависимости полностью независимы от зависимостей Airflow (включая системные зависимости), поэтому если вашей задаче требуется принципиально иное окружение, это подходящий вариант.

Добавлено в версии 2.2:
Начиная с версии Airflow 2.2, вы можете использовать декоратор @task.docker для запуска функций с помощью DockerOperator.

Добавлено в версии 2.4:
Начиная с версии Airflow 2.2, вы можете использовать декоратор @task.kubernetes для запуска функций с помощью KubernetesPodOperator.

Преимущества использования этих операторов:

  • Вы можете запускать задачи с разными наборами как Python-, так и системных зависимостей, а также задачи, написанные на совершенно другом языке программирования или даже под другую архитектуру процессора (x86 vs. arm).
  • Окружение, в котором выполняются задачи, использует оптимизации и неизменяемость контейнеров. Похожие наборы зависимостей эффективно переиспользуют закешированные слои образов, поэтому окружение хорошо оптимизировано для случаев, когда у вас есть несколько похожих, но разных окружений.
  • Зависимости могут быть заранее проверены администраторами и командой безопасности, и никакой новый, неожиданный код не будет динамически добавляться. Это полезно как с точки зрения безопасности, так и стабильности.
  • Полная изоляция между задачами. Они не могут влиять друг на друга иначе, чем через стандартные механизмы Airflow XCom.

Недостатки:

  • Существует накладной расход на запуск задач. Обычно он меньше, чем при динамическом создании virtualenv, но всё равно заметен (особенно для KubernetesPodOperator).
  • В случае использования декораторов TaskFlow весь вызываемый метод должен быть сериализован и передан в Docker-контейнер или Kubernetes Pod, при этом существуют системные ограничения на размер метода. Сериализация, передача и последующая десериализация на удалённой стороне также добавляют накладные расходы.
  • Присутствуют накладные расходы по ресурсам, связанные с необходимостью нескольких процессов. При использовании этих операторов для выполнения задач требуется как минимум два процесса: один процесс (в Docker-контейнере или Kubernetes Pod), выполняющий задачу, и процесс-наблюдатель в воркере Airflow, который отправляет задание в Docker/Kubernetes и отслеживает его выполнение.
  • Контейнерные образы должны быть подготовлены заранее. Обычно это означает, что вы не можете изменять их «на лету». Добавление системных зависимостей, изменение или обновление Python-зависимостей требует пересборки и публикации образа (как правило, в приватном реестре). Время итераций при работе с новыми зависимостями обычно больше и требует от разработчика сборки и использования собственных образов во время разработки. Наличие корректного пайплайна деплоя здесь критически важно для надёжного сопровождения системы.
  • Если вы хотите запускать Python-callable через декораторы, он должен быть сериализуемым. Также в этом случае все зависимости, отсутствующие в окружении Airflow, должны импортироваться локально внутри используемого callable, а код верхнего уровня Dag не должен импортировать или использовать эти библиотеки.
  • Вам необходимо глубже понимать, как работают Docker-контейнеры или Kubernetes. Абстракции, предоставляемые этими технологиями, являются «протекающими», поэтому для написания Dag’ов с использованием этих операторов нужно разбираться в ресурсах, сетях, контейнерах и других аспектах.

Подробные примеры использования airflow.providers.docker.operators.docker.DockerOperator приведены в разделе TaskFlow Docker example, а airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator — в разделе TaskFlow Kubernetes example.

Использование нескольких Docker-образов и очередей Celery

Существует возможность (хотя она требует глубокого понимания деплоя Airflow) запускать задачи Airflow с использованием нескольких независимых Docker-образов. Это можно реализовать путём назначения разных задач разным очередям (Queues) и настройки Celery-воркеров на использование разных образов для разных очередей. Однако такой подход (по крайней мере на данный момент) требует большого объёма ручной конфигурации деплоя и глубоких знаний того, как работают Airflow, Celery и Kubernetes. Кроме того, он вносит существенные накладные расходы при выполнении задач — снижается возможность переиспользования ресурсов, а также становится значительно сложнее точно настраивать стоимость потребляемых ресурсов без негативного влияния на производительность и стабильность.

Одним из возможных способов сделать этот подход более полезным является реализация AIP-46 (Runtime isolation for Airflow tasks and Dag parsing) и завершение AIP-43 (Dag Processor Separation). До реализации этих инициатив преимуществ у данного подхода крайне мало, и он не рекомендуется к использованию.

Однако после реализации этих AIP откроется возможность более мультиарендного (multi-tenant) подхода, при котором несколько команд смогут иметь полностью изолированные наборы зависимостей, используемые на протяжении всего жизненного цикла Dag — от парсинга до выполнения.

Создание пользовательского оператора (custom Operator)

Airflow позволяет создавать новые операторы в соответствии с требованиями вас или вашей команды. Такая расширяемость — одна из ключевых возможностей, делающих Apache Airflow мощным инструментом.

Вы можете создать любой оператор, унаследовавшись от публичного базового класса SDK — BaseOperator.

В производном классе необходимо переопределить два метода:

  • Конструктор (__init__) — определить параметры, необходимые для оператора. Нужно указывать только аргументы, специфичные для вашего оператора. default_args можно задать в файле Dag.
  • Execute — код, который будет выполнен при вызове оператора раннером. Метод принимает контекст Airflow в качестве параметра, который можно использовать для чтения конфигурационных значений.

Примечание

При реализации пользовательских операторов не выполняйте ресурсоёмкие операции в методе init. Операторы создаются один раз за цикл планировщика для каждой задачи, которая их использует, и выполнение, например, запросов к базе данных может существенно замедлить планирование и привести к неэффективному использованию ресурсов.

Реализуем пример HelloOperator в новом файле hello_operator.py:

Примечание

Чтобы импорты работали корректно, файл должен находиться в директории, присутствующей в переменной окружения PYTHONPATH. Airflow по умолчанию добавляет директории dags/, plugins/ и config/ из домашнего каталога Airflow в PYTHONPATH. В нашем примере файл размещён в директории custom_operator/.

Теперь вы можете использовать созданный пользовательский оператор следующим образом:

Вы также можете продолжать использовать папку plugins для хранения пользовательских операторов. Если файл hello_operator.py находится в директории plugins, оператор можно импортировать следующим образом:

Если оператор взаимодействует с внешним сервисом (API, база данных и т. п.), рекомендуется реализовать слой взаимодействия через Hooks. Это позволит повторно использовать реализованную логику в других операторах. Такой подход обеспечивает лучшее разделение ответственности и более эффективное использование интеграции по сравнению с созданием CustomServiceBaseOperator для каждого внешнего сервиса.

Ещё один аспект — временное состояние. Если операция требует хранения состояния в памяти (например, job id, который должен использоваться в методе on_kill для отмены запроса), это состояние должно храниться в операторе, а не в hook. Таким образом, hook сервиса остаётся полностью stateless, а вся логика операции сосредоточена в одном месте — в операторе.

Hooks

Hooks выступают интерфейсом для взаимодействия с внешними общими ресурсами в Dag. Например, нескольким задачам в Dag может потребоваться доступ к базе данных MySQL. Вместо создания отдельного подключения для каждой задачи можно получить подключение через hook и использовать его повторно.

Hook также помогает избежать хранения параметров аутентификации подключения непосредственно в Dag.

Расширим предыдущий пример и получим имя из MySQL:

Когда оператор выполняет запрос через объект hook, создаётся новое подключение, если оно ещё не существует. Hook получает параметры аутентификации (например, имя пользователя и пароль) из backend Airflow и передаёт их в airflow.hooks.base.BaseHook.get_connection().

Создавать hook следует только в методе execute или в методах, вызываемых из execute. Конструктор вызывается каждый раз при парсинге Dag (а это происходит часто), и создание hook в нём приведёт к множеству ненужных подключений к базе данных. Метод execute вызывается только во время запуска Dag.

Пользовательский интерфейс

Airflow позволяет разработчику управлять отображением оператора в интерфейсе Dag.

  • Переопределите ui_color, чтобы изменить цвет фона оператора в UI.
  • Переопределите ui_fgcolor, чтобы изменить цвет текста.

Переопределите custom_operator_name, чтобы изменить отображаемое имя (отличное от имени класса).

Шаблонизация (Templating)

Вы можете использовать шаблоны Jinja для параметризации оператора. Airflow применяет шаблонизацию к полям, указанным в template_fields, во время рендеринга оператора.

Использование шаблона:

В этом примере Jinja найдёт параметр name и заменит {{ task_instance.task_id }} на task_id_1.

Параметр также может содержать имя файла, например bash-скрипта или SQL-файла. В этом случае нужно указать расширение файла в template_ext. Если поле из template_fields содержит строку, заканчивающуюся расширением из template_ext, Jinja прочитает содержимое файла и заменит шаблоны на реальные значения.

Обратите внимание: Jinja подставляет значения в атрибуты оператора, а не в аргументы функции.

В этом примере template_fields должен быть ['guest_name'], а не ['name'].

Дополнительно вы можете указать template_fields_renderers — словарь, определяющий, в каком формате значение шаблонного поля будет отображаться в веб-интерфейсе. Например:

В ситуации, когда template_field сам по себе является словарём, также можно указать путь к ключу через точку, чтобы извлекать и корректно отображать отдельные элементы. Например:

Использование этого шаблона:

В результате в UI поле configuration будет отображаться в формате JSON, а значение, находящееся по пути configuration.query.sql, будет подсвечено с использованием SQL-лексера.

В настоящее время доступны следующие лексеры:

  • bash
  • bash_command
  • doc
  • doc_json
  • doc_md
  • doc_rst
  • doc_yaml
  • doc_md
  • hql
  • html
  • jinja
  • json
  • md
  • mysql
  • postgresql
  • powershell
  • py
  • python_callable
  • rst
  • sql
  • tsql
  • yaml

Если вы укажете несуществующий лексер, значение шаблонного поля будет отображено как красиво отформатированный (pretty-printed) объект.

Ограничения

Чтобы предотвратить неправильное использование, при определении и назначении шаблонизируемых полей в конструкторе оператора (если он определён, иначе — см. ниже) необходимо соблюдать следующие ограничения:

1. Параметры конструктора, соответствующие шаблонным полям, должны называться точно так же, как и сами поля.

Следующий пример некорректен, так как имя параметра конструктора не совпадает с именем шаблонного поля:

2. Атрибуты экземпляра, соответствующие шаблонным полям, должны быть явно присвоены из соответствующих параметров конструктора — либо напрямую, либо через вызов конструктора родительского класса (где эти поля определены как template_fields) с явной передачей параметров.

Следующий пример некорректен, так как атрибут self.foo вообще не присваивается, несмотря на то, что он объявлен как шаблонное поле:

Следующий пример также некорректен, так как self.foo в MyHelloOperator инициализируется неявно через kwargs, переданные в конструктор родителя:

3. Нельзя применять преобразования к параметру при его присваивании в конструкторе.

Любые действия над значением должны выполняться в методе execute().

Следующий пример некорректен:

Если оператор наследуется от базового оператора и не определяет собственный конструктор, указанные ограничения не применяются. Однако шаблонные поля должны быть корректно определены в родительском классе с соблюдением этих правил.

Следующий пример корректен:

Эти ограничения проверяются pre-commit hook’ом с именем validate-operators-init.

Добавление шаблонных полей через наследование

Распространённый сценарий создания пользовательского оператора — расширение уже существующих template_fields. Может возникнуть ситуация, когда нужный вам оператор не объявляет определённые параметры как шаблонные, но вы хотите передавать их динамически через Jinja-выражения. Это легко реализуется через простое наследование.

Предположим, у вас есть ранее определённый HelloOperator:

Допустим, вы хотите динамически параметризовать аргумент world.

Поскольку template_fields гарантированно является Sequence[str] (списком или кортежем строк), можно легко создать подкласс и расширить список шаблонных полей:

Теперь можно использовать MyHelloOperator следующим образом:

В этом примере аргумент world будет динамически установлен в значение переменной Airflow с именем my_world через Jinja-выражение.

Определение дополнительной ссылки (Extra Link) для оператора

Для своего оператора вы можете определить дополнительную ссылку (extra link), которая будет перенаправлять пользователей во внешние системы. Например, можно добавить ссылку, ведущую на документацию или руководство по использованию оператора.

Sensors

Airflow предоставляет специальный тип оператора — Sensor, предназначенный для регулярной проверки (polling) некоторого состояния (например, наличия файла) до тех пор, пока не будет выполнено условие успешного завершения.

Вы можете создать собственный сенсор, унаследовавшись от airflow.sensors.base.BaseSensorOperator и реализовав метод poke, который будет опрашивать внешнее состояние и проверять критерий успешности.

Режим reschedule

У сенсоров есть мощная возможность — режим reschedule, который позволяет задаче сенсора быть перепланированной, вместо того чтобы занимать слот воркера между проверками.

Это полезно, если:

  • вы можете позволить себе более длинный интервал опроса,
  • ожидается длительное ожидание выполнения условия.

Ограничение режима reschedule

Режим reschedule имеет важное ограничение: сенсор не может сохранять внутреннее состояние между перепланированными запусками.

Если ваш сенсор хранит внутреннее состояние, его следует декорировать с помощью airflow.sensors.base.poke_mode_only(). Это даст пользователям понять, что сенсор не подходит для использования в режиме reschedule.

Пример сенсора с внутренним состоянием

Примером сенсора, который хранит внутреннее состояние и не может использоваться в режиме reschedule, является:

airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor

Этот сенсор:

  • опрашивает количество объектов по заданному префиксу (это количество является его внутренним состоянием),
  • считается успешно завершённым, если в течение определённого времени количество объектов не меняется.
0
Оставьте комментарий! Напишите, что думаете по поводу статьи.x