Поддержка MERGE#

Движок Trino предоставляет API для поддержки SQL MERGE на уровне строк. Чтобы реализовать MERGE, коннектор должен предоставить следующее:

  • Реализацию ConnectorMergeSink, которая обычно построена поверх ConnectorPageSink.

  • Методы в ConnectorMetadata, чтобы получить дескриптор столбца “rowId”, получить парадигму изменения строк, а также начать и завершить операцию MERGE.

Механизмы движка Trino, используемые для реализации SQL MERGE, также применяются для поддержки SQL DELETE и UPDATE. Это означает, что коннектору достаточно реализовать поддержку SQL MERGE, и коннектор получит все операции языка модификации данных (DML).

Стандартный SQL MERGE#

Разные движки запросов поддерживают различные определения SQL MERGE. Trino поддерживает строгую спецификацию SQL ISO/IEC 9075, опубликованную в 2016 году. В качестве простого примера, пусть таблицы target_table и source_table определены так:

CREATE TABLE accounts (
    customer VARCHAR,
    purchases DECIMAL,
    address VARCHAR);
INSERT INTO accounts (customer, purchases, address) VALUES ...;
CREATE TABLE monthly_accounts_update (
    customer VARCHAR,
    purchases DECIMAL,
    address VARCHAR);
INSERT INTO monthly_accounts_update (customer, purchases, address) VALUES ...;

Вот возможная операция MERGE из monthly_accounts_update в accounts:

MERGE INTO accounts t USING monthly_accounts_update s
    ON (t.customer = s.customer)
    WHEN MATCHED AND s.address = 'Berkeley' THEN
        DELETE
    WHEN MATCHED AND s.customer = 'Joe Shmoe' THEN
        UPDATE SET purchases = purchases + 100.0
    WHEN MATCHED THEN
        UPDATE
            SET purchases = s.purchases + t.purchases, address = s.address
    WHEN NOT MATCHED THEN
        INSERT (customer, purchases, address)
            VALUES (s.customer, s.purchases, s.address);

SQL MERGE пытается сопоставить каждый WHEN в порядке их объявления. Когда совпадение найдено, выполняется соответствующий DELETE, INSERT или UPDATE, а последующие WHEN игнорируются.

SQL MERGE поддерживает две операции над целевой таблицей и источником, когда строка из исходной таблицы или запроса совпадает со строкой в целевой таблице:

  • UPDATE, при котором столбцы в целевой строке обновляются.

  • DELETE, при котором целевая строка удаляется.

В случае NOT MATCHED SQL MERGE поддерживает только операции INSERT. Вставляемые значения произвольны, но обычно берутся из несовпавшей строки исходной таблицы или запроса.

RowChangeParadigm#

Разные коннекторы используют разные способы представления обновлений строк, что определяется нижележащими системами хранения. Движок Trino классифицирует эти разные подходы как элементы перечисления RowChangeParadigm, возвращаемого методом ConnectorMetadata.getRowChangeParadigm(...).

Значения перечисления RowChangeParadigm:

  • CHANGE_ONLY_UPDATED_COLUMNS, предназначено для коннекторов, которые могут обновлять отдельные столбцы строк, идентифицированных по rowId. Соответствующий класс обработчика merge: ChangeOnlyUpdatedColumnsMergeProcessor.

  • DELETE_ROW_AND_INSERT_ROW, предназначено для коннекторов, которые представляют изменение строки как пару из удаления строки и вставки строки. Соответствующий класс обработчика merge: DeleteAndInsertMergeProcessor.

Обзор обработки MERGE#

Оператор MERGE обрабатывается созданием RIGHT JOIN между целевой таблицей и источником по критерию MERGE. Источником может быть таблица или произвольный запрос. Для каждой строки в исходной таблице или запросе MERGE формирует объект ROW, содержащий:

  • значения столбцов данных из случаев UPDATE или INSERT. Для случаев DELETE ненулевыми остаются только столбцы партиционирования, определяющие партиционирование и бакетирование.

  • булевый столбец со значением true для строк источника, совпавших с некоторой строкой цели, и false в противном случае.

  • целое число, обозначающее, является ли операция случая merge UPDATE, DELETE или INSERT, либо это строка источника, для которой не подошел ни один случай. Если строка источника не совпадает ни с одним случаем merge, все значения столбцов данных, кроме определяющих распределение, равны null, а номер операции равен -1.

SearchedCaseExpression создается из результата RIGHT JOIN для представления предложений WHEN в MERGE. В предыдущем примере MERGE выполняется так, как если бы SearchedCaseExpression был записан следующим образом:

SELECT
 CASE
   WHEN present AND s.address = 'Berkeley' THEN
       -- Null values for delete; present=true; operation DELETE=2, case_number=0
       row(null, null, null, true, 2, 0)
   WHEN present AND s.customer = 'Joe Shmoe' THEN
       -- Update column values; present=true; operation UPDATE=3, case_number=1
       row(t.customer, t.purchases + 100.0, t.address, true, 3, 1)
   WHEN present THEN
       -- Update column values; present=true; operation UPDATE=3, case_number=2
       row(t.customer, s.purchases + t.purchases, s.address, true, 3, 2)
   WHEN (present IS NULL) THEN
       -- Insert column values; present=false; operation INSERT=1, case_number=3
       row(s.customer, s.purchases, s.address, false, 1, 3)
   ELSE
       -- Null values for no case matched; present=false; operation=-1,
       --     case_number=-1
       row(null, null, null, false, -1, -1)
 END
 FROM (SELECT *, true AS present FROM target_table) t
   RIGHT JOIN source_table s ON s.customer = t.customer;

Движок Trino выполняет RIGHT JOIN и выражение CASE, гарантирует, что ни одна строка целевой таблицы не сопоставится более чем с одной строкой исходного выражения, и в итоге создает последовательность страниц для маршрутизации на узел, который выполняет метод ConnectorMergeSink.storeMergedRows(...).

Как и в DELETE и UPDATE, строки целевой таблицы для MERGE идентифицируются специфическим для коннектора дескриптором столбца rowId. Для MERGE дескриптор rowId возвращается методом ConnectorMetadata.getMergeRowIdColumnHandle(...).

Перераспределение MERGE#

Реализация MERGE в Trino позволяет UPDATE изменять значения столбцов, определяющих партиционирование и/или бакетирование, поэтому она должна “перераспределять” строки из операции MERGE на рабочие узлы, отвечающие за запись строк с итоговыми столбцами партиционирования и/или бакетирования.

Поскольку процесс MERGE в общем случае требует перераспределения объединенных строк между узлами Trino, порядок строк на сохраняемых страницах неопределен. Коннекторы, такие как Hive, которые зависят от возрастающего порядка rowId для удаляемых строк, должны сортировать удаляемые строки перед их сохранением.

Чтобы гарантировать, что все вставляемые строки для заданной партиции окажутся на одном узле, к ключам партиции/бакета страницы применяется хеш перераспределения. В результате хеширования все строки для конкретной партиции/бакета попадают в одну группу, независимо от того, были ли они строками MATCHED или NOT MATCHED.

Для коннекторов, у которых RowChangeParadigm равен DELETE_ROW_AND_INSERT_ROW, вставляемые строки распределяются по layout, который возвращает ConnectorMetadata.getInsertLayout(). Для некоторых коннекторов этот же layout используется и для обновляемых строк. Другим коннекторам нужен специальный layout для обновляемых строк, который возвращает ConnectorMetadata.getUpdateLayout().

Поддержка MERGE в коннекторе#

Чтобы начать обработку MERGE, движок Trino вызывает:

  • ConnectorMetadata.getMergeRowIdColumnHandle(...), чтобы получить дескриптор столбца rowId.

  • ConnectorMetadata.getRowChangeParadigm(...), чтобы получить парадигму, поддерживаемую коннектором для изменения существующих строк таблицы.

  • ConnectorMetadata.beginMerge(...), чтобы получить ConnectorMergeTableHandle для операции merge. Этот объект ConnectorMergeTableHandle содержит всю информацию, необходимую коннектору для задания операции MERGE.

  • ConnectorMetadata.getInsertLayout(...), из результата которого извлекается список столбцов партиции или таблицы, влияющих на перераспределение записи.

  • ConnectorMetadata.getUpdateLayout(...). Если этот layout не пустой, он используется для распределения обновленных строк, получившихся в результате операции MERGE.

На узлах, являющихся целями хеширования, движок Trino вызывает ConnectorPageSinkProvider.createMergeSink(...) для создания ConnectorMergeSink.

Чтобы записать каждую страницу объединенных строк, движок Trino вызывает ConnectorMergeSink.storeMergedRows(Page). Метод storeMergedRows(Page) итерируется по строкам страницы, выполняя обновления и удаления в случаях MATCHED, а также вставки в случаях NOT MATCHED.

При использовании RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW движок преобразует операции UPDATE в пару операций DELETE и INSERT до вызова storeMergedRows(Page).

Для завершения операции MERGE движок Trino вызывает ConnectorMetadata.finishMerge(...), передавая table handle и коллекцию JSON-объектов, закодированных как экземпляры Slice. Эти объекты содержат специфичную для коннектора информацию о том, что было изменено операцией MERGE. Обычно такой JSON-объект содержит информацию о записанных файлах, а также статистику таблиц и партиций, полученную при выполнении MERGE. Коннектор выполняет соответствующие действия, если они требуются.

Реализация RowChangeProcessor для MERGE#

В реализации MERGE каждой RowChangeParadigm соответствует внутренний класс движка Trino, реализующий интерфейс RowChangeProcessor. У RowChangeProcessor есть один важный метод: Page transformPage(Page). Формат выходной страницы зависит от RowChangeParadigm.

Коннектор не имеет доступа к экземпляру RowChangeProcessor – он используется внутри движка Trino для преобразования строк страницы merge в строки, подлежащие сохранению, на основе выбранной коннектором RowChangeParadigm.

Страница, передаваемая в transformPage(), состоит из:

  • Столбцов перераспределения записи, если они есть.

  • Для партиционированных или бакетированных таблиц, столбца с длинным хеш-значением.

  • Столбца rowId для строки из целевой таблицы при совпадении, либо null при отсутствии совпадения.

  • merge case RowBlock.

  • Блока с целым числом номера case.

  • Байтового блока is_distinct со значением 0, если не distinct.

merge case RowBlock имеет следующий layout:

  • Блоки для каждого столбца таблицы, включая столбцы партиции, в порядке столбцов таблицы.

  • Блок со значением boolean “present”, которое равно true, если строка источника совпала со строкой цели, и false в противном случае.

  • Блок с номером операции случая MERGE, закодированным как INSERT = 1, DELETE = 2, UPDATE = 3, а если ни один случай MERGE не подошел, то -1.

  • Блок с номером случая MERGE (начиная с 0) для предложения WHEN, которое совпало для строки, либо -1, если не совпало ни одно предложение.

Страница, возвращаемая из transformPage, состоит из:

  • Всех столбцов таблицы в порядке столбцов таблицы.

  • Блока операции случая merge типа tinyint.

  • Блока номера случая merge типа integer.

  • Блок rowId остается неизменным относительно входной страницы.

  • Байтового блока со значением 1, если строка является вставкой, полученной из операции обновления, и 0 в противном случае. Этот блок используется для корректного подсчета числа измененных строк в коннекторах, представляющих обновления и удаления вместе со вставками.

transformPage должен гарантировать, что на возвращаемой им странице нет строк с номером операции -1.

Обнаружение дублирующихся совпадений строк целевой таблицы#

Спецификация SQL MERGE требует, чтобы в каждом случае MERGE одна строка целевой таблицы соответствовала не более чем одной строке источника после применения выражения условия случая MERGE. Первый шаг к обнаружению таких ошибок выполняется путем маркировки каждой строки в целевой таблице уникальным идентификатором с использованием узла AssignUniqueId над сканом целевой таблицы. Спроецированные результаты RIGHT JOIN содержат эти уникальные идентификаторы для совпавших строк целевой таблицы, а также номер предложения WHEN. Узел MarkDistinct добавляет столбец is_distinct, который равен true, если нет другой строки с тем же уникальным идентификатором и номером предложения WHEN, и false в противном случае. Если у какой-либо строки is_distinct равен false, выбрасывается исключение MERGE_TARGET_ROW_MULTIPLE_MATCHES, и операция MERGE завершается с ошибкой.

API ConnectorMergeTableHandle#

Интерфейс ConnectorMergeTableHandle определяет один метод, getTableHandle(), чтобы получить ConnectorTableHandle, изначально переданный в ConnectorMetadata.beginMerge().

API ConnectorPageSinkProvider#

Для поддержки SQL MERGE ConnectorPageSinkProvider должен реализовать метод, создающий ConnectorMergeSink:

  • createMergeSink:

    ConnectorMergeSink createMergeSink(
        ConnectorTransactionHandle transactionHandle,
        ConnectorSession session,
        ConnectorMergeTableHandle mergeHandle)
    

API ConnectorMergeSink#

Для поддержки MERGE коннектор должен определить реализацию ConnectorMergeSink, обычно построенную поверх ConnectorPageSink коннектора.

ConnectorMergeSink создается вызовом ConnectorPageSinkProvider.createMergeSink().

Единственные важные методы:

  • storeMergedRows:

    void storeMergedRows(Page page)
    

    Движок Trino вызывает метод storeMergedRows(Page) экземпляра ConnectorMergeSink, возвращенного ConnectorPageSinkProvider.createMergeSink(), передавая страницу, сгенерированную методом RowChangeProcessor.transformPage(). Эта страница состоит из всех столбцов таблицы в порядке столбцов таблицы, затем столбца операции TINYINT, затем столбца номера случая merge INTEGER, затем столбца rowId.

    Задача storeMergedRows() - итерироваться по строкам страницы и обрабатывать их в зависимости от значения столбца операции: INSERT, DELETE, UPDATE, либо игнорировать строку. Выбрав подходящую парадигму, коннектор может запросить преобразование операции UPDATE в операции DELETE и INSERT.

  • finish:

    CompletableFuture<Collection<Slice>> finish()
    

    Движок Trino вызывает finish(), когда все данные были обработаны конкретным экземпляром ConnectorMergeSink. Коннектор возвращает future, содержащий коллекцию Slice, представляющую специфичную для коннектора информацию об обработанных строках. Обычно сюда входит число строк, а также может входить информация, например о созданных или измененных файлах или партициях.

API ConnectorMetadata для MERGE#

Коннектор, реализующий MERGE, должен реализовать следующие методы ConnectorMetadata.

  • getRowChangeParadigm():

    RowChangeParadigm getRowChangeParadigm(
        ConnectorSession session,
        ConnectorTableHandle tableHandle)
    

    Этот метод вызывается, когда движок начинает обработку оператора MERGE. Коннектор должен вернуть экземпляр перечисления RowChangeParadigm. Если коннектор не поддерживает MERGE, он должен выбросить исключение NOT_SUPPORTED, чтобы указать, что SQL MERGE не поддерживается коннектором. Обратите внимание, что реализация по умолчанию уже выбрасывает это исключение, если метод не реализован.

  • getMergeRowIdColumnHandle():

    ColumnHandle getMergeRowIdColumnHandle(
        ConnectorSession session,
        ConnectorTableHandle tableHandle)
    

    Этот метод вызывается на ранних этапах планирования запроса для операторов MERGE. Возвращаемый ColumnHandle предоставляет rowId, используемый коннектором для идентификации объединяемых строк, а также любые другие поля строки, которые нужны коннектору для завершения операции MERGE.

  • getInsertLayout():

    Optional<ConnectorTableLayout> getInsertLayout(
        ConnectorSession session,
        ConnectorTableHandle tableHandle)
    

    Этот метод вызывается во время планирования запроса для получения layout таблицы, который будет использоваться для строк, вставляемых операцией MERGE. Для некоторых коннекторов этот layout также используется для удаляемых строк.

  • getUpdateLayout():

    Optional<ConnectorTableLayout> getUpdateLayout(
        ConnectorSession session,
        ConnectorTableHandle tableHandle)
    

    Этот метод вызывается во время планирования запроса для получения layout таблицы, который будет использоваться для строк, удаляемых операцией MERGE. Если optional возвращаемое значение присутствует, движок Trino использует этот layout для обновляемых строк. Иначе используется результат ConnectorMetadata.getInsertLayout для распределения обновляемых строк.

  • beginMerge():

    ConnectorMergeTableHandle beginMerge(
         ConnectorSession session,
         ConnectorTableHandle tableHandle)
    

    На последнем шаге построения плана выполнения MERGE вызывается метод коннектора beginMerge(), куда передаются session и tableHandle.

    beginMerge() выполняет в коннекторе всю оркестрацию, необходимую для начала обработки MERGE. Эта оркестрация отличается от коннектора к коннектору. Например, в случае коннектора Hive, работающего с транзакционными таблицами, beginMerge() проверяет, что таблица является транзакционной, и начинает транзакцию Hive Metastore.

    beginMerge() возвращает ConnectorMergeTableHandle с дополнительной информацией, которая нужна коннектору, когда этот дескриптор передается обратно в finishMerge() и в механизмы генерации split. Для большинства коннекторов возвращаемый table handle содержит как минимум флаг, помечающий этот дескриптор таблицы как дескриптор для операции MERGE.

  • finishMerge():

    void finishMerge(
        ConnectorSession session,
        ConnectorMergeTableHandle tableHandle,
        Collection<Slice> fragments)
    

    Во время обработки MERGE движок Trino накапливает коллекции Slice, возвращаемые ConnectorMergeSink.finish(). Движок вызывает finishMerge(), передавая table handle и эту коллекцию фрагментов Slice. В ответ коннектор выполняет необходимые действия для завершения операции MERGE. Эти действия могут включать фиксацию базовой транзакции (если она есть) или освобождение любых других ресурсов.