Apache Airflow Best Practices — Глава 2 «Core Concepts»

Перевод книги «Apache Airflow Best Practices, by Dylan Intorf, Dylan Storey, Kendrick van Doorn» Packt Publishing подготовлен автором сайта

Глава 2. Основные концепции Airflow

В основе Airflow лежат основные концепции, упрощающие процесс определения, выполнения и мониторинга задач. Эти концепции включают задачи, группы задач и триггеры. Каждая из них составляет ориентированные ациклические графы (DAG’и) и позволяет использовать преимущества Airflow. Понимание каждой из этих строительных блоков является важным для того, чтобы в полной мере использовать потенциал Airflow при масштабировании и обеспечить автоматизацию и оптимизацию рабочих процессов.

Помимо основ DAG’ов, в этой главе будет представлен концепт интерфейса командной строки для запуска Apache Airflow локально на компьютере или виртуальной машине. Этот процесс легко повторить, и для его запуска были созданы различные инструменты. Те же инструменты, которые мы будем использовать для начала создания DAG’ов и настройки Airflow, могут применяться и в более сложных ситуациях, поэтому они являются отраслевым стандартом.

В этой главе мы рассмотрим следующие основные темы:

  • Запуск Apache Airflow на вашей локальной машине с помощью airflowctl
  • Строительные блоки DAG’ов
  • Как максимально эффективно использовать группы задач и организовывать DAG’и

Технические требования

В предыдущих главах мы в основном рассматривали аспекты, связанные с Apache Airflow, но не затрагивали основы или конкретные сценарии использования. Начиная с этой главы и далее, мы предполагаем, что у вас настроено окружение Airflow на локальной машине и вы понимаете, как к нему получить доступ.

Установка Airflow локально требует ряда технологий и предварительных условий. В частности, мы рекомендуем установить актуальную версию Python, так как старые версии не имеют долгосрочной поддержки. Кроме того, для запуска Airflow требуется как минимум 4 ГБ оперативной памяти, хотя это требование зависит от размера развертывания и сложности DAG’ов.

Интерфейс командной строки (CLI) — это текстовый пользовательский интерфейс, используемый для взаимодействия с программным обеспечением и операционными системами. Через CLI пользователи вводят текстовые команды для выполнения определённых задач или операций, напрямую взаимодействуя с системой. Такой интерфейс позволяет эффективно выполнять команды, автоматизировать задачи с помощью скриптов и получать доступ к широкому спектру системных функций. CLI особенно ценятся разработчиками и системными администраторами за точность, возможность скриптования и низкое потребление ресурсов по сравнению с графическими интерфейсами (GUI).

Инструмент CLI airflowctl — это CLI-инструмент, специально предназначенный для взаимодействия с окружениями Apache Airflow. Он позволяет пользователям управлять различными аспектами развертываний Airflow напрямую из командной строки. С помощью airflowctl пользователи могут выполнять задачи, такие как запуск DAG’ов, приостановка или возобновление их выполнения, создание или отображение подключений, а также доступ к логам. Этот инструмент упрощает процесс управления рабочими процессами в Airflow, обеспечивая эффективную эксплуатацию и мониторинг задач в инстансе Airflow. В следующих главах мы углубимся в более сложные идеи и сценарии использования CLI-инструментов Airflow.

Самый простой и быстрый способ начать работу с Airflow — использовать команду Airflow CLI в вашем терминале или командной строке:

Первичный запуск команды проверит, установлена ли она уже на вашей локальной машине. Если вы получите следующий запрос, выполните приведённые далее инструкции по установке:

Рисунок 2.1: настройка airflowctl

Использование airflowctl (https://github.com/kaxil/airflowctl) позволяет выполнить начальную установку вне Docker-контейнеров или Kubernetes.
Рекомендуется устанавливать CLI с помощью pip. Если у вас не установлен pip, вы можете установить его, следуя этим инструкциям: https://pip.pypa.io/en/stable/installation/. С помощью pip следующая команда установит CLI:

Если во время установки не возникло ошибок, следующим шагом будет запуск команды инициализации. При выполнении этой команды создаётся каталог проекта с названием my_airflow_project в вашей текущей папке и запускается веб-сервер Airflow:

Сразу после запуска этой команды в верхней части командного терминала вы увидите расположение папки проекта и другую информацию об инициализации:

Рисунок 2.2: Инициализация проекта Airflow

В этом примере папка проекта расположена по пути /Users/kendrickvandoorn/my_airflow_project\ x, и она была инициализирована с предустановленной организацией папок и файлов, необходимых для запуска первого проекта. Обратите внимание, что имя my_airflow_project, заданное в команде CLI, совпадает с именем папки проекта, но его можно изменить в соответствии с вашими потребностями.

По мере продолжения работы CLI airflowctl веб-сервер, триггер и планировщик будут запускаться, а их задачи будут отмечены в начале каждой строки. Например, ознакомьтесь со следующим снимком экрана, отображающим логотип Airflow, который показывает, что триггер и планировщик активны, а следующие действия относятся к различным серверам.

Рисунок 2.3: Командная строка Airflow

По мере выполнения командной строки будут запускаться и настраиваться различные базовые проверки, подключения и разрешения. По мере того как мы продолжаем изучать Airflow, многие из этих решений, принимаемых перед развертыванием, могут быть заданы и изменены заранее в различных папках с определениями, что гарантирует соответствие инстанса Airflow вашим требованиям по размеру, безопасности и подключениям. Если инициализация прошла успешно, вы увидите следующие сообщения с информацией о том, как получить доступ к веб-серверу и войти в систему:

Рисунок 2.4: Успешная инициализация Airflow

В этом выводе терминала вы можете увидеть сообщение Airflow is ready, которое показывает, что процесс прошёл успешно и Airflow запущен локально на вашей машине. Обратите внимание, что имя пользователя — admin, оно используется по умолчанию при первой установке. Указанный пароль — не по умолчанию, а сгенерированный случайным образом, и он специфичен для вашего локального хоста. Оставьте терминал открытым и работающим, чтобы сохранить среду Airflow в активном состоянии на вашем компьютере.

CLI airflowctl не только запускает Airflow локально, но также обеспечивает управление продуктом во время его работы. Все логи и информация, касающаяся Airflow, будут отображаться в терминале, пока он работает. Обратите внимание, что различные службы постоянно проверяют и обрабатывают сигналы, которые отображаются в командной строке.

Рисунок 2.5: Сигналы веб-сервера

Чтобы перейти к веб-серверу, откройте браузер и введите в адресной строке следующее:

http://localhost:8080

Перейдя на localhost, при условии, что веб-сервер Airflow продолжает работать в терминале, вам будет предложено войти в систему с именем пользователя и паролем. Используйте имя пользователя admin и пароль, который был показан в терминале.

Завершив процесс входа в систему, вы попадёте на домашнюю страницу, которая должна выглядеть следующим образом:

Рисунок 2.6: Домашняя страница DAG

В следующих главах мы подробно рассмотрим важные разделы консоли пользовательского интерфейса Apache Airflow и приведём примеры типовых сценариев управления.

С работающей средой Airflow давайте рассмотрим основные компоненты Airflow на примере DAG, который уже предоставлен.

DAG-и

DAG-и (направленные ациклические графы) — это основной способ оркестровки пайплайнов данных. Они создаются с использованием Python и опираются на широкий спектр вспомогательных библиотек. Следуя предыдущим шагам по инициализации локальной среды разработки, вы загрузили пример example_dag_basic, который мы рассмотрим в этом разделе.

В своей основе DAG-и состоят из задач (tasks), операторов (operators) и сенсоров (sensors). За последние годы были также введены новые подходы, такие как группы задач (task groups) и откладываемые операторы (deferrable operators), которые мы также рассмотрим в этой главе.

Чтобы получить визуализацию этого DAG-а, просто выберите имя DAG-а в консоли веб-сервера. Сначала вы увидите информацию о конфигурации DAG-а и статусе его предыдущих запусков.

Рисунок 2.7: Конфигурация DAG

Чтобы просмотреть визуализированное представление этого первоначального примера DAG, выберите опцию Graph из списка опций в верхней части области DAG. Это отобразит графическое представление DAG.

Важно отметить, что в этом DAG три задачи, которые выполняются последовательно. В более сложных DAG могут быть задачи, выполняющиеся параллельно, а также те, которые ожидают определённого триггера, и их может быть слишком сложно визуализировать. Всегда рекомендуется, если возможно, разбивать сложное, монолитное программное обеспечение.

Рисунок 2.8: Сопоставление триггеров

Если example_dag_basic не был включён при инициализации, вы можете найти полный блок рабочего кода в конце этой главы или перейти на GitHub-страницу книги курса (https://github.com/PacktPublishing/Apache-Airflow-Best-Practices).

Цель этого примера DAG — выполнить простую функцию извлечения, преобразования и загрузки данных (extract, transform, load — ETL). В конечном итоге мы покажем ожидаемый результат и то, как подтвердить завершение выполнения.

Декораторы и определение DAG

Первым шагом в любом Python-файле является импорт необходимых библиотек и вспомогательных функций. DAG в Airflow не является исключением. Мы начинаем с импорта json и декораторов Airflow:

Импорт декораторов dag и task позволяет нам объявить DAG проще, чем раньше. Такой подход устраняет необходимость объявления конструкции with DAG as, как это требовалось ранее. Давайте объявим DAG и определим некоторые из необходимых полей, таких как расписание:

В приведённом выше фрагменте кода мы объявили schedule интервал, start_date, catchup, default_args и tags. Давайте кратко рассмотрим каждый из них.

Планирование с Apache Airflow и отказ от CRON

Интервал расписания может быть определён как заданная дата/время, которую сохраняет Airflow, или через задания Cron. Ознакомьтесь с приведённой ниже таблицей для получения дополнительной информации о том, как выполняются интервалы расписания. Каждое из указанных времён установлено в UTC, поэтому учитывайте это при планировании выполнения задач.

Таблица 2.1: Интервалы расписания для даты и времени

Дата/время Интервал
@none Для выполнения DAG требуется ручной запуск или триггер
@hourly Выполняется в начале каждого часа
@daily Выполняется в полночь каждый день
@weekly Выполняется в полночь каждое воскресенье
@monthly Выполняется в полночь в первый день каждого месяца
@yearly Выполняется 1 января в полночь каждого года
  • Далее указывается start_date. Важно задать дату начала, так как вы можете захотеть, чтобы конвейер выполнялся в тот же день, в будущем или в прошлом. Если start_date установлена в прошлом и catchup задан как True, тогда DAG будет выполняться столько раз, сколько предусмотрено расписанием. В приведённом примере catchup установлен как False, поэтому DAG не будет выполняться за предыдущие дни, начиная с 1 января 2023 года. Если бы мы установили catchup равным True, то следовало бы ожидать, что Airflow выполнит столько заданий, сколько дней прошло с 1 января 2023 года по настоящее время.
  • default_args — это значения по умолчанию, которые мы хотим изменить. В этом примере мы изменяем retries, установив его равным 2. Это количество попыток повторного выполнения задач DAG в случае их неудачи. Это полезный инструмент, если DAG пытается подключиться к базе данных, в которой часто возникают проблемы с подключением или обновлением, так как задачи будут пытаться выполниться снова позже, не влияя на другие задачи.
  • Наконец, мы применяем tags (теги) к DAG, чтобы начать их группировку. Тег отображается в интерфейсе Airflow и может быть полезен для пометки различных DAG для разных систем, команд или пользователей в будущем.

Задачи / Tasks

Задачи составляют DAG и выполняются по порядку в зависимости от того, как они определены. Задачи часто отображаются в виде блоков, чтобы показать порядок их выполнения. В этом базовом примере нужно последовательно выполнить три задачи. Задачи также являются самой базовой единицей выполнения в Airflow. Задача представляет собой одно действие или задание, которое должно быть выполнено. Задачи определяются с помощью операторов в Airflow, каждый из которых определяет тип выполняемой задачи. Распространённые задачи включают выполнение Python-функций, выполнение запросов к базе данных или выполнение преобразования данных.

Прежде чем приступить к анализу этого простого примера DAG и связанных с ним задач, нам нужно рассмотреть операторы и как они используются.

Операторы задач

В Apache Airflow оператор представляет собой одну идемпотентную задачу, которая является частью рабочего процесса, определённого DAG. Каждый оператор в Airflow указывает конкретный тип работы, инкапсулируя логику выполнения конкретной задачи. Каждый оператор предназначен для выполнения определённой функции, например, выполнения Python-функции, SQL-запроса, передачи данных или вызова внешней системы.

Airflow поставляется с большим набором операторов для различных задач, что позволяет пользователям сразу использовать широкий спектр возможностей. Некоторые из часто используемых типов включают:

  • BashOperator: выполняет bash-команду
  • PythonOperator: запускает Python-функцию
  • SqlOperator: выполняет SQL-команду
  • DockerOperator: запускает Docker-контейнер
  • HttpOperator: отправляет HTTP-запрос

Операторы также могут быть настроены и расширены в соответствии с конкретными требованиями, обеспечивая гибкость в том, как задачи выполняются внутри DAGов Airflow. Такая архитектура позволяет пользователям создавать сложные конвейеры обработки данных, которые легко понимать, сопровождать и масштабировать. Большинство операторов являются с открытым исходным кодом и доступны для ознакомления — их можно найти, выполнив поиск по названию на GitHub.

Первая задача — определение DAG и извлечение данных

Мы создадим три задачи для выполнения этого базового примера DAG. Задачи извлечения, преобразования и загрузки будут выполнять разные функции. Первая задача — задача извлечения.

Следующий фрагмент кода взят из базового примера DAG:

Мы начинаем с объявления @task, чтобы указать, что следующая функция будет задачей, используемой в Airflow. В этой первой задаче мы определяем функцию extract, которая не принимает аргументов или переменных. Внутри extract() мы определяем простую строку data_string, которая затем преобразуется в словарь данных. В конце функция возвращает полученную информацию.

В более сложных ситуациях и примерах мы будем извлекать данные из API, озёр данных и хранилищ данных. Создание подключения и подготовка извлечения — схожие процессы.

Определение задачи преобразования (transform task)

В следующей задаче мы выполним простое действие по преобразованию. Эта задача принимает словарь данных и вычисляет общую сумму заказов:

Задача преобразования принимает order_data_dict в качестве входных данных для выполнения задачи преобразования. Далее мы задаём переменную total_order_value, равную 0, чтобы подготовить её для использования в следующем цикле. Создаётся цикл for, который рассчитывает общую сумму заказов путём суммирования значений. В конце мы возвращаем общую сумму заказов для использования в следующей задаче.

В этой задаче мы видим новый аргумент, передаваемый в @taskmultiple_outputs. Его использование разворачивает словарь данных в отдельные значения XCom.

XComs

XComs, сокращение от Cross-Communications (межзадачные коммуникации), — это часто используемая функция в Apache Airflow. Она предназначена для облегчения обмена данными между задачами внутри DAG. Эта ключевая идея позволяет задачам обмениваться сообщениями или фрагментами данных, такими как состояния задач, возвращаемые значения или любая другая информация, связанная с выполнением. Это может быть чрезвычайно ценно и полезно при организации выполнения задач после того, как подтверждено завершение или сбой другой задачи.

XComs работают за счёт того, что одна задача может передавать данные в мета-базу данных Airflow, где они хранятся под определённым ключом. Другая задача затем может извлечь эти данные, используя этот ключ, позволяя передавать данные между задачами, даже если они выполняются на разных рабочих узлах или в разное время. Эта функция особенно полезна для задач, зависящих от результатов или вывода предыдущих задач.

Использование XComs способствует отделению задач внутри рабочего процесса, повышая модульность и повторное использование компонентов DAG. Однако рекомендуется использовать XComs умеренно, так как они предназначены для небольших объёмов данных. Передача больших объёмов данных лучше осуществляется через внешние системы или сервисы хранения данных, в то время как Airflow отвечает за оркестрацию задач, а не за перемещение значительных объёмов данных.

Определение задачи загрузки (load task)

На финальном, третьем этапе мы выполним загрузку преобразованных данных. В этом примере мы просто выводим общую сумму заказов через функцию print, чтобы убедиться в её успешности в логах:

В приведённом выше коде загрузка данных ограничена простым примером печати, чтобы продемонстрировать, как выполняется задача. В более сложных примерах мы ожидаем загрузку преобразованных данных в хранилище данных или другую систему для последующего использования.

Установка порядка выполнения задач и зависимостей

Наконец, нам нужно задать порядок задач. В этом примере мы объявляем их по порядку с помощью переменных. В других примерах, которые мы рассмотрим, можно увидеть использование символа >>, который указывает, что задача слева предшествует задаче справа. Пример такого использования:

Однако это не тот способ, который используется в данном примере DAG, и мы рекомендуем следовать следующей структуре:

Как видно из приведённого фрагмента кода, мы делаем функцию extract() вызываемой через order_data, который затем передаётся через transform(). Затем результат transform передаётся через load() с помощью переменной order_summary. Задание такого порядка влияет на ожидаемую последовательность выполнения задач.

Последним, но самым важным шагом является вызов функции example_dag_basic() в конце кода. Если этот шаг не будет выполнен, то DAG никогда не будет работать.

Выполнение примера DAG

Чтобы выполнить DAG, необходимо вернуться в интерфейс Airflow. Внутри интерфейса выполните следующие шаги для запуска базового примера DAG и подтверждения завершения операций извлечения, преобразования и загрузки:

  1. Перейдите по адресу: http://localhost:8080
  2. Войдите под пользователем admin с соответствующим паролем.
  3. Убедитесь, что пример DAG отображается в интерфейсе.
  4. В разделе «Actions» справа нажмите зелёную кнопку воспроизведения, чтобы запустить DAG.

Другой способ — нажать на переключатель слева от имени DAG, чтобы включить DAG и начать его выполнение по заданному расписанию.

Выполнение задач DAG может занять до 30 минут. После завершения выполните следующие шаги, чтобы подтвердить выполнение:

  1. Нажмите на имя DAG example_dag_basic, чтобы перейти к обзору DAG.
  2. Выберите вкладку Graph.
  3. Выберите задачу load, нажав на соответствующий блок задачи.
  4. В верхнем меню выберите Logs.
  5. Убедитесь, что итоговая сумма заказов равна 1236.70.

Группы задач (Task groups)

Для управления сложностью и повышения читаемости DAG в Airflow 2 были введены группы задач (task groups). Группы задач позволяют организовывать задачи в иерархически сгруппированные подмножества внутри DAG. Такая организация полезна не только для визуального упрощения в интерфейсе Airflow, но и для логического разделения, делая крупные рабочие процессы более управляемыми и понятными.

По мере усложнения DAG, особенно в масштабных корпоративных структурах и моделях, их становится сложнее понимать. Эти группировки позволяют визуально объединять задачи в интерфейсе Airflow, предоставляя уменьшенные обзоры, которые отображают различные секции DAG.

Обычный пример группировки задач — когда у команды есть несколько этапов преобразования или извлечения данных в рамках DAG, и они хотят визуализировать эту секцию максимально просто.

Если бы приведённый выше базовый пример DAG включал два отдельных источника данных, мы могли бы создать задачи для каждого из этих извлечений и сгруппировать их. Пример кода мог бы выглядеть следующим образом:

В этом примере мы определили группу задач как extraction_task_group и создали две отдельные задачи. Задача t1 использует исходную функцию extract(), а задача t2 использует EmptyOperator, который ничего не делает. Мы указываем, что задача t1 должна выполняться до t2.

Мы можем изменить порядок выполнения задач, чтобы отразить новую группу задач как начальную точку. Следующая диаграмма отображает новый визуализированный порядок задач:

Рисунок 2.9: Обновлённый порядок задач

Чтобы развернуть группу задач, просто выберите + 2 tasks на группе задач и разверните секцию для просмотра задач внутри:

Рисунок 2.10: Развёрнутый вид новой задачи

Кроме того, Apache Airflow поддерживает вложенные группы задач, позволяя пользователям дополнительно организовывать и структурировать рабочие процессы с большей детализацией. Вложенные группы задач позволяют создавать иерархические структуры внутри DAG, где одна группа задач может содержать другие группы задач. Такая иерархическая организация также помогает в отладке и сопровождении рабочего процесса, так как она инкапсулирует связанные задачи в отдельные, управляемые единицы, которые можно разрабатывать, тестировать и отслеживать независимо.

Триггеры

Триггеры определяют условия, при которых выполняется задача или DAG. Airflow предоставляет гибкие механизмы триггеров, позволяющие задачам выполняться на основе расписаний (например, ежедневно или ежечасно), внешних событий или по завершении других задач. Понимание триггеров имеет ключевое значение для планирования задач в соответствии с вашими операционными требованиями и зависимостями. Ранее мы рассматривали планирование и установку интервалов для DAG, но конкретные триггеры внутри DAG и задач могут корректировать время выполнения действий.

Примером триггера в Apache Airflow является TimeDeltaTrigger, который планирует задачи для выполнения через определённый интервал времени после завершения другой задачи. Этот триггер является частью возможностей Airflow по динамическому отображению задач и отложенным операторам, позволяя рабочим процессам динамически адаптироваться на основе условий во время выполнения.

Например, предположим, что в базовом примере DAG мы решаем, что начальная задача преобразования должна выполняться через 30 минут после успешного завершения первой группы задач извлечения. Вы можете использовать TimeDeltaTrigger для этого, установив задержку выполнения задачи агрегации на 30 минут.

Это достигается путём добавления триггера в определение задачи в следующей форме:

Распространённые типы триггеров, используемые командами, включают триггеры на основе времени, зависимостей и событий. Кратко объясним каждый из них.

Триггеры на основе времени включают как выражения CRON, так и планирование на основе интервалов. Выражения cron — одни из самых распространённых триггеров, позволяющие запускать задачи через регулярные промежутки времени, заданные синтаксисом cron

(например, 0 0 * * * — для задачи, выполняемой ежедневно в полночь).

Планирование на основе интервалов — это когда задачи могут выполняться через фиксированные промежутки времени (например, каждый час или каждый день) с использованием предопределённого интервала.

Триггеры на основе зависимостей включают завершение предыдущих задач, внешний датчик задач и множество других опций. Завершение предыдущих задач (upstream task completion) — это когда задача может быть запущена после успешного выполнения одной или нескольких указанных предыдущих задач. Это критически важно для рабочих процессов, где задачи напрямую зависят от результата или успешности других задач. Датчик внешних задач (external task sensor) ожидает завершения определённой задачи в другом DAG перед продолжением. Это полезно для координации задач между различными рабочими процессами.

Последними из наиболее распространённых являются триггеры на основе событий, к которым относятся Webhooks и датчики, связанные с электронной почтой или другими сервисами уведомлений. Задачи-триггеры Webhook могут запускаться внешними событиями через Webhook. Это полезно для рабочих процессов, которые должны запускаться на основе действий или сигналов из внешних систем. Датчик электронной почты запускает задачу при получении электронной почты, удовлетворяющей определённым критериям, что полезно для рабочих процессов, начинающихся в ответ на уведомления по электронной почте.

Другие триггеры, заслуживающие внимания, включают триггеры доступности данных и ручные триггеры, которые могут запускаться вручную пользователем.

Резюме 2 главы

В этой главе мы рассмотрели основы DAG, задач, операторов, XCom, групп задач, триггеров и интерфейса командной строки airflowctl. Мы изучили взаимодействие с консолью Airflow и рассмотрели основы написания DAG с типичным примером, включённым в начальную локально запущенную версию Airflow.

Каждая из этих тем имеет критическое значение для понимания и освоения перед тем, как пытаться создавать крупные ETL-пайплайны или другие случаи использования ML/AI с Airflow. Рекомендуется уделить время изучению базового примера DAG и попрактиковаться с различными триггерами или операторами, рассмотренными в этой главе, чтобы вы чувствовали себя уверенно при построении систем более крупного масштаба по мере вашего развития как инженера.

В следующих главах мы расширим этот начальный пример базового DAG с помощью реального ETL-пайплайна и выполним нашу первую загрузку данных.

0
Оставьте комментарий! Напишите, что думаете по поводу статьи.x