Object Storage (объектное хранилище)
Добавлено в версии 2.8.0.
Крупные облачные провайдеры предоставляют постоянное хранение данных в объектных хранилищах. Это не классические POSIX-файловые системы. Чтобы хранить сотни петабайт без единой точки отказа, объектные хранилища заменяют дерево каталогов моделью «имя объекта => данные». Доступ к объектам обычно реализован через (медленные) HTTP REST-операции.
Airflow даёт общую абстракцию поверх объектных хранилищ (s3, gcs, Azure Blob Storage и т.п.). С ней можно использовать разные системы объектного хранения в DAG без переписывания кода под каждую. Кроме того, остаются доступны стандартные Python-модули вроде shutil, работающие с файлоподобными объектами.
Поддержка конкретной системы зависит от установленных провайдеров. Например, при установленном apache-airflow-providers-google доступна схема gcs. «Из коробки» Airflow поддерживает схему file.
Примечание. Поддержка s3 требует установки
apache-airflow-providers-amazon[s3fs], так как зависит отaiobotocore, который по умолчанию не ставится из-за возможных конфликтов сbotocore.
Облачные объектные хранилища — не настоящие файловые системы
Объектные хранилища не являются настоящими ФС, хотя могут выглядеть похоже. Они не поддерживают все операции обычной ФС. Основные отличия:
- Нет гарантированной атомарной переименовки. При «перемещении» файла он копируется, затем удаляется исходный. При сбое копирования файл можно потерять.
- Каталоги эмулируются и могут работать медленно. Например, листинг каталога может требовать перечисления всех объектов в бакете и фильтрации по префиксу.
- Seek внутри файла может давать большую нагрузку на вызовы или вообще не поддерживаться.
Airflow опирается на fsspec для единообразной работы с разными объектными хранилищами и использует локальный кэш для ускорения доступа. При проектировании DAG стоит учитывать ограничения объектных хранилищ.
Базовое использование
Чтобы работать с объектным хранилищем, создайте экземпляр Path (см. ниже) с URI нужного объекта. Например, для бакета в s3:
from airflow.sdk import ObjectStoragePath
base = ObjectStoragePath("s3://aws_default@my-bucket/")
Часть URI до @ — ID подключения (connection) Airflow; она необязательна. Подключение можно передать отдельным аргументом:
# Эквивалентно предыдущему примеру.
base = ObjectStoragePath("s3://my-bucket/", conn_id="aws_default")
Перечисление файлов-объектов:
@task
def list_files() -> list[ObjectStoragePath]:
files = [f for f in base.iterdir() if f.is_file()]
return files
Навигация по дереву каталогов:
base = ObjectStoragePath("s3://my-bucket/")
subdir = base / "subdir"
# выведет ObjectStoragePath("s3://my-bucket/subdir")
print(subdir)
Открытие файла:
@task
def read_file(path: ObjectStoragePath) -> str:
with path.open() as f:
return f.read()
Пути можно передавать между задачами через XCom:
@task
def create(path: ObjectStoragePath) -> ObjectStoragePath:
return path / "new_file.txt"
@task
def write_file(path: ObjectStoragePath, content: str):
with path.open("wb") as f:
f.write(content)
new_file = create(base)
write = write_file(new_file, b"data")
read >> write
Конфигурация
В базовом сценарии абстракция объектного хранилища почти не требует настройки и использует стандартный механизм подключений (connections) Airflow. Подключение задаётся аргументом conn_id. Параметры подключения передаются в нижележащую реализацию. Например, для s3 можно указать aws_access_key_id и aws_secret_access_key, а также дополнительные параметры вроде endpoint_url для своего эндпоинта.
Альтернативные бэкенды
Для схемы или протокола можно задать другой бэкенд, «прикрепив» его к схеме. Пример для бэкенда Databricks и схемы dbfs:
from airflow.sdk import ObjectStoragePath
from airflow.sdk.io import attach
from fsspec.implementations.dbfs import DBFSFileSystem
attach(protocol="dbfs", fs=DBFSFileSystem(instance="myinstance", token="mytoken"))
base = ObjectStoragePath("dbfs://my-location/")
Примечание. Чтобы регистрация бэкенда была доступна во всех задачах, вызывайте
attachна верхнем уровне DAG. Иначе бэкенд не будет виден в других задачах.
Path API
Абстракция объектного хранилища реализована как Path API и опирается на Universal Pathlib. То есть с объектным хранилищем можно работать почти так же, как с локальной ФС. Ниже перечислены отличия от стандартного Path API. Операции копирования и перемещения описаны в следующем разделе. Подробности по аргументам — в документации класса ObjectStoragePath.
mkdir
Создаёт запись каталога по указанному пути или внутри бакета/контейнера. В системах без настоящих каталогов может создаваться только запись для данного экземпляра, без изменения реальной ФС.
Если parents=True, создаются все недостающие родительские каталоги.
touch
Создаёт файл по пути или обновляет время модификации. Если truncate=True (по умолчанию), файл обрезается. Если файл уже есть, при exists_ok=True вызов успешен (время модификации обновляется), иначе выбрасывается FileExistsError.
stat
Возвращает объект, похожий на stat_result, с атрибутами st_size, st_mtime, st_mode, а также ведущий себя как словарь с дополнительными метаданными. Например, для s3 могут быть ключи вроде ['ETag', 'ContentType']. Для переносимости между разными хранилищами не полагайтесь на расширенные метаданные.
Расширения
Следующие операции не входят в стандартный Path API, но поддерживаются абстракцией объектного хранилища.
bucket
Имя бакета.
checksum
Контрольная сумма файла.
container
Синоним bucket.
fs
Атрибут для доступа к экземпляру файловой системы (fsspec).
key
Ключ объекта.
namespace
Пространство имён объекта. Обычно это протокол и имя бакета, например s3:// + имя бакета.
path
Путь, совместимый с fsspec, для использования с экземплярами файловой системы.
protocol
Протокол filesystem_spec.
read_block
Читает блок байт из файла по пути.
Чтение length байт начиная с offset. Если задан delimiter, чтение начинается и заканчивается на границах разделителя после offset и offset + length. При offset=0 чтение с начала. Возвращаемая строка байт включает конечный разделитель.
Если offset + length выходит за конец файла, читается до конца файла.
sign
Создаёт подписанный URL для пути. Некоторые реализации позволяют генерировать временные URL для делегирования учётных данных.
size
Размер файла в байтах.
storage_options
Параметры хранилища для создания нижележащей файловой системы.
ukey
Хеш свойств файла для проверки изменения.
Копирование и перемещение
Ожидаемое поведение операций copy и move, в том числе между разными хранилищами (например, file → s3). Каждый метод копирует или перемещает файлы/каталоги из source в target. Поведение соответствует fsspec. При копировании каталога между разными хранилищами Airflow обходит дерево и копирует каждый файл по отдельности, стримя данные из источника в приёмник.
Внешние интеграции
Многие проекты (DuckDB, Apache Iceberg и др.) могут использовать абстракцию объектного хранилища, обычно получая нижележащую реализацию fsspec. Для этого у ObjectStoragePath есть свойство fs. Пример с DuckDB: данные подключения из Airflow используются для доступа к s3, parquet-файл задаётся через ObjectStoragePath:
import duckdb
from airflow.sdk import ObjectStoragePath
path = ObjectStoragePath("s3://my-bucket/my-table.parquet", conn_id="aws_default")
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE my_table AS SELECT * FROM read_parquet('{path}');")
Источник: Airflow 3.1.7 — Object Storage. Перевод неофициальный.