Поддержка MERGE#
Движок Trino предоставляет API для поддержки SQL MERGE на уровне строк.
Чтобы реализовать MERGE, коннектор должен предоставить следующее:
Реализацию
ConnectorMergeSink, которая обычно построена поверхConnectorPageSink.Методы в
ConnectorMetadata, чтобы получить дескриптор столбца “rowId”, получить парадигму изменения строк, а также начать и завершить операциюMERGE.
Механизмы движка Trino, используемые для реализации SQL MERGE, также применяются для
поддержки SQL DELETE и UPDATE. Это означает, что коннектору достаточно
реализовать поддержку SQL MERGE, и коннектор получит все операции
языка модификации данных (DML).
Стандартный SQL MERGE#
Разные движки запросов поддерживают различные определения SQL MERGE.
Trino поддерживает строгую спецификацию SQL ISO/IEC 9075, опубликованную
в 2016 году. В качестве простого примера, пусть таблицы target_table и
source_table определены так:
CREATE TABLE accounts (
customer VARCHAR,
purchases DECIMAL,
address VARCHAR);
INSERT INTO accounts (customer, purchases, address) VALUES ...;
CREATE TABLE monthly_accounts_update (
customer VARCHAR,
purchases DECIMAL,
address VARCHAR);
INSERT INTO monthly_accounts_update (customer, purchases, address) VALUES ...;
Вот возможная операция MERGE из monthly_accounts_update в
accounts:
MERGE INTO accounts t USING monthly_accounts_update s
ON (t.customer = s.customer)
WHEN MATCHED AND s.address = 'Berkeley' THEN
DELETE
WHEN MATCHED AND s.customer = 'Joe Shmoe' THEN
UPDATE SET purchases = purchases + 100.0
WHEN MATCHED THEN
UPDATE
SET purchases = s.purchases + t.purchases, address = s.address
WHEN NOT MATCHED THEN
INSERT (customer, purchases, address)
VALUES (s.customer, s.purchases, s.address);
SQL MERGE пытается сопоставить каждый WHEN в порядке их объявления.
Когда совпадение найдено, выполняется соответствующий DELETE, INSERT или UPDATE,
а последующие WHEN игнорируются.
SQL MERGE поддерживает две операции над целевой таблицей и источником,
когда строка из исходной таблицы или запроса совпадает со строкой в целевой таблице:
UPDATE, при котором столбцы в целевой строке обновляются.DELETE, при котором целевая строка удаляется.
В случае NOT MATCHED SQL MERGE поддерживает только операции
INSERT. Вставляемые значения произвольны, но обычно берутся
из несовпавшей строки исходной таблицы или запроса.
RowChangeParadigm#
Разные коннекторы используют разные способы представления обновлений строк,
что определяется нижележащими системами хранения. Движок Trino классифицирует
эти разные подходы как элементы перечисления RowChangeParadigm,
возвращаемого методом ConnectorMetadata.getRowChangeParadigm(...).
Значения перечисления RowChangeParadigm:
CHANGE_ONLY_UPDATED_COLUMNS, предназначено для коннекторов, которые могут обновлять отдельные столбцы строк, идентифицированных поrowId. Соответствующий класс обработчика merge:ChangeOnlyUpdatedColumnsMergeProcessor.DELETE_ROW_AND_INSERT_ROW, предназначено для коннекторов, которые представляют изменение строки как пару из удаления строки и вставки строки. Соответствующий класс обработчика merge:DeleteAndInsertMergeProcessor.
Обзор обработки MERGE#
Оператор MERGE обрабатывается созданием RIGHT JOIN между
целевой таблицей и источником по критерию MERGE. Источником может быть
таблица или произвольный запрос. Для каждой строки в исходной таблице или запросе
MERGE формирует объект ROW, содержащий:
значения столбцов данных из случаев
UPDATEилиINSERT. Для случаевDELETEненулевыми остаются только столбцы партиционирования, определяющие партиционирование и бакетирование.булевый столбец со значением
trueдля строк источника, совпавших с некоторой строкой цели, иfalseв противном случае.целое число, обозначающее, является ли операция случая merge
UPDATE,DELETEилиINSERT, либо это строка источника, для которой не подошел ни один случай. Если строка источника не совпадает ни с одним случаем merge, все значения столбцов данных, кроме определяющих распределение, равны null, а номер операции равен -1.
SearchedCaseExpression создается из результата RIGHT JOIN
для представления предложений WHEN в MERGE. В предыдущем примере
MERGE выполняется так, как если бы SearchedCaseExpression был записан следующим образом:
SELECT
CASE
WHEN present AND s.address = 'Berkeley' THEN
-- Null values for delete; present=true; operation DELETE=2, case_number=0
row(null, null, null, true, 2, 0)
WHEN present AND s.customer = 'Joe Shmoe' THEN
-- Update column values; present=true; operation UPDATE=3, case_number=1
row(t.customer, t.purchases + 100.0, t.address, true, 3, 1)
WHEN present THEN
-- Update column values; present=true; operation UPDATE=3, case_number=2
row(t.customer, s.purchases + t.purchases, s.address, true, 3, 2)
WHEN (present IS NULL) THEN
-- Insert column values; present=false; operation INSERT=1, case_number=3
row(s.customer, s.purchases, s.address, false, 1, 3)
ELSE
-- Null values for no case matched; present=false; operation=-1,
-- case_number=-1
row(null, null, null, false, -1, -1)
END
FROM (SELECT *, true AS present FROM target_table) t
RIGHT JOIN source_table s ON s.customer = t.customer;
Движок Trino выполняет RIGHT JOIN и выражение CASE,
гарантирует, что ни одна строка целевой таблицы не сопоставится более чем с одной строкой
исходного выражения, и в итоге создает последовательность страниц для маршрутизации на узел,
который выполняет метод ConnectorMergeSink.storeMergedRows(...).
Как и в DELETE и UPDATE, строки целевой таблицы для MERGE идентифицируются
специфическим для коннектора дескриптором столбца rowId. Для MERGE дескриптор rowId
возвращается методом ConnectorMetadata.getMergeRowIdColumnHandle(...).
Перераспределение MERGE#
Реализация MERGE в Trino позволяет UPDATE изменять
значения столбцов, определяющих партиционирование и/или бакетирование, поэтому
она должна “перераспределять” строки из операции MERGE на рабочие
узлы, отвечающие за запись строк с итоговыми столбцами партиционирования и/или
бакетирования.
Поскольку процесс MERGE в общем случае требует перераспределения
объединенных строк между узлами Trino, порядок строк на сохраняемых страницах
неопределен. Коннекторы, такие как Hive, которые зависят от возрастающего
порядка rowId для удаляемых строк, должны сортировать удаляемые строки перед
их сохранением.
Чтобы гарантировать, что все вставляемые строки для заданной партиции окажутся на
одном узле, к ключам партиции/бакета страницы применяется хеш перераспределения.
В результате хеширования все строки для конкретной партиции/бакета попадают в одну
группу, независимо от того, были ли они строками MATCHED или NOT MATCHED.
Для коннекторов, у которых RowChangeParadigm равен DELETE_ROW_AND_INSERT_ROW,
вставляемые строки распределяются по layout, который возвращает
ConnectorMetadata.getInsertLayout(). Для некоторых коннекторов этот же
layout используется и для обновляемых строк. Другим коннекторам нужен специальный
layout для обновляемых строк, который возвращает ConnectorMetadata.getUpdateLayout().
Поддержка MERGE в коннекторе#
Чтобы начать обработку MERGE, движок Trino вызывает:
ConnectorMetadata.getMergeRowIdColumnHandle(...), чтобы получить дескриптор столбцаrowId.ConnectorMetadata.getRowChangeParadigm(...), чтобы получить парадигму, поддерживаемую коннектором для изменения существующих строк таблицы.ConnectorMetadata.beginMerge(...), чтобы получитьConnectorMergeTableHandleдля операции merge. Этот объектConnectorMergeTableHandleсодержит всю информацию, необходимую коннектору для задания операцииMERGE.ConnectorMetadata.getInsertLayout(...), из результата которого извлекается список столбцов партиции или таблицы, влияющих на перераспределение записи.ConnectorMetadata.getUpdateLayout(...). Если этот layout не пустой, он используется для распределения обновленных строк, получившихся в результате операцииMERGE.
На узлах, являющихся целями хеширования, движок Trino вызывает
ConnectorPageSinkProvider.createMergeSink(...) для создания
ConnectorMergeSink.
Чтобы записать каждую страницу объединенных строк, движок Trino вызывает
ConnectorMergeSink.storeMergedRows(Page). Метод storeMergedRows(Page)
итерируется по строкам страницы, выполняя обновления и удаления
в случаях MATCHED, а также вставки в случаях NOT MATCHED.
При использовании RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW движок
преобразует операции UPDATE в пару операций DELETE и INSERT
до вызова storeMergedRows(Page).
Для завершения операции MERGE движок Trino вызывает
ConnectorMetadata.finishMerge(...), передавая table handle
и коллекцию JSON-объектов, закодированных как экземпляры Slice. Эти
объекты содержат специфичную для коннектора информацию о том, что было изменено
операцией MERGE. Обычно такой JSON-объект содержит информацию о записанных файлах,
а также статистику таблиц и партиций, полученную при выполнении MERGE.
Коннектор выполняет соответствующие действия, если они требуются.
Реализация RowChangeProcessor для MERGE#
В реализации MERGE каждой RowChangeParadigm
соответствует внутренний класс движка Trino, реализующий интерфейс
RowChangeProcessor. У RowChangeProcessor есть один важный метод:
Page transformPage(Page). Формат выходной страницы зависит
от RowChangeParadigm.
Коннектор не имеет доступа к экземпляру RowChangeProcessor – он
используется внутри движка Trino для преобразования строк страницы merge в строки,
подлежащие сохранению, на основе выбранной коннектором RowChangeParadigm.
Страница, передаваемая в transformPage(), состоит из:
Столбцов перераспределения записи, если они есть.
Для партиционированных или бакетированных таблиц, столбца с длинным хеш-значением.
Столбца
rowIdдля строки из целевой таблицы при совпадении, либо null при отсутствии совпадения.merge case
RowBlock.Блока с целым числом номера case.
Байтового блока
is_distinctсо значением 0, если не distinct.
merge case RowBlock имеет следующий layout:
Блоки для каждого столбца таблицы, включая столбцы партиции, в порядке столбцов таблицы.
Блок со значением boolean “present”, которое равно true, если строка источника совпала со строкой цели, и false в противном случае.
Блок с номером операции случая
MERGE, закодированным какINSERT= 1,DELETE= 2,UPDATE= 3, а если ни один случайMERGEне подошел, то -1.Блок с номером случая
MERGE(начиная с 0) для предложенияWHEN, которое совпало для строки, либо -1, если не совпало ни одно предложение.
Страница, возвращаемая из transformPage, состоит из:
Всех столбцов таблицы в порядке столбцов таблицы.
Блока операции случая merge типа tinyint.
Блока номера случая merge типа integer.
Блок rowId остается неизменным относительно входной страницы.
Байтового блока со значением 1, если строка является вставкой, полученной из операции обновления, и 0 в противном случае. Этот блок используется для корректного подсчета числа измененных строк в коннекторах, представляющих обновления и удаления вместе со вставками.
transformPage
должен гарантировать, что на возвращаемой им странице нет строк с номером операции -1.
Обнаружение дублирующихся совпадений строк целевой таблицы#
Спецификация SQL MERGE требует, чтобы в каждом случае MERGE
одна строка целевой таблицы соответствовала не более чем одной строке источника после
применения выражения условия случая MERGE. Первый шаг
к обнаружению таких ошибок выполняется путем маркировки каждой строки в целевой
таблице уникальным идентификатором с использованием узла AssignUniqueId над
сканом целевой таблицы. Спроецированные результаты RIGHT JOIN
содержат эти уникальные идентификаторы для совпавших строк целевой таблицы, а также
номер предложения WHEN. Узел MarkDistinct добавляет столбец
is_distinct, который равен true, если нет другой строки с тем же
уникальным идентификатором и номером предложения WHEN, и false в противном случае. Если
у какой-либо строки is_distinct равен false, выбрасывается
исключение MERGE_TARGET_ROW_MULTIPLE_MATCHES, и операция
MERGE завершается с ошибкой.
API ConnectorMergeTableHandle#
Интерфейс ConnectorMergeTableHandle определяет один метод,
getTableHandle(), чтобы получить ConnectorTableHandle,
изначально переданный в ConnectorMetadata.beginMerge().
API ConnectorPageSinkProvider#
Для поддержки SQL MERGE ConnectorPageSinkProvider должен реализовать
метод, создающий ConnectorMergeSink:
createMergeSink:ConnectorMergeSink createMergeSink( ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorMergeTableHandle mergeHandle)
API ConnectorMergeSink#
Для поддержки MERGE коннектор должен определить
реализацию ConnectorMergeSink, обычно построенную поверх
ConnectorPageSink коннектора.
ConnectorMergeSink создается вызовом
ConnectorPageSinkProvider.createMergeSink().
Единственные важные методы:
storeMergedRows:void storeMergedRows(Page page)
Движок Trino вызывает метод
storeMergedRows(Page)экземпляраConnectorMergeSink, возвращенногоConnectorPageSinkProvider.createMergeSink(), передавая страницу, сгенерированную методомRowChangeProcessor.transformPage(). Эта страница состоит из всех столбцов таблицы в порядке столбцов таблицы, затем столбца операцииTINYINT, затем столбца номера случая mergeINTEGER, затем столбца rowId.Задача
storeMergedRows()- итерироваться по строкам страницы и обрабатывать их в зависимости от значения столбца операции:INSERT,DELETE,UPDATE, либо игнорировать строку. Выбрав подходящую парадигму, коннектор может запросить преобразование операции UPDATE в операцииDELETEиINSERT.finish:CompletableFuture<Collection<Slice>> finish()
Движок Trino вызывает
finish(), когда все данные были обработаны конкретным экземпляромConnectorMergeSink. Коннектор возвращает future, содержащий коллекциюSlice, представляющую специфичную для коннектора информацию об обработанных строках. Обычно сюда входит число строк, а также может входить информация, например о созданных или измененных файлах или партициях.
API ConnectorMetadata для MERGE#
Коннектор, реализующий MERGE, должен реализовать следующие методы
ConnectorMetadata.
getRowChangeParadigm():RowChangeParadigm getRowChangeParadigm( ConnectorSession session, ConnectorTableHandle tableHandle)
Этот метод вызывается, когда движок начинает обработку оператора
MERGE. Коннектор должен вернуть экземпляр перечисленияRowChangeParadigm. Если коннектор не поддерживаетMERGE, он должен выбросить исключениеNOT_SUPPORTED, чтобы указать, что SQLMERGEне поддерживается коннектором. Обратите внимание, что реализация по умолчанию уже выбрасывает это исключение, если метод не реализован.getMergeRowIdColumnHandle():ColumnHandle getMergeRowIdColumnHandle( ConnectorSession session, ConnectorTableHandle tableHandle)
Этот метод вызывается на ранних этапах планирования запроса для операторов
MERGE. Возвращаемый ColumnHandle предоставляетrowId, используемый коннектором для идентификации объединяемых строк, а также любые другие поля строки, которые нужны коннектору для завершения операцииMERGE.getInsertLayout():Optional<ConnectorTableLayout> getInsertLayout( ConnectorSession session, ConnectorTableHandle tableHandle)
Этот метод вызывается во время планирования запроса для получения layout таблицы, который будет использоваться для строк, вставляемых операцией
MERGE. Для некоторых коннекторов этот layout также используется для удаляемых строк.getUpdateLayout():Optional<ConnectorTableLayout> getUpdateLayout( ConnectorSession session, ConnectorTableHandle tableHandle)
Этот метод вызывается во время планирования запроса для получения layout таблицы, который будет использоваться для строк, удаляемых операцией
MERGE. Если optional возвращаемое значение присутствует, движок Trino использует этот layout для обновляемых строк. Иначе используется результатConnectorMetadata.getInsertLayoutдля распределения обновляемых строк.beginMerge():ConnectorMergeTableHandle beginMerge( ConnectorSession session, ConnectorTableHandle tableHandle)
На последнем шаге построения плана выполнения
MERGEвызывается метод коннектораbeginMerge(), куда передаютсяsessionиtableHandle.beginMerge()выполняет в коннекторе всю оркестрацию, необходимую для начала обработкиMERGE. Эта оркестрация отличается от коннектора к коннектору. Например, в случае коннектора Hive, работающего с транзакционными таблицами,beginMerge()проверяет, что таблица является транзакционной, и начинает транзакцию Hive Metastore.beginMerge()возвращаетConnectorMergeTableHandleс дополнительной информацией, которая нужна коннектору, когда этот дескриптор передается обратно вfinishMerge()и в механизмы генерации split. Для большинства коннекторов возвращаемый table handle содержит как минимум флаг, помечающий этот дескриптор таблицы как дескриптор для операцииMERGE.finishMerge():void finishMerge( ConnectorSession session, ConnectorMergeTableHandle tableHandle, Collection<Slice> fragments)
Во время обработки
MERGEдвижок Trino накапливает коллекцииSlice, возвращаемыеConnectorMergeSink.finish(). Движок вызываетfinishMerge(), передавая table handle и эту коллекцию фрагментовSlice. В ответ коннектор выполняет необходимые действия для завершения операцииMERGE. Эти действия могут включать фиксацию базовой транзакции (если она есть) или освобождение любых других ресурсов.