Dag Run
Dag Run — объект, представляющий один запуск DAG в момент времени. При каждом выполнении DAG создаётся Dag Run и выполняются все входящие в него задачи. Статус Dag Run определяется состояниями задач. Каждый Dag Run выполняется отдельно от других, то есть один и тот же DAG может иметь несколько одновременных запусков.
Статус Dag Run
Статус Dag Run определяется по завершении выполнения DAG. Выполнение DAG зависит от входящих в него задач и их зависимостей. Статус присваивается Dag Run, когда все задачи перешли в одно из терминальных состояний (то есть дальнейший переход в другое состояние невозможен): success, failed или skipped. Статус Dag Run вычисляется по так называемым «листовым узлам» (leaf nodes), то есть задачам без дочерних задач.
У Dag Run возможны два терминальных состояния:
success— если все листовые задачи в состоянииsuccessилиskipped;failed— если хотя бы одна листовая задача в состоянииfailedилиupstream_failed.
Примечание. Учитывайте, что у части задач могут быть заданы особые правила срабатывания (trigger rules). Это может приводить к неожиданному поведению. Например, листовая задача с правилом «all_done» выполнится независимо от состояний остальных задач, и при её успехе весь Dag Run будет помечен как
success, даже если где-то по пути была ошибка.
Добавлено в Airflow 2.7
DAG с текущим выполняющимся Dag Run отображаются на дашборде UI во вкладке «Running». Аналогично, DAG, у которых последний Dag Run в состоянии failed, показываются во вкладке «Failed».
Data Interval
У каждого Dag Run в Airflow есть назначенный data interval — временной диапазон, с которым он работает. Например, для DAG с расписанием @daily каждый data interval начинается в полночь (00:00) и заканчивается в полночь (24:00).
Dag Run обычно планируется после окончания соответствующего data interval, чтобы запуск мог обработать все данные за этот период. Иными словами, запуск за период 2020-01-01, как правило, не начнётся, пока не закончится 2020-01-01, то есть до 2020-01-02 00:00:00.
Все даты в Airflow так или иначе связаны с концепцией data interval. Logical date (в версиях Airflow до 2.2 назывался execution_date) Dag Run обозначает начало data interval, а не момент фактического выполнения DAG.
Аналогично, аргумент start_date у DAG и его задач указывает на ту же логическую дату и задаёт начало первого data interval DAG, а не момент, когда задачи начнут выполняться. То есть первый Dag Run будет запланирован только через один интервал после start_date.
Совет. Если cron-выражения или объекта timedelta недостаточно для описания расписания DAG, logical date или data interval, см. Timetables. Подробнее о logical date: Running Dags и What does execution_date mean?.
Повторный запуск DAG
Иногда нужно выполнить DAG снова — например, когда запланированный Dag Run завершился с ошибкой.
Catchup
DAG с заданными start_date, при необходимости end_date и расписанием, не привязанным к ассетам, задаёт серию интервалов, которые планировщик превращает в отдельные Dag Run и выполняет. По умолчанию при активации DAG планировщик не создаёт Dag Run за все непройденные интервалы с последнего data interval (в Airflow задано scheduler.catchup_by_default=False). Создаётся только Dag Run за последний интервал.
Если в DAG указать catchup=True, планировщик создаст Dag Run за каждый data interval, который ещё не был выполнен с последнего интервала (или был очищен). Такое поведение называется Catchup.
Если DAG не рассчитан на catchup (например, ориентирован не на интервал, а на «сейчас»), лучше отключить catchup — это поведение по умолчанию или задаётся явно как catchup=False в определении DAG, если в вашей среде конфиг по умолчанию был изменён.
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial.py
"""
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import datetime
import pendulum
dag = DAG(
"tutorial",
default_args={
"depends_on_past": True,
"retries": 1,
"retry_delay": datetime.timedelta(minutes=3),
},
start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
description="A simple tutorial Dag",
schedule="@daily",
)
В этом примере, если планировщик подхватит DAG 2016-01-02 в 6:00 (или при запуске из командной строки), будет создан один Dag Run с data interval с 2016-01-01 по 2016-01-02, а следующий — сразу после полуночи 2016-01-03 с интервалом 2016-01-02 — 2016-01-03.
Имейте в виду: при использовании объекта datetime.timedelta в качестве расписания поведение может отличаться. В таком случае единственный созданный Dag Run покроет период с 2016-01-01 06:00 по 2016-01-02 06:00 (один интервал расписания до текущего момента). Подробнее о различиях между расписанием на основе cron и на основе delta см. в timetables comparison.
Если бы у dag.catchup было значение True, планировщик создал бы Dag Run за каждый завершённый интервал между 2015-12-01 и 2016-01-02 (но ещё не за 2016-01-02, так как этот интервал не завершён) и выполнил бы их последовательно.
Catchup также срабатывает, когда DAG отключают на некоторый период и затем снова включают.
Такое поведение удобно для атомарных ассетов, которые легко разбить по периодам. Отключённый catchup подходит, если DAG сам обрабатывает догон внутри себя.
Backfill
Может понадобиться выполнить DAG за заданный прошедший период. Например, DAG создан с start_date 2024-11-21, но нужны выходные данные за месяц раньше — с 2024-10-21. Такой запуск называется Backfill.
Сделать это можно через UI или CLI.
UI
На странице DAG нажмите Trigger и выберите Backfill, чтобы открыть форму backfill. Укажите диапазон дат, поведение при переобработке (reprocess), максимальное число активных запусков, при необходимости порядок «назад» (backwards) и расширенные настройки (Advanced Config).
CLI
Для запуска из CLI выполните:
airflow backfill create --dag-id DAG_ID \
--start-date START_DATE \
--end-date END_DATE \
--reprocessing-behavior failed \
--max-active-runs 3 \
--run-backwards \
--dag-run-conf '{"my": "param"}'
Команда backfill перезапустит все экземпляры указанного dag_id за все интервалы между start date и end date.
Повторный запуск задач
Часть задач может завершиться с ошибкой во время запланированного запуска. После исправления ошибок (по логам) можно перезапустить задачи, очистив их для нужной даты. Очистка экземпляра задачи (clear) создаёт запись об экземпляре: увеличивается try_number, max_tries устанавливается в 0, состояние — в None, что приводит к повторному выполнению задачи.
Экспериментальная возможность в Airflow 3.1.0: можно очищать экземпляры задач и перезапускать их с последней версией bundle.
В видах Tree или Graph нажмите на упавшую задачу, затем на Clear. Исполнитель перезапустит её.
Доступные варианты при перезапуске:
- Past — все экземпляры задачи в запусках до последнего data interval DAG;
- Future — все экземпляры задачи в запусках после последнего data interval DAG;
- Upstream — вышестоящие задачи в текущем DAG;
- Downstream — нижестоящие задачи в текущем DAG;
- Recursive — все задачи в дочерних и родительских DAG;
- Failed — только упавшие задачи в последнем запуске DAG.
Очистить задачи можно и через CLI:
airflow tasks clear dag_id \
--task-regex task_regex \
--start-date START_DATE \
--end-date END_DATE
Команда очищает все экземпляры задач, совпадающих с regex, для указанного dag_id и временного интервала. Дополнительные опции — в справке по clear:
airflow tasks clear --help
История экземпляров задач
При повторных попытках (retry) или очистке (clear) экземпляра задачи его история сохраняется. Посмотреть её можно, нажав на экземпляр задачи в виде Grid.
Примечание. Селектор попыток (try selector) отображается только для задач, которые перезапускались или очищались.
В истории показываются значения атрибутов экземпляра задачи на момент окончания соответствующего запуска. На странице логов доступны логи по каждой попытке экземпляра задачи — это удобно при отладке.
Примечание. Связанные объекты (XCom, отрендеренные шаблонные поля и т.п.) в истории не сохраняются. Сохраняются только атрибуты экземпляра задачи, включая логи.
Внешние запуски (External Triggers)
Dag Run можно создавать вручную через CLI:
airflow dags trigger --logical-date logical_date run_id
Dag Run, созданные вне планировщика, привязываются к метке времени триггера и отображаются в UI вместе с запланированными. Logical date для DAG задаётся аргументом -e; по умолчанию — текущая дата в UTC.
Также можно вручную запустить Dag Run из веб-интерфейса: вкладка Dags → колонка Links → кнопка Trigger Dag.
Передача параметров при запуске DAG
При запуске DAG из CLI, REST API или UI можно передать конфигурацию Dag Run в виде JSON.
Пример параметризованного DAG:
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
dag = DAG(
"example_parameterized_dag",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
)
parameterized_task = BashOperator(
task_id="parameterized_task",
bash_command="echo value: {{ dag_run.conf['conf1'] }}",
dag=dag,
)
Параметры из dag_run.conf можно использовать только в template-полях оператора.
Ожидание завершения Dag Run
В Airflow есть экспериментальный API для ожидания завершения Dag Run. Это полезно при интеграции Airflow во внешние системы или пайплайны автоматизации, которым нужно приостановить выполнение до окончания DAG.
Эндпоинт блокируется (через опрос) до перехода указанного Dag Run в терминальное состояние: success, failed или canceled.
Ответ передаётся в формате NDJSON (Newline-Delimited JSON). Каждая строка — JSON-объект с состоянием Dag Run на данный момент.
Пример:
{"state": "running"}
{"state": "success", "results": {"op": 42}}
Так клиенты могут отслеживать запуск в реальном времени и при необходимости получать XCom-результаты отдельных задач.
Примечание. Функция экспериментальная и может измениться или быть удалена в будущих версиях Airflow.
Использование из CLI
airflow dags trigger --conf '{"conf1": "value1"}' example_parameterized_dag
Важно помнить
- Пометить экземпляр задачи как failed можно через UI. Это останавливает выполняющиеся экземпляры задачи.
- Пометить экземпляр задачи как success тоже можно через UI. Обычно это нужно для исправления ложных срабатываний или когда исправление было внесено вне Airflow.
Источник: Airflow 3.1.7 — Dag Runs. Перевод неофициальный.