Sensors (сенсоры)
Sensors — особый тип операторов, которые делают только одно: ждут наступления какого-то условия. Это может быть ожидание по времени, появления файла или внешнего события — сенсор только ждёт, пока что-то произойдёт, затем завершается успехом, и нижестоящие задачи могут запуститься.
Поскольку сенсоры в основном простаивают, у них есть два режима работы, чтобы использовать ресурсы эффективнее:
poke(по умолчанию): сенсор занимает слот воркера на всё время работы.reschedule: сенсор занимает слот воркера только на время проверки, между проверками «засыпает» на заданный интервал.
Режимы poke и reschedule задаются при создании экземпляра сенсора. Выбор между ними — по сути компромисс по задержке: проверки раз в секунду лучше в poke, раз в минуту — в reschedule.
Как и операторы, сенсоры в Airflow представлены большим набором готовых классов — в ядре и в провайдерах.
См. также: Deferrable Operators & Triggers.
Параметры BaseSensorOperator
Все сенсоры в Airflow в итоге наследуются от BaseSensorOperator (напрямую или через другие классы). Базовый класс задаёт общее поведение и параметры: как сенсор ждёт, повторяет попытки и использует слот воркера.
После рефакторинга Task SDK BaseSensorOperator реализован в Task SDK. Документация провайдеров генерируется отдельно, поэтому эти параметры не всегда видны на страницах API отдельных сенсоров провайдеров, но действуют для всех сенсоров.
Общие параметры
У BaseSensorOperator есть следующие параметры, доступные у всех сенсоров:
| Параметр | Описание |
|---|---|
poke_interval |
Интервал в секундах между проверками. В режиме poke сенсор спит между проверками, занимая слот воркера. В режиме reschedule задача откладывается и перепланируется через этот интервал. |
timeout |
Максимальное время в секундах, после которого сенсор завершается с ошибкой. Отсчёт от первой попытки выполнения, не от каждой проверки. |
mode |
Определяет, как сенсор использует слот воркера. poke (по умолчанию): слот занят всё время. reschedule: слот освобождается между проверками. |
soft_fail |
Если True, при срабатывании таймаута сенсор помечается как SKIPPED, а не FAILED. |
exponential_backoff |
Если включён, пауза между проверками растёт по экспоненте до max_wait. Полезно при опросе внешних систем с нестабильной доступностью. |
max_wait |
Верхняя граница (в секундах) паузы между проверками при включённом exponential_backoff. |
Актуальная справка по API — в документации Task SDK для BaseSensorOperator:
https://airflow.apache.org/docs/task-sdk/stable/api.html#airflow.sdk.BaseSensorOperator
Пример
BashSensor(
task_id="wait_for_file",
bash_command="test -f /data/input.csv",
poke_interval=60,
timeout=60 * 60,
mode="reschedule",
)
Источник: Airflow 3.1.7 — Sensors. Перевод неофициальный.