XComs
XComs (сокращение от «cross-communications») — механизм обмена данными между задачами. По умолчанию задачи изолированы и могут выполняться на разных машинах.
XCom идентифицируется по key (по сути имя), а также по task_id и dag_id задачи-источника. В качестве значения может быть любой сериализуемый объект (в том числе с декораторами @dataclass или @attr.define, см. TaskFlow arguments), но XCom рассчитан на небольшой объём данных; не используйте его для передачи больших значений, например датафреймов.
Работать с XCom нужно через контекст задачи и get_current_context(). Прямое изменение через модель XCom в БД недоступно.
Значения явно «пушатся» и «пулятся» в хранилище методами xcom_push и xcom_pull у Task Instance.
Чтобы отправить значение из задачи «task-1» для использования в другой задаче:
# помещает any_serializable_value в XCom с ключом "identifier as string"
task_instance.xcom_push(key="identifier as a string", value=any_serializable_value)
Чтобы получить это значение в другой задаче:
# получает XCom с ключом "identifier as string", записанный в задаче task-1
task_instance.xcom_pull(key="identifier as string", task_ids="task-1")
Многие операторы по умолчанию автоматически пушат результат в XCom с ключом return_value, если задан аргумент do_xcom_push=True (он включён по умолчанию). То же делают функции с декоратором @task. У xcom_pull ключ по умолчанию — return_value, если ключ не передан, поэтому можно писать так:
# Получает XCom return_value из задачи "pushing_task"
value = task_instance.xcom_pull(task_ids='pushing_task')
XCom можно использовать в шаблонах:
SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}
XCom родственен Variables: XCom привязан к экземпляру задачи и предназначен для обмена данными внутри одного Dag run, а Variables глобальны и служат для общей конфигурации и общих значений.
Чтобы отправить несколько XCom за раз, задайте аргументы do_xcom_push и multiple_outputs в True и верните словарь значений.
Пример отправки нескольких XCom и получения их по отдельности:
# Задача возвращает словарь
@task(do_xcom_push=True, multiple_outputs=True)
def push_multiple(**context):
return {"key1": "value1", "key2": "value2"}
@task
def xcom_pull_with_multiple_outputs(**context):
# Получение конкретного ключа из нескольких выходов
key1 = context["ti"].xcom_pull(task_ids="push_multiple", key="key1") # key1
key2 = context["ti"].xcom_pull(task_ids="push_multiple", key="key2") # key2
# Получение всех данных XCom из задачи push_multiple
data = context["ti"].xcom_pull(task_ids="push_multiple", key="return_value")
Примечание. Если первый запуск задачи не завершился успехом, при каждой повторной попытке (retry) XCom этой задачи очищаются, чтобы запуск оставался идемпотентным.
Object Storage XCom Backend
Стандартный бэкенд XCom, BaseXCom, хранит данные в БД Airflow. Это удобно для небольших значений, но может вызывать проблемы при больших объёмах или большом числе XCom. Для эффективной работы с крупными данными рекомендуется бэкенд на объектном хранилище. Подробнее: документация.
Кастомные бэкенды XCom
У системы XCom сменяемые бэкенды; нужный задаётся опцией конфигурации xcom_backend.
Чтобы реализовать свой бэкенд, создайте подкласс BaseXCom и переопределите методы serialize_value и deserialize_value.
Метод purge в классе BaseXCom можно переопределить, чтобы управлять удалением данных XCom из кастомного бэкенда. Он вызывается в рамках операции delete.
Проверка использования кастомного бэкенда XCom в контейнерах
В зависимости от окружения развёртывания (локально, Docker, K8s и т.д.) бывает важно убедиться, что кастомный бэкенд XCom действительно инициализируется. В контейнерной среде сложнее проверить, что бэкенд загружается при старте. Ниже — способ проверить, какой класс XCom используется.
Если есть доступ к терминалу внутри контейнера Airflow, можно вывести фактический класс XCom:
from airflow.sdk.execution_time.xcom import XCom
print(XCom.__name__)
Источник: Airflow 3.1.7 — XComs. Перевод неофициальный.