Airflow и dbt
dbt Core — открытая библиотека для аналитической инженерии: с её помощью строят взаимозависимые SQL-модели для трансформации данных в хранилище, используя эфемерные вычисления хранилища. Cosmos — пакет с открытым исходным кодом от Astronomer для запуска dbt-моделей проекта dbt Core в Airflow.
dbt в Airflow с Cosmos и Astro CLI
Пакет с открытым исходным кодом Cosmos позволяет встроить задания dbt в Airflow, автоматически создавая задачи Airflow из dbt-моделей. Проект dbt Core можно превратить в DAG или task group Airflow буквально несколькими строками кода.
Подробные инструкции по настройке Cosmos для разных хранилищ, опции конфигурации и оптимизация производительности описаны в eBook «Orchestrating dbt with Apache Airflow® using Cosmos» и в краткой выжимке Quick Notes: Airflow + dbt with Cosmos.
Зачем использовать Airflow с dbt Core?
dbt Core даёт возможность строить модульные, переиспользуемые SQL-компоненты со встроенным управлением зависимостями и инкрементальными сборками.
С Cosmos задания dbt можно интегрировать в окружение оркестрации на открытом Airflow — как отдельные DAG или как task group внутри DAG.
Преимущества связки Airflow и dbt Core:
- Генерация и хостинг dbt docs в Airflow.
- Поддержка установки и запуска dbt в виртуальном окружении, чтобы избежать конфликтов зависимостей с Airflow.
- Запуск проектов dbt через подключения Airflow вместо dbt profiles. Все подключения можно хранить в одном месте — в Airflow или через secrets backend.
- Запуск
dbt testпо таблицам, созданным отдельными моделями, сразу после завершения модели. Ошибки видны до перехода к следующим шагам; можно добавить проверки качества данных и запускать их вместе с dbt test. - Каждая dbt-модель становится задачей с возможностями Airflow: повторы, уведомления об ошибках, полная наблюдаемость прошлых запусков в UI Airflow.
- Data-aware планирование и сенсоры Airflow для запуска моделей в зависимости от событий в экосистеме данных.
На Astro вы получаете всё перечисленное и можете разворачивать проект dbt на Astro Deployment отдельно от проекта Airflow с помощью Astro CLI. Подробнее: Deploy dbt projects to Astro.
Время прохождения
Туториал рассчитан примерно на 30 минут.
Необходимые знания
Чтобы получить максимум от туториала, желательно понимать:
- Подключения Airflow: Manage connections in Apache Airflow.
- Task groups: Airflow task groups.
- Операторы: Operators 101.
- Соответствие понятий Airflow и dbt: Similar dbt & Airflow concepts.
- Основы Airflow: написание DAG и определение задач — Get started with Apache Airflow.
- Основы dbt Core: What is dbt?.
Предварительные требования
- Доступ к хранилищу данных, поддерживаемому dbt Core. Список: dbt documentation. В туториале используется Postgres.
- Astro CLI.
Устанавливать dbt Core локально для прохождения туториала не обязательно.
Шаг 1: Настройка проекта Astro
Чтобы использовать dbt Core с Airflow, установите dbt Core в виртуальном окружении и Cosmos в новом проекте Astro.
- Создайте новый проект Astro:
$ mkdir astro-dbt-core-tutorial && cd astro-dbt-core-tutorial
$ astro dev init
- Добавьте Cosmos, провайдер Postgres для Airflow и адаптер dbt Postgres в
requirements.txtпроекта Astro. Для другого хранилища заменитеapache-airflow-providers-postgresиdbt-postgresна соответствующие пакеты из Astronomer registry.
astronomer-cosmos==1
apache-airflow-providers-postgres==6
apache-airflow-providers-common-sql==1
dbt-postgres==1
- (Альтернатива) Если из‑за конфликтов пакетов нельзя установить dbt-адаптер в то же окружение, что и Airflow, можно создать исполняемый dbt в виртуальном окружении. В конец
Dockerfileдобавьте:
# замените dbt-postgres на другой поддерживаемый адаптер при ином типе хранилища
RUN python -m venv dbt_venv && source dbt_venv/bin/activate && \
pip install --no-cache-dir dbt-postgres && deactivate
Эта команда при сборке образа создаёт виртуальное окружение dbt_venv в контейнере scheduler Astro CLI и устанавливает в него dbt-postgres (в него входит и dbt-core). Для другого хранилища подставьте нужный адаптер.
Если установка адаптера через requirements.txt или виртуальное окружение в Docker недоступна, Cosmos можно запускать и другими способами. Подробнее: Cosmos documentation on execution modes.
Шаг 2: Подготовка проекта dbt
Чтобы подключить проект dbt к Airflow, добавьте папку проекта в окружение Airflow. Можно использовать свой проект или создать простой по шагам ниже (две модели).
- В папке
my_simple_dbt_projectдобавьтеdbt_project.yml. В конфиге должен быть как минимум имя проекта. В туториале дополнительно показывается передача переменнойmy_nameиз Airflow в проект dbt. - В папке
dbtсоздайте папкуmy_simple_dbt_project. - В папке
includeсоздайте папкуdbt.
version: '0.1'
name: 'my_simple_dbt_project'
vars:
my_name: "No entry"
- Добавьте dbt-модели в подпапку
modelsвнутриmy_simple_dbt_project. Моделей может быть любое количество; в туториале используются две.
model1.sql:
SELECT '{{ var("my_name") }}' as name
model2.sql:
SELECT * FROM {{ ref('model1') }}
В model1.sql выбирается переменная my_name. В model2.sql используется зависимость от model1.sql и выбираются все данные из вышестоящей модели.
Итоговая структура в окружении Airflow:
.
└── dags
└── include
└── dbt
└── my_simple_dbt_project
├── dbt_project.yml
└── models
├── model1.sql
└── model2.sql
Если хранить проект dbt рядом с проектом Airflow нельзя, Cosmos можно использовать и при размещении проекта в другом месте (например, разбор через manifest-файл и контейнерный режим выполнения). Подробнее: Cosmos documentation.
Шаг 3: Подключение Airflow к хранилищу данных
Cosmos позволяет применять подключения Airflow к проекту dbt.
- Создайте подключение с именем
db_conn. Тип и параметры выберите в зависимости от хранилища. Для Postgres укажите: - В UI Airflow: Admin → Connections → +.
-
Запустите Airflow:
astro dev start. -
Port: порт Postgres.
- Password: пароль Postgres.
- Login: имя пользователя Postgres.
- Schema: база данных Postgres.
- Host: адрес хоста Postgres.
- Connection Type:
Postgres. - Connection ID:
db_conn.
Если нужного типа подключения нет, добавьте соответствующий провайдер в requirements.txt и выполните astro dev restart.
Шаг 4: Написание DAG Airflow
DAG создаёт задачи из существующих dbt-моделей через Cosmos и использует SQLExecuteQueryOperator для запроса к созданной таблице. При необходимости можно добавить задачи до и после, встроив dbt в более широкий пайплайн.
- В папке
dagsсоздайте файлmy_simple_dbt_dag.py. - Скопируйте в него следующий код:
"""
### Запуск проекта dbt Core как task group с Cosmos
Простой DAG: запуск проекта dbt как task group с использованием
подключения Airflow и передачей переменной в проект dbt.
"""
from airflow.sdk import dag, chain
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
# для других БД подставьте свой профиль
from cosmos.profiles.postgres import PostgresUserPasswordProfileMapping
import os
YOUR_NAME = "YOUR_NAME"
CONNECTION_ID = "db_conn"
DB_NAME = "YOUR_DB_NAME"
SCHEMA_NAME = "YOUR_SCHEMA_NAME"
MODEL_TO_QUERY = "model2"
# путь к проекту dbt
DBT_PROJECT_PATH = f"{os.environ['AIRFLOW_HOME']}/include/dbt/my_simple_dbt_project"
# ОПЦИОНАЛЬНО: путь к исполняемому dbt в venv из Dockerfile,
# если адаптер dbt нельзя ставить в requirements.txt из‑за конфликтов.
# DBT_EXECUTABLE_PATH = f"{os.environ['AIRFLOW_HOME']}/dbt_venv/bin/dbt"
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id=CONNECTION_ID,
profile_args={"schema": SCHEMA_NAME},
),
)
# ОПЦИОНАЛЬНО: путь к исполняемому dbt
# execution_config = ExecutionConfig(
# dbt_executable_path=DBT_EXECUTABLE_PATH,
# )
@dag(
params={"my_name": YOUR_NAME},
)
def my_simple_dbt_dag():
transform_data = DbtTaskGroup(
group_id="transform_data",
project_config=ProjectConfig(DBT_PROJECT_PATH),
profile_config=profile_config,
# ОПЦИОНАЛЬНО: execution_config при использовании venv
# execution_config=execution_config,
operator_args={
"vars": '{"my_name": {{ params.my_name }} }',
},
default_args={"retries": 2},
)
query_table = SQLExecuteQueryOperator(
task_id="query_table",
conn_id=CONNECTION_ID,
sql=f"SELECT * FROM {DB_NAME}.{SCHEMA_NAME}.{MODEL_TO_QUERY}",
)
chain(transform_data, query_table)
my_simple_dbt_dag()
В DAG класс DbtTaskGroup из Cosmos создаёт task group из моделей проекта dbt. Зависимости между моделями dbt автоматически становятся зависимостями между задачами Airflow. Подставьте свои значения для YOUR_NAME, YOUR_DB_NAME и YOUR_SCHEMA_NAME.
Через ключ vars в словаре operator_args в проект dbt передаются переменные. Здесь передаётся YOUR_NAME в переменную my_name. Если в проекте есть dbt test, они запускаются сразу после завершения модели. Рекомендуется задавать retries не менее 2 для всех задач, запускающих dbt-модели.
При больших проектах dbt иногда возникает ошибка DagBag import timeout. Её можно устранить, увеличив значение настройки Airflow core.dagbag_import_timeout.
-
Запустите DAG вручную (кнопка play) и откройте граф. Разверните task groups, чтобы увидеть все задачи.
-
Проверьте XCom задачи
query_table— в таблицеmodel2должно быть ваше имя.
Класс DbtTaskGroup заполняет task group Airflow задачами, созданными из dbt-моделей внутри обычного DAG. Чтобы описать целый DAG только из dbt-моделей, используйте класс DbtDag, как в документации Cosmos.
Готово: вы запустили DAG с Cosmos, который автоматически создал задачи из dbt-моделей. Дальнейшая настройка Cosmos описана в документации Cosmos.
Для крупных проектов dbt и ускорения выполнения есть несколько вариантов. Один из недавних — экспериментальный режим watcher, который может сократить время выполнения DAG до 80% и приблизить его к скорости dbt build через dbt CLI. Подробнее: Cosmos documentation.
Другие способы запуска dbt Core с Airflow
Рекомендуется использовать Cosmos, но запускать dbt Core в Airflow можно и иначе.
BashOperator
С помощью BashOperator можно выполнять отдельные команды dbt. Желательно запускать dbt-core и адаптер dbt для вашей БД в виртуальном окружении из‑за частых конфликтов зависимостей с другими пакетами.
Пример DAG с BashOperator: активация venv и выполнение dbt run для проекта dbt.
from airflow.sdk import dag
from airflow.providers.standard.operators.bash import BashOperator
PATH_TO_DBT_PROJECT = "<path to your dbt project>"
PATH_TO_DBT_VENV = "<path to your venv activate binary>"
@dag
def simple_dbt_dag():
dbt_run = BashOperator(
task_id="dbt_run",
bash_command="source $PATH_TO_DBT_VENV && dbt run --models .",
env={"PATH_TO_DBT_VENV": PATH_TO_DBT_VENV},
cwd=PATH_TO_DBT_PROJECT,
)
simple_dbt_dag()
Запуск dbt run и других команд dbt через BashOperator удобен при разработке. Но запуск dbt на уровне всего проекта имеет минусы:
- Любой сбой ведёт к необходимости перезапуска всех моделей проекта, что может быть затратно.
- Низкая наблюдаемость за состоянием выполнения проекта.
Использование manifest-файла
Использование сгенерированного dbt файла manifest.json даёт больше ясности о шагах dbt в каждой задаче. Файл создаётся в каталоге target проекта dbt и содержит полное представление проекта. Подробнее: dbt documentation.
Cosmos умеет разбирать manifest-файлы: Cosmos documentation.