Коннектор Kafka#

Этот коннектор позволяет использовать топики Apache Kafka как таблицы в Trino. Каждое сообщение представляется как строка в Trino.
Топики могут быть активными. Строки появляются по мере поступления данных и исчезают при удалении сегментов. Это может приводить к неожиданному поведению, если обращаться к одной и той же таблице несколько раз в одном запросе, например при self join.
Коннектор параллельно читает и записывает данные сообщений из топиков Kafka на рабочих узлах, что дает существенный выигрыш в производительности. Размер наборов данных для такого распараллеливания настраивается и может быть адаптирован под конкретные потребности.
См. Учебное руководство по коннектору Kafka.
Требования#
Для подключения к Kafka требуется:
брокер Kafka версии 3.3 или выше (с включенным KRaft);
сетевой доступ от координатора и рабочих узлов Trino к узлам Kafka. Порт по умолчанию — 9092.
При использовании декодера Protobuf с поставщиком описаний таблиц Confluent необходимо выполнить дополнительные шаги:
Скопируйте JAR-файлы
kafka-protobuf-providerиkafka-protobuf-typesиз Confluent для версии Confluent 8.1.1 в каталог плагина коннектора Kafka (<install directory>/plugin/kafka) на всех узлах кластера. Каталог плагина зависит от способа Установка.Копируя и используя эти JAR-файлы, вы соглашаетесь с условиями Confluent Community License Agreement, на которых Confluent предоставляет их.
Эти шаги не требуются, если вы не используете Protobuf и поставщик описаний таблиц Confluent.
Конфигурация#
Чтобы настроить коннектор Kafka, создайте файл свойств каталога
etc/catalog/example.properties со следующим содержимым, заменив свойства
подходящими значениями.
В некоторых случаях, например при использовании специализированных методов
аутентификации, для доступа к кластеру Kafka необходимо указать дополнительные
свойства клиента Kafka. Для этого добавьте свойство kafka.config.resources,
которое ссылается на файлы конфигурации Kafka. Обратите внимание, что
конфигурации могут быть переопределены, если они явно заданы в
kafka.properties:
connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port
kafka.config.resources=/etc/kafka-configuration.properties
Несколько кластеров Kafka#
Можно создать столько каталогов, сколько требуется. Поэтому, если у вас есть
дополнительные кластеры Kafka, просто добавьте еще один файл свойств в
etc/catalog с другим именем, убедившись, что оно заканчивается на
.properties. Например, если назвать файл свойств sales.properties, Trino
создаст каталог sales с использованием настроенного коннектора.
Уровни журналирования#
Журналирование потребителя Kafka может быть подробным и засорять журналы
Trino. Чтобы снизить уровень журналирования, просто
добавьте следующее в etc/log.properties:
org.apache.kafka=WARN
Свойства конфигурации#
Доступны следующие свойства конфигурации:
Имя свойства |
Описание |
|---|---|
|
Имя схемы по умолчанию для таблиц. |
|
Список узлов в кластере Kafka. |
|
Размер буфера чтения Kafka. |
|
Управляет тем, входят ли внутренние столбцы в схему таблицы. |
|
Префикс внутренних столбцов; по умолчанию |
|
Число сообщений, обрабатываемых каждым split Trino; по умолчанию |
|
Включает поддержку кодирования типов Protobuf |
|
Управляет тем, включено ли проталкивание верхней границы timestamp для топиков, использующих режим |
|
Протокол безопасности для подключения к кластеру Kafka; по умолчанию |
|
Расположение файла keystore. |
|
Пароль для файла keystore. |
|
Формат файла keystore; по умолчанию |
|
Расположение файла truststore. |
|
Пароль для файла truststore. |
|
Формат файла truststore; по умолчанию |
|
Пароль для закрытого ключа в файле keystore. |
|
Алгоритм идентификации конечной точки, используемый клиентами для проверки имени хоста сервера; по умолчанию |
|
Разделенный запятыми список файлов конфигурации клиента Kafka. Эти файлы должны существовать на машинах, где работает Trino. Указывайте только если это действительно необходимо для доступа к Kafka. Пример: |
Кроме того, необходимо настроить схему таблицы и использование реестра схем с соответствующими свойствами.
kafka.default-schema#
Определяет схему, содержащую все таблицы, которые были заданы без квалифицирующего имени схемы.
Свойство необязательно; значение по умолчанию — default.
kafka.nodes#
Разделенный запятыми список пар hostname:port для узлов данных Kafka.
Свойство обязательно; значения по умолчанию нет, и должен быть задан хотя бы один узел.
Note
Trino все равно должен иметь возможность подключаться ко всем узлам кластера, даже если здесь указан только поднабор, поскольку файлы сегментов могут находиться только на конкретном узле.
kafka.buffer-size#
Размер внутреннего буфера данных для чтения данных из Kafka. Буфер данных должен вмещать хотя бы одно сообщение, а в идеале — много сообщений. Для каждого рабочего узла и узла данных выделяется один буфер данных.
Свойство необязательно; значение по умолчанию — 64kb.
kafka.timestamp-upper-bound-force-push-down-enabled#
Предикат верхней границы для столбца _timestamp проталкивается только для
топиков, использующих режим LogAppendTime.
Для топиков, использующих режим CreateTime, проталкивание верхней границы
нужно явно включить через свойство конфигурации
kafka.timestamp-upper-bound-force-push-down-enabled или свойство сеанса
timestamp_upper_bound_force_push_down_enabled.
Свойство необязательно; значение по умолчанию — false.
kafka.hide-internal-columns#
Помимо столбцов данных, заданных в файле описания таблицы, коннектор
поддерживает ряд дополнительных столбцов для каждой таблицы. Если эти столбцы
скрыты, их по-прежнему можно использовать в запросах, но они не отображаются в
DESCRIBE <table-name> или SELECT *.
Свойство необязательно; значение по умолчанию — true.
kafka.security-protocol#
Протокол, используемый для взаимодействия с брокерами. Допустимые значения:
PLAINTEXT, SSL.
Свойство необязательно; значение по умолчанию — PLAINTEXT.
kafka.ssl.keystore.location#
Расположение файла keystore, используемого для подключения к кластеру Kafka.
Свойство необязательно.
kafka.ssl.keystore.password#
Пароль для файла keystore, используемого для подключения к кластеру Kafka.
Свойство необязательно, но обязательно, если задано
kafka.ssl.keystore.location.
kafka.ssl.keystore.type#
Формат файла keystore. Допустимые значения: JKS, PKCS12.
Свойство необязательно; значение по умолчанию — JKS.
kafka.ssl.truststore.location#
Расположение файла truststore, используемого для подключения к кластеру Kafka.
Свойство необязательно.
kafka.ssl.truststore.password#
Пароль для файла truststore, используемого для подключения к кластеру Kafka.
Свойство необязательно, но обязательно, если задано
kafka.ssl.truststore.location.
kafka.ssl.truststore.type#
Формат файла truststore. Допустимые значения: JKS, PKCS12.
Свойство необязательно; значение по умолчанию — JKS.
kafka.ssl.key.password#
Пароль для закрытого ключа в файле keystore, используемого для подключения к кластеру Kafka.
Свойство необязательно. Оно требуется клиентам только если настроена
двусторонняя аутентификация, то есть ssl.client.auth=required.
kafka.ssl.endpoint-identification-algorithm#
Алгоритм идентификации конечной точки, используемый клиентами для проверки
имени хоста сервера при подключении к кластеру Kafka. Kafka по умолчанию
использует https. Используйте disabled, чтобы отключить проверку имени
хоста сервера.
Свойство необязательно; значение по умолчанию — https.
Внутренние столбцы#
Префикс внутренних столбцов настраивается свойством конфигурации
kafka.internal-column-prefix; по умолчанию используется _. Другой префикс
влияет на имена внутренних столбцов в следующих разделах. Например, значение
internal_ меняет имя столбца идентификатора секции с _partition_id на
internal_partition_id.
Для каждой заданной таблицы коннектор поддерживает следующие столбцы:
Имя столбца |
Тип |
Описание |
|---|---|---|
|
BIGINT |
Идентификатор секции Kafka, содержащей эту строку. |
|
BIGINT |
Смещение в секции Kafka для этой строки. |
|
BIGINT |
Наименьшее смещение в сегменте (включительно), содержащем эту строку. Это смещение относится к конкретной секции. |
|
BIGINT |
Наибольшее смещение в сегменте (исключительно), содержащем эту строку. Смещение относится к конкретной секции. Это то же значение, что и |
|
BIGINT |
Текущий порядковый номер строки внутри сегмента. Для некомпактированного топика |
|
BOOLEAN |
Истина, если декодер не смог декодировать сообщение для этой строки. Если значение истинно, столбцы данных, сопоставленные из сообщения, следует считать недействительными. |
|
VARCHAR |
Байты сообщения как строка в кодировке UTF-8. Полезно только для текстового топика. |
|
BIGINT |
Число байтов в сообщении. |
|
map(VARCHAR, array(VARBINARY)) |
Заголовки сообщения, где значения с одинаковым ключом сгруппированы в массив. |
|
BOOLEAN |
Истина, если декодер ключа не смог декодировать ключ для этой строки. Если значение истинно, столбцы данных, сопоставленные из ключа, следует считать недействительными. |
|
VARCHAR |
Байты ключа как строка в кодировке UTF-8. Полезно только для текстовых ключей. |
|
BIGINT |
Число байтов в ключе. |
|
TIMESTAMP |
Метка времени сообщения. |
Для таблиц без файла определения таблицы столбцы _key_corrupt и
_message_corrupt всегда имеют значение false.
Схема таблицы и использование реестра схем#
Схема таблицы для сообщений может быть передана коннектору через конфигурационный файл или реестр схем. Она также предоставляет механизм, с помощью которого коннектор обнаруживает таблицы.
Необходимо настроить поставщик с помощью свойства
kafka.table-description-supplier, установив его в FILE или CONFLUENT. У
каждого поставщика описаний таблиц есть отдельный набор свойств конфигурации.
Дополнительные сведения см. в следующих подразделах. Поставщик описаний таблиц
FILE используется по умолчанию, а значение не чувствительно к регистру.
Файловый поставщик описаний таблиц#
Чтобы использовать файловый поставщик описаний таблиц, свойство
kafka.table-description-supplier должно быть установлено в FILE, что и
является значением по умолчанию.
Кроме того, необходимо задать kafka.table-names и
kafka.table-description-dir, как описано в следующих разделах.
kafka.table-names#
Разделенный запятыми список всех таблиц, предоставляемых этим каталогом. Имя
таблицы может быть неквалифицированным (простым именем) и помещается в схему по
умолчанию (см. ниже), либо может быть квалифицировано именем схемы
(<schema-name>.<table-name>).
Для каждой заданной здесь таблицы может существовать файл описания таблицы (см. ниже). Если файла описания таблицы нет, имя таблицы используется как имя топика в Kafka, а столбцы данных в таблицу не сопоставляются. Таблица все равно содержит все внутренние столбцы (см. ниже).
Свойство обязательно; значения по умолчанию нет, и должна быть задана хотя бы одна таблица.
kafka.table-description-dir#
Ссылается на папку внутри развертывания Trino, содержащую один или несколько
JSON-файлов (должны заканчиваться на .json) с файлами описаний таблиц.
Свойство необязательно; значение по умолчанию — etc/kafka.
Файлы определения таблиц#
Kafka хранит топики только как байтовые сообщения и оставляет производителям и потребителям определение того, как сообщение следует интерпретировать. Для Trino эти данные должны быть сопоставлены со столбцами, чтобы их можно было запрашивать.
Note
Для текстовых топиков, содержащих JSON-данные, вполне можно не использовать
файлы определения таблиц, а вместо этого применять Функции и операторы JSON Trino
для разбора столбца _message, содержащего байты, сопоставленные в строку
UTF-8. Это неудобно и усложняет написание SQL-запросов. Такой подход работает
только при чтении данных.
Файл определения таблицы состоит из JSON-определения таблицы. Имя файла может
быть произвольным, но должно заканчиваться на .json. Поместите файл в
каталог, настроенный свойством kafka.table-description-dir. Файл определения
таблицы должен быть доступен со всех узлов Trino.
{
"tableName": ...,
"schemaName": ...,
"topicName": ...,
"key": {
"dataFormat": ...,
"fields": [
...
]
},
"message": {
"dataFormat": ...,
"fields": [
...
]
}
}
Поле |
Обязательность |
Тип |
Описание |
|---|---|---|---|
|
обязательно |
string |
Имя таблицы Trino, заданное этим файлом. |
|
необязательно |
string |
Схема, содержащая таблицу. Если опущено, используется имя схемы по умолчанию. |
|
обязательно |
string |
Сопоставляемый топик Kafka. |
|
необязательно |
JSON object |
Определения полей для столбцов данных, сопоставленных с ключом сообщения. |
|
необязательно |
JSON object |
Определения полей для столбцов данных, сопоставленных с самим сообщением. |
Ключ и сообщение в Kafka#
Начиная с Kafka 0.8, каждое сообщение в топике может иметь необязательный ключ. Файл определения таблицы содержит секции для ключа и сообщения, чтобы сопоставить данные со столбцами таблицы.
Каждое из полей key и message в определении таблицы является объектом JSON,
который должен содержать два поля:
Поле |
Обязательность |
Тип |
Описание |
|---|---|---|---|
|
обязательно |
string |
Выбирает декодер для этой группы полей. |
|
обязательно |
JSON array |
Список определений полей. Каждое определение поля создает новый столбец в таблице Trino. |
Каждое определение поля является объектом JSON:
{
"name": ...,
"type": ...,
"dataFormat": ...,
"mapping": ...,
"formatHint": ...,
"hidden": ...,
"comment": ...
}
Поле |
Обязательность |
Тип |
Описание |
|---|---|---|---|
|
обязательно |
string |
Имя столбца в таблице Trino. |
|
обязательно |
string |
Тип Trino для столбца. |
|
необязательно |
string |
Выбирает декодер столбца для этого поля. По умолчанию используется декодер по умолчанию для формата данных строки и типа столбца. |
|
необязательно |
string |
Путь или URL, где находится схема Avro. Используется только для декодера Avro. |
|
необязательно |
string |
Информация сопоставления для столбца. Зависит от декодера, см. ниже. |
|
необязательно |
string |
Задает подсказку формата для конкретного столбца в декодере столбца. |
|
необязательно |
boolean |
Скрывает столбец из |
|
необязательно |
string |
Добавляет комментарий к столбцу, который показывается в |
Ограничений на число описаний полей для ключа или сообщения нет.
Поставщик описаний таблиц Confluent#
Поставщик описаний таблиц Confluent использует Confluent Schema Registry для обнаружения определений таблиц. Он протестирован только для работы с Confluent Schema Registry.
Преимущества поставщика описаний таблиц Confluent по сравнению с файловым поставщиком:
Новые таблицы можно задавать без перезапуска кластера.
Обновления схем обнаруживаются автоматически.
Не нужно определять таблицы вручную.
Некоторые специфичные для Protobuf типы, такие как
oneofиany, поддерживаются и сопоставляются с JSON.
При использовании декодера Protobuf с поставщиком описаний таблиц Confluent требуются дополнительные шаги. Подробнее см. Требования.
Чтобы использовать реестр схем, установите kafka.table-description-supplier
в CONFLUENT. Также необходимо настроить дополнительные свойства из следующей
таблицы:
Note
Вставки не поддерживаются, а единственный поддерживаемый формат данных — AVRO.
Имя свойства |
Описание |
Значение по умолчанию |
|---|---|---|
|
Разделенный запятыми список URL-адресов реестра схем Confluent. Например:
|
|
|
Максимальное число subject, которые могут храниться в локальном кэше. Кэш
хранит схемы локально по subjectId и предоставляется клиентом Confluent
|
1000 |
|
Avro допускает пустые поля struct, но в Trino это не разрешено. Есть три стратегии обработки пустых полей struct:
Это также можно изменить через свойство сеанса |
|
|
Интервал обновления списка subject и определения схемы для subject в кэше subject. |
|
Сопоставление subject Confluent с именем таблицы#
Стратегия именования subject определяет, как subject разрешается из имени таблицы.
Стратегия по умолчанию — TopicNameStrategy, где subject ключа задается как
<topic-name>-key, а subject значения — как <topic-name>-value. Если
используются другие стратегии, заранее определить имя subject невозможно,
поэтому его нужно указать вручную в имени таблицы.
Чтобы вручную указать subject ключа и значения, добавьте их к имени топика,
например: <topic name>&key-subject=<key subject>&value-subject=<value subject>. Оба параметра, key-subject и value-subject, необязательны. Если
ни один не указан, для разрешения имени subject по имени топика используется
стандартная TopicNameStrategy. Обратите внимание, что сопоставление должно
выполняться без учета регистра, поскольку идентификаторы не могут содержать
символы верхнего регистра.
Обработка специфичных для Protobuf типов в поставщике описаний таблиц Confluent#
При использовании поставщика описаний таблиц Confluent в дополнение к обычно поддерживаемым типам поддерживаются следующие специфичные для Protobuf типы:
oneof#
Схемы Protobuf, содержащие поля oneof, сопоставляются с полем JSON в
Trino.
Например, дана следующая схема Protobuf:
syntax = "proto3";
message schema {
oneof test_oneof_column {
string string_column = 1;
uint32 integer_column = 2;
uint64 long_column = 3;
double double_column = 4;
float float_column = 5;
bool boolean_column = 6;
}
}
Соответствующая строка Trino содержит поле JSON test_oneof_column с
объектом JSON, включающим один ключ. Значение ключа соответствует имени
присутствующего типа oneof.
В приведенном выше примере, если сообщение Protobuf содержит
test_oneof_column, где для string_column установлено значение Trino, то
соответствующая строка Trino включает столбец test_oneof_column со значением
JSON '{"string_column": "Trino"}'.
Вставки Kafka#
Коннектор Kafka поддерживает использование операторов INSERT для записи данных в топик Kafka. Данные столбцов таблицы сопоставляются с сообщениями Kafka, как задано в файле определения таблицы. Для кодирования ключа и сообщения поддерживаются пять форматов данных:
У каждого из этих форматов данных есть кодировщик, который сопоставляет значения столбцов с байтами, отправляемыми в топик Kafka.
Trino поддерживает для производителей Kafka доставку at-least-once. Это означает, что сообщения гарантированно отправляются в топики Kafka как минимум один раз. Если истекает время ожидания подтверждения от производителя или производитель получает ошибку, он может повторить отправку сообщения. Это может привести к отправке дублирующего сообщения в топик Kafka.
Коннектор Kafka не позволяет пользователю определить, какая секция будет использоваться как целевая для сообщения. Если сообщение включает ключ, производитель использует хеш-алгоритм для выбора целевой секции сообщения. Один и тот же ключ всегда назначается одной и той же секции.
Сопоставление типов#
Поскольку Trino и Kafka поддерживают типы, которых нет в другой системе, этот коннектор сопоставляет некоторые типы при чтении (декодировании) или записи (кодировании) данных. Сопоставление типов зависит от формата (Raw, Avro, JSON, CSV).
Кодирование строк#
Кодирование требуется для записи данных; оно определяет, как столбцы таблиц Trino сопоставляются с ключами Kafka и данными сообщений.
Коннектор Kafka содержит следующие кодировщики:
raw-кодировщик — столбцы таблицы сопоставляются с сообщением Kafka как необработанные байты.
CSV-кодировщик — сообщение Kafka форматируется как значения, разделенные запятыми.
JSON-кодировщик — столбцы таблицы сопоставляются с полями JSON.
Avro-кодировщик — столбцы таблицы сопоставляются с полями Avro на основе схемы Avro.
Protobuf-кодировщик — столбцы таблицы сопоставляются с полями Protobuf на основе схемы Protobuf.
Note
Чтобы кодировщик работал, должен быть задан файл определения таблицы.
Raw-кодировщик#
Raw-кодировщик форматирует столбцы таблицы как необработанные байты, используя информацию сопоставления, указанную в файле определения таблицы.
Поддерживаются следующие атрибуты полей:
dataFormat— задает ширину типа данных столбца.type— тип данных Trino.mapping— начальная и необязательная конечная позиция байтов для преобразования, заданная какstartилиstart:end.
Атрибут dataFormat выбирает число преобразуемых байтов. Если он отсутствует,
предполагается BYTE. Все значения знаковые.
Поддерживаемые значения:
BYTE— один байт.SHORT— два байта (big-endian).INT— четыре байта (big-endian).LONG— восемь байтов (big-endian).FLOAT— четыре байта (формат IEEE 754, big-endian).DOUBLE— восемь байтов (формат IEEE 754, big-endian).
Атрибут type определяет тип данных Trino.
В зависимости от типа данных Trino поддерживаются разные значения
dataFormat:
Тип данных Trino |
Значения |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Другие типы не поддерживаются.
Атрибут mapping задает диапазон байтов в ключе или сообщении, используемый
для кодирования.
Note
Для типов VARCHAR должны быть заданы начальная и конечная позиции. Иначе
невозможно определить, сколько байтов содержит сообщение. Информация
сопоставления raw-формата статична и не может динамически изменяться под
переменную ширину некоторых типов данных Trino.
Если задана только начальная позиция:
Для типов фиксированной ширины используется соответствующее число байтов для указанного
dataFormat(см. выше).
Если заданы начальная и конечная позиции:
Для типов фиксированной ширины размер должен быть равен числу байтов, используемых указанным
dataFormat.Используются все байты между начальной позицией (включительно) и конечной позицией (исключительно).
Note
Чтобы кодирование работало, все сопоставления должны включать начальную позицию.
Кодирование числовых типов данных (BIGINT, INTEGER, SMALLINT, TINYINT,
REAL, DOUBLE) просто. Все числовые типы используют big-endian. Типы с
плавающей точкой используют формат IEEE 754.
Пример определения raw-поля в файле определения таблицы для сообщения Kafka:
{
"tableName": "example_table_name",
"schemaName": "example_schema_name",
"topicName": "example_topic_name",
"key": { "..." },
"message": {
"dataFormat": "raw",
"fields": [
{
"name": "field1",
"type": "BIGINT",
"dataFormat": "LONG",
"mapping": "0"
},
{
"name": "field2",
"type": "INTEGER",
"dataFormat": "INT",
"mapping": "8"
},
{
"name": "field3",
"type": "SMALLINT",
"dataFormat": "LONG",
"mapping": "12"
},
{
"name": "field4",
"type": "VARCHAR(6)",
"dataFormat": "BYTE",
"mapping": "20:26"
}
]
}
}
Столбцы следует задавать в том же порядке, в котором они сопоставлены. Между
сопоставлениями столбцов не должно быть пропусков или пересечений. Ширина
столбца, заданная сопоставлением, должна быть эквивалентна ширине
dataFormat для всех типов, кроме типов переменной ширины.
Пример запроса вставки для приведенного выше определения таблицы:
INSERT INTO example_raw_table (field1, field2, field3, field4)
VALUES (123456789, 123456, 1234, 'abcdef');
Note
Raw-кодировщик требует заранее известного размера поля, включая типы данных
переменной ширины, такие как VARCHAR. Он также запрещает вставку значений,
которые не соответствуют ширине, заданной в файле определения таблицы. Это
нужно для корректности: иначе более длинные значения будут усечены, а более
короткие будут прочитаны обратно некорректно из-за неопределенного символа
заполнения.
CSV-кодировщик#
CSV-кодировщик форматирует значения каждой строки как строку
comma-separated-values (CSV) в кодировке UTF-8. CSV-строка форматируется с
запятой , в качестве разделителя столбцов.
Для каждого поля должны быть заданы атрибуты type и mapping:
type— тип данных Trino.mapping— целочисленный индекс столбца в CSV-строке (первый столбец — 0, второй — 1 и так далее).
dataFormat и formatHint не поддерживаются и должны быть опущены.
CSV-кодировщик поддерживает следующие типы данных Trino:
BIGINTINTEGERSMALLINTTINYINTDOUBLEREALBOOLEANVARCHAR/VARCHAR(x)
Другие типы не поддерживаются.
Значения столбцов преобразуются в строки перед форматированием как CSV-строка.
Ниже приведен пример определения CSV-полей в файле определения таблицы для сообщения Kafka:
{
"tableName": "example_table_name",
"schemaName": "example_schema_name",
"topicName": "example_topic_name",
"key": { "..." },
"message": {
"dataFormat": "csv",
"fields": [
{
"name": "field1",
"type": "BIGINT",
"mapping": "0"
},
{
"name": "field2",
"type": "VARCHAR",
"mapping": "1"
},
{
"name": "field3",
"type": "BOOLEAN",
"mapping": "2"
}
]
}
}
Пример запроса вставки для приведенного выше определения таблицы:
INSERT INTO example_csv_table (field1, field2, field3)
VALUES (123456789, 'example text', TRUE);
JSON-кодировщик#
JSON-кодировщик сопоставляет столбцы таблицы с полями JSON, заданными в файле определения таблицы, согласно RFC 4627.
Для полей поддерживаются следующие атрибуты:
type— тип данных Trino для столбца.mapping— разделенный косыми чертами список имен полей для выбора поля из объекта JSON.dataFormat— имя форматтера. Обязательно для временных типов.formatHint— шаблон для форматирования временных данных. Используется только с форматтеромcustom-date-time.
JSON-кодировщик поддерживает следующие типы данных Trino:
BIGINTINTEGERSMALLINTTINYINTDOUBLEREALBOOLEANVARCHARDATETIMETIME WITH TIME ZONETIMESTAMPTIMESTAMP WITH TIME ZONE
Другие типы не поддерживаются.
Для временных данных доступны следующие dataFormats:
iso8601rfc2822custom-date-time— форматирует временные данные согласно шаблону Joda Time, заданному полемformatHint.milliseconds-since-epochseconds-since-epoch
Все временные данные в Kafka поддерживают точность до миллисекунд.
Следующая таблица определяет, какие временные типы данных поддерживаются
dataFormats:
Тип данных Trino |
Правила декодирования |
|---|---|
|
|
|
|
|
|
|
|
|
|
Ниже приведен пример определения JSON-полей в файле определения таблицы для сообщения Kafka:
{
"tableName": "example_table_name",
"schemaName": "example_schema_name",
"topicName": "example_topic_name",
"key": { "..." },
"message": {
"dataFormat": "json",
"fields": [
{
"name": "field1",
"type": "BIGINT",
"mapping": "field1"
},
{
"name": "field2",
"type": "VARCHAR",
"mapping": "field2"
},
{
"name": "field3",
"type": "TIMESTAMP",
"dataFormat": "custom-date-time",
"formatHint": "yyyy-dd-MM HH:mm:ss.SSS",
"mapping": "field3"
}
]
}
}
Ниже показан пример запроса вставки для предыдущего определения таблицы:
INSERT INTO example_json_table (field1, field2, field3)
VALUES (123456789, 'example text', TIMESTAMP '2020-07-15 01:02:03.456');
Avro-кодировщик#
Avro-кодировщик сериализует строки в записи Avro, как задано схемой Avro. Trino не поддерживает кодирование Avro без схемы.
Note
Схема Avro кодируется вместе со значениями столбцов таблицы в каждом сообщении Kafka.
Чтобы использовать Avro-кодировщик, в файле определения таблицы должен быть
задан dataSchema. Он указывает расположение файла схемы Avro для ключа или
сообщения.
Файлы схем Avro можно получать по HTTP или HTTPS с удаленного сервера с таким синтаксисом:
"dataSchema": "http://example.org/schema/avro_data.avsc"
Локальные файлы должны быть доступны на всех узлах Trino и использовать абсолютный путь, например:
"dataSchema": "/usr/local/schema/avro_data.avsc"
Поддерживаются следующие атрибуты полей:
name— имя столбца в таблице Trino.type— тип данных Trino для столбца.mapping— разделенный косыми чертами список имен полей для выбора поля из схемы Avro. Если поле, указанное вmapping, отсутствует в исходной схеме Avro, операция записи завершается ошибкой.
В следующей таблице перечислены поддерживаемые типы данных Trino, которые можно
использовать в type для эквивалентного типа поля Avro.
Тип данных Trino |
Тип данных Avro |
|---|---|
|
|
|
|
|
|
|
|
|
|
Другие типы не поддерживаются.
Следующий пример показывает определение поля Avro в файле определения таблицы для сообщения Kafka:
{
"tableName": "example_table_name",
"schemaName": "example_schema_name",
"topicName": "example_topic_name",
"key": { "..." },
"message":
{
"dataFormat": "avro",
"dataSchema": "/avro_message_schema.avsc",
"fields":
[
{
"name": "field1",
"type": "BIGINT",
"mapping": "field1"
},
{
"name": "field2",
"type": "VARCHAR",
"mapping": "field2"
},
{
"name": "field3",
"type": "BOOLEAN",
"mapping": "field3"
}
]
}
}
В следующем примере показано определение схемы Avro для предыдущего определения таблицы:
{
"type" : "record",
"name" : "example_avro_message",
"namespace" : "io.trino.plugin.kafka",
"fields" :
[
{
"name":"field1",
"type":["null", "long"],
"default": null
},
{
"name": "field2",
"type":["null", "string"],
"default": null
},
{
"name":"field3",
"type":["null", "boolean"],
"default": null
}
],
"doc:" : "A basic avro schema"
}
Пример запроса вставки для предыдущего определения таблицы:
- INSERT INTO example_avro_table (field1, field2, field3)
VALUES (123456789, ‘example text’, FALSE);
Protobuf-кодировщик#
Protobuf-кодировщик сериализует строки в Protobuf DynamicMessages, как задано схемой Protobuf.
Note
Схема Protobuf кодируется вместе со значениями столбцов таблицы в каждом сообщении Kafka.
Чтобы использовать Protobuf-кодировщик, в файле определения таблицы должен быть
задан dataSchema. Он указывает расположение файла proto для ключа или
сообщения.
Файлы схем Protobuf можно получать по HTTP или HTTPS с удаленного сервера с таким синтаксисом:
"dataSchema": "http://example.org/schema/schema.proto"
Локальные файлы должны быть доступны на всех узлах Trino и использовать абсолютный путь, например:
"dataSchema": "/usr/local/schema/schema.proto"
Поддерживаются следующие атрибуты полей:
name— имя столбца в таблице Trino.type— тип Trino для столбца.mapping— разделенный косыми чертами список имен полей для выбора поля из схемы Protobuf. Если поле, указанное вmapping, отсутствует в исходной схеме Protobuf, операция записи завершается ошибкой.
В следующей таблице перечислены поддерживаемые типы данных Trino, которые можно
использовать в type для эквивалентного типа поля Protobuf.
Тип данных Trino |
Тип данных Protobuf |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Тип Protobuf с полем |
|
|
|
|
Следующий пример показывает определение поля Protobuf в файле определения таблицы для сообщения Kafka:
{
"tableName": "example_table_name",
"schemaName": "example_schema_name",
"topicName": "example_topic_name",
"key": { "..." },
"message":
{
"dataFormat": "protobuf",
"dataSchema": "/message_schema.proto",
"fields":
[
{
"name": "field1",
"type": "BIGINT",
"mapping": "field1"
},
{
"name": "field2",
"type": "VARCHAR",
"mapping": "field2"
},
{
"name": "field3",
"type": "BOOLEAN",
"mapping": "field3"
}
]
}
}
В следующем примере показано определение схемы Protobuf для предыдущего определения таблицы:
syntax = "proto3";
message schema {
uint64 field1 = 1 ;
string field2 = 2;
bool field3 = 3;
}
Пример запроса вставки для предыдущего определения таблицы:
INSERT INTO example_protobuf_table (field1, field2, field3)
VALUES (123456789, 'example text', FALSE);
Декодирование строк#
Для ключа и сообщения используется декодер, который сопоставляет данные сообщения и ключа со столбцами таблицы.
Коннектор Kafka содержит следующие декодеры:
raw— сообщение Kafka не интерпретируется; диапазоны необработанных байтов сообщения сопоставляются со столбцами таблицы.csv— сообщение Kafka интерпретируется как сообщение, разделенное запятыми, а поля сопоставляются со столбцами таблицы.json— сообщение Kafka разбирается как JSON, а поля JSON сопоставляются со столбцами таблицы.avro— сообщение Kafka разбирается на основе схемы Avro, а поля Avro сопоставляются со столбцами таблицы.protobuf— сообщение Kafka разбирается на основе схемы Protobuf, а поля Protobuf сопоставляются со столбцами таблицы.
Note
Если файла определения таблицы для таблицы нет, используется декодер dummy,
который не предоставляет никаких столбцов.
Raw-декодер#
Raw-декодер поддерживает чтение необработанных байтовых значений из сообщения или ключа Kafka и преобразование их в столбцы Trino.
Для полей поддерживаются следующие атрибуты:
dataFormat— выбирает ширину преобразуемого типа данных.type— тип данных Trino. Список поддерживаемых типов данных см. в таблице далее в этом документе.mapping—<start>[:<end>]— начальная и конечная позиция байтов для преобразования (необязательно).
Атрибут dataFormat выбирает число преобразуемых байтов. Если он отсутствует,
предполагается BYTE. Все значения знаковые.
Поддерживаются следующие значения:
BYTE— один байт.SHORT— два байта (big-endian).INT— четыре байта (big-endian).LONG— восемь байтов (big-endian).FLOAT— четыре байта (формат IEEE 754).DOUBLE— восемь байтов (формат IEEE 754).
Атрибут type определяет тип данных Trino, на который сопоставляется значение.
В зависимости от типа Trino, назначенного столбцу, можно использовать разные
значения dataFormat:
Тип данных Trino |
Допустимые значения |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Другие типы не поддерживаются.
Атрибут mapping задает диапазон байтов в ключе или сообщении, используемый
для декодирования. Он может быть одним или двумя числами, разделенными
двоеточием (<start>[:<end>]).
Если задана только начальная позиция:
Для типов фиксированной ширины столбец использует соответствующее число байтов для указанного
dataFormat(см. выше).При декодировании значения
VARCHARиспользуются все байты от начальной позиции до конца сообщения.
Если заданы начальная и конечная позиции:
Для типов фиксированной ширины размер должен быть равен числу байтов, используемых указанным
dataFormat.Для
VARCHARиспользуются все байты между начальной позицией (включительно) и конечной позицией (исключительно).
Если атрибут mapping не указан, это эквивалентно начальной позиции 0 и
неуказанной конечной позиции.
Схема декодирования числовых типов данных (BIGINT, INTEGER, SMALLINT,
TINYINT, DOUBLE) проста. Последовательность байтов считывается из входного
сообщения и декодируется согласно одному из вариантов:
кодировка big-endian (для целочисленных типов);
формат IEEE 754 (для
DOUBLE).
Длина декодированной последовательности байтов подразумевается значением
dataFormat.
Для типа данных VARCHAR последовательность байтов интерпретируется согласно
кодировке UTF-8.
CSV-декодер#
CSV-декодер преобразует байты, представляющие сообщение или ключ, в строку с использованием кодировки UTF-8, а затем интерпретирует результат как CSV-строку (comma-separated value).
Для полей должны быть заданы атрибуты type и mapping:
type— тип данных Trino. Список поддерживаемых типов данных см. в следующей таблице.mapping— индекс поля в CSV-записи.
Атрибуты dataFormat и formatHint не поддерживаются и должны быть опущены.
В таблице ниже перечислены поддерживаемые типы Trino, которые можно
использовать в type, и схема декодирования:
Тип данных Trino |
Правила декодирования |
|---|---|
|
Декодируется с помощью Java |
|
Декодируется с помощью Java |
|
Последовательность символов “true” сопоставляется с |
|
Используется как есть |
Другие типы не поддерживаются.
JSON-декодер#
JSON-декодер преобразует байты, представляющие сообщение или ключ, в JSON согласно RFC 4627. Обратите внимание, что сообщение или ключ ДОЛЖНЫ преобразовываться в объект JSON, а не в массив или простой тип.
Для полей поддерживаются следующие атрибуты:
type— тип данных Trino для столбца.dataFormat— декодер поля, используемый для столбца.mapping— разделенный косыми чертами список имен полей для выбора поля из объекта JSON.formatHint— только дляcustom-date-time.
JSON-декодер поддерживает несколько декодеров полей: _default используется
для стандартных столбцов таблицы, а также доступны декодеры для типов на основе
даты и времени.
В следующей таблице перечислены типы данных Trino, которые можно использовать в
type, и соответствующие декодеры полей, задаваемые через атрибут
dataFormat.
Тип данных Trino |
Допустимые значения |
|---|---|
|
Декодер поля по умолчанию (атрибут |
|
|
|
|
|
|
|
|
|
|
Другие типы не поддерживаются.
Декодер поля по умолчанию#
Это стандартный декодер поля, поддерживающий все физические типы данных Trino. Значение поля преобразуется по правилам преобразования JSON в логическое, целочисленное, вещественное или строковое значение. Для столбцов, не основанных на дате или времени, следует использовать этот декодер.
Декодеры даты и времени#
Чтобы преобразовать значения из объектов JSON в столбцы Trino DATE, TIME,
TIME WITH TIME ZONE, TIMESTAMP или TIMESTAMP WITH TIME ZONE, необходимо
выбрать специальные декодеры с помощью атрибута dataFormat в определении
поля.
iso8601— текстовый, разбирает текстовое поле как метку времени ISO 8601.rfc2822— текстовый, разбирает текстовое поле как метку времени RFC 2822.custom-date-time— текстовый, разбирает текстовое поле согласно шаблону формата Joda, указанному через атрибутformatHint. Шаблон формата должен соответствовать https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html.milliseconds-since-epoch— числовой, интерпретирует текст или число как количество миллисекунд с начала эпохи.seconds-since-epoch— числовой, интерпретирует текст или число как количество миллисекунд с начала эпохи.
Для типов данных TIMESTAMP WITH TIME ZONE и TIME WITH TIME ZONE, если
информация о часовом поясе присутствует в декодированном значении, она
используется как значение Trino. В противном случае часовой пояс результата
устанавливается в UTC.
Avro-декодер#
Avro-декодер преобразует байты, представляющие сообщение или ключ в формате Avro, на основе схемы. Сообщение должно содержать встроенную схему Avro. Trino не поддерживает декодирование Avro без схемы.
Для ключа/сообщения при использовании декодера avro должен быть задан
dataSchema. Он должен указывать расположение допустимого файла схемы Avro для
сообщения, которое нужно декодировать. Это расположение может быть удаленным
веб-сервером, например dataSchema: 'http://example.org/schema/avro_data.avsc',
или локальной файловой системой, например
dataSchema: '/usr/local/schema/avro_data.avsc'. Декодер завершится ошибкой,
если это расположение недоступно с узла координатора Trino.
Для полей поддерживаются следующие атрибуты:
name— имя столбца в таблице Trino.type— тип данных Trino для столбца.mapping— разделенный косыми чертами список имен полей для выбора поля из схемы Avro. Если поле, указанное вmapping, отсутствует в исходной схеме Avro, операция чтения возвращаетNULL.
В следующей таблице перечислены поддерживаемые типы Trino, которые можно
использовать в type для эквивалентных типов полей Avro:
Тип данных Trino |
Допустимый тип данных Avro |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Другие типы не поддерживаются.
Эволюция схем Avro#
Avro-декодер поддерживает эволюцию схем с обратной совместимостью. При обратной совместимости новая схема может использоваться для чтения данных Avro, созданных со старой схемой. Любое изменение в схеме Avro также должно быть отражено в файле определения топика Trino. Новые добавленные или переименованные поля должны иметь значение по умолчанию в файле схемы Avro.
Поведение эволюции схем:
Столбец добавлен в новой схеме: данные, созданные со старой схемой, дают значение по умолчанию, когда таблица использует новую схему.
Столбец удален в новой схеме: данные, созданные со старой схемой, больше не выводят данные из удаленного столбца.
Столбец переименован в новой схеме: это эквивалентно удалению столбца и добавлению нового, и данные, созданные со старой схемой, дают значение по умолчанию, когда таблица использует новую схему.
Тип столбца изменен в новой схеме: если приведение типа поддерживается Avro, выполняется преобразование. Для несовместимых типов возникает ошибка.
Protobuf-декодер#
Protobuf-декодер преобразует байты, представляющие сообщение или ключ в формате Protobuf, на основе схемы.
Для ключа/сообщения при использовании декодера protobuf должен быть задан
dataSchema. Он указывает расположение допустимого файла proto для
сообщения, которое нужно декодировать. Это расположение может быть удаленным
веб-сервером, dataSchema: 'http://example.org/schema/schema.proto', или
локальным файлом, dataSchema: '/usr/local/schema/schema.proto'. Декодер
завершится ошибкой, если расположение недоступно с координатора.
Для полей поддерживаются следующие атрибуты:
name— имя столбца в таблице Trino.type— тип данных Trino для столбца.mapping— разделенный косыми чертами список имен полей для выбора поля из схемы Protobuf. Если поле, указанное вmapping, отсутствует в исходном файлеproto, операция чтения возвращает NULL.
В следующей таблице перечислены поддерживаемые типы Trino, которые можно
использовать в type для эквивалентных типов полей Protobuf:
Тип данных Trino |
Допустимый тип данных Protobuf |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Тип Protobuf с полем |
|
|
|
|
|
|
any#
Типы сообщений с полем Any
содержат произвольное сериализованное сообщение в виде байтов и URL типа для
разрешения типа этого сообщения со схемой file://, http:// или https://.
Коннектор считывает содержимое URL, чтобы создать дескриптор типа для
сообщения Any и преобразовать сообщение в JSON. Это поведение включается
установкой kafka.protobuf-any-support-enabled в true.
Дескрипторы для каждого уникального URL кэшируются из соображений производительности, и любые изменения типа, возвращаемого URL, требуют перезапуска Trino.
Например, дана следующая схема Protobuf, определяющая MyMessage с тремя
столбцами:
syntax = "proto3";
message MyMessage {
string stringColumn = 1;
uint32 integerColumn = 2;
uint64 longColumn = 3;
}
И отдельная схема, использующая тип Any, который является упакованным
сообщением указанного выше типа и допустимым URL:
syntax = "proto3";
import "google/protobuf/any.proto";
message schema {
google.protobuf.Any any_message = 1;
}
Соответствующий столбец Trino называется any_message, имеет тип JSON и
содержит JSON-сериализованное представление сообщения Protobuf:
{
"@type":"file:///path/to/schemas/MyMessage",
"longColumn":"493857959588286460",
"numberColumn":"ONE",
"stringColumn":"Trino"
}
Эволюция схем Protobuf#
Protobuf-декодер поддерживает эволюцию схем с обратной совместимостью. При обратной совместимости новая схема может использоваться для чтения данных Protobuf, созданных со старой схемой. Любое изменение в схеме Protobuf должно также быть отражено в файле определения топика.
Поведение эволюции схем:
Столбец добавлен в новой схеме: данные, созданные со старой схемой, дают значение по умолчанию, когда таблица использует новую схему.
Столбец удален в новой схеме: данные, созданные со старой схемой, больше не выводят данные из удаленного столбца.
Столбец переименован в новой схеме: это эквивалентно удалению столбца и добавлению нового, и данные, созданные со старой схемой, дают значение по умолчанию, когда таблица использует новую схему.
Тип столбца изменен в новой схеме: если приведение типа поддерживается Protobuf, выполняется преобразование. Для несовместимых типов возникает ошибка.
Ограничения Protobuf#
Protobuf Timestamp имеет наносекундную точность, но Trino поддерживает декодирование и кодирование с микросекундной точностью.
Поддержка SQL#
Коннектор предоставляет доступ на чтение и запись к данным и метаданным в таблицах Trino, заполняемых топиками Kafka. Дополнительные сведения см. в Декодирование строк.
Помимо глобально доступных операторов и операций чтения, коннектор поддерживает следующие возможности:
INSERT, кодируемый в указанный формат данных. См. также Вставки Kafka.