Динамическая генерация DAG (Dynamically generating DAGs)
Эта страница ещё не обновлена для Airflow 3. Описанные концепции актуальны, но часть кода может потребовать изменений. При запуске примеров обновите импорты и учтите возможные breaking changes.
Инфо
В Airflow DAG задаются кодом на Python. Airflow выполняет весь Python-код в dags_folder и загружает все объекты DAG, попадающие в globals(). Проще всего создать DAG — написать статичный Python-файл.
Иногда вручную писать DAG неудобно: например, нужны сотни или тысячи похожих DAG с разными параметрами, или набор DAG для загрузки таблиц без ручного обновления при каждом изменении списка таблиц. В таких случаях DAG имеет смысл генерировать динамически.
Поскольку в Airflow всё задаётся кодом, DAG можно генерировать средствами одного Python. Airflow загрузит любой объект DAG в globals(), созданный кодом из dags_folder. В этом руководстве — как динамически генерировать DAG, когда это уместно и каких ошибок избегать.
Весь код из руководства доступен в Astronomer Registry.
Для DAG с динамическим числом параллельных задач в runtime можно использовать динамический маппинг задач — встроенную возможность Airflow. Astronomer рекомендует сначала рассмотреть динамический маппинг задач, а уже потом при необходимости прибегать к методам динамической генерации DAG из этого руководства.
Совет
Модуль Astronomer Academy: Airflow: Dynamic DAGs.
По этой теме есть и другие материалы. См. также:
Другие способы изучения
Необходимая база
Чтобы получить максимум от руководства, нужно понимать:
- DAG в Airflow. См. Введение в DAG Airflow.
Метод одного файла
Один из способов динамической генерации DAG — один Python-файл, который создаёт DAG по входным параметрам (например, списку API или таблиц). Типичный сценарий — ETL/ELT с множеством источников или приёмников данных, когда нужны многие DAG по одному шаблону.
Плюсы метода одного файла:
- Добавление DAG сводится к изменению входных параметров и происходит почти мгновенно.
- Параметры могут браться из разных источников.
- Реализация проста.
Минусы:
- Код генерации выполняется при каждом парсинге DAG, так как файл лежит в
dags_folder. Частота задаётся параметромmin_file_process_interval. При большом числе DAG или при обращении к внешним системам (БД) возможны проблемы с производительностью. - Код конкретного DAG неочевиден — отдельного DAG-файла нет.
Ниже метод одного файла показан на примерах с разными источниками параметров.
Пример: функция create_dag
Чтобы создавать DAG из файла динамически, нужна функция, которая по параметрам строит DAG. В примере шаблон DAG задаётся внутри функции create_dag. Код похож на создание одного DAG, но обёрнут в функцию с параметрами.
TaskFlow:
from airflow.decorators import dag, task
def create_dag(dag_id, schedule, dag_number, default_args):
@dag(dag_id=dag_id, schedule=schedule, default_args=default_args, catchup=False)
def hello_world_dag():
@task()
def hello_world():
print("Hello World")
print("This is DAG: {}".format(str(dag_number)))
hello_world()
generated_dag = hello_world_dag()
return generated_dag
Традиционный вариант:
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
def create_dag(dag_id, schedule, dag_number, default_args):
def hello_world_py(*args):
print("Hello World")
print("This is DAG: {}".format(str(dag_number)))
generated_dag = DAG(dag_id, schedule=schedule, default_args=default_args, catchup=False)
with generated_dag:
t1 = PythonOperator(
task_id="hello_world", python_callable=hello_world_py
)
return generated_dag
Параметры могут приходить из любого источника, доступного скрипту. В примере ниже цикл range(1, 4) генерирует параметры, DAG регистрируются в глобальной области видимости и подхватываются планировщиком Airflow:
TaskFlow:
from airflow.decorators import dag, task
from pendulum import datetime
def create_dag(dag_id, schedule, dag_number, default_args):
@dag(dag_id=dag_id, schedule=schedule, default_args=default_args, catchup=False)
def hello_world_dag():
@task()
def hello_world(*args):
print("Hello World")
print("This is DAG: {}".format(str(dag_number)))
hello_world()
generated_dag = hello_world_dag()
return generated_dag
# DAG для каждого числа в range(1, 4)
for n in range(1, 4):
dag_id = "loop_hello_world_{}".format(str(n))
default_args = {"owner": "airflow", "start_date": datetime(2023, 7, 1)}
schedule = "@daily"
dag_number = n
globals()[dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
Традиционный вариант:
from pendulum import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
def create_dag(dag_id, schedule, dag_number, default_args):
def hello_world_py():
print("Hello World")
print("This is DAG: {}".format(str(dag_number)))
generated_dag = DAG(dag_id=dag_id, schedule=schedule, default_args=default_args)
with generated_dag:
PythonOperator(task_id="hello_world", python_callable=hello_world_py)
return generated_dag
# DAG для каждого числа в range(1, 4)
for n in range(1, 4):
dag_id = "loop_hello_world_{}".format(str(n))
default_args = {"owner": "airflow", "start_date": datetime(2023, 7, 1)}
schedule = "@daily"
dag_number = n
globals()[dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
Созданные DAG отображаются в UI Airflow:
Пример: генерация DAG из переменных окружения
Параметры не обязаны быть в самом DAG-файле. Часто DAG генерируют по переменным окружения.
Переменные можно задать локально в .env проекта Astro или в Astro UI для деплоев Astro.
DYNAMIC_DAG_NUMBER=10
Значение можно прочитать и передать в range. Аргумент default=3 нужен, чтобы файл считался валидным и при отсутствии переменной.
TaskFlow:
from airflow.decorators import dag, task
from pendulum import datetime
import os
def create_dag(dag_id, schedule, dag_number, default_args):
@dag(dag_id=dag_id, schedule=schedule, default_args=default_args, catchup=False)
def hello_world_dag():
@task()
def hello_world(*args):
print("Hello World")
print("This is DAG: {}".format(str(dag_number)))
hello_world()
generated_dag = hello_world_dag()
return generated_dag
number_of_dags = os.getenv("DYNAMIC_DAG_NUMBER", default=3)
number_of_dags = int(number_of_dags)
for n in range(1, number_of_dags):
dag_id = "variable_hello_world_{}".format(str(n))
default_args = {"owner": "airflow", "start_date": datetime(2023, 7, 1)}
schedule = "@daily"
dag_number = n
globals()[dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
Традиционный вариант:
from airflow import DAG
from airflow.operators.python import PythonOperator
from pendulum import datetime
import os
def create_dag(dag_id, schedule, dag_number, default_args):
def hello_world_py():
print("Hello World")
print("This is DAG: {}".format(str(dag_number)))
generated_dag = DAG(dag_id=dag_id, schedule=schedule, default_args=default_args)
with generated_dag:
PythonOperator(task_id="hello_world", python_callable=hello_world_py)
return generated_dag
number_of_dags = os.getenv("DYNAMIC_DAG_NUMBER", default=3)
number_of_dags = int(number_of_dags)
for n in range(1, number_of_dags):
dag_id = "hello_world_{}".format(str(n))
default_args = {"owner": "airflow", "start_date": datetime(2023, 7, 1)}
schedule = "@daily"
dag_number = n
globals()[dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
Созданные DAG отображаются в UI Airflow:
Метод нескольких файлов
Другой способ — генерировать отдельный Python-файл под каждый DAG. В итоге в dags_folder оказывается по файлу на каждый сгенерированный DAG.
В production генерацию можно встроить в CI/CD: скрипт создаёт DAG-файлы при сборке, затем они деплоятся в Airflow. Либо отдельный DAG периодически запускает скрипт генерации.
Плюсы:
- DAG-файлы создаются до деплоя — полная прозрачность кода, в том числе через кнопку Code в UI Airflow.
- Масштабируется лучше метода одного файла: код генерации не лежит в
dags_folderи не выполняется при каждом парсинге.
Минусы:
- Изменения и новые DAG появятся только после следующего запуска скрипта (часто — после деплоя).
- Настройка сложнее.
Пример: генерация DAG из JSON-конфигов
Один из вариантов — Python-скрипт, который по набору JSON-конфигов генерирует DAG-файлы. В примере у всех DAG одна задача — BashOperator с bash-командой. Сценарий: команда аналитиков планирует bash-команды; DAG одинаковые, меняются команда, переменная окружения и расписание.
Сначала создаётся файл-шаблон DAG со структурой. Он похож на обычный DAG-файл, но в места подставляемых значений стоят плейсхолдеры: dag_id_to_replace, schedule_to_replace, bash_command_to_replace, env_var_to_replace.
from airflow.decorators import dag
from airflow.operators.bash import BashOperator
from pendulum import datetime
@dag(
dag_id=dag_id_to_replace,
start_date=datetime(2023, 7, 1),
schedule=schedule_to_replace,
catchup=False,
)
def dag_from_config():
BashOperator(
task_id="say_hello",
bash_command=bash_command_to_replace,
env={"ENVVAR": env_var_to_replace},
)
dag_from_config()
Далее создаётся каталог dag-config с JSON-конфигом для каждого DAG. В конфиге задаются dag_id, расписание, bash-команда и переменная окружения.
{
"dag_id": "dag_file_1",
"schedule": "'@daily'",
"bash_command": "'echo $ENVVAR'",
"env_var": "'Hello! :)'"
}
Затем скрипт обходит конфиги, копирует шаблон в каталог include и подставляет в него параметры из конфига:
import json
import os
import shutil
import fileinput
config_filepath = "include/dag-config/"
dag_template_filename = "include/dag-template.py"
for filename in os.listdir(config_filepath):
f = open(config_filepath + filename)
config = json.load(f)
new_filename = "dags/" + config["dag_id"] + ".py"
shutil.copyfile(dag_template_filename, new_filename)
for line in fileinput.input(new_filename, inplace=True):
line = line.replace("dag_id_to_replace", "'" + config["dag_id"] + "'")
line = line.replace("schedule_to_replace", config["schedule"])
line = line.replace("bash_command_to_replace", config["bash_command"])
line = line.replace("env_var_to_replace", config["env_var"])
print(line, end="")
Скрипт можно запускать вручную или в CI/CD. После запуска структура проекта может выглядеть так: в include — шаблон и конфиги, в dags — сгенерированные DAG-файлы:
.
├── dags
│ ├── dag_file_1.py
│ └── dag_file_2.py
└── include
├── dag-config
│ ├── dag1-config.json
│ └── dag2-config.json
├── dag-template.py
└── generate-dag-files.py
Это простой пример для DAG с одной и той же структурой. Его можно развить: динамические задачи, зависимости, разные операторы и т.д.
Инструменты для динамического создания DAG
gusty
Один из популярных инструментов — gusty. gusty — открытая Python-библиотека для динамической генерации DAG в Airflow. Задачи можно описывать в YAML, Python, SQL, R Markdown и Jupyter Notebook.
Установка: pip install gusty. В проекте на Astro CLI можно добавить gusty в requirements.txt.
Использование: в папке dags создаётся каталог для DAG gusty. Подкаталоги этого каталога задают DAG, вложенные подкаталоги — группы задач (task groups) внутри DAG.
Пример структуры, из которой получаются 2 DAG из каталога my_gusty_dags: в my_dag_1 две задачи из YAML; в my_dag_2 одна задача из YAML и две группы задач по две задачи, во второй группе две задачи заданы SQL-файлами.
.
└── dags
├── my_gusty_dags
│ ├── my_dag_1
│ │ ├── METADATA.yml
│ │ ├── task_1.yaml
│ │ └── task_2.yaml
│ └── my_dag_2
│ ├── METADATA.yml
│ ├── task_0.yaml
│ ├── my_taskgroup_1
│ │ ├── task_1.yaml
│ │ └── task_2.yaml
│ └── my_taskgroup_2
│ ├── task_3.sql
│ └── task_4.sql
├── creating_gusty_dags.py
└── my_regular_dag.py
Чтобы загрузить DAG из my_gusty_dags, нужен скрипт, вызывающий функцию gusty create_dags. В примере скрипт creating_gusty_dags.py в каталоге dags:
from gusty import create_dags
dag = create_dags(
# путь к каталогу с DAG gusty
'/usr/local/airflow/dags/my_gusty_dags',
# namespace для gusty
globals(),
# по умолчанию gusty ставит LatestOnlyOperator в корень DAG;
# отключить: latest_only=False
latest_only=False
)
Параметры DAG задаются в METADATA.yml:
description: "An example of a DAG created using gusty!"
schedule_interval: "1 0 * * *"
default_args:
owner: airflow
depends_on_past: False
start_date: !days_ago 1
email: airflow@example.com
email_on_failure: False
email_on_retry: False
retries: 1
retry_delay: !timedelta 'minutes: 5'
Задачи можно описывать в YAML для стандартных и кастомных операторов. Ниже — задача BashOperator с зависимостью от task_1:
operator: airflow.operators.bash.BashOperator
bash_command: echo $MY_ENV_VAR
dependencies:
- task_1
env:
MY_ENV_VAR: "Hello!"
Чтобы использовать DAG gusty и обычные DAG в одном окружении, обычные DAG должны лежать в dags вне каталога my_gusty_dags.
Подробнее о gusty: README репозитория. Также можно посмотреть примеры: gusty-demo и gusty-demo-lite.
DAG Factory
Ещё один инструмент для динамической генерации DAG — DAG Factory. Пакет dag-factory позволяет создавать DAG из YAML с параметрами DAG и задач, без необходимости знать синтаксис Airflow.
Подробнее о DAG Factory, включая обзор возможностей (ассеты, наследование конфигурации, генерация YAML по шаблону): DAG Factory tutorial.
Масштабирование
Динамическая генерация DAG при большом масштабе может ухудшить производительность. Итог зависит от числа DAG, конфигурации Airflow и инфраструктуры. Учитывайте:
- Можно увеличить
min_file_processing_interval, чтобы реже парсить файлы. Имеет смысл, если DAG меняются нечасто и допустима задержка появления изменений из внешнего источника. - При получении списка DAG из БД запросы идут часто. Нужно учитывать нагрузку на БД и возможную стоимость запросов.
- Весь код в
dags_folderвыполняется не реже, чем раз вmin_file_processing_interval(или как только успевает DAG file processor). Методы вроде одного файла с генерацией DAG в коде сильнее влияют на производительность при росте числа DAG.
Тонкая настройка планировщика помогает смягчить проблемы. Единого «правильного» способа реализовать и масштабировать динамические DAG нет; гибкость Airflow позволяет подобрать решение под вашу организацию.

