Executor (исполнитель)
Executors — механизм, с помощью которого экземпляры задач выполняются. У них общий API, и они «подключаемые» (pluggable): исполнитель можно менять в зависимости от окружения.
Исполнитель задаётся опцией executor в секции [core] файла конфигурации.
Встроенные исполнители указываются по имени, например:
[core]
executor = KubernetesExecutor
Собственные или сторонние исполнители задаются путём к Python-классу исполнителя:
[core]
executor = my.custom.executor.module.ExecutorClass
Примечание. Подробнее о конфигурации Airflow: Setting Configuration Options.
Текущий исполнитель можно проверить командой airflow config get-value core executor:
$ airflow config get-value core executor
LocalExecutor
Типы исполнителей
В дереве репозитория есть только один тип исполнителя, который выполняет задачи локально (в процессе scheduler); аналогичного поведения можно добиться своими реализациями. Остальные выполняют задачи удалённо (обычно через пул воркеров). По умолчанию в Airflow настроен LocalExecutor — локальный и самый простой вариант. Поскольку LocalExecutor запускает процессы в процессе планировщика, это может влиять на его производительность. LocalExecutor подходит для небольших однопользовательских продакшен-установок; для мультимашинных или облачных используют один из удалённых исполнителей.
Локальные исполнители
Задачи Airflow выполняются локально в процессе планировщика.
Плюсы: очень просты в использовании, быстрые, низкая задержка, минимум требований к настройке.
Минусы: ограниченные возможности и общие ресурсы с планировщиком Airflow.
Примеры:
Удалённые исполнители
Удалённые исполнители делятся на две категории.
Queued/Batch Executors (очередь/пакет)
Задачи Airflow отправляются в общую очередь, откуда удалённые воркеры забирают и выполняют их. Воркеры часто постоянные и выполняют несколько задач одновременно.
Плюсы: надёжнее за счёт отделения воркеров от планировщика; воркеры могут быть мощными и обрабатывать много задач (часто параллельно), что выгодно по стоимости; задержка может быть небольшой, если воркеры постоянно готовы забирать задачи из очереди.
Минусы: общие воркеры дают эффект «шумного соседа» — задачи конкурируют за ресурсы и конфигурацию окружения; при непостоянной нагрузке воркеры могут простаивать, ресурсы завышены или приходится управлять масштабированием.
Примеры:
Containerized Executors (контейнерные)
Задачи Airflow выполняются по мере постановки в очередь в отдельных контейнерах/подах. Каждая задача изолирована в своём контейнерном окружении, которое разворачивается при постановке задачи в очередь.
Плюсы: каждая задача в одном контейнере — нет «шумного соседа»; окружение можно настраивать под задачу (библиотеки, бинарники, зависимости, объём ресурсов); выгодно по стоимости, так как воркеры живут только на время задачи.
Минусы: задержка на старт — контейнер/под должен подняться до начала задачи; дорого при большом числе коротких/лёгких задач; воркеров нет, но нужно управлять, например, кластером Kubernetes.
Примеры:
Примечание. Новые пользователи иногда думают, что нужен отдельный процесс исполнителя при использовании локального или удалённого executor. Это не так: логика исполнителя выполняется внутри процесса планировщика и в зависимости от выбранного исполнителя запускает задачи локально или нет.
Одновременное использование нескольких исполнителей
Начиная с версии 2.10.0, Airflow может работать с конфигурацией из нескольких исполнителей. У каждого исполнителя свои плюсы и минусы, часто это компромисс между задержкой, изоляцией и эффективностью вычислений (см. сравнение исполнителей). Несколько исполнителей позволяют лучше использовать сильные стороны каждого и обходить слабые — то есть выбирать исполнитель под конкретный набор задач.
Настройка
Несколько исполнителей задаются той же опцией конфигурации, что и один, через список через запятую:
Примечание. Первый исполнитель в списке (один или вместе с другими) ведёт себя так же, как до версии 2.10.0 — это исполнитель по умолчанию для окружения. Любая задача или DAG без явно указанного исполнителя использует его. Остальные исполнители из списка инициализируются и готовы выполнять задачи, если они указаны у задачи или DAG. Исполнитель, не указанный в этом списке, не может использоваться для выполнения задач.
Примеры корректной конфигурации:
[core]
executor = LocalExecutor
[core]
executor = LocalExecutor,CeleryExecutor
[core]
executor = KubernetesExecutor,my.custom.module.ExecutorClass
Примечание. Использование двух экземпляров одного и того же класса исполнителя сейчас не поддерживается.
Для удобного указания исполнителя в задачах и DAG поддерживаются алиасы. Алиас задаётся в конфигурации и используется в DAG (см. ниже):
[core]
executor = LocalExecutor,ShortName:my.custom.module.ExecutorClass
Примечание. Если в DAG для задачи указан исполнитель, которого нет в конфигурации, DAG не пройдёт разбор и в UI Airflow появится предупреждение. Убедитесь, что все нужные исполнители указаны в конфигурации Airflow на каждом хосте/контейнере, где запущен компонент Airflow (scheduler, воркеры и т.д.).
Написание DAG и задач
Чтобы задать исполнитель для задачи, используйте параметр executor у операторов Airflow:
BashOperator(
task_id="hello_world",
executor="LocalExecutor",
bash_command="echo 'hello world!'",
)
@task(executor="LocalExecutor")
def hello_world():
print("hello world!")
Чтобы задать исполнитель для всего DAG, используйте механизм default_args. Все задачи DAG будут использовать указанный исполнитель (если задача явно его не переопределяет):
def hello_world():
print("hello world!")
def hello_world_again():
print("hello world again!")
with DAG(
dag_id="hello_worlds",
default_args={"executor": "LocalExecutor"}, # Для всех задач в DAG
) as dag:
hw = hello_world()
hw_again = hello_world_again()
Примечание. Исполнитель, на котором настроена задача, сохраняется в БД Airflow. Изменения учитываются после каждого разбора DAG.
Мониторинг
При одном исполнителе метрики Airflow ведут себя как до версии 2.9. При нескольких исполнителях метрики исполнителя (executor.open_slots, executor.queued_slots, executor.running_tasks) публикуются для каждого сконфигурированного исполнителя, с суффиксом имени исполнителя (например, executor.open_slots.<имя>).
Логирование работает так же, как при одном исполнителе.
Гибридные исполнители со статическим кодом
Раньше были два «статически зашитых» гибридных исполнителя; начиная с Airflow 3.0.0 они не поддерживаются.
Это гибриды двух исполнителей: LocalKubernetesExecutor и CeleryKubernetesExecutor. Их реализация не входит в ядро Airflow. Они использовали поле queue у Task Instance, чтобы хранить и указывать, на каком под-исполнителе запускать задачу. Это неправильное использование поля queue, из-за чего его нельзя было применять по назначению.
Такие исполнители также требовали ручного создания новых «конкретных» классов под каждую комбинацию исполнителей. При росте числа исполнителей это нецелесообразно и увеличивает поддержку. Любая комбинация исполнителей не должна требовать отдельной реализации.
Поэтому с Airflow 3.0.0 такие исполнители не поддерживаются. Рекомендуется использовать функцию «Using Multiple Executors Concurrently».
Написание собственного исполнителя
Все исполнители Airflow реализуют общий интерфейс, чтобы их можно было подключать и чтобы любой исполнитель имел доступ ко всем возможностям и интеграциям Airflow. В основном планировщик Airflow взаимодействует с исполнителем через этот интерфейс; его же используют логирование и CLI. Публичный интерфейс — BaseExecutor. Актуальная детализация — в коде; ниже перечислены основные моменты.
Примечание. Подробнее о публичном интерфейсе Airflow: Public Interface for Airflow 3.0+.
Собственный исполнитель может понадобиться, если:
- Нет подходящего исполнителя под ваш сценарий (конкретный инструмент или сервис вычислений).
- Хотите использовать исполнитель на базе сервиса вычислений вашего облачного провайдера.
- Есть внутренний инструмент/сервис выполнения задач, доступный только вам или вашей организации.
Workloads (нагрузки)
В контексте исполнителя workload — базовая единица выполнения. Это отдельная операция или задача, которую исполнитель запускает на воркере (например, пользовательский код задачи Airflow).
Пример:
ExecuteTask(
token="mock",
ti=TaskInstance(
id=UUID("4d828a62-a417-4936-a7a6-2b3fabacecab"),
task_id="mock",
dag_id="mock",
run_id="mock",
try_number=1,
map_index=-1,
pool_slots=1,
queue="default",
priority_weight=1,
executor_config=None,
parent_context_carrier=None,
context_carrier=None,
queued_dttm=None,
),
dag_rel_path=PurePosixPath("mock.py"),
bundle_info=BundleInfo(name="n/a", version="no matter"),
log_path="mock.log",
type="ExecuteTask",
)
Важные методы BaseExecutor
Их не обязательно переопределять для своей реализации, но полезно знать:
| Метод | Описание |
|---|---|
heartbeat |
Цикл Job планировщика периодически вызывает heartbeat у исполнителя. Одна из основных точек взаимодействия планировщика и исполнителя. Метод обновляет метрики, запускает новые задачи из очереди и обновляет состояние выполняющихся и завершённых задач. |
queue_workload |
Исполнитель Airflow вызывает этот метод BaseExecutor, чтобы передать задачи на выполнение. BaseExecutor просто добавляет workload’ы (см. выше) во внутренний список очереди. Все исполнители в репозитории используют этот метод. |
get_event_buffer |
Планировщик вызывает его, чтобы получить текущее состояние TaskInstance, которые выполняет исполнитель. |
has_task |
Планировщик использует этот метод BaseExecutor, чтобы проверить, есть ли у исполнителя уже эта задача в очереди или в выполнении. |
send_callback |
Отправляет callback’и в настроенный sink исполнителя. |
Обязательные методы для реализации
Минимум для поддержки вашего исполнителя в Airflow нужно переопределить:
| Метод | Описание |
|---|---|
sync |
Вызывается периодически во время heartbeat исполнителя. Реализуйте обновление состояния задач, о которых знает исполнитель. По желанию — запуск задач из очереди, полученных от планировщика. |
execute_async |
Выполняет workload асинхронно. Вызывается (через несколько слоёв) во время heartbeat планировщика. На практике часто только ставит задачи во внутреннюю или внешнюю очередь (например, KubernetesExecutor), но может и выполнять их напрямую (например, LocalExecutor) — зависит от исполнителя. |
Опциональные методы интерфейса
Их переопределение не обязательно для рабочего исполнителя, но даёт дополнительные возможности и стабильность:
| Метод | Описание |
|---|---|
start |
Job планировщика вызывает его после инициализации объекта исполнителя. Здесь можно выполнить дополнительную настройку. |
end |
Вызывается при завершении работы планировщика. Здесь — синхронная очистка и ожидание завершения выполняющихся задач. |
terminate |
Более жёсткая остановка исполнителя, вплоть до прерывания выполняющихся задач без ожидания. |
try_adopt_task_instances |
Задачи, оставшиеся «сиротами» (например, после падения job планировщика), передаются исполнителю для «усыновления» или обработки. Задачи, которые принять нельзя (по умолчанию BaseExecutor считает, что принять нельзя ни одну), нужно вернуть. |
get_cli_commands |
Исполнитель может предоставлять CLI-команды пользователям; подробнее в разделе CLI ниже. |
get_task_log |
Исполнитель может отдавать сообщения в логи задач Airflow; подробнее в разделе Logging ниже. |
Атрибуты совместимости
Интерфейс класса BaseExecutor содержит атрибуты, по которым код ядра Airflow проверяет совместимость вашего исполнителя с возможностями. При написании своего исполнителя задайте их корректно. Каждый атрибут — булев флаг включения/выключения или поддержки возможности:
| Атрибут | Описание |
|---|---|
supports_pickling |
Поддерживает ли исполнитель чтение «маринованных» DAG из БД перед выполнением (а не из файловой системы). |
supports_sentry |
Поддерживает ли исполнитель Sentry. |
is_local |
Локальный исполнитель или удалённый. См. раздел «Типы исполнителей» выше. |
is_single_threaded |
Однопоточный ли исполнитель. Важно для поддерживаемых бэкендов БД: однопоточные могут работать с любым, включая SQLite. |
is_production |
Предназначен ли исполнитель для продакшена. При непродакшенном исполнителе в UI показывается предупреждение. |
serve_logs |
Поддерживает ли исполнитель отдачу логов; см. Logging for Tasks. |
CLI
Исполнитель может предоставлять CLI-команды в утилиту airflow, реализовав метод get_cli_commands. Так сделано, например, у CeleryExecutor и KubernetesExecutor — для настройки воркеров, инициализации окружения и т.п. Команды выдаются только для текущего сконфигурированного исполнителя. Псевдокод:
@staticmethod
def get_cli_commands() -> list[GroupCommand]:
sub_commands = [
ActionCommand(
name="command_name",
help="Description of what this specific command does",
func=lazy_load_command("path.to.python.function.for.command"),
args=(),
),
]
return [
GroupCommand(
name="my_cool_executor",
help="Description of what this group of commands do",
subcommands=sub_commands,
),
]
Примечание. Строгих правил по пространству имён команд Airflow пока нет. Разработчикам следует выбирать достаточно уникальные имена, чтобы не конфликтовать с другими исполнителями и компонентами.
Примечание. При создании нового исполнителя или обновлении существующего не импортируйте и не выполняйте тяжёлый код на уровне модуля. Классы исполнителей импортируются в нескольких местах; медленный импорт ухудшит работу Airflow, особенно CLI.
Logging
Исполнитель может отдавать сообщения в логи задач Airflow, реализовав метод get_task_logs. Это полезно, если окружение выполнения даёт дополнительный контекст при сбоях (например, из-за самого окружения, а не кода задачи), или для логов setup/teardown окружения. KubernetesExecutor использует это, чтобы подмешивать логи пода, в котором выполнялась задача. Псевдокод:
def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
messages = []
log = []
try:
res = helper_function_to_fetch_logs_from_execution_env(ti, try_number)
for line in res:
log.append(remove_escape_codes(line.decode()))
if log:
messages.append("Found logs from execution environment!")
except Exception as e: # Исключение не должно ломать логи задачи
messages.append(f"Failed to find logs from execution environment: {e}")
return messages, ["\n".join(log)]
Дальнейшие шаги
После реализации класса исполнителя с интерфейсом BaseExecutor укажите его в конфигурации Airflow через core.executor — путь к модулю класса:
[core]
executor = my_company.executors.MyCustomExecutor
Примечание. Подробнее о конфигурации: Setting Configuration Options; об управлении Python-модулями в Airflow: Modules Management.
Источник: Airflow 3.1.7 — Executor. Перевод неофициальный.