Перейти к содержанию

Начало работы с Cosmos на Astro (Getting Started on Astro)

Cosmos на Astro можно использовать со всеми режимами выполнения, но рекомендуется режим local — он проще всего в настройке и использовании.

Готовый рабочий проект для запуска на Astro (CLI или Cloud): cosmos-demo.

Ниже — пошаговая инструкция по запуску своего dbt-проекта в Astro.

Предварительные требования

Перед началом нужно иметь:

  • Установленный Astro CLI. Инструкции: здесь.
  • Проект Astro CLI. Новый проект создаётся командой astro dev init.
  • Проект dbt. Подойдёт, например, jaffle shop.

Создание виртуального окружения

Создайте виртуальное окружение в Dockerfile по примеру ниже. Замените <your-dbt-adapter> на нужный адаптер (например dbt-redshift, dbt-snowflake). Виртуальное окружение рекомендуется, так как у dbt и Airflow могут конфликтовать зависимости.

FROM quay.io/astronomer/astro-runtime:11.3.0

# установка dbt в виртуальное окружение
RUN python -m venv dbt_venv && source dbt_venv/bin/activate && \
    pip install --no-cache-dir <your-dbt-adapter> && deactivate

Пример адаптера: dbt-postgres.

Установка Cosmos

Добавьте Cosmos в requirements.txt проекта:

astronomer-cosmos

Размещение dbt-проекта в каталоге DAGs

Создайте папку dbt внутри локальной папки dags, скопируйте в неё свой dbt-проект и создайте файл my_cosmos_dag.py в корне каталога dags. Структура проекта должна быть такой:

├── dags/
│   ├── dbt/
│   │   └── my_dbt_project/
│   │       ├── dbt_project.yml
│   │       ├── models/
│   │       │   ├── my_model.sql
│   │       │   └── my_other_model.sql
│   │       └── macros/
│   │           ├── my_macro.sql
│   │           └── my_other_macro.sql
│   └── my_cosmos_dag.py
├── Dockerfile
├── requirements.txt
└── ...

По умолчанию Cosmos ищет проект в каталоге /usr/local/airflow/dags/dbt, но dbt-проект может лежать в любом месте на образе Airflow. Путь можно задать аргументом dbt_project_dir при создании экземпляра DAG.

Например, если проект должен лежать в /usr/local/airflow/dags/my_dbt_project:

from cosmos import DbtDag, ProjectConfig

my_cosmos_dag = DbtDag(
    project_config=ProjectConfig(
        dbt_project_path="/usr/local/airflow/dags/my_dbt_project",
    ),
    # ...,
)

Создание DAG-файла

В файле my_cosmos_dag.py импортируйте класс DbtDag из Cosmos и создайте экземпляр DAG. Обязательно укажите аргумент dbt_executable_path, чтобы он указывал на виртуальное окружение из шага 1.

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping

import os
from datetime import datetime

airflow_home = os.environ["AIRFLOW_HOME"]

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=PostgresUserPasswordProfileMapping(
        conn_id="airflow_db",
        profile_args={"schema": "public"},
    ),
)

my_cosmos_dag = DbtDag(
    project_config=ProjectConfig(
        f"{airflow_home}/dags/my_dbt_project",
    ),
    profile_config=profile_config,
    execution_config=ExecutionConfig(
        dbt_executable_path=f"{airflow_home}/dbt_venv/bin/dbt",
    ),
    # обычные параметры DAG
    schedule_interval="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="my_cosmos_dag",
    default_args={"retries": 2},
)

В больших dbt-проектах иногда возникает ошибка DagBag import timeout. Её можно устранить, увеличив значение настройки Airflow core.dagbag_import_timeout.

Примечание

Запуск проекта

Запустите проект командой astro dev start. DAG Airflow появится в UI Airflow (по умолчанию localhost:8080), откуда его можно запускать.