Коннекторы#
Коннекторы являются источником всех данных для запросов в Trino. Даже если ваши данные источник не имеет базовых таблиц, поддерживающих его, если вы адаптируете свой источник данных к API, ожидаемому Trino, вы можете писать запросы к этим данным.
Фабрика коннекторов (ConnectorFactory)#
Экземпляры вашего коннектора создает фабрика ConnectorFactory,
которая создается, когда Trino вызывает getConnectorFactory() у плагина.
Фабрика коннекторов — это простой интерфейс, отвечающий за предоставление
имени коннектора и создание экземпляра объекта Connector. Базовая реализация
коннектора, которая поддерживает только чтение, но не запись данных, должна
вернуть экземпляры следующих сервисов:
Конфигурация#
Метод create() фабрики коннектора получает карту config, содержащую все
свойства из файла свойств каталога. Ее можно использовать для настройки
коннектора, но поскольку все значения являются строками, им может потребоваться
дополнительная обработка, если они представляют другие типы данных. Метод также
не проверяет, известны ли все предоставленные свойства. Это может привести к
тому, что коннектор будет вести себя иначе, чем ожидалось, если он игнорирует
свойство из-за ошибки пользователя при вводе названия свойства.
Чтобы сделать конфигурацию более надежной, определите класс конфигурации. Этот класс описывает все доступные свойства, их типы и дополнительные правила проверки.
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.ConfigSecuritySensitive;
import io.airlift.units.Duration;
import io.airlift.units.MaxDuration;
import io.airlift.units.MinDuration;
import javax.validation.constraints.NotNull;
public class ExampleConfig
{
private String secret;
private Duration timeout = Duration.succinctDuration(10, TimeUnit.SECONDS);
public String getSecret()
{
return secret;
}
@Config("secret")
@ConfigDescription("Secret required to access the data source")
@ConfigSecuritySensitive
public ExampleConfig setSecret(String secret)
{
this.secret = secret;
return this;
}
@NotNull
@MaxDuration("10m")
@MinDuration("1ms")
public Duration getTimeout()
{
return timeout;
}
@Config("timeout")
public ExampleConfig setTimeout(Duration timeout)
{
this.timeout = timeout;
return this;
}
}
В предыдущем примере определяются два свойства конфигурации, что делает коннектор более надежным благодаря следующим механизмам:
определение всех поддерживаемых свойств, что позволяет обнаружить орфографические ошибки в конфигурации при запуске сервера
определение значения тайм-аута по умолчанию, чтобы предотвратить зависание соединений на неопределенный срок
предотвращение недопустимых значений таймаута, например 0 мс, которые могли бы приводить к сбою всех запросов
анализ значений таймаута в разных единицах измерения, обнаружение недопустимых значений
предотвращение регистрации секретного значения в виде обычного текста
Класс конфигурации должен быть связан с модулем Guice:
import com.google.inject.Binder;
import com.google.inject.Module;
import static io.airlift.configuration.ConfigBinder.configBinder;
public class ExampleModule
implements Module
{
public ExampleModule()
{
}
@Override
public void configure(Binder binder)
{
configBinder(binder).bindConfig(ExampleConfig.class);
}
}
Затем модуль необходимо инициализировать в фабрике коннекторов при создании нового экземпляра коннектора:
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
requireNonNull(config, "config is null");
Bootstrap app = new Bootstrap("io.trino.bootstrap.catalog." + catalogName, new ExampleModule());
Injector injector = app
.doNotInitializeLogging()
.setRequiredConfigurationProperties(config)
.initialize();
return injector.getInstance(ExampleConnector.class);
}
Note
Переменные среды в файле свойств каталога
(например, secret=${ENV:SECRET}) разрешаются только при использовании
класса io.airlift.bootstrap.Bootstrap для инициализации модуля.
См. Секреты для получения дополнительной информации.
Если вам в конечном итоге потребуется определить несколько каталогов, использующих один и тот же коннектор, просто чтобы изменить одно свойство, рассмотрите возможность добавления поддержки свойств схемы и/или таблицы. Это позволит задавать конфигурацию более детально. Если коннектор не поддерживает управление схемой, предикаты запроса для выбранных столбцов можно использовать как способ передачи необходимой конфигурации во время выполнения.
Например, при создании коннектора для чтения коммитов из репозитория Git: URL-адрес репозитория может быть свойством конфигурации. Но это привело бы к тому, что каталог мог бы возвращать данные только из одного репозитория. Альтернативно это может быть столбец, где для каждого запроса выбора потребуется предикат для него:
SELECT *
FROM git.default.commits
WHERE url = 'https://github.com/trinodb/trino.git'
Метаданные коннектора#
Интерфейс метаданных коннектора позволяет Trino получать списки схем, таблицы, столбцы и другие метаданные о конкретном источнике данных.
Базовый коннектор только для чтения должен реализовывать следующие методы:
listSchemaNameslistTablesstreamTableColumnsgetTableHandlegetTableMetadatagetColumnHandlesgetColumnMetadata
Если вам интересно увидеть стратегии реализации большего количества методов, посмотрите Пример HTTP-коннектора и коннектор Cassandra. Если ваш базовый источник данных поддерживает схемы, таблицы и столбцы, этот интерфейс должен быть простым в реализации. Если вы пытаетесь адаптировать что-то, что не является реляционной базой данных, как пример HTTP-коннектора, вы можете понадобиться творческий подход к тому, как сопоставить источник данных со схемой Trino, концепции таблиц и столбцов.
Интерфейс метаданных коннектора также позволяет реализовать другие функции коннектора, такие как:
Управление схемами: создание, изменение и удаление схем, таблиц, столбцов таблиц, представлений и материализованных представлений.
Поддержка комментариев и свойств для таблиц и столбцов.
Авторизация для схем, таблиц и представлений.
Выполнение Табличные функции.
Предоставление статистики таблиц, которую использует оптимизатор на основе стоимости (CBO), а также сбор статистики во время записи и при анализе выбранных таблиц.
Изменение данных:
вставка, обновление и удаление строк в таблицах,
обновление материализованных представлений (materialized views),
усечение (truncating) таблиц целиком,
создание таблиц из результатов запроса.
Управление ролями и привилегиями.
Pushdown:
проекции (Projections)
выборка (Sampling)
агрегации (Aggregations)
соединения (Joins)
вызов табличных функций (Table function invocation)
Обратите внимание, что модификация данных также требует реализации Поставщик приемника страниц коннектора.
Когда Trino получает запрос SELECT, он анализирует его в промежуточное
представление (IR). Затем, во время оптимизации, он проверяет, могут ли
коннекторы обрабатывать операции, связанные с предложениями SQL, вызывая один
из следующих методов сервиса ConnectorMetadata:
applyLimitapplyTopNapplyFilterapplyProjectionapplySampleapplyAggregationapplyJoinapplyTableFunctionapplyTableScanRedirect
Коннекторы могут указать, что они не поддерживают определенный pushdown или
что действие не дало эффекта, вернув Optional.empty(). Коннекторы должны
ожидать, что эти методы будут вызываться несколько раз во время оптимизации
одного запроса.
Warning
Критически важно, чтобы коннекторы возвращали Optional.empty(), если вызов
этого метода не дает эффекта для данного вызова, даже когда коннектор в целом
поддерживает конкретный pushdown. Иначе оптимизатор может войти в бесконечный
цикл.
В остальных случаях эти методы возвращают объект результата, содержащий новый
дескриптор таблицы. Новый дескриптор таблицы представляет виртуальную таблицу,
полученную применением операции (фильтра, проекции, ограничения и т. д.) к
таблице, которую создает узел сканирования таблицы. Когда запрос фактически
запускается, ConnectorRecordSetProvider или ConnectorPageSourceProvider
может использовать оптимизации, переданные в ConnectorTableHandle.
Возвращенный дескриптор таблицы затем передается другим сервисам, которые
реализует коннектор, например ConnectorRecordSetProvider или
ConnectorPageSourceProvider.
Pushdown LIMIT и top-N#
При выполнении запроса SELECT с предложениями LIMIT или ORDER BY план
запроса может содержать операции Sort или Limit.
Когда план содержит операции Sort и Limit, движок пытается передать
ограничение в коннектор, вызывая метод applyTopN сервиса метаданных
коннектора. Если операции Sort нет, а есть только Limit, вызывается метод
applyLimit, и коннектор может вернуть результаты в произвольном порядке.
Если коннектор может получить пользу от информации, переданной этим методам, но
не может гарантировать, что сможет произвести меньше строк, чем указанное
ограничение, он должен вернуть непустой результат с новым дескриптором
производной таблицы и флагом limitGuaranteed (в LimitApplicationResult) или
topNGuaranteed (в TopNApplicationResult), установленным в false.
Если коннектор может гарантировать, что произведет меньше строк, чем указанное
ограничение, он должен вернуть непустой результат с флагом гарантии для
ограничения или topN, установленным в true.
Note
applyTopN — единственный метод, который получает элементы сортировки из
операции Sort.
В запросе раздел ORDER BY может включать любой столбец с любым порядком.
Однако источник данных коннектора может поддерживать только ограниченные
комбинации. Авторам плагинов нужно решить, должен ли коннектор игнорировать
pushdown, вернуть все данные и позволить движку отсортировать их, или выбросить
исключение, чтобы сообщить пользователю, что конкретный порядок не
поддерживается, если получение всех данных было бы слишком дорогим или
занимало бы слишком много времени. При выбрасывании исключения используйте
класс TrinoException с кодом ошибки INVALID_ORDER_BY и сообщением,
подсказывающим действие, чтобы пользователи понимали, как написать допустимый
запрос.
Pushdown предикатов#
При выполнении запроса с предложением WHERE план запроса может содержать узел
плана ScanFilterProject с ограничением-предикатом.
Ограничение-предикат — это описание ограничения, наложенного на результаты
стадии или фрагмента так, как оно выражено в предложении WHERE. Например,
WHERE x > 5 AND y = 3 преобразуется в ограничение, где поле summary
означает, что домен столбца x должен быть больше 5, а домен столбца y
должен быть равен 3.
Когда план запроса содержит операцию ScanFilterProject, Trino пытается
оптимизировать запрос, передавая ограничение-предикат в коннектор через вызов
метода applyFilter сервиса метаданных коннектора. Этот метод получает
дескриптор таблицы со всеми оптимизациями, примененными на данный момент, и
возвращает либо Optional.empty(), либо ответ с новым дескриптором таблицы,
полученным из старого.
Оптимизатор запросов может несколько раз вызвать applyFilter для одного
запроса, пока ищет оптимальный план. Коннекторы должны возвращать
Optional.empty() из applyFilter, если они не могут применить ограничение
для данного вызова, даже если в целом поддерживают pushdown
ScanFilterProject. Коннекторы также должны возвращать Optional.empty(),
если ограничение уже было применено.
Ограничение содержит следующие элементы:
TupleDomain, определяющий сопоставление между столбцами и их доменами.Domainпредставляет собой список возможных значений или список диапазонов, а также содержит информацию о допустимостиnull.Выражение для pushdown вызовов функций.
Карта назначений от переменных в выражении к столбцам.
(необязательно) Предикат, который проверяет карту столбцов и их значений; его нельзя сохранять после возврата из вызова
applyFilter.(необязательно) Набор столбцов, от которых зависит предикат; должен присутствовать, если присутствует предикат.
Если доступны и предикат, и summary, предикат гарантированно строже фильтрует
значения и при использовании может существенно повысить производительность
запроса.
Однако предикат нельзя сохранить в дескрипторе таблицы и использовать позже,
поскольку его нельзя удерживать после возврата из вызова applyFilter. Он
используется для фильтрации целых разделов и не передается через pushdown.
Вместо него можно передать summary, сохранив его в дескрипторе таблицы.
Это пересечение между предикатом и summary связано с историческими причинами:
pushdown простых сравнений сначала был реализован через summary, а более
сложные фильтры, такие как LIKE, которым требовались более выразительные
предикаты, были добавлены позже.
Если ограничение можно передать через pushdown только частично, например когда
коннектор для базы данных, не поддерживающей сопоставление диапазонов,
используется в запросе с WHERE x = 2 AND y > 5, ограничение столбца y
должно быть возвращено в ConstraintApplicationResult из applyFilter. В этом
случае условие y > 5 применяется в Trino, а не передается через pushdown.
Ниже приведен простой пример, который учитывает только TupleDomain:
@Override
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(
ConnectorSession session,
ConnectorTableHandle tableHandle,
Constraint constraint)
{
ExampleTableHandle handle = (ExampleTableHandle) tableHandle;
TupleDomain<ColumnHandle> oldDomain = handle.getConstraint();
TupleDomain<ColumnHandle> newDomain = oldDomain.intersect(constraint.getSummary());
if (oldDomain.equals(newDomain)) {
// Ничего не изменилось, возвращаем пустой Optional
return Optional.empty();
}
handle = new ExampleTableHandle(newDomain);
return Optional.of(new ConstraintApplicationResult<>(handle, TupleDomain.all(), false));
}
TupleDomain из ограничения пересекается с TupleDomain, уже примененным к
TableHandle, чтобы сформировать newDomain. Если фильтрация не изменилась,
возвращается результат Optional.empty(), чтобы уведомить планировщик, что
этот путь оптимизации достиг конца.
В этом примере коннектор передает через pushdown TupleDomain со всеми типами
данных Trino, которые поддерживаются в источнике данных с такой же семантикой.
В результате фильтры в Trino не нужны, а ConstraintApplicationResult
устанавливает remainingFilter в TupleDomain.all().
Эта реализация pushdown очень похожа на многие коннекторы Trino, включая
MongoMetadata, BigQueryMetadata, KafkaMetadata.
Следующий, более сложный пример показывает типы данных Trino, которые не доступны напрямую в нижележащем источнике данных и должны быть сопоставлены:
@Override
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(
ConnectorSession session,
ConnectorTableHandle table,
Constraint constraint)
{
JdbcTableHandle handle = (JdbcTableHandle) table;
TupleDomain<ColumnHandle> oldDomain = handle.getConstraint();
TupleDomain<ColumnHandle> newDomain = oldDomain.intersect(constraint.getSummary());
TupleDomain<ColumnHandle> remainingFilter;
if (newDomain.isNone()) {
newConstraintExpressions = ImmutableList.of();
remainingFilter = TupleDomain.all();
remainingExpression = Optional.of(Constant.TRUE);
}
else {
// Нужно решить, какие столбцы передавать через pushdown.
// Так как это базовый класс для многих JDBC-коннекторов,
// каждый из которых имеет свои сопоставления типов Trino
// и семантику сравнения, он должен быть гибким.
Map<ColumnHandle, Domain> domains = newDomain.getDomains().orElseThrow();
List<JdbcColumnHandle> columnHandles = domains.keySet().stream()
.map(JdbcColumnHandle.class::cast)
.collect(toImmutableList());
// Получаем информацию о том, как передавать через pushdown каждый
// столбец на основе его типа данных JDBC
List<ColumnMapping> columnMappings = jdbcClient.toColumnMappings(
session,
columnHandles.stream()
.map(JdbcColumnHandle::getJdbcTypeHandle)
.collect(toImmutableList()));
// Вычисляем домены, которые можно безопасно передать через pushdown
// (поддерживаемые), и те, которые нужно фильтровать в Trino
// (неподдерживаемые)
Map<ColumnHandle, Domain> supported = new HashMap<>();
Map<ColumnHandle, Domain> unsupported = new HashMap<>();
for (int i = 0; i < columnHandles.size(); i++) {
JdbcColumnHandle column = columnHandles.get(i);
DomainPushdownResult pushdownResult =
columnMappings.get(i).getPredicatePushdownController().apply(
session,
domains.get(column));
supported.put(column, pushdownResult.getPushedDown());
unsupported.put(column, pushdownResult.getRemainingFilter());
}
newDomain = TupleDomain.withColumnDomains(supported);
remainingFilter = TupleDomain.withColumnDomains(unsupported);
}
// Возвращаем пустой Optional, если в фильтрации ничего не изменилось
if (oldDomain.equals(newDomain)) {
return Optional.empty();
}
handle = new JdbcTableHandle(
handle.getRelationHandle(),
newDomain,
...);
return Optional.of(
new ConstraintApplicationResult<>(
handle,
remainingFilter));
}
Этот пример иллюстрирует реализацию базового класса для многих JDBC-коннекторов с учетом особых требований нескольких JDBC-совместимых источников данных. Он гарантирует, что если ограничение передается через pushdown, оно работает в нижележащем источнике данных точно так же и дает те же результаты, что и в Trino. Например, в базах данных, где сравнения строк не зависят от регистра, pushdown не работает, поскольку операции сравнения строк в Trino зависят от регистра.
Интерфейс PredicatePushdownController определяет, можно ли передать домен
столбца через pushdown в JDBC-совместимых источниках данных. В предыдущем
примере он вызывается из реализации JdbcClient, специфичной для этой базы
данных. В источниках данных, не совместимых с JDBC, pushdown на основе типов
реализуется напрямую, без использования интерфейса
PredicatePushdownController.
Следующий пример добавляет pushdown выражений, включаемый флагом сеанса:
@Override
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(
ConnectorSession session,
ConnectorTableHandle table,
Constraint constraint)
{
JdbcTableHandle handle = (JdbcTableHandle) table;
TupleDomain<ColumnHandle> oldDomain = handle.getConstraint();
TupleDomain<ColumnHandle> newDomain = oldDomain.intersect(constraint.getSummary());
List<String> newConstraintExpressions;
TupleDomain<ColumnHandle> remainingFilter;
Optional<ConnectorExpression> remainingExpression;
if (newDomain.isNone()) {
newConstraintExpressions = ImmutableList.of();
remainingFilter = TupleDomain.all();
remainingExpression = Optional.of(Constant.TRUE);
}
else {
// Нужно решить, какие столбцы передавать через pushdown.
// Так как это базовый класс для многих JDBC-коннекторов,
// каждый из которых имеет свои сопоставления типов Trino
// и семантику сравнения, он должен быть гибким.
Map<ColumnHandle, Domain> domains = newDomain.getDomains().orElseThrow();
List<JdbcColumnHandle> columnHandles = domains.keySet().stream()
.map(JdbcColumnHandle.class::cast)
.collect(toImmutableList());
// Получаем информацию о том, как передавать через pushdown каждый
// столбец на основе его типа данных JDBC
List<ColumnMapping> columnMappings = jdbcClient.toColumnMappings(
session,
columnHandles.stream()
.map(JdbcColumnHandle::getJdbcTypeHandle)
.collect(toImmutableList()));
// Вычисляем домены, которые можно безопасно передать через pushdown
// (поддерживаемые), и те, которые нужно фильтровать в Trino
// (неподдерживаемые)
Map<ColumnHandle, Domain> supported = new HashMap<>();
Map<ColumnHandle, Domain> unsupported = new HashMap<>();
for (int i = 0; i < columnHandles.size(); i++) {
JdbcColumnHandle column = columnHandles.get(i);
DomainPushdownResult pushdownResult =
columnMappings.get(i).getPredicatePushdownController().apply(
session,
domains.get(column));
supported.put(column, pushdownResult.getPushedDown());
unsupported.put(column, pushdownResult.getRemainingFilter());
}
newDomain = TupleDomain.withColumnDomains(supported);
remainingFilter = TupleDomain.withColumnDomains(unsupported);
// Нужно ли обрабатывать pushdown выражений?
if (isComplexExpressionPushdown(session)) {
List<String> newExpressions = new ArrayList<>();
List<ConnectorExpression> remainingExpressions = new ArrayList<>();
// Каждое выражение можно разбить на список конъюнктов,
// соединенных через AND. Каждый конъюнкт обрабатывается отдельно.
for (ConnectorExpression expression : extractConjuncts(constraint.getExpression())) {
// Пытаемся преобразовать конъюнкт во что-то, что понимает
// нижележащий источник данных JDBC
Optional<String> converted = jdbcClient.convertPredicate(
session,
expression,
constraint.getAssignments());
if (converted.isPresent()) {
newExpressions.add(converted.get());
}
else {
remainingExpressions.add(expression);
}
}
// Вычисляем, какие части выражения можно передать через pushdown,
// а какие нужно вычислить в движке Trino
newConstraintExpressions = ImmutableSet.<String>builder()
.addAll(handle.getConstraintExpressions())
.addAll(newExpressions)
.build().asList();
remainingExpression = Optional.of(and(remainingExpressions));
}
else {
newConstraintExpressions = ImmutableList.of();
remainingExpression = Optional.empty();
}
}
// Возвращаем пустой Optional, если в фильтрации ничего не изменилось
if (oldDomain.equals(newDomain) &&
handle.getConstraintExpressions().equals(newConstraintExpressions)) {
return Optional.empty();
}
handle = new JdbcTableHandle(
handle.getRelationHandle(),
newDomain,
newConstraintExpressions,
...);
return Optional.of(
remainingExpression.isPresent()
? new ConstraintApplicationResult<>(
handle,
remainingFilter,
remainingExpression.get())
: new ConstraintApplicationResult<>(
handle,
remainingFilter));
}
ConnectorExpression разбивается аналогично TupleDomain. Каждое выражение
можно разбить на независимые конъюнкты. Конъюнкты — это меньшие выражения,
которые при объединении оператором AND эквивалентны исходному выражению.
Каждый конъюнкт можно обработать отдельно. Для большей гибкости каждый из них
преобразуется по правилам, специфичным для коннектора, как определено
реализацией JdbcClient. Непреобразованные конъюнкты возвращаются как
remainingExpression и вычисляются движком Trino.
Менеджер сплитов коннектора#
Менеджер сплитов разбивает данные таблицы на отдельные фрагменты, которые Trino распределяет между рабочими узлами для обработки. Например, коннектор Hive перечисляет файлы для каждого раздела Hive и создает один или несколько сплитов на файл. Для источников данных без секционированных данных хорошая стратегия — просто вернуть один сплит для всей таблицы. Именно эту стратегию использует пример HTTP-коннектора.
Поставщик наборов записей коннектора#
Получив сплит, дескриптор таблицы и список столбцов, поставщик набора записей отвечает за доставку данных в движок выполнения Trino.
Дескрипторы таблицы и столбцов представляют виртуальную таблицу. Их создает сервис метаданных коннектора, который Trino вызывает во время планирования и оптимизации запроса. Такая виртуальная таблица не обязана напрямую соответствовать одной коллекции в источнике данных коннектора. Если коннектор поддерживает pushdown, может существовать несколько виртуальных таблиц, производных от других, которые представляют разные виды нижележащих данных.
Поставщик создает RecordSet, который, в свою очередь, создает
RecordCursor, используемый Trino для чтения значений столбцов каждой строки.
Предоставленный набор записей должен включать только запрошенные столбцы в
порядке, соответствующем списку дескрипторов столбцов, переданному методу
ConnectorRecordSetProvider.getRecordSet(). Набор записей должен вернуть все
строки, содержащиеся в “виртуальной таблице”, представленной TableHandle,
связанным с операцией TableScan.
Для простых коннекторов, где производительность не критична, поставщик набора
записей может вернуть экземпляр InMemoryRecordSet. Набор записей в памяти
можно построить из списков значений для каждой строки, что может быть проще,
чем реализовывать RecordCursor.
Реализация RecordCursor должна отслеживать текущую запись. Она возвращает
значения столбцов по числовой позиции в типе данных, соответствующем
определению столбца в таблице. Когда движок заканчивает чтение текущей записи,
он вызывает у курсора advanceNextPosition.
Сопоставление типов#
Встроенные типы данных SQL используют разные Java-типы в качестве типов-носителей.
Тип SQL |
Тип Java |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Метод RecordCursor.getType(int field) возвращает SQL-тип поля, а значение
поля возвращается одним из следующих методов, соответствующих типу-носителю:
getBoolean(int field)getLong(int field)getDouble(int field)getSlice(int field)getObject(int field)
Значения типа real кодируются в long с использованием битового
представления IEEE 754 одинарного формата для чисел с плавающей точкой с
сохранением NaN. Это можно сделать с помощью статического метода
java.lang.Float.floatToRawIntBits.
Значения типов timestamp(p) with time zone и time(p) with time zone обычной
точности можно преобразовать в long с помощью статических методов класса
io.trino.spi.type.DateTimeEncoding, например pack() или
packDateTimeWithZone().
Строки в кодировке UTF-8 можно преобразовать в Slice с помощью статического
метода Slices.utf8Slice().
Note
Класс Slice предоставляется пакетом io.airlift:slice.
Объекты Int128 можно создавать с помощью метода Int128.valueOf().
Следующий пример создает блок для столбца array(varchar):
private Block encodeArray(List<String> names)
{
BlockBuilder builder = VARCHAR.createBlockBuilder(null, names.size());
blockBuilder.buildEntry(elementBuilder -> names.forEach(name -> {
if (name == null) {
elementBuilder.appendNull();
}
else {
VARCHAR.writeString(elementBuilder, name);
}
}));
return builder.build();
}
Следующий пример создает объект SqlMap для столбца map(varchar, varchar):
private SqlMap encodeMap(Map<String, ?> map)
{
MapType mapType = typeManager.getType(TypeSignature.mapType(
VARCHAR.getTypeSignature(),
VARCHAR.getTypeSignature()));
MapBlockBuilder values = mapType.createBlockBuilder(null, map != null ? map.size() : 0);
if (map == null) {
values.appendNull();
return values.build().getObject(0, Block.class);
}
values.buildEntry((keyBuilder, valueBuilder) -> map.foreach((key, value) -> {
VARCHAR.writeString(keyBuilder, key);
if (value == null) {
valueBuilder.appendNull();
}
else {
VARCHAR.writeString(valueBuilder, value.toString());
}
}));
return values.build().getObject(0, SqlMap.class);
}
Поставщик источника страниц коннектора#
Получив сплит, дескриптор таблицы и список столбцов, поставщик источника
страниц отвечает за доставку данных в движок выполнения Trino. Он создает
ConnectorPageSource, который, в свою очередь, создает объекты Page,
используемые Trino для чтения значений столбцов.
Если он не реализован, используется стандартный RecordPageSourceProvider.
Получив поставщик набора записей, он возвращает экземпляр RecordPageSource,
который строит объекты Page из записей в наборе записей.
Коннектору следует реализовать поставщик источника страниц вместо поставщика набора записей, когда страницы можно создавать напрямую. Преобразование отдельных записей из поставщика набора записей в страницы добавляет накладные расходы во время выполнения запроса.
Поставщик приемника страниц коннектора#
Получив дескриптор таблицы для вставки, поставщик приемника страниц отвечает за
потребление данных из движка выполнения Trino. Он создает ConnectorPageSink,
который, в свою очередь, принимает объекты Page, содержащие значения
столбцов.
Пример, показывающий, как итерироваться по странице для доступа к отдельным значениям:
@Override
public CompletableFuture<?> appendPage(Page page)
{
for (int channel = 0; channel < page.getChannelCount(); channel++) {
Block block = page.getBlock(channel);
for (int position = 0; position < page.getPositionCount(); position++) {
if (block.isNull(position)) {
// или обработайте это иначе
continue;
}
// channel должен соответствовать номеру столбца в таблице
// используйте его для определения ожидаемого типа столбца
String value = VARCHAR.getSlice(block, position).toStringUtf8();
// TODO выполните нужное действие со значением
}
}
return NOT_BLOCKED;
}