Коннектор Kafka#

Этот коннектор позволяет использовать топики Apache Kafka как таблицы в Trino. Каждое сообщение представляется как строка в Trino.

Топики могут быть активными. Строки появляются по мере поступления данных и исчезают при удалении сегментов. Это может приводить к неожиданному поведению, если обращаться к одной и той же таблице несколько раз в одном запросе, например при self join.

Коннектор параллельно читает и записывает данные сообщений из топиков Kafka на рабочих узлах, что дает существенный выигрыш в производительности. Размер наборов данных для такого распараллеливания настраивается и может быть адаптирован под конкретные потребности.

См. Учебное руководство по коннектору Kafka.

Требования#

Для подключения к Kafka требуется:

  • брокер Kafka версии 3.3 или выше (с включенным KRaft);

  • сетевой доступ от координатора и рабочих узлов Trino к узлам Kafka. Порт по умолчанию — 9092.

При использовании декодера Protobuf с поставщиком описаний таблиц Confluent необходимо выполнить дополнительные шаги:

  • Скопируйте JAR-файлы kafka-protobuf-provider и kafka-protobuf-types из Confluent для версии Confluent 8.1.1 в каталог плагина коннектора Kafka (<install directory>/plugin/kafka) на всех узлах кластера. Каталог плагина зависит от способа Установка.

  • Копируя и используя эти JAR-файлы, вы соглашаетесь с условиями Confluent Community License Agreement, на которых Confluent предоставляет их.

Эти шаги не требуются, если вы не используете Protobuf и поставщик описаний таблиц Confluent.

Конфигурация#

Чтобы настроить коннектор Kafka, создайте файл свойств каталога etc/catalog/example.properties со следующим содержимым, заменив свойства подходящими значениями.

В некоторых случаях, например при использовании специализированных методов аутентификации, для доступа к кластеру Kafka необходимо указать дополнительные свойства клиента Kafka. Для этого добавьте свойство kafka.config.resources, которое ссылается на файлы конфигурации Kafka. Обратите внимание, что конфигурации могут быть переопределены, если они явно заданы в kafka.properties:

connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port
kafka.config.resources=/etc/kafka-configuration.properties

Несколько кластеров Kafka#

Можно создать столько каталогов, сколько требуется. Поэтому, если у вас есть дополнительные кластеры Kafka, просто добавьте еще один файл свойств в etc/catalog с другим именем, убедившись, что оно заканчивается на .properties. Например, если назвать файл свойств sales.properties, Trino создаст каталог sales с использованием настроенного коннектора.

Уровни журналирования#

Журналирование потребителя Kafka может быть подробным и засорять журналы Trino. Чтобы снизить уровень журналирования, просто добавьте следующее в etc/log.properties:

org.apache.kafka=WARN

Свойства конфигурации#

Доступны следующие свойства конфигурации:

Имя свойства

Описание

kafka.default-schema

Имя схемы по умолчанию для таблиц.

kafka.nodes

Список узлов в кластере Kafka.

kafka.buffer-size

Размер буфера чтения Kafka.

kafka.hide-internal-columns

Управляет тем, входят ли внутренние столбцы в схему таблицы.

kafka.internal-column-prefix

Префикс внутренних столбцов; по умолчанию _.

kafka.messages-per-split

Число сообщений, обрабатываемых каждым split Trino; по умолчанию 100000.

kafka.protobuf-any-support-enabled

Включает поддержку кодирования типов Protobuf any в JSON, если установить свойство в true; по умолчанию false.

kafka.timestamp-upper-bound-force-push-down-enabled

Управляет тем, включено ли проталкивание верхней границы timestamp для топиков, использующих режим CreateTime.

kafka.security-protocol

Протокол безопасности для подключения к кластеру Kafka; по умолчанию PLAINTEXT.

kafka.ssl.keystore.location

Расположение файла keystore.

kafka.ssl.keystore.password

Пароль для файла keystore.

kafka.ssl.keystore.type

Формат файла keystore; по умолчанию JKS.

kafka.ssl.truststore.location

Расположение файла truststore.

kafka.ssl.truststore.password

Пароль для файла truststore.

kafka.ssl.truststore.type

Формат файла truststore; по умолчанию JKS.

kafka.ssl.key.password

Пароль для закрытого ключа в файле keystore.

kafka.ssl.endpoint-identification-algorithm

Алгоритм идентификации конечной точки, используемый клиентами для проверки имени хоста сервера; по умолчанию https.

kafka.config.resources

Разделенный запятыми список файлов конфигурации клиента Kafka. Эти файлы должны существовать на машинах, где работает Trino. Указывайте только если это действительно необходимо для доступа к Kafka. Пример: /etc/kafka-configuration.properties.

Кроме того, необходимо настроить схему таблицы и использование реестра схем с соответствующими свойствами.

kafka.default-schema#

Определяет схему, содержащую все таблицы, которые были заданы без квалифицирующего имени схемы.

Свойство необязательно; значение по умолчанию — default.

kafka.nodes#

Разделенный запятыми список пар hostname:port для узлов данных Kafka.

Свойство обязательно; значения по умолчанию нет, и должен быть задан хотя бы один узел.

Note

Trino все равно должен иметь возможность подключаться ко всем узлам кластера, даже если здесь указан только поднабор, поскольку файлы сегментов могут находиться только на конкретном узле.

kafka.buffer-size#

Размер внутреннего буфера данных для чтения данных из Kafka. Буфер данных должен вмещать хотя бы одно сообщение, а в идеале — много сообщений. Для каждого рабочего узла и узла данных выделяется один буфер данных.

Свойство необязательно; значение по умолчанию — 64kb.

kafka.timestamp-upper-bound-force-push-down-enabled#

Предикат верхней границы для столбца _timestamp проталкивается только для топиков, использующих режим LogAppendTime.

Для топиков, использующих режим CreateTime, проталкивание верхней границы нужно явно включить через свойство конфигурации kafka.timestamp-upper-bound-force-push-down-enabled или свойство сеанса timestamp_upper_bound_force_push_down_enabled.

Свойство необязательно; значение по умолчанию — false.

kafka.hide-internal-columns#

Помимо столбцов данных, заданных в файле описания таблицы, коннектор поддерживает ряд дополнительных столбцов для каждой таблицы. Если эти столбцы скрыты, их по-прежнему можно использовать в запросах, но они не отображаются в DESCRIBE <table-name> или SELECT *.

Свойство необязательно; значение по умолчанию — true.

kafka.security-protocol#

Протокол, используемый для взаимодействия с брокерами. Допустимые значения: PLAINTEXT, SSL.

Свойство необязательно; значение по умолчанию — PLAINTEXT.

kafka.ssl.keystore.location#

Расположение файла keystore, используемого для подключения к кластеру Kafka.

Свойство необязательно.

kafka.ssl.keystore.password#

Пароль для файла keystore, используемого для подключения к кластеру Kafka.

Свойство необязательно, но обязательно, если задано kafka.ssl.keystore.location.

kafka.ssl.keystore.type#

Формат файла keystore. Допустимые значения: JKS, PKCS12.

Свойство необязательно; значение по умолчанию — JKS.

kafka.ssl.truststore.location#

Расположение файла truststore, используемого для подключения к кластеру Kafka.

Свойство необязательно.

kafka.ssl.truststore.password#

Пароль для файла truststore, используемого для подключения к кластеру Kafka.

Свойство необязательно, но обязательно, если задано kafka.ssl.truststore.location.

kafka.ssl.truststore.type#

Формат файла truststore. Допустимые значения: JKS, PKCS12.

Свойство необязательно; значение по умолчанию — JKS.

kafka.ssl.key.password#

Пароль для закрытого ключа в файле keystore, используемого для подключения к кластеру Kafka.

Свойство необязательно. Оно требуется клиентам только если настроена двусторонняя аутентификация, то есть ssl.client.auth=required.

kafka.ssl.endpoint-identification-algorithm#

Алгоритм идентификации конечной точки, используемый клиентами для проверки имени хоста сервера при подключении к кластеру Kafka. Kafka по умолчанию использует https. Используйте disabled, чтобы отключить проверку имени хоста сервера.

Свойство необязательно; значение по умолчанию — https.

Внутренние столбцы#

Префикс внутренних столбцов настраивается свойством конфигурации kafka.internal-column-prefix; по умолчанию используется _. Другой префикс влияет на имена внутренних столбцов в следующих разделах. Например, значение internal_ меняет имя столбца идентификатора секции с _partition_id на internal_partition_id.

Для каждой заданной таблицы коннектор поддерживает следующие столбцы:

Имя столбца

Тип

Описание

_partition_id

BIGINT

Идентификатор секции Kafka, содержащей эту строку.

_partition_offset

BIGINT

Смещение в секции Kafka для этой строки.

_segment_start

BIGINT

Наименьшее смещение в сегменте (включительно), содержащем эту строку. Это смещение относится к конкретной секции.

_segment_end

BIGINT

Наибольшее смещение в сегменте (исключительно), содержащем эту строку. Смещение относится к конкретной секции. Это то же значение, что и _segment_start следующего сегмента, если он существует.

_segment_count

BIGINT

Текущий порядковый номер строки внутри сегмента. Для некомпактированного топика _segment_start + _segment_count равно _partition_offset.

_message_corrupt

BOOLEAN

Истина, если декодер не смог декодировать сообщение для этой строки. Если значение истинно, столбцы данных, сопоставленные из сообщения, следует считать недействительными.

_message

VARCHAR

Байты сообщения как строка в кодировке UTF-8. Полезно только для текстового топика.

_message_length

BIGINT

Число байтов в сообщении.

_headers

map(VARCHAR, array(VARBINARY))

Заголовки сообщения, где значения с одинаковым ключом сгруппированы в массив.

_key_corrupt

BOOLEAN

Истина, если декодер ключа не смог декодировать ключ для этой строки. Если значение истинно, столбцы данных, сопоставленные из ключа, следует считать недействительными.

_key

VARCHAR

Байты ключа как строка в кодировке UTF-8. Полезно только для текстовых ключей.

_key_length

BIGINT

Число байтов в ключе.

_timestamp

TIMESTAMP

Метка времени сообщения.

Для таблиц без файла определения таблицы столбцы _key_corrupt и _message_corrupt всегда имеют значение false.

Схема таблицы и использование реестра схем#

Схема таблицы для сообщений может быть передана коннектору через конфигурационный файл или реестр схем. Она также предоставляет механизм, с помощью которого коннектор обнаруживает таблицы.

Необходимо настроить поставщик с помощью свойства kafka.table-description-supplier, установив его в FILE или CONFLUENT. У каждого поставщика описаний таблиц есть отдельный набор свойств конфигурации.

Дополнительные сведения см. в следующих подразделах. Поставщик описаний таблиц FILE используется по умолчанию, а значение не чувствительно к регистру.

Файловый поставщик описаний таблиц#

Чтобы использовать файловый поставщик описаний таблиц, свойство kafka.table-description-supplier должно быть установлено в FILE, что и является значением по умолчанию.

Кроме того, необходимо задать kafka.table-names и kafka.table-description-dir, как описано в следующих разделах.

kafka.table-names#

Разделенный запятыми список всех таблиц, предоставляемых этим каталогом. Имя таблицы может быть неквалифицированным (простым именем) и помещается в схему по умолчанию (см. ниже), либо может быть квалифицировано именем схемы (<schema-name>.<table-name>).

Для каждой заданной здесь таблицы может существовать файл описания таблицы (см. ниже). Если файла описания таблицы нет, имя таблицы используется как имя топика в Kafka, а столбцы данных в таблицу не сопоставляются. Таблица все равно содержит все внутренние столбцы (см. ниже).

Свойство обязательно; значения по умолчанию нет, и должна быть задана хотя бы одна таблица.

kafka.table-description-dir#

Ссылается на папку внутри развертывания Trino, содержащую один или несколько JSON-файлов (должны заканчиваться на .json) с файлами описаний таблиц.

Свойство необязательно; значение по умолчанию — etc/kafka.

Файлы определения таблиц#

Kafka хранит топики только как байтовые сообщения и оставляет производителям и потребителям определение того, как сообщение следует интерпретировать. Для Trino эти данные должны быть сопоставлены со столбцами, чтобы их можно было запрашивать.

Note

Для текстовых топиков, содержащих JSON-данные, вполне можно не использовать файлы определения таблиц, а вместо этого применять Функции и операторы JSON Trino для разбора столбца _message, содержащего байты, сопоставленные в строку UTF-8. Это неудобно и усложняет написание SQL-запросов. Такой подход работает только при чтении данных.

Файл определения таблицы состоит из JSON-определения таблицы. Имя файла может быть произвольным, но должно заканчиваться на .json. Поместите файл в каталог, настроенный свойством kafka.table-description-dir. Файл определения таблицы должен быть доступен со всех узлов Trino.

{
    "tableName": ...,
    "schemaName": ...,
    "topicName": ...,
    "key": {
        "dataFormat": ...,
        "fields": [
            ...
        ]
    },
    "message": {
        "dataFormat": ...,
        "fields": [
            ...
       ]
    }
}

Поле

Обязательность

Тип

Описание

tableName

обязательно

string

Имя таблицы Trino, заданное этим файлом.

schemaName

необязательно

string

Схема, содержащая таблицу. Если опущено, используется имя схемы по умолчанию.

topicName

обязательно

string

Сопоставляемый топик Kafka.

key

необязательно

JSON object

Определения полей для столбцов данных, сопоставленных с ключом сообщения.

message

необязательно

JSON object

Определения полей для столбцов данных, сопоставленных с самим сообщением.

Ключ и сообщение в Kafka#

Начиная с Kafka 0.8, каждое сообщение в топике может иметь необязательный ключ. Файл определения таблицы содержит секции для ключа и сообщения, чтобы сопоставить данные со столбцами таблицы.

Каждое из полей key и message в определении таблицы является объектом JSON, который должен содержать два поля:

Поле

Обязательность

Тип

Описание

dataFormat

обязательно

string

Выбирает декодер для этой группы полей.

fields

обязательно

JSON array

Список определений полей. Каждое определение поля создает новый столбец в таблице Trino.

Каждое определение поля является объектом JSON:

{
    "name": ...,
    "type": ...,
    "dataFormat": ...,
    "mapping": ...,
    "formatHint": ...,
    "hidden": ...,
    "comment": ...
}

Поле

Обязательность

Тип

Описание

name

обязательно

string

Имя столбца в таблице Trino.

type

обязательно

string

Тип Trino для столбца.

dataFormat

необязательно

string

Выбирает декодер столбца для этого поля. По умолчанию используется декодер по умолчанию для формата данных строки и типа столбца.

dataSchema

необязательно

string

Путь или URL, где находится схема Avro. Используется только для декодера Avro.

mapping

необязательно

string

Информация сопоставления для столбца. Зависит от декодера, см. ниже.

formatHint

необязательно

string

Задает подсказку формата для конкретного столбца в декодере столбца.

hidden

необязательно

boolean

Скрывает столбец из DESCRIBE <table name> и SELECT *. По умолчанию — false.

comment

необязательно

string

Добавляет комментарий к столбцу, который показывается в DESCRIBE <table name>.

Ограничений на число описаний полей для ключа или сообщения нет.

Поставщик описаний таблиц Confluent#

Поставщик описаний таблиц Confluent использует Confluent Schema Registry для обнаружения определений таблиц. Он протестирован только для работы с Confluent Schema Registry.

Преимущества поставщика описаний таблиц Confluent по сравнению с файловым поставщиком:

  • Новые таблицы можно задавать без перезапуска кластера.

  • Обновления схем обнаруживаются автоматически.

  • Не нужно определять таблицы вручную.

  • Некоторые специфичные для Protobuf типы, такие как oneof и any, поддерживаются и сопоставляются с JSON.

При использовании декодера Protobuf с поставщиком описаний таблиц Confluent требуются дополнительные шаги. Подробнее см. Требования.

Чтобы использовать реестр схем, установите kafka.table-description-supplier в CONFLUENT. Также необходимо настроить дополнительные свойства из следующей таблицы:

Note

Вставки не поддерживаются, а единственный поддерживаемый формат данных — AVRO.

Свойства поставщика описаний таблиц Confluent#

Имя свойства

Описание

Значение по умолчанию

kafka.confluent-schema-registry-url

Разделенный запятыми список URL-адресов реестра схем Confluent. Например: http://schema-registry-1.example.org:8081,http://schema-registry-2.example.org:8081

kafka.confluent-schema-registry-client-cache-size

Максимальное число subject, которые могут храниться в локальном кэше. Кэш хранит схемы локально по subjectId и предоставляется клиентом Confluent CachingSchemaRegistry.

1000

kafka.empty-field-strategy

Avro допускает пустые поля struct, но в Trino это не разрешено. Есть три стратегии обработки пустых полей struct:

  • IGNORE — игнорировать struct без полей. Это распространяется на родительские элементы. Например, массив struct без полей игнорируется.

  • FAIL — завершить запрос ошибкой, если определен struct без полей.

  • MARK — добавить поле-маркер с именем $empty_field_marker, которое имеет тип boolean и значение null. Это может быть полезно, если struct представляет поле-маркер.

Это также можно изменить через свойство сеанса empty_field_strategy.

IGNORE

kafka.confluent-subjects-cache-refresh-interval

Интервал обновления списка subject и определения схемы для subject в кэше subject.

1s

Сопоставление subject Confluent с именем таблицы#

Стратегия именования subject определяет, как subject разрешается из имени таблицы.

Стратегия по умолчанию — TopicNameStrategy, где subject ключа задается как <topic-name>-key, а subject значения — как <topic-name>-value. Если используются другие стратегии, заранее определить имя subject невозможно, поэтому его нужно указать вручную в имени таблицы.

Чтобы вручную указать subject ключа и значения, добавьте их к имени топика, например: <topic name>&key-subject=<key subject>&value-subject=<value subject>. Оба параметра, key-subject и value-subject, необязательны. Если ни один не указан, для разрешения имени subject по имени топика используется стандартная TopicNameStrategy. Обратите внимание, что сопоставление должно выполняться без учета регистра, поскольку идентификаторы не могут содержать символы верхнего регистра.

Обработка специфичных для Protobuf типов в поставщике описаний таблиц Confluent#

При использовании поставщика описаний таблиц Confluent в дополнение к обычно поддерживаемым типам поддерживаются следующие специфичные для Protobuf типы:

oneof#

Схемы Protobuf, содержащие поля oneof, сопоставляются с полем JSON в Trino.

Например, дана следующая схема Protobuf:

syntax = "proto3";

message schema {
    oneof test_oneof_column {
        string string_column = 1;
        uint32 integer_column = 2;
        uint64 long_column = 3;
        double double_column = 4;
        float float_column = 5;
        bool boolean_column = 6;
    }
}

Соответствующая строка Trino содержит поле JSON test_oneof_column с объектом JSON, включающим один ключ. Значение ключа соответствует имени присутствующего типа oneof.

В приведенном выше примере, если сообщение Protobuf содержит test_oneof_column, где для string_column установлено значение Trino, то соответствующая строка Trino включает столбец test_oneof_column со значением JSON '{"string_column": "Trino"}'.

Вставки Kafka#

Коннектор Kafka поддерживает использование операторов INSERT для записи данных в топик Kafka. Данные столбцов таблицы сопоставляются с сообщениями Kafka, как задано в файле определения таблицы. Для кодирования ключа и сообщения поддерживаются пять форматов данных:

У каждого из этих форматов данных есть кодировщик, который сопоставляет значения столбцов с байтами, отправляемыми в топик Kafka.

Trino поддерживает для производителей Kafka доставку at-least-once. Это означает, что сообщения гарантированно отправляются в топики Kafka как минимум один раз. Если истекает время ожидания подтверждения от производителя или производитель получает ошибку, он может повторить отправку сообщения. Это может привести к отправке дублирующего сообщения в топик Kafka.

Коннектор Kafka не позволяет пользователю определить, какая секция будет использоваться как целевая для сообщения. Если сообщение включает ключ, производитель использует хеш-алгоритм для выбора целевой секции сообщения. Один и тот же ключ всегда назначается одной и той же секции.

Сопоставление типов#

Поскольку Trino и Kafka поддерживают типы, которых нет в другой системе, этот коннектор сопоставляет некоторые типы при чтении (декодировании) или записи (кодировании) данных. Сопоставление типов зависит от формата (Raw, Avro, JSON, CSV).

Кодирование строк#

Кодирование требуется для записи данных; оно определяет, как столбцы таблиц Trino сопоставляются с ключами Kafka и данными сообщений.

Коннектор Kafka содержит следующие кодировщики:

  • raw-кодировщик — столбцы таблицы сопоставляются с сообщением Kafka как необработанные байты.

  • CSV-кодировщик — сообщение Kafka форматируется как значения, разделенные запятыми.

  • JSON-кодировщик — столбцы таблицы сопоставляются с полями JSON.

  • Avro-кодировщик — столбцы таблицы сопоставляются с полями Avro на основе схемы Avro.

  • Protobuf-кодировщик — столбцы таблицы сопоставляются с полями Protobuf на основе схемы Protobuf.

Note

Чтобы кодировщик работал, должен быть задан файл определения таблицы.

Raw-кодировщик#

Raw-кодировщик форматирует столбцы таблицы как необработанные байты, используя информацию сопоставления, указанную в файле определения таблицы.

Поддерживаются следующие атрибуты полей:

  • dataFormat — задает ширину типа данных столбца.

  • type — тип данных Trino.

  • mapping — начальная и необязательная конечная позиция байтов для преобразования, заданная как start или start:end.

Атрибут dataFormat выбирает число преобразуемых байтов. Если он отсутствует, предполагается BYTE. Все значения знаковые.

Поддерживаемые значения:

  • BYTE — один байт.

  • SHORT — два байта (big-endian).

  • INT — четыре байта (big-endian).

  • LONG — восемь байтов (big-endian).

  • FLOAT — четыре байта (формат IEEE 754, big-endian).

  • DOUBLE — восемь байтов (формат IEEE 754, big-endian).

Атрибут type определяет тип данных Trino.

В зависимости от типа данных Trino поддерживаются разные значения dataFormat:

Тип данных Trino

Значения dataFormat

BIGINT

BYTE, SHORT, INT, LONG

INTEGER

BYTE, SHORT, INT

SMALLINT

BYTE, SHORT

TINYINT

BYTE

REAL

FLOAT

DOUBLE

FLOAT, DOUBLE

BOOLEAN

BYTE, SHORT, INT, LONG

VARCHAR / VARCHAR(x)

BYTE

Другие типы не поддерживаются.

Атрибут mapping задает диапазон байтов в ключе или сообщении, используемый для кодирования.

Note

Для типов VARCHAR должны быть заданы начальная и конечная позиции. Иначе невозможно определить, сколько байтов содержит сообщение. Информация сопоставления raw-формата статична и не может динамически изменяться под переменную ширину некоторых типов данных Trino.

Если задана только начальная позиция:

  • Для типов фиксированной ширины используется соответствующее число байтов для указанного dataFormat (см. выше).

Если заданы начальная и конечная позиции:

  • Для типов фиксированной ширины размер должен быть равен числу байтов, используемых указанным dataFormat.

  • Используются все байты между начальной позицией (включительно) и конечной позицией (исключительно).

Note

Чтобы кодирование работало, все сопоставления должны включать начальную позицию.

Кодирование числовых типов данных (BIGINT, INTEGER, SMALLINT, TINYINT, REAL, DOUBLE) просто. Все числовые типы используют big-endian. Типы с плавающей точкой используют формат IEEE 754.

Пример определения raw-поля в файле определения таблицы для сообщения Kafka:

{
  "tableName": "example_table_name",
  "schemaName": "example_schema_name",
  "topicName": "example_topic_name",
  "key": { "..." },
  "message": {
    "dataFormat": "raw",
    "fields": [
      {
        "name": "field1",
        "type": "BIGINT",
        "dataFormat": "LONG",
        "mapping": "0"
      },
      {
        "name": "field2",
        "type": "INTEGER",
        "dataFormat": "INT",
        "mapping": "8"
      },
      {
        "name": "field3",
        "type": "SMALLINT",
        "dataFormat": "LONG",
        "mapping": "12"
      },
      {
        "name": "field4",
        "type": "VARCHAR(6)",
        "dataFormat": "BYTE",
        "mapping": "20:26"
      }
    ]
  }
}

Столбцы следует задавать в том же порядке, в котором они сопоставлены. Между сопоставлениями столбцов не должно быть пропусков или пересечений. Ширина столбца, заданная сопоставлением, должна быть эквивалентна ширине dataFormat для всех типов, кроме типов переменной ширины.

Пример запроса вставки для приведенного выше определения таблицы:

INSERT INTO example_raw_table (field1, field2, field3, field4)
  VALUES (123456789, 123456, 1234, 'abcdef');

Note

Raw-кодировщик требует заранее известного размера поля, включая типы данных переменной ширины, такие как VARCHAR. Он также запрещает вставку значений, которые не соответствуют ширине, заданной в файле определения таблицы. Это нужно для корректности: иначе более длинные значения будут усечены, а более короткие будут прочитаны обратно некорректно из-за неопределенного символа заполнения.

CSV-кодировщик#

CSV-кодировщик форматирует значения каждой строки как строку comma-separated-values (CSV) в кодировке UTF-8. CSV-строка форматируется с запятой , в качестве разделителя столбцов.

Для каждого поля должны быть заданы атрибуты type и mapping:

  • type — тип данных Trino.

  • mapping — целочисленный индекс столбца в CSV-строке (первый столбец — 0, второй — 1 и так далее).

dataFormat и formatHint не поддерживаются и должны быть опущены.

CSV-кодировщик поддерживает следующие типы данных Trino:

  • BIGINT

  • INTEGER

  • SMALLINT

  • TINYINT

  • DOUBLE

  • REAL

  • BOOLEAN

  • VARCHAR / VARCHAR(x)

Другие типы не поддерживаются.

Значения столбцов преобразуются в строки перед форматированием как CSV-строка.

Ниже приведен пример определения CSV-полей в файле определения таблицы для сообщения Kafka:

{
  "tableName": "example_table_name",
  "schemaName": "example_schema_name",
  "topicName": "example_topic_name",
  "key": { "..." },
  "message": {
    "dataFormat": "csv",
    "fields": [
      {
        "name": "field1",
        "type": "BIGINT",
        "mapping": "0"
      },
      {
        "name": "field2",
        "type": "VARCHAR",
        "mapping": "1"
      },
      {
        "name": "field3",
        "type": "BOOLEAN",
        "mapping": "2"
      }
    ]
  }
}

Пример запроса вставки для приведенного выше определения таблицы:

INSERT INTO example_csv_table (field1, field2, field3)
  VALUES (123456789, 'example text', TRUE);

JSON-кодировщик#

JSON-кодировщик сопоставляет столбцы таблицы с полями JSON, заданными в файле определения таблицы, согласно RFC 4627.

Для полей поддерживаются следующие атрибуты:

  • type — тип данных Trino для столбца.

  • mapping — разделенный косыми чертами список имен полей для выбора поля из объекта JSON.

  • dataFormat — имя форматтера. Обязательно для временных типов.

  • formatHint — шаблон для форматирования временных данных. Используется только с форматтером custom-date-time.

JSON-кодировщик поддерживает следующие типы данных Trino:

  • BIGINT

  • INTEGER

  • SMALLINT

  • TINYINT

  • DOUBLE

  • REAL

  • BOOLEAN

  • VARCHAR

  • DATE

  • TIME

  • TIME WITH TIME ZONE

  • TIMESTAMP

  • TIMESTAMP WITH TIME ZONE

Другие типы не поддерживаются.

Для временных данных доступны следующие dataFormats:

  • iso8601

  • rfc2822

  • custom-date-time — форматирует временные данные согласно шаблону Joda Time, заданному полем formatHint.

  • milliseconds-since-epoch

  • seconds-since-epoch

Все временные данные в Kafka поддерживают точность до миллисекунд.

Следующая таблица определяет, какие временные типы данных поддерживаются dataFormats:

Тип данных Trino

Правила декодирования

DATE

custom-date-time, iso8601

TIME

custom-date-time, iso8601, milliseconds-since-epoch, seconds-since-epoch

TIME WITH TIME ZONE

custom-date-time, iso8601

TIMESTAMP

custom-date-time, iso8601, rfc2822, milliseconds-since-epoch, seconds-since-epoch

TIMESTAMP WITH TIME ZONE

custom-date-time, iso8601, rfc2822, milliseconds-since-epoch, seconds-since-epoch

Ниже приведен пример определения JSON-полей в файле определения таблицы для сообщения Kafka:

{
  "tableName": "example_table_name",
  "schemaName": "example_schema_name",
  "topicName": "example_topic_name",
  "key": { "..." },
  "message": {
    "dataFormat": "json",
    "fields": [
      {
        "name": "field1",
        "type": "BIGINT",
        "mapping": "field1"
      },
      {
        "name": "field2",
        "type": "VARCHAR",
        "mapping": "field2"
      },
      {
        "name": "field3",
        "type": "TIMESTAMP",
        "dataFormat": "custom-date-time",
        "formatHint": "yyyy-dd-MM HH:mm:ss.SSS",
        "mapping": "field3"
      }
    ]
  }
}

Ниже показан пример запроса вставки для предыдущего определения таблицы:

INSERT INTO example_json_table (field1, field2, field3)
  VALUES (123456789, 'example text', TIMESTAMP '2020-07-15 01:02:03.456');

Avro-кодировщик#

Avro-кодировщик сериализует строки в записи Avro, как задано схемой Avro. Trino не поддерживает кодирование Avro без схемы.

Note

Схема Avro кодируется вместе со значениями столбцов таблицы в каждом сообщении Kafka.

Чтобы использовать Avro-кодировщик, в файле определения таблицы должен быть задан dataSchema. Он указывает расположение файла схемы Avro для ключа или сообщения.

Файлы схем Avro можно получать по HTTP или HTTPS с удаленного сервера с таким синтаксисом:

"dataSchema": "http://example.org/schema/avro_data.avsc"

Локальные файлы должны быть доступны на всех узлах Trino и использовать абсолютный путь, например:

"dataSchema": "/usr/local/schema/avro_data.avsc"

Поддерживаются следующие атрибуты полей:

  • name — имя столбца в таблице Trino.

  • type — тип данных Trino для столбца.

  • mapping — разделенный косыми чертами список имен полей для выбора поля из схемы Avro. Если поле, указанное в mapping, отсутствует в исходной схеме Avro, операция записи завершается ошибкой.

В следующей таблице перечислены поддерживаемые типы данных Trino, которые можно использовать в type для эквивалентного типа поля Avro.

Тип данных Trino

Тип данных Avro

BIGINT

INT, LONG

REAL

FLOAT

DOUBLE

FLOAT, DOUBLE

BOOLEAN

BOOLEAN

VARCHAR / VARCHAR(x)

STRING

Другие типы не поддерживаются.

Следующий пример показывает определение поля Avro в файле определения таблицы для сообщения Kafka:

{
  "tableName": "example_table_name",
  "schemaName": "example_schema_name",
  "topicName": "example_topic_name",
  "key": { "..." },
  "message":
  {
    "dataFormat": "avro",
    "dataSchema": "/avro_message_schema.avsc",
    "fields":
    [
      {
        "name": "field1",
        "type": "BIGINT",
        "mapping": "field1"
      },
      {
        "name": "field2",
        "type": "VARCHAR",
        "mapping": "field2"
      },
      {
        "name": "field3",
        "type": "BOOLEAN",
        "mapping": "field3"
      }
    ]
  }
}

В следующем примере показано определение схемы Avro для предыдущего определения таблицы:

{
  "type" : "record",
  "name" : "example_avro_message",
  "namespace" : "io.trino.plugin.kafka",
  "fields" :
  [
    {
      "name":"field1",
      "type":["null", "long"],
      "default": null
    },
    {
      "name": "field2",
      "type":["null", "string"],
      "default": null
    },
    {
      "name":"field3",
      "type":["null", "boolean"],
      "default": null
    }
  ],
  "doc:" : "A basic avro schema"
}

Пример запроса вставки для предыдущего определения таблицы:

INSERT INTO example_avro_table (field1, field2, field3)

VALUES (123456789, ‘example text’, FALSE);

Protobuf-кодировщик#

Protobuf-кодировщик сериализует строки в Protobuf DynamicMessages, как задано схемой Protobuf.

Note

Схема Protobuf кодируется вместе со значениями столбцов таблицы в каждом сообщении Kafka.

Чтобы использовать Protobuf-кодировщик, в файле определения таблицы должен быть задан dataSchema. Он указывает расположение файла proto для ключа или сообщения.

Файлы схем Protobuf можно получать по HTTP или HTTPS с удаленного сервера с таким синтаксисом:

"dataSchema": "http://example.org/schema/schema.proto"

Локальные файлы должны быть доступны на всех узлах Trino и использовать абсолютный путь, например:

"dataSchema": "/usr/local/schema/schema.proto"

Поддерживаются следующие атрибуты полей:

  • name — имя столбца в таблице Trino.

  • type — тип Trino для столбца.

  • mapping — разделенный косыми чертами список имен полей для выбора поля из схемы Protobuf. Если поле, указанное в mapping, отсутствует в исходной схеме Protobuf, операция записи завершается ошибкой.

В следующей таблице перечислены поддерживаемые типы данных Trino, которые можно использовать в type для эквивалентного типа поля Protobuf.

Тип данных Trino

Тип данных Protobuf

BOOLEAN

bool

INTEGER

int32, uint32, sint32, fixed32, sfixed32

BIGINT

int64, uint64, sint64, fixed64, sfixed64

DOUBLE

double

REAL

float

VARCHAR / VARCHAR(x)

string

VARBINARY

bytes

ROW

Message

ARRAY

Тип Protobuf с полем repeated

MAP

Map

TIMESTAMP

Timestamp, предопределенный в timestamp.proto

Следующий пример показывает определение поля Protobuf в файле определения таблицы для сообщения Kafka:

{
  "tableName": "example_table_name",
  "schemaName": "example_schema_name",
  "topicName": "example_topic_name",
  "key": { "..." },
  "message":
  {
    "dataFormat": "protobuf",
    "dataSchema": "/message_schema.proto",
    "fields":
    [
      {
        "name": "field1",
        "type": "BIGINT",
        "mapping": "field1"
      },
      {
        "name": "field2",
        "type": "VARCHAR",
        "mapping": "field2"
      },
      {
        "name": "field3",
        "type": "BOOLEAN",
        "mapping": "field3"
      }
    ]
  }
}

В следующем примере показано определение схемы Protobuf для предыдущего определения таблицы:

syntax = "proto3";

message schema {
  uint64 field1 = 1 ;
  string field2 = 2;
  bool field3 = 3;
}

Пример запроса вставки для предыдущего определения таблицы:

INSERT INTO example_protobuf_table (field1, field2, field3)
  VALUES (123456789, 'example text', FALSE);

Декодирование строк#

Для ключа и сообщения используется декодер, который сопоставляет данные сообщения и ключа со столбцами таблицы.

Коннектор Kafka содержит следующие декодеры:

  • raw — сообщение Kafka не интерпретируется; диапазоны необработанных байтов сообщения сопоставляются со столбцами таблицы.

  • csv — сообщение Kafka интерпретируется как сообщение, разделенное запятыми, а поля сопоставляются со столбцами таблицы.

  • json — сообщение Kafka разбирается как JSON, а поля JSON сопоставляются со столбцами таблицы.

  • avro — сообщение Kafka разбирается на основе схемы Avro, а поля Avro сопоставляются со столбцами таблицы.

  • protobuf — сообщение Kafka разбирается на основе схемы Protobuf, а поля Protobuf сопоставляются со столбцами таблицы.

Note

Если файла определения таблицы для таблицы нет, используется декодер dummy, который не предоставляет никаких столбцов.

Raw-декодер#

Raw-декодер поддерживает чтение необработанных байтовых значений из сообщения или ключа Kafka и преобразование их в столбцы Trino.

Для полей поддерживаются следующие атрибуты:

  • dataFormat — выбирает ширину преобразуемого типа данных.

  • type — тип данных Trino. Список поддерживаемых типов данных см. в таблице далее в этом документе.

  • mapping<start>[:<end>] — начальная и конечная позиция байтов для преобразования (необязательно).

Атрибут dataFormat выбирает число преобразуемых байтов. Если он отсутствует, предполагается BYTE. Все значения знаковые.

Поддерживаются следующие значения:

  • BYTE — один байт.

  • SHORT — два байта (big-endian).

  • INT — четыре байта (big-endian).

  • LONG — восемь байтов (big-endian).

  • FLOAT — четыре байта (формат IEEE 754).

  • DOUBLE — восемь байтов (формат IEEE 754).

Атрибут type определяет тип данных Trino, на который сопоставляется значение.

В зависимости от типа Trino, назначенного столбцу, можно использовать разные значения dataFormat:

Тип данных Trino

Допустимые значения dataFormat

BIGINT

BYTE, SHORT, INT, LONG

INTEGER

BYTE, SHORT, INT

SMALLINT

BYTE, SHORT

TINYINT

BYTE

DOUBLE

DOUBLE, FLOAT

BOOLEAN

BYTE, SHORT, INT, LONG

VARCHAR / VARCHAR(x)

BYTE

Другие типы не поддерживаются.

Атрибут mapping задает диапазон байтов в ключе или сообщении, используемый для декодирования. Он может быть одним или двумя числами, разделенными двоеточием (<start>[:<end>]).

Если задана только начальная позиция:

  • Для типов фиксированной ширины столбец использует соответствующее число байтов для указанного dataFormat (см. выше).

  • При декодировании значения VARCHAR используются все байты от начальной позиции до конца сообщения.

Если заданы начальная и конечная позиции:

  • Для типов фиксированной ширины размер должен быть равен числу байтов, используемых указанным dataFormat.

  • Для VARCHAR используются все байты между начальной позицией (включительно) и конечной позицией (исключительно).

Если атрибут mapping не указан, это эквивалентно начальной позиции 0 и неуказанной конечной позиции.

Схема декодирования числовых типов данных (BIGINT, INTEGER, SMALLINT, TINYINT, DOUBLE) проста. Последовательность байтов считывается из входного сообщения и декодируется согласно одному из вариантов:

  • кодировка big-endian (для целочисленных типов);

  • формат IEEE 754 (для DOUBLE).

Длина декодированной последовательности байтов подразумевается значением dataFormat.

Для типа данных VARCHAR последовательность байтов интерпретируется согласно кодировке UTF-8.

CSV-декодер#

CSV-декодер преобразует байты, представляющие сообщение или ключ, в строку с использованием кодировки UTF-8, а затем интерпретирует результат как CSV-строку (comma-separated value).

Для полей должны быть заданы атрибуты type и mapping:

  • type — тип данных Trino. Список поддерживаемых типов данных см. в следующей таблице.

  • mapping — индекс поля в CSV-записи.

Атрибуты dataFormat и formatHint не поддерживаются и должны быть опущены.

В таблице ниже перечислены поддерживаемые типы Trino, которые можно использовать в type, и схема декодирования:

Тип данных Trino

Правила декодирования

BIGINT, INTEGER, SMALLINT, TINYINT

Декодируется с помощью Java Long.parseLong()

DOUBLE

Декодируется с помощью Java Double.parseDouble()

BOOLEAN

Последовательность символов “true” сопоставляется с true; остальные последовательности символов сопоставляются с false

VARCHAR, VARCHAR(x)

Используется как есть

Другие типы не поддерживаются.

JSON-декодер#

JSON-декодер преобразует байты, представляющие сообщение или ключ, в JSON согласно RFC 4627. Обратите внимание, что сообщение или ключ ДОЛЖНЫ преобразовываться в объект JSON, а не в массив или простой тип.

Для полей поддерживаются следующие атрибуты:

  • type — тип данных Trino для столбца.

  • dataFormat — декодер поля, используемый для столбца.

  • mapping — разделенный косыми чертами список имен полей для выбора поля из объекта JSON.

  • formatHint — только для custom-date-time.

JSON-декодер поддерживает несколько декодеров полей: _default используется для стандартных столбцов таблицы, а также доступны декодеры для типов на основе даты и времени.

В следующей таблице перечислены типы данных Trino, которые можно использовать в type, и соответствующие декодеры полей, задаваемые через атрибут dataFormat.

Тип данных Trino

Допустимые значения dataFormat

BIGINT, INTEGER, SMALLINT, TINYINT, DOUBLE, BOOLEAN, VARCHAR, VARCHAR(x)

Декодер поля по умолчанию (атрибут dataFormat опущен)

DATE

custom-date-time, iso8601

TIME

custom-date-time, iso8601, milliseconds-since-epoch, seconds-since-epoch

TIME WITH TIME ZONE

custom-date-time, iso8601

TIMESTAMP

custom-date-time, iso8601, rfc2822, milliseconds-since-epoch, seconds-since-epoch

TIMESTAMP WITH TIME ZONE

custom-date-time, iso8601, rfc2822, milliseconds-since-epoch seconds-since-epoch

Другие типы не поддерживаются.

Декодер поля по умолчанию#

Это стандартный декодер поля, поддерживающий все физические типы данных Trino. Значение поля преобразуется по правилам преобразования JSON в логическое, целочисленное, вещественное или строковое значение. Для столбцов, не основанных на дате или времени, следует использовать этот декодер.

Декодеры даты и времени#

Чтобы преобразовать значения из объектов JSON в столбцы Trino DATE, TIME, TIME WITH TIME ZONE, TIMESTAMP или TIMESTAMP WITH TIME ZONE, необходимо выбрать специальные декодеры с помощью атрибута dataFormat в определении поля.

  • iso8601 — текстовый, разбирает текстовое поле как метку времени ISO 8601.

  • rfc2822 — текстовый, разбирает текстовое поле как метку времени RFC 2822.

  • custom-date-time — текстовый, разбирает текстовое поле согласно шаблону формата Joda, указанному через атрибут formatHint. Шаблон формата должен соответствовать https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html.

  • milliseconds-since-epoch — числовой, интерпретирует текст или число как количество миллисекунд с начала эпохи.

  • seconds-since-epoch — числовой, интерпретирует текст или число как количество миллисекунд с начала эпохи.

Для типов данных TIMESTAMP WITH TIME ZONE и TIME WITH TIME ZONE, если информация о часовом поясе присутствует в декодированном значении, она используется как значение Trino. В противном случае часовой пояс результата устанавливается в UTC.

Avro-декодер#

Avro-декодер преобразует байты, представляющие сообщение или ключ в формате Avro, на основе схемы. Сообщение должно содержать встроенную схему Avro. Trino не поддерживает декодирование Avro без схемы.

Для ключа/сообщения при использовании декодера avro должен быть задан dataSchema. Он должен указывать расположение допустимого файла схемы Avro для сообщения, которое нужно декодировать. Это расположение может быть удаленным веб-сервером, например dataSchema: 'http://example.org/schema/avro_data.avsc', или локальной файловой системой, например dataSchema: '/usr/local/schema/avro_data.avsc'. Декодер завершится ошибкой, если это расположение недоступно с узла координатора Trino.

Для полей поддерживаются следующие атрибуты:

  • name — имя столбца в таблице Trino.

  • type — тип данных Trino для столбца.

  • mapping — разделенный косыми чертами список имен полей для выбора поля из схемы Avro. Если поле, указанное в mapping, отсутствует в исходной схеме Avro, операция чтения возвращает NULL.

В следующей таблице перечислены поддерживаемые типы Trino, которые можно использовать в type для эквивалентных типов полей Avro:

Тип данных Trino

Допустимый тип данных Avro

BIGINT

INT, LONG

DOUBLE

DOUBLE, FLOAT

BOOLEAN

BOOLEAN

VARCHAR / VARCHAR(x)

STRING

VARBINARY

FIXED, BYTES

ARRAY

ARRAY

MAP

MAP

Другие типы не поддерживаются.

Эволюция схем Avro#

Avro-декодер поддерживает эволюцию схем с обратной совместимостью. При обратной совместимости новая схема может использоваться для чтения данных Avro, созданных со старой схемой. Любое изменение в схеме Avro также должно быть отражено в файле определения топика Trino. Новые добавленные или переименованные поля должны иметь значение по умолчанию в файле схемы Avro.

Поведение эволюции схем:

  • Столбец добавлен в новой схеме: данные, созданные со старой схемой, дают значение по умолчанию, когда таблица использует новую схему.

  • Столбец удален в новой схеме: данные, созданные со старой схемой, больше не выводят данные из удаленного столбца.

  • Столбец переименован в новой схеме: это эквивалентно удалению столбца и добавлению нового, и данные, созданные со старой схемой, дают значение по умолчанию, когда таблица использует новую схему.

  • Тип столбца изменен в новой схеме: если приведение типа поддерживается Avro, выполняется преобразование. Для несовместимых типов возникает ошибка.

Protobuf-декодер#

Protobuf-декодер преобразует байты, представляющие сообщение или ключ в формате Protobuf, на основе схемы.

Для ключа/сообщения при использовании декодера protobuf должен быть задан dataSchema. Он указывает расположение допустимого файла proto для сообщения, которое нужно декодировать. Это расположение может быть удаленным веб-сервером, dataSchema: 'http://example.org/schema/schema.proto', или локальным файлом, dataSchema: '/usr/local/schema/schema.proto'. Декодер завершится ошибкой, если расположение недоступно с координатора.

Для полей поддерживаются следующие атрибуты:

  • name — имя столбца в таблице Trino.

  • type — тип данных Trino для столбца.

  • mapping — разделенный косыми чертами список имен полей для выбора поля из схемы Protobuf. Если поле, указанное в mapping, отсутствует в исходном файле proto, операция чтения возвращает NULL.

В следующей таблице перечислены поддерживаемые типы Trino, которые можно использовать в type для эквивалентных типов полей Protobuf:

Тип данных Trino

Допустимый тип данных Protobuf

BOOLEAN

bool

INTEGER

int32, uint32, sint32, fixed32, sfixed32

BIGINT

int64, uint64, sint64, fixed64, sfixed64

DOUBLE

double

REAL

float

VARCHAR / VARCHAR(x)

string

VARBINARY

bytes

ROW

Message

ARRAY

Тип Protobuf с полем repeated

MAP

Map

TIMESTAMP

Timestamp, предопределенный в timestamp.proto

JSON

oneof (только поставщик таблиц Confluent), Any

any#

Типы сообщений с полем Any содержат произвольное сериализованное сообщение в виде байтов и URL типа для разрешения типа этого сообщения со схемой file://, http:// или https://. Коннектор считывает содержимое URL, чтобы создать дескриптор типа для сообщения Any и преобразовать сообщение в JSON. Это поведение включается установкой kafka.protobuf-any-support-enabled в true.

Дескрипторы для каждого уникального URL кэшируются из соображений производительности, и любые изменения типа, возвращаемого URL, требуют перезапуска Trino.

Например, дана следующая схема Protobuf, определяющая MyMessage с тремя столбцами:

syntax = "proto3";

message MyMessage {
  string stringColumn = 1;
  uint32 integerColumn = 2;
  uint64 longColumn = 3;
}

И отдельная схема, использующая тип Any, который является упакованным сообщением указанного выше типа и допустимым URL:

syntax = "proto3";

import "google/protobuf/any.proto";

message schema {
    google.protobuf.Any any_message = 1;
}

Соответствующий столбец Trino называется any_message, имеет тип JSON и содержит JSON-сериализованное представление сообщения Protobuf:

{
  "@type":"file:///path/to/schemas/MyMessage",
  "longColumn":"493857959588286460",
  "numberColumn":"ONE",
  "stringColumn":"Trino"
}
Эволюция схем Protobuf#

Protobuf-декодер поддерживает эволюцию схем с обратной совместимостью. При обратной совместимости новая схема может использоваться для чтения данных Protobuf, созданных со старой схемой. Любое изменение в схеме Protobuf должно также быть отражено в файле определения топика.

Поведение эволюции схем:

  • Столбец добавлен в новой схеме: данные, созданные со старой схемой, дают значение по умолчанию, когда таблица использует новую схему.

  • Столбец удален в новой схеме: данные, созданные со старой схемой, больше не выводят данные из удаленного столбца.

  • Столбец переименован в новой схеме: это эквивалентно удалению столбца и добавлению нового, и данные, созданные со старой схемой, дают значение по умолчанию, когда таблица использует новую схему.

  • Тип столбца изменен в новой схеме: если приведение типа поддерживается Protobuf, выполняется преобразование. Для несовместимых типов возникает ошибка.

Ограничения Protobuf#
  • Protobuf Timestamp имеет наносекундную точность, но Trino поддерживает декодирование и кодирование с микросекундной точностью.

Поддержка SQL#

Коннектор предоставляет доступ на чтение и запись к данным и метаданным в таблицах Trino, заполняемых топиками Kafka. Дополнительные сведения см. в Декодирование строк.

Помимо глобально доступных операторов и операций чтения, коннектор поддерживает следующие возможности: