Fault-tolerant execution#
По умолчанию, если узлу Trino не хватает ресурсов для выполнения задачи или он по иной причине выходит из строя во время выполнения запроса, запрос завершается с ошибкой и должен быть запущен повторно вручную. Чем дольше выполняется запрос, тем выше вероятность подобных сбоев.
Fault-tolerant execution — это механизм в Trino, который позволяет кластеру смягчать последствия сбоев за счёт повторных попыток выполнения запросов или их отдельных задач в случае ошибки. При включённой fault-tolerant execution промежуточные exchange-данные сохраняются (spooled) и могут быть повторно использованы другим worker в случае отказа узла или другой ошибки во время выполнения запроса.
Note
Fault tolerance не применяется к некорректным запросам или другим ошибкам пользователя. Например, Trino не тратит ресурсы на повторные попытки выполнения запроса, который завершился с ошибкой из-за невозможности распарсить SQL.
Пошаговое руководство по настройке кластера Trino с поддержкой fault-tolerant execution для повышения устойчивости обработки запросов см. в Повышение устойчивости обработки запросов.
Configuration#
По умолчанию fault-tolerant execution отключена. Чтобы включить эту
возможность, установите свойство конфигурации retry-policy в значение
QUERY или TASK в зависимости от желаемой
retry policy.
retry-policy=QUERY
Warning
Установка retry-policy может привести к сбоям выполнения запросов с
коннекторами, которые явно не поддерживают fault-tolerant execution, что
приводит к сообщению об ошибке “This connector does not support query retries”.
Поддержка fault-tolerant execution для SQL-запросов зависит от конкретного коннектора. Подробности приведены в документации каждого коннектора. Следующие коннекторы поддерживают fault-tolerant execution:
Следующие свойства конфигурации управляют поведением fault-tolerant execution в кластере Trino:
Property name |
Description |
Default value |
|---|---|---|
|
Определяет, что именно повторяется при сбое: либо |
|
|
Список политик повторных попыток, разрешённых для настройки в кластере. Это свойство используется для предотвращения настройки пользователем политики, которая не предназначена для использования в данном кластере. |
|
|
Размер данных буфера в памяти coordinator, используемого fault-tolerant execution для хранения выходных данных stages запроса. Если этот буфер заполняется во время выполнения запроса, запрос завершится с ошибкой “Exchange manager must be configured for the failure recovery capabilities to be fully functional”, если не настроен exchange manager. |
|
|
Включает шифрование данных spooling, см. Encryption
для подробностей. Устанавливать это свойство в |
|
Дополнительные связанные свойства можно найти в Справочник свойств, в частности в Resource management properties и Exchange properties.
Retry policy#
Свойство конфигурации retry-policy или session property retry_policy
определяет, будет ли Trino повторно выполнять весь запрос целиком или
отдельные задачи запроса в случае сбоя.
QUERY#
Политика повторных попыток QUERY указывает Trino автоматически
повторять выполнение запроса при возникновении ошибки на worker-узле.
Политика QUERY рекомендуется, когда основная нагрузка кластера Trino
состоит из множества небольших запросов.
По умолчанию Trino не реализует fault tolerance для запросов, размер
результирующего набора данных которых превышает 32MB, например для
SELECT запросов, возвращающих очень большой объём данных
пользователю. Этот лимит можно увеличить, изменив свойство конфигурации
exchange.deduplication-buffer-size на значение больше стандартного 32MB,
но это приведёт к увеличению использования памяти на coordinator.
Чтобы включить fault-tolerant execution для запросов с большим объёмом результатов, настоятельно рекомендуется настроить exchange manager, который использует внешнее хранилище для spooled-данных и тем самым позволяет хранить данные, превышающие размер буфера в памяти.
TASK#
Политика повторных попыток TASK указывает Trino повторять выполнение
отдельных tasks запроса при возникновении сбоя.
Для использования политики TASK необходимо настроить
exchange manager. Эта политика рекомендуется
при выполнении крупных batch-запросов, поскольку кластер может более
эффективно повторно выполнять небольшие задачи внутри запроса, чем
перезапускать весь запрос целиком.
Когда кластер настроен с политикой TASK, некоторые связанные свойства
конфигурации получают значения по умолчанию, соответствующие best practices
для fault-tolerant кластера. Однако это автоматическое изменение не
затрагивает кластеры, где эти свойства заданы вручную. Если у вас настроено
любое из следующих свойств в файле config.properties на кластере с
политикой TASK, настоятельно рекомендуется установить свойство
task.low-memory-killer.policy
query management property в
значение total-reservation-on-blocked-nodes, иначе может потребоваться
вручную завершать запросы при нехватке памяти в кластере.
Note
Политика TASK лучше всего подходит для крупных batch-запросов, однако
она может увеличивать задержки для коротких запросов, выполняемых в
большом количестве. В качестве best practice рекомендуется использовать
отдельный кластер с политикой TASK для batch-запросов, отдельно от
кластера, обрабатывающего короткие запросы.
Encryption#
Trino шифрует данные перед их сохранением (spooling) во внешнее хранилище. Это предотвращает доступ к данным запроса для всех, кроме кластера Trino, который их записал, включая администраторов системы хранения. Для каждого запроса случайным образом генерируется новый ключ шифрования, который удаляется после завершения запроса.
Advanced configuration#
Вы можете дополнительно настроить fault-tolerant execution с помощью следующих свойств конфигурации. Значения по умолчанию подходят для большинства развёртываний, однако их можно изменить для целей тестирования или устранения неполадок.
Retry limits#
Следующие свойства конфигурации определяют пороги, при которых запросы/задачи перестают повторяться в случае повторяющихся сбоев:
Property name |
Description |
Default value |
Retry policy |
|---|---|---|---|
|
Максимальное количество попыток повторного выполнения запроса, после которого запрос считается завершившимся с ошибкой. |
|
Только |
|
Максимальное количество попыток повторного выполнения одной задачи, после которого запрос считается завершившимся с ошибкой. |
|
Только |
|
Минимальное время, которое неудавшийся запрос
или задача должны ждать перед повторной попыткой выполнения.
Может быть переопределено через |
|
|
|
Максимальное время, которое неудавшийся запрос
или задача должны ждать перед повторной попыткой. Время ожидания
увеличивается при каждой последующей ошибке. Может быть переопределено
через |
|
|
|
Коэффициент, на который увеличивается задержка перед повторной попыткой
при каждом сбое запроса или задачи. Может быть переопределён через
|
|
|
Task sizing#
При использовании политики TASK важно управлять объёмом данных,
обрабатываемых в каждой задаче. Если задачи слишком малы, управление
координацией задач может потреблять больше времени и ресурсов, чем
непосредственное выполнение задач. Если задачи слишком велики, одна
задача может требовать больше ресурсов, чем доступно на одном узле,
что может помешать завершению запроса.
Trino поддерживает ограниченное автоматическое управление размером задач.
Если возникают проблемы при выполнении задач с fault-tolerant execution,
вы можете настроить следующие свойства конфигурации для ручного управления
размером задач. Эти свойства применимы только для политики TASK.
Property name |
Description |
Default value |
|---|---|---|
|
Стандартный split размер данных, обрабатываемый задачами, которые читают данные из исходных таблиц. Значение интерпретируется с учётом веса split. Если вес split, возвращаемый каталогом, указывает, что они легче или тяжелее “стандартного” split, количество split, обрабатываемых одной задачей, корректируется соответствующим образом. Может быть переопределено для текущей сессии через
|
|
|
Максимальное количество splits, обрабатываемых одной задачей. Это значение не учитывает вес split и служит защитой от ситуаций, когда каталог сообщает некорректный вес split. Может быть переопределено для текущей сессии через
|
|
|
Количество задач, создаваемых для любой стадии произвольного распределения (non-writer), после которого увеличивается размер задачи. |
|
|
Коэффициент роста для адаптивного изменения размера задач (non-writer) с произвольным распределением при fault-tolerant execution. Нижняя граница — 1.0. При каждом увеличении размера задачи новый целевой размер равен предыдущему, умноженному на этот коэффициент. |
|
|
Начальный/минимальный целевой размер данных для входных данных задач (non-writer) с произвольным распределением. |
|
|
Максимальный целевой размер данных входных данных для каждой задачи (non-writer) с произвольным распределением. |
|
|
Количество задач, создаваемых для любой стадии записи (writer) с произвольным распределением, после которого увеличивается размер задачи. |
|
|
Коэффициент роста для адаптивного изменения размера задач (writer) с произвольным распределением при fault-tolerant execution. Нижняя граница — 1.0. При каждом увеличении размера задачи новый целевой размер равен предыдущему, умноженному на этот коэффициент. |
|
|
Начальный/минимальный целевой размер данных входных данных для задач записи (writer) с произвольным распределением. |
|
|
Максимальный целевой размер данных входных данных для задач записи (writer) с произвольным распределением. |
|
|
Целевой размер данных входных данных для задач (non-writer) с hash-распределением при fault-tolerant execution. |
|
|
Целевой размер данных входных данных для задач записи (writer) с hash-распределением при fault-tolerant execution. |
|
|
Мягкое верхнее ограничение на количество задач записи (writer) на стадии с hash-распределением при fault-tolerant execution. |
|
Node allocation#
При использовании политики TASK узлы назначаются задачам на основе
доступной памяти и оценочного потребления памяти. Если задача завершается
с ошибкой из-за превышения доступной памяти на узле, она перезапускается
с запросом на выделение целого узла для её выполнения.
Начальная оценка требований к памяти задачи является статической и
настраивается с помощью свойства конфигурации
fault-tolerant-execution-task-memory. Это свойство применяется только
для политики TASK.
Property name |
Description |
Default value |
|---|---|---|
|
Начальная оценка размера данных памяти задачи,
используемая для bin-packing при назначении узлов задачам. Может быть
переопределена для текущей сессии через
|
|
Other tuning#
Следующее дополнительное свойство конфигурации может использоваться для управления fault-tolerant execution:
Property name |
Description |
Default value |
Retry policy |
|---|---|---|---|
|
Максимальный размер данных памяти, используемой для хранения дескрипторов задач для fault-tolerant запросов на coordinator. Дополнительная память необходима для возможности повторного планирования задач в случае сбоя. |
(размер heap JVM * 0.15) |
Только |
|
Максимальное количество партиций, используемых для распределённых
join и aggregation, аналогично свойству
|
|
Только |
|
Минимальное количество партиций, используемых для распределённых
join и aggregation, аналогично свойству
|
|
Только |
|
Минимальное количество партиций, используемых для распределённых
join и aggregation в write-запросах, аналогично свойству
|
|
Только |
|
Позволяет до указанного количества задач ожидать назначения узла для одного запроса, прежде чем приостановить планирование других задач этого запроса. |
|
Только |
Exchange manager#
Exchange spooling отвечает за хранение и управление spooled-данными для fault-tolerant execution. Вы можете настроить exchange manager на основе файловой системы, который сохраняет spooled-данные в указанном месте, например AWS S3 и S3-совместимые системы, Azure Blob Storage, Google Cloud Storage, Alluxio или HDFS.
Configuration#
Чтобы настроить exchange manager, создайте новый файл конфигурации
etc/exchange-manager.properties на coordinator и всех worker-узлах.
В этом файле установите свойство exchange-manager.name в значение
filesystem или hdfs, а также задайте дополнительные свойства
в зависимости от используемой системы хранения.
Вы также можете указать расположение файла конфигурации exchange manager
в config.properties с помощью свойства exchange-manager.config-file.
Если это свойство задано, Trino загружает конфигурацию exchange manager
из указанного пути вместо стандартного
etc/exchange-manager.properties.
В следующей таблице перечислены доступные свойства конфигурации для
exchange-manager.properties, их значения по умолчанию и файловые
системы, для которых они применимы:
Property name |
Description |
Default value |
Supported filesystem |
|---|---|---|---|
|
Список URI (через запятую), которые exchange manager использует для хранения spooling-данных. |
Any |
|
|
Максимальный размер хранения страницы, записываемой в sink, включая саму страницу и её размер. |
|
Any |
|
Минимальный размер пула буферов для exchange sink. Чем больше размер пула, тем выше параллелизм записи и потребление памяти. |
|
Any |
|
Количество буферов на партицию в пуле буферов. Чем больше размер пула, тем выше параллелизм записи и потребление памяти. |
|
Any |
|
Максимальный размер данных файлов, записываемых exchange sink. |
|
Any |
|
Количество параллельных reader-ов для чтения из spooling-хранилища. Чем больше значение, тем выше параллелизм чтения и потребление памяти. |
|
Any |
|
AWS access key. Обязателен для подключения к AWS S3 и GCS, может быть проигнорирован для других S3-совместимых систем. |
AWS S3, GCS |
|
|
AWS secret key. Обязателен для подключения к AWS S3 и GCS, может быть проигнорирован для других S3-совместимых систем. |
AWS S3, GCS |
|
|
IAM role, который необходимо использовать. |
AWS S3, GCS |
|
|
External ID для trust policy IAM role. |
AWS S3, GCS |
|
|
Регион S3-бакета. |
AWS S3, GCS |
|
|
Endpoint сервера S3-хранилища, если используется S3-совместимая система,
отличная от AWS. При использовании AWS S3 можно игнорировать, если только
политика бакета не требует HTTPS. Если требуется TLS, можно указать
https endpoint, например |
Any S3-compatible storage |
|
|
Максимальное количество повторных попыток запроса клиентом S3 exchange manager. |
|
Any S3-compatible storage |
|
Включает использование path-style access для всех запросов к S3. |
|
Any S3-compatible storage |
|
Размер части (data size) для multipart upload в S3. |
|
Any S3-compatible storage |
|
Путь к JSON-файлу с ключом service account Google Cloud Platform.
Не должен использоваться вместе с |
GCS |
|
|
Ключ service account Google Cloud Platform в формате JSON.
Не должен использоваться вместе с |
GCS |
|
|
Endpoint Azure Blob, используемый для доступа к контейнеру spooling.
Не должен использоваться вместе с |
Azure Blob Storage |
|
|
Connection string для доступа к контейнеру spooling.
Не должен использоваться вместе с |
Azure Blob Storage |
|
|
Размер блока (data size) для параллельной загрузки Azure block blob. |
|
Azure Blob Storage |
|
Максимальное количество повторных попыток запроса клиентом Azure exchange manager. |
|
Azure Blob Storage |
|
Размер блока (data size) для хранения в Alluxio. |
|
Alluxio |
|
Путь к файлу конфигурации alluxio site, содержащему пользовательские
настройки, например |
Alluxio |
|
|
Размер блока (data size) для хранения в HDFS. |
|
HDFS |
|
Отключает проверку схемы директории для поддержки Hadoop-совместимых файловых систем. |
false |
HDFS |
|
Список путей (через запятую) к конфигурационным файлам HDFS,
например |
HDFS |
Чтобы снизить общую I/O-нагрузку exchange manager, свойство конфигурации
exchange.compression-codec по умолчанию установлено в LZ4.
Дополнительно автоматически применяется File compression and decompression, и некоторые
параметры можно настроить.
Также рекомендуется настроить lifecycle rule для бакета, чтобы автоматически удалять «заброшенные» объекты в случае сбоя узла.
AWS S3#
Следующий пример конфигурации exchange-manager.properties задаёт
S3-бакет AWS в качестве хранилища для spooling. Обратите внимание, что
назначение не обязательно должно находиться в AWS — это может быть
любая S3-совместимая система хранения. Хотя exchange manager разработан
с поддержкой S3-совместимых систем, официально протестированы только
AWS S3 и MinIO. Для других систем хранения необходимо выполнить
собственное тестирование и обратиться к поставщику за дополнительной
информацией.
exchange-manager.name=filesystem
exchange.base-directories=s3://exchange-spooling-bucket
exchange.s3.region=us-west-1
exchange.s3.aws-access-key=example-access-key
exchange.s3.aws-secret-key=example-secret-key
Вы можете настроить несколько S3-бакетов для exchange manager, чтобы
распределять spooled-данные между ними и снижать нагрузку I/O на один
бакет. Если запрос завершается с ошибкой:
“software.amazon.awssdk.services.s3.model.S3Exception: Please reduce your
request rate”, это означает, что нагрузка является I/O-интенсивной, и вам следует
указать несколько S3-бакетов в exchange.base-directories для
балансировки нагрузки:
exchange.base-directories=s3://exchange-spooling-bucket-1,s3://exchange-spooling-bucket-2
Azure Blob Storage#
Следующий пример конфигурации exchange-manager.properties задаёт
контейнер Azure Blob Storage в качестве хранилища для spooling. Необходимо
использовать именно Azure Blob Storage, а не Azure Data Lake Storage или
любые другие иерархические варианты хранения в Azure.
exchange-manager.name=filesystem
exchange.base-directories=abfs://container_name@account_name.dfs.core.windows.net
exchange.azure.connection-string=connection-string
Google Cloud Storage#
Чтобы включить exchange spooling на GCS в Trino, измените endpoint
запросов на URI Google Storage https://storage.googleapis.com и
настройте AWS access/secret keys для использования HMAC-ключей GCS.
Если вы разворачиваете Trino в GCP, необходимо либо создать service
account с доступом к бакету для spooling, либо указать путь к файлу
учётных данных GCS.
Для получения дополнительной информации о совместимости GCS с S3 см. документацию Google Cloud по миграции с S3.
Следующий пример конфигурации exchange-manager.properties задаёт
GCS-бакет в качестве хранилища для spooling.
exchange-manager.name=filesystem
exchange.base-directories=gs://exchange-spooling-bucket
exchange.s3.region=us-west-1
exchange.s3.aws-access-key=example-access-key
exchange.s3.aws-secret-key=example-secret-key
exchange.s3.endpoint=https://storage.googleapis.com
exchange.gcs.json-key-file-path=/path/to/gcs_keyfile.json
Alluxio#
В приведенном ниже примере конфигурации exchange-manager.properties в качестве места назначения для буферизации указывается Alluxio.
exchange-manager.name=filesystem
exchange.base-directories=alluxio://alluxio-master:19998/exchange-spooling-directory
exchange.alluxio.site-file-path=/path/to/alluxio-site.properties
HDFS#
Следующий пример конфигурации exchange-manager.properties задаёт HDFS
в качестве хранилища для spooling.
exchange-manager.name=hdfs
exchange.base-directories=hadoop-master:9000/exchange-spooling-directory
hdfs.config.resources=/usr/lib/hadoop/etc/hadoop/core-site.xml
Если вы хотите использовать файловую систему, совместимую с Hadoop, в
качестве хранилища spooling, необходимо включить
exchange.hdfs.skip-directory-scheme-validation в
exchange-manager.properties, если при настройке
exchange.base-directories используется специфичная схема, отличная от
hdfs. Также могут потребоваться следующие шаги:
Настроить реализацию
AbstractFileSystemвcore-site.xml.Добавить соответствующие клиентские JAR-файлы в директорию
${Trino_HOME}/plugin/exchange-hdfsна всех узлах кластера Trino.
Local filesystem storage#
Следующий пример конфигурации exchange-manager.properties задаёт
локальную директорию /tmp/trino-exchange-manager в качестве
хранилища для spooling.
Note
Рекомендуется использовать локальную файловую систему для exchange только в standalone, не production-кластерах. Локальная директория может использоваться в распределённом кластере только в том случае, если она разделяется и доступна со всех узлов.
exchange-manager.name=filesystem
exchange.base-directories=/tmp/trino-exchange-manager
Adaptive plan optimizations#
Режим fault-tolerant execution предоставляет несколько адаптивных оптимизаций плана, которые динамически изменяют планы выполнения запросов на основе статистики времени выполнения. Для получения дополнительной информации см. Адаптивные оптимизации плана.