Перейти к содержанию

Sensors (сенсоры)

Sensors — особый тип операторов, которые делают только одно: ждут наступления какого-то условия. Это может быть ожидание по времени, появления файла или внешнего события — сенсор только ждёт, пока что-то произойдёт, затем завершается успехом, и нижестоящие задачи могут запуститься.

Поскольку сенсоры в основном простаивают, у них есть два режима работы, чтобы использовать ресурсы эффективнее:

  • poke (по умолчанию): сенсор занимает слот воркера на всё время работы.
  • reschedule: сенсор занимает слот воркера только на время проверки, между проверками «засыпает» на заданный интервал.

Режимы poke и reschedule задаются при создании экземпляра сенсора. Выбор между ними — по сути компромисс по задержке: проверки раз в секунду лучше в poke, раз в минуту — в reschedule.

Как и операторы, сенсоры в Airflow представлены большим набором готовых классов — в ядре и в провайдерах.

См. также: Deferrable Operators & Triggers.

Параметры BaseSensorOperator

Все сенсоры в Airflow в итоге наследуются от BaseSensorOperator (напрямую или через другие классы). Базовый класс задаёт общее поведение и параметры: как сенсор ждёт, повторяет попытки и использует слот воркера.

После рефакторинга Task SDK BaseSensorOperator реализован в Task SDK. Документация провайдеров генерируется отдельно, поэтому эти параметры не всегда видны на страницах API отдельных сенсоров провайдеров, но действуют для всех сенсоров.

Общие параметры

У BaseSensorOperator есть следующие параметры, доступные у всех сенсоров:

Параметр Описание
poke_interval Интервал в секундах между проверками. В режиме poke сенсор спит между проверками, занимая слот воркера. В режиме reschedule задача откладывается и перепланируется через этот интервал.
timeout Максимальное время в секундах, после которого сенсор завершается с ошибкой. Отсчёт от первой попытки выполнения, не от каждой проверки.
mode Определяет, как сенсор использует слот воркера. poke (по умолчанию): слот занят всё время. reschedule: слот освобождается между проверками.
soft_fail Если True, при срабатывании таймаута сенсор помечается как SKIPPED, а не FAILED.
exponential_backoff Если включён, пауза между проверками растёт по экспоненте до max_wait. Полезно при опросе внешних систем с нестабильной доступностью.
max_wait Верхняя граница (в секундах) паузы между проверками при включённом exponential_backoff.

Актуальная справка по API — в документации Task SDK для BaseSensorOperator:

https://airflow.apache.org/docs/task-sdk/stable/api.html#airflow.sdk.BaseSensorOperator

Пример

BashSensor(
    task_id="wait_for_file",
    bash_command="test -f /data/input.csv",
    poke_interval=60,
    timeout=60 * 60,
    mode="reschedule",
)

Источник: Airflow 3.1.7 — Sensors. Перевод неофициальный.