Первый конвейер данных с dbt-trino#

Этот материал адаптирован по статье Starburst First dbt-trino data pipeline. Он показывает базовый путь от установки dbt-trino до первой модели, источников данных, DAG, materialization, тестов, macros и seeds.

Что делает dbt-trino#

Адаптер dbt-trino использует Trino как движок SQL-запросов для федеративной работы с разными источниками данных. Один dbt-профиль подключается к Trino, а Trino уже обращается к каталогам и коннекторам: PostgreSQL, Hive, Iceberg, Kafka, объектным хранилищам и другим источникам.

В такой схеме dbt по-прежнему отвечает за трансформации: модели, зависимости, тесты и документацию. Trino выполняет SQL, читает данные из подключенных источников, создает таблицы и представления и, если нужно, записывает результат в целевой каталог.

Установка dbt-trino#

Для установки нужен Python. В исходной статье указано требование Python 3.7+, но для новых версий dbt проверяйте актуальные требования в документации dbt и репозитории адаптера.

Установите адаптер:

pip install dbt-trino

Проверьте установку:

dbt --version

Пример ожидаемого вывода:

Core:
  - installed: 1.3.0
  - latest:    1.3.0 - Up to date!

Plugins:
  - trino: 1.3.1 - Up to date!

Создание проекта и профиля#

Новый dbt-проект можно создать командой:

dbt init

Даже если проект уже скачан или склонирован, dbt init может быть полезен: команда помогает создать или дополнить локальный файл profiles.yml и настроить подключение к Trino.

Подробнее о профилях см. в официальной документации dbt.

Пример профиля для локального Trino на порту 8080, где результаты моделей сохраняются в каталог datalake и схему analytics:

my_dbt_trino_project:
  target: dev
  outputs:
    dev:
      type: trino
      method: none
      user: admin
      database: datalake
      host: localhost
      port: 8080
      schema: analytics
      threads: 1

В этом примере method: none означает, что в локальной среде разработки не используется механизм аутентификации. Параметр threads задает, сколько запросов dbt может выполнять параллельно.

Полный список параметров профиля доступен в репозитории dbt-trino.

Первая модель#

Основная сущность dbt — модель. Обычно это SQL-файл в каталоге models, содержащий запрос SELECT. Результат модели материализуется в Trino как VIEW, TABLE или другой поддерживаемый тип materialization.

Например, создайте файл models/src_customers.sql:

SELECT * FROM webshop.public.customers

Запустите проект:

dbt run

dbt создаст объект src_customers в целевой схеме datalake.analytics. Проверить результат можно из SQL-клиента:

SELECT * FROM datalake.analytics.src_customers;

При повторном запуске dbt run dbt обновит объект согласно текущему определению модели.

Источники данных#

Жестко прописывать полные имена таблиц прямо в моделях неудобно. Вместо этого в dbt принято описывать внешние объекты как sources. Sources задают внешние таблицы и позволяют обращаться к ним через макрос source().

Создайте файл models/sources.yml:

version: 2

sources:
  - name: webshop
    database: webshop
    schema: public
    tables:
      - name: customers

После этого модель src_customers.sql можно переписать так:

SELECT * FROM {{ source('webshop', 'customers') }}

Двойные фигурные скобки означают, что фрагмент обрабатывается dbt до отправки запроса в Trino. В результате dbt скомпилирует source('webshop', 'customers') в конкретное имя объекта из sources.yml.

EL внутри T с помощью Trino#

Обычно dbt не занимается извлечением и загрузкой данных. Он выполняет трансформации, то есть T в ELT. Но благодаря федеративным возможностям Trino модель dbt может читать данные из разных каталогов как из единого SQL-слоя.

Например, кроме операционной базы webshop, можно добавить источник clickstream-данных из каталога website:

version: 2

sources:
  - name: webshop
    database: webshop
    schema: public
    tables:
      - name: customers

  - name: website
    database: website
    schema: clickstream
    tables:
      - name: clicks

Создайте модель models/src_clicks.sql:

with source as (
    SELECT * FROM {{ source('website', 'clicks') }}
),

renamed as (
    SELECT
        visitorid,
        useragent,
        language,
        event,
        cast(from_iso8601_timestamp(eventtime) AS timestamp(6) with time zone) AS eventtime,
        page,
        referrer
    FROM source
)

SELECT * FROM renamed

После запуска dbt run dbt создаст новую модель на основе clickstream-данных. Trino выполнит чтение из нужного каталога через соответствующий коннектор.

Как dbt строит DAG#

В конвейерах данных важна модель зависимостей DAG (Directed Acyclic Graph). Большинство оркестраторов, например Apache Airflow, требует явно описывать зависимости. dbt строит DAG неявно: каждый SQL-файл является моделью, а зависимости задаются через макрос ref().

Например:

SELECT * FROM {{ ref('src_customers') }}

ref() сообщает dbt, что текущая модель зависит от модели src_customers. dbt использует это для правильного порядка выполнения и построения lineage.

Пример: сессии пользователей#

Допустим, нужно узнать, сколько раз клиент посещал сайт перед покупкой. Для этого можно объединить данные интернет-магазина с clickstream-данными и выполнить sessionization.

Добавьте таблицу sessions в источник webshop:

version: 2

sources:
  - name: webshop
    database: webshop
    schema: public
    tables:
      - name: customers
      - name: sessions

Создайте модель models/src_sessions.sql:

with source as (
    SELECT * FROM {{ source('webshop', 'sessions') }}
),

renamed as (
    SELECT
        cookie_id,
        cast(from_unixtime(started_ts / 1000) AS timestamp(6)) AS session_started,
        customer_id
    FROM source
)

SELECT * FROM renamed

Дальше можно построить модель sessionized_clicks.sql. В ней полезно зафиксировать бизнес-правила:

  • сессия — это последовательность кликов, где между двумя соседними кликами не проходит больше одного часа;

  • после входа пользователя в систему прошлые сессии можно связать с этим пользователем;

  • cookie или другой долгоживущий идентификатор помогает связать клики с пользователем.

Для такой логики Trino предоставляет оконные функции и операции WINDOW, а dbt позволяет использовать macros вроде ref() и macros из пакетов dbt-utils или trino_utils.

Упрощенный фрагмент модели:

WITH sessions AS (
    SELECT
        date_diff('hour', lag(c.eventtime) OVER w, c.eventtime) > 1 AS new_session,
        c.visitorid,
        c.eventtime,
        c.referrer,
        s.customer_id,
        min(c.eventtime) OVER w AS session_started,
        max(c.eventtime) OVER w AS session_ended
    FROM {{ ref("src_clicks") }} c
    JOIN {{ ref("src_sessions") }} s
      ON c.visitorid = s.cookie_id
     AND c.eventtime >= s.session_started
    WINDOW w AS (
        PARTITION BY c.visitorid
        ORDER BY c.eventtime
    )
)

SELECT * FROM sessions

В реальной модели можно расширить этот запрос: добавить row_number(), посчитать последовательность сессий, сформировать sessionid, использовать dbt_utils.star() для генерации списка столбцов и сохранить результат как таблицу.

Materializations: view, table, ephemeral#

Чтобы управлять производительностью конвейера, выбирайте materialization для каждой модели осознанно:

  • view подходит для легких моделей и промежуточных представлений;

  • table подходит для тяжелых вычислений и данных, которые нужны конечным пользователям или BI-инструментам;

  • ephemeral не создает объект в базе данных, а подставляет SQL модели в зависимые модели как CTE.

Если sessionization использует оконные функции и добавляет sessionid к каждому клику, такую операцию нежелательно выполнять заново при каждом обращении к модели. Поэтому модель sessionized_clicks разумно сохранить как таблицу.

В начало sessionized_clicks.sql добавьте:

{{ config(materialized='table') }}

После этого dbt run создаст таблицу через запрос вида CREATE TABLE sessionized_clicks AS ....

Проверка логики с помощью dbt tests#

dbt упрощает добавление тестов данных. Часть тестов можно описать как generic tests, например unique и not_null в schema.yml.

Для sessionization полезно проверить, что сессии одного посетителя не пересекаются. Для этого можно создать singular test в каталоге tests.

Файл tests/assert_no_overlapping_sessions.sql:

SELECT
    sc1.session_started,
    sc1.session_ended,
    sc2.session_started,
    sc2.session_ended,
    sc1.visitorid
FROM {{ ref('sessionized_clicks') }} sc1
JOIN {{ ref('sessionized_clicks') }} sc2
  ON sc1.visitorid = sc2.visitorid
 AND sc1.session_started < sc2.session_ended
 AND sc1.session_ended > sc2.session_started
 AND sc1.session_started <> sc2.session_started

Запустите тест:

dbt test --select sessionized_clicks

Если запрос теста возвращает строки, dbt считает тест проваленным.

Макросы dbt#

dbt поддерживает macros. Распространенный пакет dbt-utils содержит macros, которые можно переиспользовать в разных проектах.

Starburst поддерживает пакет dbt-trino-utils. Он содержит macros для проектов, работающих на Trino или Starburst, а также реализации dispatched macros из других пакетов.

Пример packages.yml:

packages:
  - package: dbt-labs/dbt_utils
    version: {SEE DBT HUB FOR NEWEST VERSION}
  - package: starburstdata/trino_utils
    version: {SEE DBT HUB FOR NEWEST VERSION}

Добавьте dispatch в dbt_project.yml, чтобы dbt_utils также искал macros из trino_utils:

dispatch:
  - macro_namespace: dbt_utils
    search_order: ['trino_utils', 'dbt_utils']

Установите пакеты:

dbt deps

Пример служебной операции для очистки неиспользуемых объектов:

dbt run-operation trino__drop_old_relations

Чтобы предварительно посмотреть результат без удаления, добавьте аргументы:

dbt run-operation trino__drop_old_relations --args "{dry_run: true}"

Статические данные через seeds#

dbt позволяет загружать содержимое CSV-файлов, которые называются seeds, напрямую в таблицу командой dbt seed. Это удобно для небольших справочников, списков соответствий и статических значений, которые должны храниться рядом с проектом.

Например, создайте файл seeds/campaigns.csv:

campaign_id,country,age_group
1,US,"18-24"
2,Europe,"18-24"
3,US,"25-40"
4,Europe,"25-40"
5,US,"41-60"

Загрузите seeds:

dbt seed

После этого seed можно использовать в моделях через ref():

{{ ref("campaigns") }}

Дальнейшие шаги#

После базового конвейера обычно переходят к инкрементальному обновлению, настройке materialization под конкретные коннекторы, управлению grants, тестированию моделей и документированию lineage. Подробнее о настройках адаптера см. в Конфигурация Starburst и Trino.