Fault-tolerant execution#

По умолчанию, если узлу Trino не хватает ресурсов для выполнения задачи или он по иной причине выходит из строя во время выполнения запроса, запрос завершается с ошибкой и должен быть запущен повторно вручную. Чем дольше выполняется запрос, тем выше вероятность подобных сбоев.

Fault-tolerant execution — это механизм в Trino, который позволяет кластеру смягчать последствия сбоев за счёт повторных попыток выполнения запросов или их отдельных задач в случае ошибки. При включённой fault-tolerant execution промежуточные exchange-данные сохраняются (spooled) и могут быть повторно использованы другим worker в случае отказа узла или другой ошибки во время выполнения запроса.

Note

Fault tolerance не применяется к некорректным запросам или другим ошибкам пользователя. Например, Trino не тратит ресурсы на повторные попытки выполнения запроса, который завершился с ошибкой из-за невозможности распарсить SQL.

Пошаговое руководство по настройке кластера Trino с поддержкой fault-tolerant execution для повышения устойчивости обработки запросов см. в Повышение устойчивости обработки запросов.

Configuration#

По умолчанию fault-tolerant execution отключена. Чтобы включить эту возможность, установите свойство конфигурации retry-policy в значение QUERY или TASK в зависимости от желаемой retry policy.

retry-policy=QUERY

Warning

Установка retry-policy может привести к сбоям выполнения запросов с коннекторами, которые явно не поддерживают fault-tolerant execution, что приводит к сообщению об ошибке “This connector does not support query retries”.

Поддержка fault-tolerant execution для SQL-запросов зависит от конкретного коннектора. Подробности приведены в документации каждого коннектора. Следующие коннекторы поддерживают fault-tolerant execution:

Следующие свойства конфигурации управляют поведением fault-tolerant execution в кластере Trino:

Fault-tolerant execution configuration properties#

Property name

Description

Default value

retry-policy

Определяет, что именно повторяется при сбое: либо QUERY для повторного выполнения всего запроса, либо TASK для повторного выполнения отдельных задач при их ошибке. См. retry policy для получения дополнительной информации. Используйте соответствующее session property retry_policy только на кластерах с включённой fault-tolerant execution и, как правило, только для отключения с помощью NONE, поскольку переключение режимов на кластере не тестируется.

NONE

retry-policy.allowed

Список политик повторных попыток, разрешённых для настройки в кластере. Это свойство используется для предотвращения настройки пользователем политики, которая не предназначена для использования в данном кластере.

NONE, QUERY, TASK

exchange.deduplication-buffer-size

Размер данных буфера в памяти coordinator, используемого fault-tolerant execution для хранения выходных данных stages запроса. Если этот буфер заполняется во время выполнения запроса, запрос завершится с ошибкой “Exchange manager must be configured for the failure recovery capabilities to be fully functional”, если не настроен exchange manager.

32MB

fault-tolerant-execution.exchange-encryption-enabled

Включает шифрование данных spooling, см. Encryption для подробностей. Устанавливать это свойство в false не рекомендуется, если Trino обрабатывает чувствительные данные.

true

Дополнительные связанные свойства можно найти в Справочник свойств, в частности в Resource management properties и Exchange properties.

Retry policy#

Свойство конфигурации retry-policy или session property retry_policy определяет, будет ли Trino повторно выполнять весь запрос целиком или отдельные задачи запроса в случае сбоя.

QUERY#

Политика повторных попыток QUERY указывает Trino автоматически повторять выполнение запроса при возникновении ошибки на worker-узле. Политика QUERY рекомендуется, когда основная нагрузка кластера Trino состоит из множества небольших запросов.

По умолчанию Trino не реализует fault tolerance для запросов, размер результирующего набора данных которых превышает 32MB, например для SELECT запросов, возвращающих очень большой объём данных пользователю. Этот лимит можно увеличить, изменив свойство конфигурации exchange.deduplication-buffer-size на значение больше стандартного 32MB, но это приведёт к увеличению использования памяти на coordinator.

Чтобы включить fault-tolerant execution для запросов с большим объёмом результатов, настоятельно рекомендуется настроить exchange manager, который использует внешнее хранилище для spooled-данных и тем самым позволяет хранить данные, превышающие размер буфера в памяти.

TASK#

Политика повторных попыток TASK указывает Trino повторять выполнение отдельных tasks запроса при возникновении сбоя. Для использования политики TASK необходимо настроить exchange manager. Эта политика рекомендуется при выполнении крупных batch-запросов, поскольку кластер может более эффективно повторно выполнять небольшие задачи внутри запроса, чем перезапускать весь запрос целиком.

Когда кластер настроен с политикой TASK, некоторые связанные свойства конфигурации получают значения по умолчанию, соответствующие best practices для fault-tolerant кластера. Однако это автоматическое изменение не затрагивает кластеры, где эти свойства заданы вручную. Если у вас настроено любое из следующих свойств в файле config.properties на кластере с политикой TASK, настоятельно рекомендуется установить свойство task.low-memory-killer.policy query management property в значение total-reservation-on-blocked-nodes, иначе может потребоваться вручную завершать запросы при нехватке памяти в кластере.

Note

Политика TASK лучше всего подходит для крупных batch-запросов, однако она может увеличивать задержки для коротких запросов, выполняемых в большом количестве. В качестве best practice рекомендуется использовать отдельный кластер с политикой TASK для batch-запросов, отдельно от кластера, обрабатывающего короткие запросы.

Encryption#

Trino шифрует данные перед их сохранением (spooling) во внешнее хранилище. Это предотвращает доступ к данным запроса для всех, кроме кластера Trino, который их записал, включая администраторов системы хранения. Для каждого запроса случайным образом генерируется новый ключ шифрования, который удаляется после завершения запроса.

Advanced configuration#

Вы можете дополнительно настроить fault-tolerant execution с помощью следующих свойств конфигурации. Значения по умолчанию подходят для большинства развёртываний, однако их можно изменить для целей тестирования или устранения неполадок.

Retry limits#

Следующие свойства конфигурации определяют пороги, при которых запросы/задачи перестают повторяться в случае повторяющихся сбоев:

Fault tolerance retry limit configuration properties#

Property name

Description

Default value

Retry policy

query-retry-attempts

Максимальное количество попыток повторного выполнения запроса, после которого запрос считается завершившимся с ошибкой.

4

Только QUERY

task-retry-attempts-per-task

Максимальное количество попыток повторного выполнения одной задачи, после которого запрос считается завершившимся с ошибкой.

4

Только TASK

retry-initial-delay

Минимальное время, которое неудавшийся запрос или задача должны ждать перед повторной попыткой выполнения. Может быть переопределено через retry_initial_delay session property.

10s

QUERY и TASK

retry-max-delay

Максимальное время, которое неудавшийся запрос или задача должны ждать перед повторной попыткой. Время ожидания увеличивается при каждой последующей ошибке. Может быть переопределено через retry_max_delay session property.

1m

QUERY и TASK

retry-delay-scale-factor

Коэффициент, на который увеличивается задержка перед повторной попыткой при каждом сбое запроса или задачи. Может быть переопределён через retry_delay_scale_factor session property.

2.0

QUERY и TASK

Task sizing#

При использовании политики TASK важно управлять объёмом данных, обрабатываемых в каждой задаче. Если задачи слишком малы, управление координацией задач может потреблять больше времени и ресурсов, чем непосредственное выполнение задач. Если задачи слишком велики, одна задача может требовать больше ресурсов, чем доступно на одном узле, что может помешать завершению запроса.

Trino поддерживает ограниченное автоматическое управление размером задач. Если возникают проблемы при выполнении задач с fault-tolerant execution, вы можете настроить следующие свойства конфигурации для ручного управления размером задач. Эти свойства применимы только для политики TASK.

Task sizing configuration properties#

Property name

Description

Default value

fault-tolerant-execution-standard-split-size

Стандартный split размер данных, обрабатываемый задачами, которые читают данные из исходных таблиц. Значение интерпретируется с учётом веса split. Если вес split, возвращаемый каталогом, указывает, что они легче или тяжелее “стандартного” split, количество split, обрабатываемых одной задачей, корректируется соответствующим образом.

Может быть переопределено для текущей сессии через fault_tolerant_execution_standard_split_size session property.

64MB

fault-tolerant-execution-max-task-split-count

Максимальное количество splits, обрабатываемых одной задачей. Это значение не учитывает вес split и служит защитой от ситуаций, когда каталог сообщает некорректный вес split.

Может быть переопределено для текущей сессии через fault_tolerant_execution_max_task_split_count session property.

2048

fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-period

Количество задач, создаваемых для любой стадии произвольного распределения (non-writer), после которого увеличивается размер задачи.

64

fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-factor

Коэффициент роста для адаптивного изменения размера задач (non-writer) с произвольным распределением при fault-tolerant execution. Нижняя граница — 1.0. При каждом увеличении размера задачи новый целевой размер равен предыдущему, умноженному на этот коэффициент.

1.26

fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-min

Начальный/минимальный целевой размер данных для входных данных задач (non-writer) с произвольным распределением.

512MB

fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-max

Максимальный целевой размер данных входных данных для каждой задачи (non-writer) с произвольным распределением.

50GB

fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-period

Количество задач, создаваемых для любой стадии записи (writer) с произвольным распределением, после которого увеличивается размер задачи.

64

fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-factor

Коэффициент роста для адаптивного изменения размера задач (writer) с произвольным распределением при fault-tolerant execution. Нижняя граница — 1.0. При каждом увеличении размера задачи новый целевой размер равен предыдущему, умноженному на этот коэффициент.

1.26

fault-tolerant-execution-arbitrary-distribution-write-task-target-size-min

Начальный/минимальный целевой размер данных входных данных для задач записи (writer) с произвольным распределением.

4GB

fault-tolerant-execution-arbitrary-distribution-write-task-target-size-max

Максимальный целевой размер данных входных данных для задач записи (writer) с произвольным распределением.

50GB

fault-tolerant-execution-hash-distribution-compute-task-target-size

Целевой размер данных входных данных для задач (non-writer) с hash-распределением при fault-tolerant execution.

512MB

fault-tolerant-execution-hash-distribution-write-task-target-size

Целевой размер данных входных данных для задач записи (writer) с hash-распределением при fault-tolerant execution.

4GB

fault-tolerant-execution-hash-distribution-write-task-target-max-count

Мягкое верхнее ограничение на количество задач записи (writer) на стадии с hash-распределением при fault-tolerant execution.

2000

Node allocation#

При использовании политики TASK узлы назначаются задачам на основе доступной памяти и оценочного потребления памяти. Если задача завершается с ошибкой из-за превышения доступной памяти на узле, она перезапускается с запросом на выделение целого узла для её выполнения.

Начальная оценка требований к памяти задачи является статической и настраивается с помощью свойства конфигурации fault-tolerant-execution-task-memory. Это свойство применяется только для политики TASK.

Node allocation configuration properties#

Property name

Description

Default value

fault-tolerant-execution-task-memory

Начальная оценка размера данных памяти задачи, используемая для bin-packing при назначении узлов задачам. Может быть переопределена для текущей сессии через fault_tolerant_execution_task_memory session property.

5GB

Other tuning#

Следующее дополнительное свойство конфигурации может использоваться для управления fault-tolerant execution:

Other fault-tolerant execution configuration properties#

Property name

Description

Default value

Retry policy

fault-tolerant-execution-task-descriptor-storage-max-memory

Максимальный размер данных памяти, используемой для хранения дескрипторов задач для fault-tolerant запросов на coordinator. Дополнительная память необходима для возможности повторного планирования задач в случае сбоя.

(размер heap JVM * 0.15)

Только TASK

fault-tolerant-execution-max-partition-count

Максимальное количество партиций, используемых для распределённых join и aggregation, аналогично свойству query.max-hash-partition-count query management property. Не рекомендуется увеличивать это значение выше значения по умолчанию 50, так как это может привести к нестабильности и снижению производительности. Может быть переопределено для текущей сессии через fault_tolerant_execution_max_partition_count session property.

50

Только TASK

fault-tolerant-execution-min-partition-count

Минимальное количество партиций, используемых для распределённых join и aggregation, аналогично свойству query.min-hash-partition-count query management property. Может быть переопределено для текущей сессии через fault_tolerant_execution_min_partition_count session property.

4

Только TASK

fault-tolerant-execution-min-partition-count-for-write

Минимальное количество партиций, используемых для распределённых join и aggregation в write-запросах, аналогично свойству query.min-hash-partition-count-for-write query management property. Может быть переопределено для текущей сессии через fault_tolerant_execution_min_partition_count_for_write session property.

50

Только TASK

max-tasks-waiting-for-node-per-query

Позволяет до указанного количества задач ожидать назначения узла для одного запроса, прежде чем приостановить планирование других задач этого запроса.

50

Только TASK

Exchange manager#

Exchange spooling отвечает за хранение и управление spooled-данными для fault-tolerant execution. Вы можете настроить exchange manager на основе файловой системы, который сохраняет spooled-данные в указанном месте, например AWS S3 и S3-совместимые системы, Azure Blob Storage, Google Cloud Storage, Alluxio или HDFS.

Configuration#

Чтобы настроить exchange manager, создайте новый файл конфигурации etc/exchange-manager.properties на coordinator и всех worker-узлах. В этом файле установите свойство exchange-manager.name в значение filesystem или hdfs, а также задайте дополнительные свойства в зависимости от используемой системы хранения.

Вы также можете указать расположение файла конфигурации exchange manager в config.properties с помощью свойства exchange-manager.config-file. Если это свойство задано, Trino загружает конфигурацию exchange manager из указанного пути вместо стандартного etc/exchange-manager.properties.

В следующей таблице перечислены доступные свойства конфигурации для exchange-manager.properties, их значения по умолчанию и файловые системы, для которых они применимы:

Exchange manager configuration properties#

Property name

Description

Default value

Supported filesystem

exchange.base-directories

Список URI (через запятую), которые exchange manager использует для хранения spooling-данных.

Any

exchange.max-page-storage-size

Максимальный размер хранения страницы, записываемой в sink, включая саму страницу и её размер.

16MB

Any

exchange.sink-buffer-pool-min-size

Минимальный размер пула буферов для exchange sink. Чем больше размер пула, тем выше параллелизм записи и потребление памяти.

10

Any

exchange.sink-buffers-per-partition

Количество буферов на партицию в пуле буферов. Чем больше размер пула, тем выше параллелизм записи и потребление памяти.

2

Any

exchange.sink-max-file-size

Максимальный размер данных файлов, записываемых exchange sink.

1GB

Any

exchange.source-concurrent-readers

Количество параллельных reader-ов для чтения из spooling-хранилища. Чем больше значение, тем выше параллелизм чтения и потребление памяти.

4

Any

exchange.s3.aws-access-key

AWS access key. Обязателен для подключения к AWS S3 и GCS, может быть проигнорирован для других S3-совместимых систем.

AWS S3, GCS

exchange.s3.aws-secret-key

AWS secret key. Обязателен для подключения к AWS S3 и GCS, может быть проигнорирован для других S3-совместимых систем.

AWS S3, GCS

exchange.s3.iam-role

IAM role, который необходимо использовать.

AWS S3, GCS

exchange.s3.external-id

External ID для trust policy IAM role.

AWS S3, GCS

exchange.s3.region

Регион S3-бакета.

AWS S3, GCS

exchange.s3.endpoint

Endpoint сервера S3-хранилища, если используется S3-совместимая система, отличная от AWS. При использовании AWS S3 можно игнорировать, если только политика бакета не требует HTTPS. Если требуется TLS, можно указать https endpoint, например https://s3.us-east-1.amazonaws.com. Обратите внимание, что TLS является избыточным из-за automatic encryption. Для GCS установите значение https://storage.googleapis.com.

Any S3-compatible storage

exchange.s3.max-error-retries

Максимальное количество повторных попыток запроса клиентом S3 exchange manager.

10

Any S3-compatible storage

exchange.s3.path-style-access

Включает использование path-style access для всех запросов к S3.

false

Any S3-compatible storage

exchange.s3.upload.part-size

Размер части (data size) для multipart upload в S3.

5MB

Any S3-compatible storage

exchange.gcs.json-key-file-path

Путь к JSON-файлу с ключом service account Google Cloud Platform. Не должен использоваться вместе с exchange.gcs.json-key.

GCS

exchange.gcs.json-key

Ключ service account Google Cloud Platform в формате JSON. Не должен использоваться вместе с exchange.gcs.json-key-file-path.

GCS

exchange.azure.endpoint

Endpoint Azure Blob, используемый для доступа к контейнеру spooling. Не должен использоваться вместе с exchange.azure.connection-string.

Azure Blob Storage

exchange.azure.connection-string

Connection string для доступа к контейнеру spooling. Не должен использоваться вместе с exchange.azure.endpoint.

Azure Blob Storage

exchange.azure.block-size

Размер блока (data size) для параллельной загрузки Azure block blob.

4MB

Azure Blob Storage

exchange.azure.max-error-retries

Максимальное количество повторных попыток запроса клиентом Azure exchange manager.

10

Azure Blob Storage

exchange.alluxio.block-size

Размер блока (data size) для хранения в Alluxio.

4MB

Alluxio

exchange.alluxio.site-file-path

Путь к файлу конфигурации alluxio site, содержащему пользовательские настройки, например /etc/alluxio-site.properties. Файл должен существовать на всех узлах кластера Trino. См. документацию Alluxio client configuration для подробностей.

Alluxio

exchange.hdfs.block-size

Размер блока (data size) для хранения в HDFS.

4MB

HDFS

exchange.hdfs.skip-directory-scheme-validation

Отключает проверку схемы директории для поддержки Hadoop-совместимых файловых систем.

false

HDFS

hdfs.config.resources

Список путей (через запятую) к конфигурационным файлам HDFS, например /etc/hdfs-site.xml. Файлы должны существовать на всех узлах кластера Trino.

HDFS

Чтобы снизить общую I/O-нагрузку exchange manager, свойство конфигурации exchange.compression-codec по умолчанию установлено в LZ4. Дополнительно автоматически применяется File compression and decompression, и некоторые параметры можно настроить.

Также рекомендуется настроить lifecycle rule для бакета, чтобы автоматически удалять «заброшенные» объекты в случае сбоя узла.

AWS S3#

Следующий пример конфигурации exchange-manager.properties задаёт S3-бакет AWS в качестве хранилища для spooling. Обратите внимание, что назначение не обязательно должно находиться в AWS — это может быть любая S3-совместимая система хранения. Хотя exchange manager разработан с поддержкой S3-совместимых систем, официально протестированы только AWS S3 и MinIO. Для других систем хранения необходимо выполнить собственное тестирование и обратиться к поставщику за дополнительной информацией.

exchange-manager.name=filesystem
exchange.base-directories=s3://exchange-spooling-bucket
exchange.s3.region=us-west-1
exchange.s3.aws-access-key=example-access-key
exchange.s3.aws-secret-key=example-secret-key

Вы можете настроить несколько S3-бакетов для exchange manager, чтобы распределять spooled-данные между ними и снижать нагрузку I/O на один бакет. Если запрос завершается с ошибкой: “software.amazon.awssdk.services.s3.model.S3Exception: Please reduce your request rate”, это означает, что нагрузка является I/O-интенсивной, и вам следует указать несколько S3-бакетов в exchange.base-directories для балансировки нагрузки:

exchange.base-directories=s3://exchange-spooling-bucket-1,s3://exchange-spooling-bucket-2

Azure Blob Storage#

Следующий пример конфигурации exchange-manager.properties задаёт контейнер Azure Blob Storage в качестве хранилища для spooling. Необходимо использовать именно Azure Blob Storage, а не Azure Data Lake Storage или любые другие иерархические варианты хранения в Azure.

exchange-manager.name=filesystem
exchange.base-directories=abfs://container_name@account_name.dfs.core.windows.net
exchange.azure.connection-string=connection-string

Google Cloud Storage#

Чтобы включить exchange spooling на GCS в Trino, измените endpoint запросов на URI Google Storage https://storage.googleapis.com и настройте AWS access/secret keys для использования HMAC-ключей GCS. Если вы разворачиваете Trino в GCP, необходимо либо создать service account с доступом к бакету для spooling, либо указать путь к файлу учётных данных GCS.

Для получения дополнительной информации о совместимости GCS с S3 см. документацию Google Cloud по миграции с S3.

Следующий пример конфигурации exchange-manager.properties задаёт GCS-бакет в качестве хранилища для spooling.

exchange-manager.name=filesystem
exchange.base-directories=gs://exchange-spooling-bucket
exchange.s3.region=us-west-1
exchange.s3.aws-access-key=example-access-key
exchange.s3.aws-secret-key=example-secret-key
exchange.s3.endpoint=https://storage.googleapis.com
exchange.gcs.json-key-file-path=/path/to/gcs_keyfile.json

Alluxio#

В приведенном ниже примере конфигурации exchange-manager.properties в качестве места назначения для буферизации указывается Alluxio.

exchange-manager.name=filesystem
exchange.base-directories=alluxio://alluxio-master:19998/exchange-spooling-directory
exchange.alluxio.site-file-path=/path/to/alluxio-site.properties

HDFS#

Следующий пример конфигурации exchange-manager.properties задаёт HDFS в качестве хранилища для spooling.

exchange-manager.name=hdfs
exchange.base-directories=hadoop-master:9000/exchange-spooling-directory
hdfs.config.resources=/usr/lib/hadoop/etc/hadoop/core-site.xml

Если вы хотите использовать файловую систему, совместимую с Hadoop, в качестве хранилища spooling, необходимо включить exchange.hdfs.skip-directory-scheme-validation в exchange-manager.properties, если при настройке exchange.base-directories используется специфичная схема, отличная от hdfs. Также могут потребоваться следующие шаги:

  1. Настроить реализацию AbstractFileSystem в core-site.xml.

  2. Добавить соответствующие клиентские JAR-файлы в директорию ${Trino_HOME}/plugin/exchange-hdfs на всех узлах кластера Trino.

Local filesystem storage#

Следующий пример конфигурации exchange-manager.properties задаёт локальную директорию /tmp/trino-exchange-manager в качестве хранилища для spooling.

Note

Рекомендуется использовать локальную файловую систему для exchange только в standalone, не production-кластерах. Локальная директория может использоваться в распределённом кластере только в том случае, если она разделяется и доступна со всех узлов.

exchange-manager.name=filesystem
exchange.base-directories=/tmp/trino-exchange-manager

Adaptive plan optimizations#

Режим fault-tolerant execution предоставляет несколько адаптивных оптимизаций плана, которые динамически изменяют планы выполнения запросов на основе статистики времени выполнения. Для получения дополнительной информации см. Адаптивные оптимизации плана.