Коннектор Delta Lake#

Коннектор Delta Lake позволяет запрашивать данные, хранящиеся в формате Delta Lake, включая Databricks Delta Lake. Коннектор может нативно читать журнал транзакций Delta Lake и поэтому обнаруживать изменения данных, внесенные внешними системами.

Требования#

Для подключения к Databricks Delta Lake требуется:

  • Поддерживаются таблицы, записанные Databricks Runtime 7.3 LTS, 9.1 LTS, 10.4 LTS, 11.3 LTS, 12.2 LTS, 13.3 LTS, 14.3 LTS, 15.4 LTS, 16.4 LTS и 17.3 LTS.

  • Полностью поддерживаются развертывания с AWS, HDFS, Azure Storage и Google Cloud Storage (GCS).

  • Сетевой доступ от координатора и рабочих узлов к хранилищу Delta Lake.

  • Доступ к службе Hive metastore (HMS) Delta Lake, отдельной HMS или metastore Glue.

  • Сетевой доступ к HMS от координатора и рабочих узлов. Порт 9083 — порт по умолчанию для протокола Thrift, используемого HMS.

  • Файлы данных, хранящиеся в формате файлов Parquet на поддерживаемой файловой системе.

Общая конфигурация#

Чтобы настроить коннектор Delta Lake, создайте файл свойств каталога etc/catalog/example.properties, который ссылается на коннектор delta_lake.

Необходимо настроить metastore для метаданных.

Необходимо выбрать и настроить одну из поддерживаемых файловых систем.

connector.name=delta_lake
hive.metastore.uri=thrift://example.net:9083
fs.x.enabled=true

Замените свойство конфигурации fs.x.enabled нужной файловой системой.

Если вы используете AWS Glue как metastore, вместо этого задайте hive.metastore равным glue:

connector.name=delta_lake
hive.metastore=glue

У каждого типа metastore есть собственные свойства конфигурации вместе с общими свойствами конфигурации metastore.

Коннектор распознает таблицы Delta Lake, созданные в metastore средой выполнения Databricks. Если в metastore также присутствуют таблицы не Delta Lake, они не видны коннектору.

Конфигурация доступа к файловым системам#

Коннектор поддерживает доступ к следующим файловым системам:

Необходимо включить и настроить доступ к конкретной файловой системе. Устаревшая поддержка не рекомендуется и будет удалена.

Общие свойства конфигурации Delta Lake#

Все следующие свойства конфигурации используют разумные и протестированные значения по умолчанию. При типичном использовании их не нужно настраивать.

Свойства конфигурации Delta Lake#

Имя свойства

Описание

По умолчанию

delta.metadata.cache-ttl

Длительность кэширования метаданных таблиц Delta Lake.

30m

delta.metadata.cache-max-retained-size

Максимальный сохраняемый размер метаданных таблиц Delta, хранящихся в кэше. Должен задаваться в значениях data size, например 64MB. По умолчанию вычисляется как 5% от максимальной памяти, выделенной JVM.

delta.transaction-log.max-cached-file-size

Максимальный размер файла журнала транзакций Delta, который будет кэшироваться в памяти для кэша метаданных таблицы.

16MB

delta.compression-codec

Кодек сжатия, используемый при записи новых файлов данных. Возможные значения:

  • NONE

  • SNAPPY

  • ZSTD

  • GZIP

Эквивалентное свойство сеанса каталога — compression_codec.

ZSTD

delta.max-partitions-per-writer

Максимальное число секций на writer.

100

delta.idle-writer-min-file-size

Минимальный объем данных, записанный одним writer секции, после которого он может считаться простаивающим и быть закрыт движком. Эквивалентное свойство сеанса каталога — idle_writer_min_file_size.

16MB

delta.hide-non-delta-lake-tables

Скрывать сведения о таблицах, которыми не управляет Delta Lake. Скрытие применяется только к таблицам, метаданные которых управляются в каталоге Glue, и не применяется при использовании службы Hive metastore.

false

delta.enable-non-concurrent-writes

Включает поддержку записи для всех поддерживаемых файловых систем. Обратите особое внимание на предупреждение о параллелизме и checkpoint.

false

delta.default-checkpoint-writing-interval

Целочисленное значение по умолчанию для записи записей checkpoint журнала транзакций. Если значение равно N, checkpoint записываются после каждого N-го оператора, выполняющего запись в таблицу. Значение можно переопределить для конкретной таблицы свойством таблицы checkpoint_interval.

10

delta.hive-catalog-name

Имя каталога, в который перенаправляются запросы SELECT при обнаружении таблицы Hive.

delta.checkpoint-row-statistics-writing.enabled

Включает запись статистики строк в файлы checkpoint.

true

delta.dynamic-filtering.wait-timeout

Длительность ожидания завершения динамической фильтрации во время генерации split. Эквивалентное свойство сеанса каталога — dynamic_filtering_wait_timeout.

delta.table-statistics-enabled

Включает статистику таблиц для повышения производительности. Эквивалентное свойство сеанса каталога — statistics_enabled.

true

delta.extended-statistics.enabled

Включает сбор статистики с помощью ANALYZE и использование расширенной статистики. Эквивалентное свойство сеанса каталога — extended_statistics_enabled.

true

delta.extended-statistics.collect-on-write

Включает сбор расширенной статистики для операций записи. Эквивалентное свойство сеанса каталога — extended_statistics_collect_on_write.

true

delta.per-transaction-metastore-cache-maximum-size

Максимальное число объектов данных metastore на транзакцию в кэше Hive metastore.

1000

delta.metastore.store-table-metadata

Хранить комментарии таблиц и определения столбцов в metastore. Для обновления metastore требуется право записи.

false

delta.metastore.store-table-metadata-threads

Число потоков, используемых для сохранения метаданных таблиц в metastore.

5

delta.delete-schema-locations-fallback

Удалять ли расположения схем, когда Trino не может определить, содержат ли они внешние файлы.

false

delta.parquet.time-zone

Часовой пояс для чтения и записи Parquet.

JVM default

delta.target-max-file-size

Целевой максимальный размер записываемых файлов; фактический размер может быть больше. Эквивалентное свойство сеанса каталога — target_max_file_size.

1GB

delta.unique-table-location

Использовать рандомизированные уникальные расположения таблиц.

true

delta.register-table-procedure.enabled

Включает возможность вызова пользователями процедуры register_table.

false

delta.vacuum.min-retention

Минимальный порог хранения для файлов, учитываемых при удалении процедурой VACUUM. Эквивалентное свойство сеанса каталога — vacuum_min_retention.

7 DAYS

delta.deletion-vectors-enabled

Установите true, чтобы по умолчанию включать deletion vectors при создании новых таблиц.

false

delta.metadata.parallelism

Число потоков, используемых для получения метаданных. В настоящее время распараллелена только загрузка таблиц.

8

delta.checkpoint-processing.parallelism

Число потоков, используемых для получения файлов checkpoint каждой таблицы. В настоящее время распараллелено только получение sidecar-файлов V2 Checkpoint.

4

Свойства сеанса каталога#

В следующей таблице описаны свойства сеанса каталога, поддерживаемые коннектором Delta Lake:

Свойства сеанса каталога#

Имя свойства

Описание

По умолчанию

parquet_max_read_block_size

Максимальный размер блока, используемый при чтении файлов Parquet.

16MB

parquet_writer_block_size

Максимальный размер блока, создаваемого writer Parquet.

128MB

parquet_writer_page_size

Максимальный размер страницы, создаваемой writer Parquet.

1MB

parquet_writer_page_value_count

Максимальное число значений на страницах, создаваемых writer Parquet.

60000

parquet_writer_batch_size

Максимальное число строк, обрабатываемых writer Parquet в одном пакете.

10000

projection_pushdown_enabled

Читать только проецируемые поля из столбцов-строк при выполнении запросов SELECT.

true

Поддержка отказоустойчивого выполнения#

Коннектор поддерживает Fault-tolerant execution обработки запросов. Операции чтения и записи поддерживаются с любой политикой повторов.

Сопоставление типов#

Поскольку Trino и Delta Lake поддерживают типы, которых нет в другой системе, этот коннектор изменяет некоторые типы при чтении или записи данных. Типы данных могут сопоставляться по-разному в разных направлениях между Trino и источником данных. Сведения о сопоставлении типов в каждом направлении см. в следующих разделах.

Дополнительные сведения о поддерживаемых типах данных в спецификации формата таблиц Delta Lake см. в спецификации Delta Transaction Log.

Сопоставление типов Delta Lake с типами Trino#

Коннектор сопоставляет типы Delta Lake с соответствующими типами Trino согласно этой таблице:

Сопоставление типов Delta Lake с типами Trino#

Тип Delta Lake

Тип Trino

BOOLEAN

BOOLEAN

INTEGER

INTEGER

BYTE

TINYINT

SHORT

SMALLINT

LONG

BIGINT

FLOAT

REAL

DOUBLE

DOUBLE

DECIMAL(p,s)

DECIMAL(p,s)

STRING

VARCHAR

BINARY

VARBINARY

DATE

DATE

TIMESTAMPNTZ (TIMESTAMP_NTZ)

TIMESTAMP(6)

TIMESTAMP

TIMESTAMP(3) WITH TIME ZONE

VARIANT

JSON

ARRAY

ARRAY

MAP

MAP

STRUCT(...)

ROW(...)

Другие типы не поддерживаются.

Сопоставление типов Trino с типами Delta Lake#

Коннектор сопоставляет типы Trino с соответствующими типами Delta Lake согласно этой таблице:

Сопоставление типов Trino с типами Delta Lake#

Тип Trino

Тип Delta Lake

BOOLEAN

BOOLEAN

INTEGER

INTEGER

TINYINT

BYTE

SMALLINT

SHORT

BIGINT

LONG

REAL

FLOAT

DOUBLE

DOUBLE

DECIMAL(p,s)

DECIMAL(p,s)

VARCHAR

STRING

VARBINARY

BINARY

DATE

DATE

TIMESTAMP

TIMESTAMPNTZ (TIMESTAMP_NTZ)

TIMESTAMP(3) WITH TIME ZONE

TIMESTAMP

ARRAY

ARRAY

MAP

MAP

ROW(...)

STRUCT(...)

Другие типы не поддерживаются.

Возможности таблиц Delta Lake#

Коннектор поддерживает следующие возможности таблиц Delta Lake:

Возможности таблиц#

Возможность

Описание

Append-only tables

Только для записи

Column invariants

Только для записи

CHECK constraints

Только для записи

Change data feed

Только для записи

Column mapping

Для чтения и записи

Deletion vectors

Для чтения и записи

Iceberg compatibility V1 & V2

Только для чтения

Invariants

Только для записи

Timestamp without time zone

Для чтения и записи

Type widening

Только для чтения

Vacuum protocol check

Для чтения и записи

V2 checkpoint

Только для чтения

Другие возможности не поддерживаются.

Безопасность#

Коннектор Delta Lake позволяет выбрать один из нескольких способов предоставления авторизации на уровне каталога. В разных файлах каталогов Delta Lake можно выбрать разные типы проверки авторизации.

Проверки авторизации#

Включите проверки авторизации для коннектора, задав свойство delta.security в файле свойств каталога. Это свойство должно быть одним из значений безопасности в следующей таблице:

Значения безопасности Delta Lake#

Значение свойства

Описание

ALLOW_ALL (значение по умолчанию)

Проверки авторизации не применяются.

SYSTEM

Коннектор полагается на системный контроль доступа.

READ_ONLY

Разрешены операции, читающие данные или метаданные, например SELECT. Операции, записывающие данные или метаданные, например CREATE TABLE, INSERT или DELETE, не разрешены.

FILE

Проверки авторизации применяются с помощью файла конфигурации контроля доступа на уровне каталога, путь к которому указан в свойстве конфигурации каталога security.config-file. Сведения о файле конфигурации авторизации см. в Файлы контроля доступа на уровне каталога.

Поддержка SQL#

Коннектор предоставляет доступ на чтение и запись к данным и метаданным в Delta Lake. Помимо глобально доступных операторов и операций чтения, коннектор поддерживает следующие возможности:

Запросы time travel#

Коннектор позволяет запрашивать исторические данные. Это дает возможность запрашивать таблицу в том виде, в котором она была на момент создания предыдущего снимка, даже если затем данные были изменены или удалены.

Исторические данные таблицы можно получить, указав номер версии, соответствующий нужной версии таблицы:

SELECT *
FROM example.testdb.customer_orders FOR VERSION AS OF 3

Другой способ получить исторические данные — указать момент времени в прошлом, например день или неделю назад. Для предоставления предыдущего состояния таблицы внутренне используется последний снимок таблицы, созданный до указанной в запросе метки времени или ровно в нее:

SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 09:59:29.803 America/Los_Angeles';

Коннектор позволяет создать новый снимок через замену таблицы Delta Lake.

CREATE OR REPLACE TABLE example.testdb.customer_orders AS
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 09:59:29.803 America/Los_Angeles';

Можно использовать дату, чтобы указать момент времени в прошлом для применения снимка таблицы в запросе. Если часовой пояс сеанса — America/Los_Angeles, следующие запросы эквивалентны:

SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF DATE '2022-03-23';
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 00:00:00';
SELECT *
FROM example.testdb.customer_orders FOR TIMESTAMP AS OF TIMESTAMP '2022-03-23 00:00:00.000 America/Los_Angeles';

Используйте метаданную таблицу $history, чтобы определить ID снимка таблицы, как в следующем запросе:

SELECT version, operation
FROM example.testdb."customer_orders$history"
ORDER BY version DESC

Процедуры#

Используйте оператор CALL для выполнения операций изменения данных или административных задач. Процедуры доступны в системной схеме каждого каталога. Следующий фрагмент кода показывает, как вызвать example_procedure в каталоге examplecatalog:

CALL examplecatalog.system.example_procedure()

Регистрация таблицы#

Коннектор может зарегистрировать существующие таблицы Delta Lake в metastore, если для каталога delta.register-table-procedure.enabled установлено в true.

Процедура system.register_table позволяет вызывающему зарегистрировать существующую таблицу Delta Lake в metastore, используя ее существующие журналы транзакций и файлы данных:

CALL example.system.register_table(schema_name => 'testdb', table_name => 'customer_orders', table_location => 's3://my-bucket/a/path')

Чтобы предотвратить доступ неавторизованных пользователей к данным, эта процедура по умолчанию отключена. Процедура включается только когда delta.register-table-procedure.enabled установлено в true.

Отмена регистрации таблицы#

Коннектор может удалить существующие таблицы Delta Lake из metastore. После отмены регистрации таблицу больше нельзя запрашивать из Trino.

Процедура system.unregister_table позволяет вызывающему отменить регистрацию существующей таблицы Delta Lake из metastore без удаления данных:

CALL example.system.unregister_table(schema_name => 'testdb', table_name => 'customer_orders')

Очистка кэша метаданных#

  • system.flush_metadata_cache()

    Очищает все кэши метаданных.

  • system.flush_metadata_cache(schema_name => ..., table_name => ...)

    Очищает записи кэша метаданных конкретной таблицы. Процедура требует передачи именованных параметров.

VACUUM#

Процедура VACUUM удаляет все старые файлы, отсутствующие в журнале транзакций, а также файлы, которые не нужны для чтения снимков таблицы новее, чем текущее время минус период хранения, заданный параметром retention period.

Пользователи с правами INSERT и DELETE на таблицу могут выполнить VACUUM следующим образом:

CALL example.system.vacuum('exampleschemaname', 'exampletablename', '7d');

Все параметры обязательны и должны быть представлены в следующем порядке:

  • имя схемы;

  • имя таблицы;

  • период хранения.

Свойство конфигурации delta.vacuum.min-retention предоставляет защитную меру, чтобы гарантировать ожидаемое сохранение файлов. Минимальное значение этого свойства — 0s. Также существует свойство сеанса минимального периода хранения vacuum_min_retention.

Управление данными#

Коннектор можно использовать для INSERT, DELETE, UPDATE и MERGE данных в таблицах Delta Lake.

Операции записи поддерживаются для таблиц, хранящихся в следующих системах:

  • Azure ADLS Gen2, Google Cloud Storage

    Запись в Azure ADLS Gen2 и Google Cloud Storage включена по умолчанию. Trino обнаруживает конфликты записи в этих системах хранения при записи из нескольких кластеров Trino или из других движков запросов.

  • S3 и S3-совместимое хранилище

    Запись в Amazon S3 и S3-совместимое хранилище управляется следующими свойствами конфигурации. Когда delta.s3.transaction-log-conditional-writes.enabled установлено в true (по умолчанию), коннектор использует условные записи S3 для обнаружения конфликтов записи в журнал. Это совместимо с любыми другими движками, которые также используют условные записи.

    Когда delta.s3.transaction-log-conditional-writes.enabled равно false, запись в Amazon S3 и S3-совместимое хранилище должна быть включена свойством delta.enable-non-concurrent-writes. В этом режиме коннектор использует гарантии строгой согласованности S3 вместе со специфичной для Trino стратегией именования, чтобы организовать создание новых файлов журнала. В этом режиме запись в S3 можно безопасно выполнять из нескольких кластеров Trino, использующих тот же режим записи. Однако конфликты записи не обнаруживаются при параллельной записи из других движков Delta Lake или из кластеров Trino, использующих условные записи S3. Нужно убедиться, что параллельные изменения данных не выполняются, чтобы избежать повреждения данных.

Управление схемами и таблицами#

Функциональность Управление схемами и таблицами включает поддержку:

Коннектор поддерживает создание схем. Схему можно создать с указанным расположением или без него.

Схему можно создать оператором CREATE SCHEMA и свойством схемы location. Таблицы в этой схеме располагаются в подкаталоге расположения схемы. Файлы данных для таблиц в этой схеме, использующих расположение по умолчанию, удаляются при удалении таблицы:

CREATE SCHEMA example.example_schema
WITH (location = 's3://my-bucket/a/path');

При необходимости расположение можно опустить. При создании таблиц в этой схеме для них нужно включать расположение. Файлы данных этих таблиц не удаляются при удалении таблицы:

CREATE SCHEMA example.example_schema;

Когда таблицы Delta Lake существуют в хранилище, но отсутствуют в metastore, Trino можно использовать для регистрации таблиц:

CALL example.system.register_table(schema_name => 'testdb', table_name => 'example_table', table_location => 's3://my-bucket/a/path')

Вместо этого схема таблицы считывается из журнала транзакций. Если схема изменяется внешней системой, Trino автоматически использует новую схему.

Warning

Использование CREATE TABLE с существующим содержимым таблицы запрещено; используйте вместо этого процедуру system.register_table.

Если указанное расположение еще не содержит таблицу Delta, коннектор автоматически записывает начальные записи журнала транзакций и регистрирует таблицу в metastore. В результате любой движок Databricks может записывать в таблицу:

CREATE TABLE example.default.new_table (id BIGINT, address VARCHAR);

Коннектор Delta Lake также поддерживает создание таблиц с помощью синтаксиса CREATE TABLE AS.

Эволюция схемы#

Коннектор Delta Lake поддерживает эволюцию схемы, включая безопасные операции добавления, удаления и переименования столбцов для невложенных структур.

Коннектор поддерживает следующие операторы ALTER TABLE.

Замена таблиц#

Коннектор поддерживает замену существующей таблицы как атомарную операцию. Атомарная замена таблицы создает новый снимок с новым определением таблицы как часть истории таблицы.

Чтобы заменить таблицу, используйте CREATE OR REPLACE TABLE или CREATE OR REPLACE TABLE AS.

В этом примере таблица example_table заменяется полностью новым определением и данными из исходной таблицы:

CREATE OR REPLACE TABLE example_table
WITH (partitioned_by = ARRAY['a'])
AS SELECT * FROM another_table;

ALTER TABLE EXECUTE#

Коннектор поддерживает следующие команды для использования с ALTER TABLE EXECUTE.

optimize#

Команда optimize используется для перезаписи содержимого указанной таблицы так, чтобы оно было объединено в меньшее число более крупных файлов. Если таблица секционирована, компактирование данных применяется отдельно к каждой секции, выбранной для оптимизации. Эта операция повышает производительность чтения.

Объединяются все файлы с размером меньше необязательного параметра file_size_threshold (значение порога по умолчанию — 100MB):

ALTER TABLE test_table EXECUTE optimize

Следующий оператор объединяет в таблице файлы размером меньше 128 мегабайт:

ALTER TABLE test_table EXECUTE optimize(file_size_threshold => '128MB')

Можно использовать предложение WHERE со столбцами, по которым секционирована таблица, чтобы отфильтровать секции для оптимизации:

ALTER TABLE test_partitioned_table EXECUTE optimize
WHERE partition_key = 1

Можно использовать более сложное предложение WHERE, чтобы сузить область действия процедуры optimize. В следующем примере значения меток времени приводятся к датам, а сравнение используется для оптимизации только секций с данными за 2022 год или более поздними:

ALTER TABLE test_table EXECUTE optimize
WHERE CAST(timestamp_tz AS DATE) > DATE '2021-12-31'

Используйте предложение WHERE с метаданными столбцами, чтобы отфильтровать файлы для оптимизации.

ALTER TABLE test_table EXECUTE optimize
WHERE "$file_modified_time" > date_trunc('day', CURRENT_TIMESTAMP);
ALTER TABLE test_table EXECUTE optimize
WHERE "$path" <> 'skipping-file-path'
-- оптимизация файлов меньше 1MB
ALTER TABLE test_table EXECUTE optimize
WHERE "$file_size" <= 1024 * 1024

ALTER TABLE RENAME TO#

Коннектор поддерживает оператор ALTER TABLE RENAME TO только при выполнении одного из следующих условий:

  • Тип таблицы — external.

  • Таблица поддерживается metastore, который не выполняет операции объектного хранилища, например AWS Glue.

Свойства таблиц#

Доступны следующие свойства таблиц:

Свойства таблиц Delta Lake#

Имя свойства

Описание

location

URI расположения таблицы в файловой системе.

partitioned_by

Задает столбцы секционирования.

checkpoint_interval

Задает интервал checkpoint в числе записей таблицы.

change_data_feed_enabled

Включает хранение записей change data feed.

column_mapping_mode

Режим сопоставления столбцов. Возможные значения:

  • ID

  • NAME

  • NONE

По умолчанию — NONE.

deletion_vectors_enabled

Включает deletion vectors.

Следующий пример использует все доступные свойства таблиц:

CREATE TABLE example.default.example_partitioned_table
WITH (
  location = 's3://my-bucket/a/path',
  partitioned_by = ARRAY['regionkey'],
  checkpoint_interval = 5,
  change_data_feed_enabled = false,
  column_mapping_mode = 'name',
  deletion_vectors_enabled = false
)
AS SELECT name, comment, regionkey FROM tpch.tiny.nation;

Shallow cloned tables#

Коннектор поддерживает операции чтения и записи для shallow cloned tables. Trino не поддерживает создание shallow clone tables. Подробнее о shallow cloning см. в документации Delta Lake.

Shallow cloned tables позволяют тестировать запросы или экспериментировать с изменениями таблицы без дублирования данных.

Таблицы метаданных#

Коннектор предоставляет несколько таблиц метаданных для каждой таблицы Delta Lake. Эти таблицы метаданных содержат информацию о внутренней структуре таблицы Delta Lake. Каждую таблицу метаданных можно запросить, добавив имя таблицы метаданных к имени таблицы:

SELECT * FROM "test_table$history"
Таблица $history#

Таблица $history предоставляет журнал изменений метаданных, выполненных над таблицей Delta Lake.

Журнал изменений таблицы Delta Lake test_table можно получить следующим запросом:

SELECT * FROM "test_table$history"
 version |               timestamp               | user_id | user_name |  operation   |         operation_parameters          |                 cluster_id      | read_version |  isolation_level  | is_blind_append | operation_metrics
---------+---------------------------------------+---------+-----------+--------------+---------------------------------------+---------------------------------+--------------+-------------------+-----------------+-------------------
       2 | 2023-01-19 07:40:54.684 Europe/Vienna | trino   | trino     | WRITE        | {queryId=20230119_064054_00008_4vq5t} | trino-406-trino-coordinator     |            2 | WriteSerializable | true            | {}
       1 | 2023-01-19 07:40:41.373 Europe/Vienna | trino   | trino     | ADD COLUMNS  | {queryId=20230119_064041_00007_4vq5t} | trino-406-trino-coordinator     |            0 | WriteSerializable | true            | {}
       0 | 2023-01-19 07:40:10.497 Europe/Vienna | trino   | trino     | CREATE TABLE | {queryId=20230119_064010_00005_4vq5t} | trino-406-trino-coordinator     |            0 | WriteSerializable | true            | {}

Вывод запроса содержит следующие столбцы истории:

Столбцы истории#

Имя

Тип

Описание

version

BIGINT

Версия таблицы, соответствующая операции.

timestamp

TIMESTAMP(3) WITH TIME ZONE

Время, когда версия таблицы стала активной. Для таблиц с включенными in-Commit timestamps это поле возвращает значение inCommitTimestamp, иначе возвращает значение поля timestamp из commitInfo.

user_id

VARCHAR

Идентификатор пользователя, выполнившего операцию.

user_name

VARCHAR

Имя пользователя, выполнившего операцию.

operation

VARCHAR

Имя операции, выполненной над таблицей.

operation_parameters

map(VARCHAR, VARCHAR)

Параметры операции.

cluster_id

VARCHAR

ID кластера, выполнившего операцию.

read_version

BIGINT

Версия таблицы, которая была прочитана для выполнения операции.

isolation_level

VARCHAR

Уровень изоляции, использованный для выполнения операции.

is_blind_append

BOOLEAN

Добавляла ли операция данные.

operation_metrics

map(VARCHAR, VARCHAR)

Метрики операции.

Таблица $partitions#

Таблица $partitions предоставляет подробный обзор секций таблицы Delta Lake.

Информацию о секциях таблицы Delta Lake test_table можно получить следующим запросом:

SELECT * FROM "test_table$partitions"
           partition           | file_count | total_size |                     data
-------------------------------+------------+------------+----------------------------------------------+
{_bigint=1, _date=2021-01-12}  |          2 |        884 | {_decimal={min=1.0, max=2.0, null_count=0}}
{_bigint=1, _date=2021-01-13}  |          1 |        442 | {_decimal={min=1.0, max=1.0, null_count=0}}

Вывод запроса содержит следующие столбцы:

Столбцы секций#

Имя

Тип

Описание

partition

ROW(...)

Строка, содержащая сопоставление имен столбцов секционирования со значениями столбцов секционирования.

file_count

BIGINT

Число файлов, сопоставленных с секцией.

total_size

BIGINT

Размер всех файлов в секции.

data

ROW(... ROW (min ..., max ... , null_count BIGINT))

Диапазон секции и счетчики null.

Таблица $properties#

Таблица $properties предоставляет доступ к конфигурации таблицы Delta Lake, возможностям таблицы и свойствам таблицы. Строки таблицы являются парами ключ/значение.

Свойства таблицы Delta test_table можно получить следующим запросом:

SELECT * FROM "test_table$properties"
 key                        | value
----------------------------+-----------------+
delta.minReaderVersion      | 1
delta.minWriterVersion      | 4
delta.columnMapping.mode    | name
delta.feature.columnMapping | supported

Столбцы метаданных#

Помимо определенных столбцов, коннектор Delta Lake автоматически предоставляет метаданные в нескольких скрытых столбцах каждой таблицы. Эти столбцы можно использовать в SQL-операторах как любые другие, например выбирать напрямую или использовать в условных выражениях.

  • $path

    Полный путь в файловой системе к файлу для этой строки.

  • $file_modified_time

    Дата и время последнего изменения файла для этой строки.

  • $file_size

    Размер файла для этой строки.

Табличные функции#

Коннектор предоставляет следующие табличные функции:

table_changes#

Позволяет читать записи Change Data Feed (CDF), чтобы показывать изменения на уровне строк между двумя версиями таблицы Delta Lake. Когда свойство таблицы change_data_feed_enabled установлено в true для конкретной таблицы Delta Lake, коннектор записывает события изменений для всех изменений данных в таблице. Эти изменения можно прочитать так:

SELECT
  *
FROM
  TABLE(
    system.table_changes(
      schema_name => 'test_schema',
      table_name => 'tableName',
      since_version => 0
    )
  );

schema_name — тип VARCHAR, обязательно, имя схемы, для которой вызывается функция.

table_name — тип VARCHAR, обязательно, имя таблицы, для которой вызывается функция.

since_version — тип BIGINT, необязательно, версия, начиная с которой показываются изменения, исключительно.

Помимо возврата столбцов, присутствующих в таблице, функция возвращает следующие значения для каждого события изменения:

  • _change_type

    Указывает тип произошедшего изменения. Возможные значения: insert, delete, update_preimage и update_postimage.

  • _commit_version

    Показывает версию таблицы, в которой произошло изменение.

  • _commit_timestamp

    Представляет timestamp коммита, в котором произошло указанное изменение.

Обычный пример использования:

Создание таблицы:

CREATE TABLE test_schema.pages (page_url VARCHAR, domain VARCHAR, views INTEGER)
    WITH (change_data_feed_enabled = true);

Вставка данных:

INSERT INTO test_schema.pages
    VALUES
        ('url1', 'domain1', 1),
        ('url2', 'domain2', 2),
        ('url3', 'domain1', 3);
INSERT INTO test_schema.pages
    VALUES
        ('url4', 'domain1', 400),
        ('url5', 'domain2', 500),
        ('url6', 'domain3', 2);

Обновление данных:

UPDATE test_schema.pages
    SET domain = 'domain4'
    WHERE views = 2;

Выбор изменений:

SELECT
  *
FROM
  TABLE(
    system.table_changes(
      schema_name => 'test_schema',
      table_name => 'pages',
      since_version => 1
    )
  )
ORDER BY _commit_version ASC;

Предыдущая последовательность SQL-операторов возвращает следующий результат:

page_url    |     domain     |    views    |    _change_type     |    _commit_version    |    _commit_timestamp
url4        |     domain1    |    400      |    insert           |     2                 |    2023-03-10T21:22:23.000+0000
url5        |     domain2    |    500      |    insert           |     2                 |    2023-03-10T21:22:23.000+0000
url6        |     domain3    |    2        |    insert           |     2                 |    2023-03-10T21:22:23.000+0000
url2        |     domain2    |    2        |    update_preimage  |     3                 |    2023-03-10T22:23:24.000+0000
url2        |     domain4    |    2        |    update_postimage |     3                 |    2023-03-10T22:23:24.000+0000
url6        |     domain3    |    2        |    update_preimage  |     3                 |    2023-03-10T22:23:24.000+0000
url6        |     domain4    |    2        |    update_postimage |     3                 |    2023-03-10T22:23:24.000+0000

Вывод показывает, какие изменения произошли в какой версии. Например, в версии 3 были изменены две строки: первая изменилась с ('url2', 'domain2', 2) на ('url2', 'domain4', 2), а вторая — с ('url6', 'domain2', 2) на ('url6', 'domain4', 2).

Если since_version не указан, функция создает события изменений начиная с момента создания таблицы.

SELECT
  *
FROM
  TABLE(
    system.table_changes(
      schema_name => 'test_schema',
      table_name => 'pages'
    )
  )
ORDER BY _commit_version ASC;

Предыдущий SQL-оператор возвращает следующий результат:

page_url    |     domain     |    views    |    _change_type     |    _commit_version    |    _commit_timestamp
url1        |     domain1    |    1        |    insert           |     1                 |    2023-03-10T20:21:22.000+0000
url2        |     domain2    |    2        |    insert           |     1                 |    2023-03-10T20:21:22.000+0000
url3        |     domain1    |    3        |    insert           |     1                 |    2023-03-10T20:21:22.000+0000
url4        |     domain1    |    400      |    insert           |     2                 |    2023-03-10T21:22:23.000+0000
url5        |     domain2    |    500      |    insert           |     2                 |    2023-03-10T21:22:23.000+0000
url6        |     domain3    |    2        |    insert           |     2                 |    2023-03-10T21:22:23.000+0000
url2        |     domain2    |    2        |    update_preimage  |     3                 |    2023-03-10T22:23:24.000+0000
url2        |     domain4    |    2        |    update_postimage |     3                 |    2023-03-10T22:23:24.000+0000
url6        |     domain3    |    2        |    update_preimage  |     3                 |    2023-03-10T22:23:24.000+0000
url6        |     domain4    |    2        |    update_postimage |     3                 |    2023-03-10T22:23:24.000+0000

Можно увидеть изменения, произошедшие в версии 1, как три вставки. Они не отображаются в предыдущем операторе, когда значение since_version было установлено в 1.

Производительность#

Коннектор включает ряд улучшений производительности, подробно описанных в следующих разделах:

Статистика таблиц#

Используйте операторы ANALYZE в Trino, чтобы заполнить расширенную статистику таблиц Delta Lake по размеру данных и числу различных значений (NDV). Статистика минимального значения, максимального значения, числа значений и числа null вычисляется на лету из журнала транзакций таблицы Delta Lake. Затем оптимизатор на основе стоимости использует эту статистику для повышения производительности запросов.

Расширенная статистика включает более широкий набор оптимизаций, включая переупорядочивание соединений. Управляющее свойство каталога delta.table-statistics-enabled включено по умолчанию. Эквивалентное свойство сеанса каталогаstatistics_enabled.

Каждый оператор ANALYZE обновляет статистику таблицы инкрементально, поэтому учитываются только данные, изменившиеся с последнего ANALYZE. Статистика таблицы не обновляется автоматически операциями записи, такими как INSERT, UPDATE и DELETE. Нужно вручную снова выполнить ANALYZE, чтобы обновить статистику таблицы.

Чтобы собрать статистику для таблицы, выполните следующий оператор:

ANALYZE table_schema.table_name;

Чтобы пересчитать статистику таблицы с нуля, используйте дополнительный параметр mode:

ANALYZE table_schema.table_name WITH(mode = ‘full_refresh’);

Доступны два режима: full_refresh и incremental. По умолчанию процедура использует incremental.

Чтобы получить максимальную пользу от оптимизаций на основе стоимости, периодически выполняйте ANALYZE для каждой большой таблицы, к которой часто выполняются запросы.

Тонкая настройка#

Свойство files_modified_after полезно, если нужно выполнить оператор ANALYZE для таблицы, которая уже анализировалась ранее. Его можно использовать для ограничения объема данных, используемых для генерации статистики таблицы:

ANALYZE example_table WITH(files_modified_after = TIMESTAMP '2021-08-23
16:43:01.321 Z')

В результате в анализе используются только файлы новее указанной метки времени.

Также можно указать набор или поднабор столбцов для анализа с помощью свойства columns:

ANALYZE example_table WITH(columns = ARRAY['nationkey', 'regionkey'])

Чтобы выполнить ANALYZE с columns более одного раза, следующий ANALYZE должен выполняться с тем же набором или поднабором исходных столбцов.

Чтобы расширить набор columns, удалите статистику и заново проанализируйте таблицу.

Отключение и удаление расширенной статистики#

Расширенную статистику можно отключить свойством конфигурации каталога delta.extended-statistics.enabled, установленным в false. Также ее можно отключить для сеанса с помощью свойства сеанса каталога extended_statistics_enabled, установленного в false.

Если таблица изменена большим числом операций удаления и обновления, вызов ANALYZE не дает точной статистики. Чтобы исправить статистику, нужно удалить расширенную статистику и заново проанализировать таблицу.

Используйте процедуру system.drop_extended_stats в каталоге, чтобы удалить расширенную статистику для указанной таблицы в указанной схеме:

CALL example.system.drop_extended_stats('example_schema', 'example_table')

Использование памяти#

Коннектор Delta Lake интенсивно использует память, и объем требуемой памяти растет с размером журналов транзакций Delta Lake любых доступных таблиц. Это важно учитывать при выделении ресурсов координатору.

Необходимо снижать использование памяти, поддерживая низкое число активных файлов данных в таблице с помощью регулярного выполнения OPTIMIZE и VACUUM в Delta Lake.

Мониторинг памяти#

При использовании коннектора Delta Lake необходимо отслеживать использование памяти на координаторе. В частности, отслеживайте использование heap JVM стандартными инструментами как часть штатной эксплуатации кластера.

Хороший косвенный показатель использования памяти — загрузка кэшей Delta Lake. Она предоставляется коннектором через JMX bean plugin.deltalake.transactionlog:name=<catalog-name>,type=transactionlogaccess.

К нему можно обращаться любым стандартным ПО мониторинга с поддержкой JMX или использовать Коннектор JMX со следующим запросом:

SELECT * FROM jmx.current."*.plugin.deltalake.transactionlog:name=<catalog-name>,type=transactionlogaccess"

Пример результата:

datafilemetadatacachestats.hitrate      | 0.97
datafilemetadatacachestats.missrate     | 0.03
datafilemetadatacachestats.requestcount | 3232
metadatacachestats.hitrate              | 0.98
metadatacachestats.missrate             | 0.02
metadatacachestats.requestcount         | 6783
node                                    | trino-master
object_name                             | io.trino.plugin.deltalake.transactionlog:type=TransactionLogAccess,name=delta

В здоровой системе оба значения datafilemetadatacachestats.hitrate и metadatacachestats.hitrate близки к 1.0.

Перенаправление таблиц#

Trino позволяет прозрачно перенаправлять операции над существующей таблицей в подходящий каталог на основе формата таблицы и конфигурации каталога.

В контексте коннекторов, зависящих от службы metastore, например Коннектор Hive, Коннектор Iceberg и Коннектор Delta Lake, metastore (служба Hive metastore, AWS Glue Data Catalog) может использоваться для работы с таблицами разных форматов. Поэтому база данных metastore может содержать разные таблицы с разными форматами.

В качестве конкретного примера используем следующий простой сценарий, в котором задействовано перенаправление таблицы:

USE example.example_schema;

EXPLAIN SELECT * FROM example_table;
                               Query Plan
-------------------------------------------------------------------------
Fragment 0 [SOURCE]
     ...
     Output[columnNames = [...]]
     │   ...
     └─ TableScan[table = another_catalog:example_schema:example_table]
            ...

Вывод оператора EXPLAIN указывает фактический каталог, который обрабатывает запрос SELECT к таблице example_table.

Функциональность перенаправления таблиц также работает при использовании полностью квалифицированных имен таблиц:

EXPLAIN SELECT * FROM example.example_schema.example_table;
                               Query Plan
-------------------------------------------------------------------------
Fragment 0 [SOURCE]
     ...
     Output[columnNames = [...]]
     │   ...
     └─ TableScan[table = another_catalog:example_schema:example_table]
            ...

Trino поддерживает перенаправление таблиц для следующих операций:

Trino не поддерживает перенаправление представлений.

Коннектор поддерживает перенаправление из таблиц Delta Lake в таблицы Hive с помощью свойства конфигурации каталога delta.hive-catalog-name.

Свойства конфигурации настройки производительности#

В следующей таблице описаны свойства каталога для настройки производительности, специфичные для коннектора Delta Lake.

Warning

Свойства конфигурации настройки производительности считаются возможностями экспертного уровня. Изменение этих свойств относительно значений по умолчанию, скорее всего, приведет к нестабильности и снижению производительности. Настоятельно рекомендуется использовать их только для решения нетривиальных проблем производительности и сохранять резервную копию исходных значений, если вы их меняете.

Свойства конфигурации настройки производительности Delta Lake#

Имя свойства

Описание

По умолчанию

delta.domain-compaction-threshold

Минимальный размер предикатов запроса, выше которого Trino компактирует предикаты. Проталкивание большого списка предикатов в источник данных может ухудшить производительность. Для оптимизации в такой ситуации Trino может компактировать большие предикаты. При необходимости настройте порог, чтобы сохранить баланс между производительностью и проталкиванием предикатов.

1000

delta.max-outstanding-splits

Целевое число буферизованных split для каждого сканирования таблицы в запросе до того, как планировщик попытается приостановиться.

1000

delta.max-splits-per-second

Задает максимальное число split в секунду, используемых для доступа к базовому хранилищу. Уменьшите это число, если ваш лимит регулярно превышается, исходя из ограничений файловой системы. Значение установлено в абсолютный максимум, что по умолчанию приводит к максимальному распараллеливанию доступа к данным в Trino. Попытка установить его выше приведет к невозможности запуска Trino.

Integer.MAX_VALUE

delta.max-split-size

Задает наибольший data size для одного участка чтения, назначаемого рабочему узлу после обработки max-initial-splits. Также можно использовать соответствующее свойство сеанса каталога <catalog-name>.max_split_size.

128MB

delta.minimum-assigned-split-weight

Десятичное значение в диапазоне (0, 1], используемое как минимум для весов, назначаемых каждому split. Низкое значение может повысить производительность для таблиц с маленькими файлами. Более высокое значение может улучшить производительность запросов с сильно перекошенными агрегациями или соединениями.

0.05

delta.projection-pushdown-enabled

Читать только проецируемые поля из столбцов-строк при выполнении запросов SELECT.

true

delta.query-partition-filter-required

Установите true, чтобы требовать от запроса использования фильтра секции. Для временного использования в конкретном каталоге можно использовать свойство сеанса каталога query_partition_filter_required.

false

Кэш файловой системы#

Коннектор поддерживает настройку и использование кэширования файловой системы.

В следующей таблице описаны свойства кэша файловой системы, специфичные для коннектора Delta Lake.

Свойства конфигурации кэша файловой системы Delta Lake#

Имя свойства

Описание

По умолчанию

delta.fs.cache.disable-transaction-log-caching

Установите true, чтобы отключить кэширование каталога _delta_log для таблиц Delta. Это полезно в случаях, когда таблицы Delta удаляются и создаются заново, а файлы внутри каталога журнала транзакций перезаписываются и не могут безопасно кэшироваться. Действует только при fs.cache.enabled=true.

false