Первый конвейер данных с dbt-trino#
Этот материал адаптирован по статье Starburst First dbt-trino data
pipeline.
Он показывает базовый путь от установки dbt-trino до первой модели, источников
данных, DAG, materialization, тестов, macros и seeds.
Что делает dbt-trino#
Адаптер dbt-trino использует Trino как движок SQL-запросов для федеративной работы с разными источниками данных. Один dbt-профиль подключается к Trino, а Trino уже обращается к каталогам и коннекторам: PostgreSQL, Hive, Iceberg, Kafka, объектным хранилищам и другим источникам.
В такой схеме dbt по-прежнему отвечает за трансформации: модели, зависимости, тесты и документацию. Trino выполняет SQL, читает данные из подключенных источников, создает таблицы и представления и, если нужно, записывает результат в целевой каталог.
Установка dbt-trino#
Для установки нужен Python. В исходной статье указано требование Python 3.7+, но для новых версий dbt проверяйте актуальные требования в документации dbt и репозитории адаптера.
Установите адаптер:
pip install dbt-trino
Проверьте установку:
dbt --version
Пример ожидаемого вывода:
Core:
- installed: 1.3.0
- latest: 1.3.0 - Up to date!
Plugins:
- trino: 1.3.1 - Up to date!
Создание проекта и профиля#
Новый dbt-проект можно создать командой:
dbt init
Даже если проект уже скачан или склонирован, dbt init может быть полезен:
команда помогает создать или дополнить локальный файл profiles.yml и настроить
подключение к Trino.
Подробнее о профилях см. в официальной документации dbt.
Пример профиля для локального Trino на порту 8080, где результаты моделей
сохраняются в каталог datalake и схему analytics:
my_dbt_trino_project:
target: dev
outputs:
dev:
type: trino
method: none
user: admin
database: datalake
host: localhost
port: 8080
schema: analytics
threads: 1
В этом примере method: none означает, что в локальной среде разработки не
используется механизм аутентификации. Параметр threads задает, сколько
запросов dbt может выполнять параллельно.
Полный список параметров профиля доступен в репозитории dbt-trino.
Первая модель#
Основная сущность dbt — модель. Обычно это SQL-файл в каталоге models,
содержащий запрос SELECT. Результат модели материализуется в Trino как
VIEW, TABLE или другой поддерживаемый тип materialization.
Например, создайте файл models/src_customers.sql:
SELECT * FROM webshop.public.customers
Запустите проект:
dbt run
dbt создаст объект src_customers в целевой схеме
datalake.analytics. Проверить результат можно из SQL-клиента:
SELECT * FROM datalake.analytics.src_customers;
При повторном запуске dbt run dbt обновит объект согласно текущему
определению модели.
Источники данных#
Жестко прописывать полные имена таблиц прямо в моделях неудобно. Вместо этого
в dbt принято описывать внешние объекты как
sources. Sources задают внешние
таблицы и позволяют обращаться к ним через макрос source().
Создайте файл models/sources.yml:
version: 2
sources:
- name: webshop
database: webshop
schema: public
tables:
- name: customers
После этого модель src_customers.sql можно переписать так:
SELECT * FROM {{ source('webshop', 'customers') }}
Двойные фигурные скобки означают, что фрагмент обрабатывается dbt до отправки
запроса в Trino. В результате dbt скомпилирует source('webshop', 'customers') в конкретное имя объекта из sources.yml.
EL внутри T с помощью Trino#
Обычно dbt не занимается извлечением и загрузкой данных. Он выполняет
трансформации, то есть T в ELT. Но благодаря федеративным возможностям Trino
модель dbt может читать данные из разных каталогов как из единого SQL-слоя.
Например, кроме операционной базы webshop, можно добавить источник
clickstream-данных из каталога website:
version: 2
sources:
- name: webshop
database: webshop
schema: public
tables:
- name: customers
- name: website
database: website
schema: clickstream
tables:
- name: clicks
Создайте модель models/src_clicks.sql:
with source as (
SELECT * FROM {{ source('website', 'clicks') }}
),
renamed as (
SELECT
visitorid,
useragent,
language,
event,
cast(from_iso8601_timestamp(eventtime) AS timestamp(6) with time zone) AS eventtime,
page,
referrer
FROM source
)
SELECT * FROM renamed
После запуска dbt run dbt создаст новую модель на основе clickstream-данных.
Trino выполнит чтение из нужного каталога через соответствующий коннектор.
Как dbt строит DAG#
В конвейерах данных важна модель зависимостей DAG (Directed Acyclic Graph).
Большинство оркестраторов, например Apache Airflow, требует явно описывать
зависимости. dbt строит DAG неявно: каждый SQL-файл является моделью, а
зависимости задаются через макрос ref().
Например:
SELECT * FROM {{ ref('src_customers') }}
ref() сообщает dbt, что текущая модель зависит от модели src_customers. dbt
использует это для правильного порядка выполнения и построения lineage.
Пример: сессии пользователей#
Допустим, нужно узнать, сколько раз клиент посещал сайт перед покупкой. Для этого можно объединить данные интернет-магазина с clickstream-данными и выполнить sessionization.
Добавьте таблицу sessions в источник webshop:
version: 2
sources:
- name: webshop
database: webshop
schema: public
tables:
- name: customers
- name: sessions
Создайте модель models/src_sessions.sql:
with source as (
SELECT * FROM {{ source('webshop', 'sessions') }}
),
renamed as (
SELECT
cookie_id,
cast(from_unixtime(started_ts / 1000) AS timestamp(6)) AS session_started,
customer_id
FROM source
)
SELECT * FROM renamed
Дальше можно построить модель sessionized_clicks.sql. В ней полезно
зафиксировать бизнес-правила:
сессия — это последовательность кликов, где между двумя соседними кликами не проходит больше одного часа;
после входа пользователя в систему прошлые сессии можно связать с этим пользователем;
cookie или другой долгоживущий идентификатор помогает связать клики с пользователем.
Для такой логики Trino предоставляет оконные функции и операции
WINDOW, а dbt
позволяет использовать macros вроде ref() и macros из пакетов dbt-utils или
trino_utils.
Упрощенный фрагмент модели:
WITH sessions AS (
SELECT
date_diff('hour', lag(c.eventtime) OVER w, c.eventtime) > 1 AS new_session,
c.visitorid,
c.eventtime,
c.referrer,
s.customer_id,
min(c.eventtime) OVER w AS session_started,
max(c.eventtime) OVER w AS session_ended
FROM {{ ref("src_clicks") }} c
JOIN {{ ref("src_sessions") }} s
ON c.visitorid = s.cookie_id
AND c.eventtime >= s.session_started
WINDOW w AS (
PARTITION BY c.visitorid
ORDER BY c.eventtime
)
)
SELECT * FROM sessions
В реальной модели можно расширить этот запрос: добавить row_number(),
посчитать последовательность сессий, сформировать sessionid, использовать
dbt_utils.star() для генерации списка столбцов и сохранить результат как
таблицу.
Materializations: view, table, ephemeral#
Чтобы управлять производительностью конвейера, выбирайте materialization для каждой модели осознанно:
viewподходит для легких моделей и промежуточных представлений;tableподходит для тяжелых вычислений и данных, которые нужны конечным пользователям или BI-инструментам;ephemeralне создает объект в базе данных, а подставляет SQL модели в зависимые модели как CTE.
Если sessionization использует оконные функции и добавляет sessionid к
каждому клику, такую операцию нежелательно выполнять заново при каждом
обращении к модели. Поэтому модель sessionized_clicks разумно сохранить как
таблицу.
В начало sessionized_clicks.sql добавьте:
{{ config(materialized='table') }}
После этого dbt run создаст таблицу через запрос вида
CREATE TABLE sessionized_clicks AS ....
Проверка логики с помощью dbt tests#
dbt упрощает добавление тестов данных.
Часть тестов можно описать как generic tests, например unique и not_null в
schema.yml.
Для sessionization полезно проверить, что сессии одного посетителя не
пересекаются. Для этого можно создать singular test в каталоге tests.
Файл tests/assert_no_overlapping_sessions.sql:
SELECT
sc1.session_started,
sc1.session_ended,
sc2.session_started,
sc2.session_ended,
sc1.visitorid
FROM {{ ref('sessionized_clicks') }} sc1
JOIN {{ ref('sessionized_clicks') }} sc2
ON sc1.visitorid = sc2.visitorid
AND sc1.session_started < sc2.session_ended
AND sc1.session_ended > sc2.session_started
AND sc1.session_started <> sc2.session_started
Запустите тест:
dbt test --select sessionized_clicks
Если запрос теста возвращает строки, dbt считает тест проваленным.
Макросы dbt#
dbt поддерживает macros. Распространенный пакет dbt-utils содержит macros, которые можно переиспользовать в разных проектах.
Starburst поддерживает пакет dbt-trino-utils. Он содержит macros для проектов, работающих на Trino или Starburst, а также реализации dispatched macros из других пакетов.
Пример packages.yml:
packages:
- package: dbt-labs/dbt_utils
version: {SEE DBT HUB FOR NEWEST VERSION}
- package: starburstdata/trino_utils
version: {SEE DBT HUB FOR NEWEST VERSION}
Добавьте dispatch в dbt_project.yml, чтобы dbt_utils также искал macros из
trino_utils:
dispatch:
- macro_namespace: dbt_utils
search_order: ['trino_utils', 'dbt_utils']
Установите пакеты:
dbt deps
Пример служебной операции для очистки неиспользуемых объектов:
dbt run-operation trino__drop_old_relations
Чтобы предварительно посмотреть результат без удаления, добавьте аргументы:
dbt run-operation trino__drop_old_relations --args "{dry_run: true}"
Статические данные через seeds#
dbt позволяет загружать содержимое CSV-файлов, которые называются
seeds, напрямую в таблицу командой
dbt seed. Это удобно для небольших справочников, списков соответствий и
статических значений, которые должны храниться рядом с проектом.
Например, создайте файл seeds/campaigns.csv:
campaign_id,country,age_group
1,US,"18-24"
2,Europe,"18-24"
3,US,"25-40"
4,Europe,"25-40"
5,US,"41-60"
Загрузите seeds:
dbt seed
После этого seed можно использовать в моделях через ref():
{{ ref("campaigns") }}
Дальнейшие шаги#
После базового конвейера обычно переходят к инкрементальному обновлению, настройке materialization под конкретные коннекторы, управлению grants, тестированию моделей и документированию lineage. Подробнее о настройках адаптера см. в Конфигурация Starburst и Trino.