Operators (операторы)
Operator — по сути шаблон предопределённой задачи, которую можно описать декларативно внутри DAG:
with DAG("my-dag") as dag:
ping = HttpOperator(endpoint="http://example.com/update/")
email = EmailOperator(to="admin@example.com", subject="Update complete")
ping >> email
В Airflow доступен большой набор операторов: часть входит в ядро или предустановленные провайдеры. Из ядра часто используют:
- BashOperator — выполняет bash-команду
- PythonOperator — вызывает произвольную Python-функцию
Для выполнения произвольной Python-функции рекомендуется декоратор @task. Он не поддерживает рендеринг Jinja-шаблонов в аргументах.
Примечание. Для вызова Python-callable без шаблонизации аргументов рекомендуется декоратор
@task, а не классический PythonOperator.
Полный список операторов ядра: Core Operators and Hooks Reference.
Если нужного оператора нет в поставке Airflow по умолчанию, его можно найти среди провайдеров. Примеры популярных операторов из провайдеров:
- EmailOperator
- HttpOperator
- SQLExecuteQueryOperator
- DockerOperator
- HiveOperator
- S3FileTransformOperator
- PrestoToMySqlOperator
- SlackAPIOperator
И многие другие — полный список операторов, хуков, сенсоров и трансферов из провайдеров: providers packages.
Примечание. В коде Airflow понятия Tasks и Operators часто смешивают, и в большинстве случаев они взаимозаменяемы. Task — общая «единица выполнения» в DAG; Operator — готовый переиспользуемый шаблон задачи с уже реализованной логикой, которому нужно лишь передать аргументы.
Jinja Templating (шаблонизация Jinja)
Airflow использует Jinja в сочетании с макросами.
Например, чтобы передать начало data interval в bash-скрипт как переменную окружения через BashOperator:
# Начало data interval в формате YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
task_id="test_env",
bash_command="/tmp/test.sh ",
dag=dag,
env={"DATA_INTERVAL_START": date},
)
Здесь {{ ds }} — шаблонная переменная; параметр env у BashOperator обрабатывается Jinja, поэтому дата начала data interval будет доступна в bash-скрипте как переменная окружения DATA_INTERVAL_START.
Вместо строки с шаблоном можно передать callable — когда на Python это читается проще. Callable должен принимать два именованных аргумента: context и jinja_env:
context— объектContextAirflow с информацией о текущем выполнении задачи. Доступ — как к обычному словарю. Содержит все переменные, доступные в Jinja-шаблонах; с точки зрения рендеринга шаблонов он только для чтения — изменения не влияют на среду выполнения задачи.
Полный список переменных контекста: Templates reference.
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import jinja2
from airflow.sdk import Context
def build_complex_command(context: Context, jinja_env: jinja2.Environment) -> str:
# Доступ к данным выполнения из контекста
task_id = context["ti"].task_id
execution_date = context["ds"]
with open("file.csv") as f:
return do_complex_things(f, task_id, execution_date)
t = BashOperator(
task_id="complex_templated_echo",
bash_command=build_complex_command,
dag=dag,
)
Каждое template-поле рендерится один раз, поэтому возвращаемое callable значение повторно не обрабатывается. При необходимости шаблоны внутри него нужно рендерить вручную, например через render_template() у текущей задачи:
def build_complex_command(context: Context, jinja_env: jinja2.Environment) -> str:
with open("file.csv") as f:
data = do_complex_things(f)
return context["task"].render_template(data, context, jinja_env)
Шаблонизация применима ко всем параметрам, помеченным в документации как «templated». Подстановка выполняется непосредственно перед вызовом функции pre_execute оператора.
Шаблонизацию можно использовать и для вложенных полей, если эти поля помечены как templated в своей структуре: поля из свойства template_fields проходят подстановку. Пример — поле path:
class MyDataReader:
template_fields: Sequence[str] = ("path",)
def __init__(self, my_path):
self.path = my_path
# [дополнительный код...]
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
dag=dag,
)
Примечание. Свойство
template_fields— переменная класса и должно иметь типSequence[str](список или кортеж строк).
Глубоко вложенные поля тоже подставляются, если все промежуточные поля помечены как template fields:
class MyDataTransformer:
template_fields: Sequence[str] = ("reader",)
def __init__(self, my_reader):
self.reader = my_reader
# [дополнительный код...]
class MyDataReader:
template_fields: Sequence[str] = ("path",)
def __init__(self, my_path):
self.path = my_path
# [дополнительный код...]
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
dag=dag,
)
При создании DAG в Jinja Environment можно передать свои опции. Часто включают сохранение завершающего перевода строки в шаблоне:
my_dag = DAG(
dag_id="my-dag",
jinja_environment_kwargs={
"keep_trailing_newline": True,
# другие опции jinja2 Environment
},
)
Все доступные опции: Jinja documentation.
У части операторов строки с определёнными суффиксами (задаются в template_ext) при рендеринге считаются путями к файлам. Так можно подгружать скрипты или запросы из файлов, а не вставлять их в код DAG.
Пример: BashOperator с многострочным bash-скриптом — загружается файл script.sh, его содержимое подставляется в bash_command:
run_script = BashOperator(
task_id="run_script",
bash_command="script.sh",
)
По умолчанию пути задаются относительно папки DAG (это путь поиска Jinja-шаблонов по умолчанию). Дополнительные пути можно добавить через аргумент template_searchpath у DAG.
Иногда нужно исключить строку из шаблонизации и использовать её как есть. Пример:
print_script = BashOperator(
task_id="print_script",
bash_command="cat script.sh",
)
Так будет ошибка TemplateNotFound: cat script.sh — Airflow воспримет строку как путь к файлу, а не команду. Чтобы значение не считалось путём к файлу, его оборачивают в literal(). При этом отключаются и подстановка макросов, и загрузка из файла; можно применять к отдельным вложенным полям, оставляя обычную шаблонизацию для остального содержимого.
from airflow.sdk import literal
fixed_print_script = BashOperator(
task_id="fixed_print_script",
bash_command=literal("cat script.sh"),
)
Добавлено в версии 2.8: функция literal().
Либо можно переопределить template_ext, чтобы значение не трактовалось как путь к файлу:
fixed_print_script = BashOperator(
task_id="fixed_print_script",
bash_command="cat script.sh",
)
fixed_print_script.template_ext = ()
Рендеринг полей как нативных Python-объектов
По умолчанию все Jinja-шаблоны в template_fields рендерятся в строки. Иногда нужен другой тип. Например, задача extract пушит в XCom словарь {"1001": 301.27, "1002": 433.21, "1003": 502.22}:
@task(task_id="extract")
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)
Если задача зависит от extract и в order_data передаётся строка "{'1001': 301.27, '1002': 433.21, '1003': 502.22}":
def transform(order_data):
total_order_value = sum(order_data.values()) # Ошибка: order_data — str
return {"total_order_value": total_order_value}
transform = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
python_callable=transform,
)
extract() >> transform
Чтобы получить именно словарь, есть два варианта. Первый — использовать callable:
def render_transform_op_kwargs(context, jinja_env):
order_data = context["ti"].xcom_pull("extract")
return {"order_data": order_data}
transform = PythonOperator(
task_id="transform",
op_kwargs=render_transform_op_kwargs,
python_callable=transform,
)
Второй — указать Jinja рендерить нативный Python-объект. Для этого в DAG передают render_template_as_native_obj=True. Тогда вместо стандартного SandboxedEnvironment используется NativeEnvironment:
with DAG(
dag_id="example_template_as_python_object",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
render_template_as_native_obj=True,
):
transform = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
python_callable=transform,
)
Зарезервированное ключевое слово params
В Apache Airflow 2.2.0 переменная params используется при сериализации DAG. В сторонних операторах это имя использовать не следует. Если после обновления окружения появляется ошибка:
AttributeError: 'str' object has no attribute '__module__'
переименуйте params в своих операторах.
Конфликт шаблонизации с f-строками
При формировании строк для template-полей (например, bash_command в BashOperator) через Python f-строки нужно учитывать взаимодействие подстановки f-строк и синтаксиса Jinja: в обоих случаях используются фигурные скобки {}.
В f-строках двойные фигурные скобки {{ и }} интерпретируются как экранирование одной скобки { или }. В Jinja двойные скобки {{ variable }} обозначают переменную для подстановки.
Чтобы в строке, заданной f-строкой, оставить выражение Jinja (например, {{ ds }}) для последующей обработки движком Airflow, скобки для f-строки нужно экранировать ещё одним удвоением — то есть использовать четыре скобки:
t1 = BashOperator(
task_id="fstring_templating_correct",
bash_command=f"echo Data interval start: {{{{ ds }}}}",
dag=dag,
)
python_var = "echo Data interval start:"
t2 = BashOperator(
task_id="fstring_templating_simple",
bash_command=f"{python_var} {{{{ ds }}}}",
dag=dag,
)
В результате после обработки f-строки получится строка с двойными скобками для Jinja, и Airflow корректно выполнит подстановку перед запуском. Частая ошибка у новичков — не удвоить скобки; из-за этого возможны ошибки при разборе DAG или неожиданное поведение при выполнении, когда подстановка не срабатывает.
Источник: Airflow 3.1.7 — Operators. Перевод неофициальный.