Перейти к содержанию

Executor (исполнитель)

Executors — механизм, с помощью которого экземпляры задач выполняются. У них общий API, и они «подключаемые» (pluggable): исполнитель можно менять в зависимости от окружения.

Исполнитель задаётся опцией executor в секции [core] файла конфигурации.

Встроенные исполнители указываются по имени, например:

[core]
executor = KubernetesExecutor

Собственные или сторонние исполнители задаются путём к Python-классу исполнителя:

[core]
executor = my.custom.executor.module.ExecutorClass

Примечание. Подробнее о конфигурации Airflow: Setting Configuration Options.

Текущий исполнитель можно проверить командой airflow config get-value core executor:

$ airflow config get-value core executor
LocalExecutor

Типы исполнителей

В дереве репозитория есть только один тип исполнителя, который выполняет задачи локально (в процессе scheduler); аналогичного поведения можно добиться своими реализациями. Остальные выполняют задачи удалённо (обычно через пул воркеров). По умолчанию в Airflow настроен LocalExecutor — локальный и самый простой вариант. Поскольку LocalExecutor запускает процессы в процессе планировщика, это может влиять на его производительность. LocalExecutor подходит для небольших однопользовательских продакшен-установок; для мультимашинных или облачных используют один из удалённых исполнителей.

Локальные исполнители

Задачи Airflow выполняются локально в процессе планировщика.

Плюсы: очень просты в использовании, быстрые, низкая задержка, минимум требований к настройке.

Минусы: ограниченные возможности и общие ресурсы с планировщиком Airflow.

Примеры:

Удалённые исполнители

Удалённые исполнители делятся на две категории.

Queued/Batch Executors (очередь/пакет)

Задачи Airflow отправляются в общую очередь, откуда удалённые воркеры забирают и выполняют их. Воркеры часто постоянные и выполняют несколько задач одновременно.

Плюсы: надёжнее за счёт отделения воркеров от планировщика; воркеры могут быть мощными и обрабатывать много задач (часто параллельно), что выгодно по стоимости; задержка может быть небольшой, если воркеры постоянно готовы забирать задачи из очереди.

Минусы: общие воркеры дают эффект «шумного соседа» — задачи конкурируют за ресурсы и конфигурацию окружения; при непостоянной нагрузке воркеры могут простаивать, ресурсы завышены или приходится управлять масштабированием.

Примеры:

Containerized Executors (контейнерные)

Задачи Airflow выполняются по мере постановки в очередь в отдельных контейнерах/подах. Каждая задача изолирована в своём контейнерном окружении, которое разворачивается при постановке задачи в очередь.

Плюсы: каждая задача в одном контейнере — нет «шумного соседа»; окружение можно настраивать под задачу (библиотеки, бинарники, зависимости, объём ресурсов); выгодно по стоимости, так как воркеры живут только на время задачи.

Минусы: задержка на старт — контейнер/под должен подняться до начала задачи; дорого при большом числе коротких/лёгких задач; воркеров нет, но нужно управлять, например, кластером Kubernetes.

Примеры:

Примечание. Новые пользователи иногда думают, что нужен отдельный процесс исполнителя при использовании локального или удалённого executor. Это не так: логика исполнителя выполняется внутри процесса планировщика и в зависимости от выбранного исполнителя запускает задачи локально или нет.

Одновременное использование нескольких исполнителей

Начиная с версии 2.10.0, Airflow может работать с конфигурацией из нескольких исполнителей. У каждого исполнителя свои плюсы и минусы, часто это компромисс между задержкой, изоляцией и эффективностью вычислений (см. сравнение исполнителей). Несколько исполнителей позволяют лучше использовать сильные стороны каждого и обходить слабые — то есть выбирать исполнитель под конкретный набор задач.

Настройка

Несколько исполнителей задаются той же опцией конфигурации, что и один, через список через запятую:

Примечание. Первый исполнитель в списке (один или вместе с другими) ведёт себя так же, как до версии 2.10.0 — это исполнитель по умолчанию для окружения. Любая задача или DAG без явно указанного исполнителя использует его. Остальные исполнители из списка инициализируются и готовы выполнять задачи, если они указаны у задачи или DAG. Исполнитель, не указанный в этом списке, не может использоваться для выполнения задач.

Примеры корректной конфигурации:

[core]
executor = LocalExecutor
[core]
executor = LocalExecutor,CeleryExecutor
[core]
executor = KubernetesExecutor,my.custom.module.ExecutorClass

Примечание. Использование двух экземпляров одного и того же класса исполнителя сейчас не поддерживается.

Для удобного указания исполнителя в задачах и DAG поддерживаются алиасы. Алиас задаётся в конфигурации и используется в DAG (см. ниже):

[core]
executor = LocalExecutor,ShortName:my.custom.module.ExecutorClass

Примечание. Если в DAG для задачи указан исполнитель, которого нет в конфигурации, DAG не пройдёт разбор и в UI Airflow появится предупреждение. Убедитесь, что все нужные исполнители указаны в конфигурации Airflow на каждом хосте/контейнере, где запущен компонент Airflow (scheduler, воркеры и т.д.).

Написание DAG и задач

Чтобы задать исполнитель для задачи, используйте параметр executor у операторов Airflow:

BashOperator(
    task_id="hello_world",
    executor="LocalExecutor",
    bash_command="echo 'hello world!'",
)
@task(executor="LocalExecutor")
def hello_world():
    print("hello world!")

Чтобы задать исполнитель для всего DAG, используйте механизм default_args. Все задачи DAG будут использовать указанный исполнитель (если задача явно его не переопределяет):

def hello_world():
    print("hello world!")


def hello_world_again():
    print("hello world again!")


with DAG(
    dag_id="hello_worlds",
    default_args={"executor": "LocalExecutor"},  # Для всех задач в DAG
) as dag:
    hw = hello_world()
    hw_again = hello_world_again()

Примечание. Исполнитель, на котором настроена задача, сохраняется в БД Airflow. Изменения учитываются после каждого разбора DAG.

Мониторинг

При одном исполнителе метрики Airflow ведут себя как до версии 2.9. При нескольких исполнителях метрики исполнителя (executor.open_slots, executor.queued_slots, executor.running_tasks) публикуются для каждого сконфигурированного исполнителя, с суффиксом имени исполнителя (например, executor.open_slots.<имя>).

Логирование работает так же, как при одном исполнителе.

Гибридные исполнители со статическим кодом

Раньше были два «статически зашитых» гибридных исполнителя; начиная с Airflow 3.0.0 они не поддерживаются.

Это гибриды двух исполнителей: LocalKubernetesExecutor и CeleryKubernetesExecutor. Их реализация не входит в ядро Airflow. Они использовали поле queue у Task Instance, чтобы хранить и указывать, на каком под-исполнителе запускать задачу. Это неправильное использование поля queue, из-за чего его нельзя было применять по назначению.

Такие исполнители также требовали ручного создания новых «конкретных» классов под каждую комбинацию исполнителей. При росте числа исполнителей это нецелесообразно и увеличивает поддержку. Любая комбинация исполнителей не должна требовать отдельной реализации.

Поэтому с Airflow 3.0.0 такие исполнители не поддерживаются. Рекомендуется использовать функцию «Using Multiple Executors Concurrently».

Написание собственного исполнителя

Все исполнители Airflow реализуют общий интерфейс, чтобы их можно было подключать и чтобы любой исполнитель имел доступ ко всем возможностям и интеграциям Airflow. В основном планировщик Airflow взаимодействует с исполнителем через этот интерфейс; его же используют логирование и CLI. Публичный интерфейс — BaseExecutor. Актуальная детализация — в коде; ниже перечислены основные моменты.

Примечание. Подробнее о публичном интерфейсе Airflow: Public Interface for Airflow 3.0+.

Собственный исполнитель может понадобиться, если:

  • Нет подходящего исполнителя под ваш сценарий (конкретный инструмент или сервис вычислений).
  • Хотите использовать исполнитель на базе сервиса вычислений вашего облачного провайдера.
  • Есть внутренний инструмент/сервис выполнения задач, доступный только вам или вашей организации.

Workloads (нагрузки)

В контексте исполнителя workload — базовая единица выполнения. Это отдельная операция или задача, которую исполнитель запускает на воркере (например, пользовательский код задачи Airflow).

Пример:

ExecuteTask(
    token="mock",
    ti=TaskInstance(
        id=UUID("4d828a62-a417-4936-a7a6-2b3fabacecab"),
        task_id="mock",
        dag_id="mock",
        run_id="mock",
        try_number=1,
        map_index=-1,
        pool_slots=1,
        queue="default",
        priority_weight=1,
        executor_config=None,
        parent_context_carrier=None,
        context_carrier=None,
        queued_dttm=None,
    ),
    dag_rel_path=PurePosixPath("mock.py"),
    bundle_info=BundleInfo(name="n/a", version="no matter"),
    log_path="mock.log",
    type="ExecuteTask",
)

Важные методы BaseExecutor

Их не обязательно переопределять для своей реализации, но полезно знать:

Метод Описание
heartbeat Цикл Job планировщика периодически вызывает heartbeat у исполнителя. Одна из основных точек взаимодействия планировщика и исполнителя. Метод обновляет метрики, запускает новые задачи из очереди и обновляет состояние выполняющихся и завершённых задач.
queue_workload Исполнитель Airflow вызывает этот метод BaseExecutor, чтобы передать задачи на выполнение. BaseExecutor просто добавляет workload’ы (см. выше) во внутренний список очереди. Все исполнители в репозитории используют этот метод.
get_event_buffer Планировщик вызывает его, чтобы получить текущее состояние TaskInstance, которые выполняет исполнитель.
has_task Планировщик использует этот метод BaseExecutor, чтобы проверить, есть ли у исполнителя уже эта задача в очереди или в выполнении.
send_callback Отправляет callback’и в настроенный sink исполнителя.

Обязательные методы для реализации

Минимум для поддержки вашего исполнителя в Airflow нужно переопределить:

Метод Описание
sync Вызывается периодически во время heartbeat исполнителя. Реализуйте обновление состояния задач, о которых знает исполнитель. По желанию — запуск задач из очереди, полученных от планировщика.
execute_async Выполняет workload асинхронно. Вызывается (через несколько слоёв) во время heartbeat планировщика. На практике часто только ставит задачи во внутреннюю или внешнюю очередь (например, KubernetesExecutor), но может и выполнять их напрямую (например, LocalExecutor) — зависит от исполнителя.

Опциональные методы интерфейса

Их переопределение не обязательно для рабочего исполнителя, но даёт дополнительные возможности и стабильность:

Метод Описание
start Job планировщика вызывает его после инициализации объекта исполнителя. Здесь можно выполнить дополнительную настройку.
end Вызывается при завершении работы планировщика. Здесь — синхронная очистка и ожидание завершения выполняющихся задач.
terminate Более жёсткая остановка исполнителя, вплоть до прерывания выполняющихся задач без ожидания.
try_adopt_task_instances Задачи, оставшиеся «сиротами» (например, после падения job планировщика), передаются исполнителю для «усыновления» или обработки. Задачи, которые принять нельзя (по умолчанию BaseExecutor считает, что принять нельзя ни одну), нужно вернуть.
get_cli_commands Исполнитель может предоставлять CLI-команды пользователям; подробнее в разделе CLI ниже.
get_task_log Исполнитель может отдавать сообщения в логи задач Airflow; подробнее в разделе Logging ниже.

Атрибуты совместимости

Интерфейс класса BaseExecutor содержит атрибуты, по которым код ядра Airflow проверяет совместимость вашего исполнителя с возможностями. При написании своего исполнителя задайте их корректно. Каждый атрибут — булев флаг включения/выключения или поддержки возможности:

Атрибут Описание
supports_pickling Поддерживает ли исполнитель чтение «маринованных» DAG из БД перед выполнением (а не из файловой системы).
supports_sentry Поддерживает ли исполнитель Sentry.
is_local Локальный исполнитель или удалённый. См. раздел «Типы исполнителей» выше.
is_single_threaded Однопоточный ли исполнитель. Важно для поддерживаемых бэкендов БД: однопоточные могут работать с любым, включая SQLite.
is_production Предназначен ли исполнитель для продакшена. При непродакшенном исполнителе в UI показывается предупреждение.
serve_logs Поддерживает ли исполнитель отдачу логов; см. Logging for Tasks.

CLI

Исполнитель может предоставлять CLI-команды в утилиту airflow, реализовав метод get_cli_commands. Так сделано, например, у CeleryExecutor и KubernetesExecutor — для настройки воркеров, инициализации окружения и т.п. Команды выдаются только для текущего сконфигурированного исполнителя. Псевдокод:

@staticmethod
def get_cli_commands() -> list[GroupCommand]:
    sub_commands = [
        ActionCommand(
            name="command_name",
            help="Description of what this specific command does",
            func=lazy_load_command("path.to.python.function.for.command"),
            args=(),
        ),
    ]

    return [
        GroupCommand(
            name="my_cool_executor",
            help="Description of what this group of commands do",
            subcommands=sub_commands,
        ),
    ]

Примечание. Строгих правил по пространству имён команд Airflow пока нет. Разработчикам следует выбирать достаточно уникальные имена, чтобы не конфликтовать с другими исполнителями и компонентами.

Примечание. При создании нового исполнителя или обновлении существующего не импортируйте и не выполняйте тяжёлый код на уровне модуля. Классы исполнителей импортируются в нескольких местах; медленный импорт ухудшит работу Airflow, особенно CLI.

Logging

Исполнитель может отдавать сообщения в логи задач Airflow, реализовав метод get_task_logs. Это полезно, если окружение выполнения даёт дополнительный контекст при сбоях (например, из-за самого окружения, а не кода задачи), или для логов setup/teardown окружения. KubernetesExecutor использует это, чтобы подмешивать логи пода, в котором выполнялась задача. Псевдокод:

def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
    messages = []
    log = []
    try:
        res = helper_function_to_fetch_logs_from_execution_env(ti, try_number)
        for line in res:
            log.append(remove_escape_codes(line.decode()))
        if log:
            messages.append("Found logs from execution environment!")
    except Exception as e:  # Исключение не должно ломать логи задачи
        messages.append(f"Failed to find logs from execution environment: {e}")
    return messages, ["\n".join(log)]

Дальнейшие шаги

После реализации класса исполнителя с интерфейсом BaseExecutor укажите его в конфигурации Airflow через core.executor — путь к модулю класса:

[core]
executor = my_company.executors.MyCustomExecutor

Примечание. Подробнее о конфигурации: Setting Configuration Options; об управлении Python-модулями в Airflow: Modules Management.


Источник: Airflow 3.1.7 — Executor. Перевод неофициальный.