Перейти к содержанию

Object Storage (объектное хранилище)

Добавлено в версии 2.8.0.

Крупные облачные провайдеры предоставляют постоянное хранение данных в объектных хранилищах. Это не классические POSIX-файловые системы. Чтобы хранить сотни петабайт без единой точки отказа, объектные хранилища заменяют дерево каталогов моделью «имя объекта => данные». Доступ к объектам обычно реализован через (медленные) HTTP REST-операции.

Airflow даёт общую абстракцию поверх объектных хранилищ (s3, gcs, Azure Blob Storage и т.п.). С ней можно использовать разные системы объектного хранения в DAG без переписывания кода под каждую. Кроме того, остаются доступны стандартные Python-модули вроде shutil, работающие с файлоподобными объектами.

Поддержка конкретной системы зависит от установленных провайдеров. Например, при установленном apache-airflow-providers-google доступна схема gcs. «Из коробки» Airflow поддерживает схему file.

Примечание. Поддержка s3 требует установки apache-airflow-providers-amazon[s3fs], так как зависит от aiobotocore, который по умолчанию не ставится из-за возможных конфликтов с botocore.

Облачные объектные хранилища — не настоящие файловые системы

Объектные хранилища не являются настоящими ФС, хотя могут выглядеть похоже. Они не поддерживают все операции обычной ФС. Основные отличия:

  • Нет гарантированной атомарной переименовки. При «перемещении» файла он копируется, затем удаляется исходный. При сбое копирования файл можно потерять.
  • Каталоги эмулируются и могут работать медленно. Например, листинг каталога может требовать перечисления всех объектов в бакете и фильтрации по префиксу.
  • Seek внутри файла может давать большую нагрузку на вызовы или вообще не поддерживаться.

Airflow опирается на fsspec для единообразной работы с разными объектными хранилищами и использует локальный кэш для ускорения доступа. При проектировании DAG стоит учитывать ограничения объектных хранилищ.

Базовое использование

Чтобы работать с объектным хранилищем, создайте экземпляр Path (см. ниже) с URI нужного объекта. Например, для бакета в s3:

from airflow.sdk import ObjectStoragePath

base = ObjectStoragePath("s3://aws_default@my-bucket/")

Часть URI до @ — ID подключения (connection) Airflow; она необязательна. Подключение можно передать отдельным аргументом:

# Эквивалентно предыдущему примеру.
base = ObjectStoragePath("s3://my-bucket/", conn_id="aws_default")

Перечисление файлов-объектов:

@task
def list_files() -> list[ObjectStoragePath]:
    files = [f for f in base.iterdir() if f.is_file()]
    return files

Навигация по дереву каталогов:

base = ObjectStoragePath("s3://my-bucket/")
subdir = base / "subdir"

# выведет ObjectStoragePath("s3://my-bucket/subdir")
print(subdir)

Открытие файла:

@task
def read_file(path: ObjectStoragePath) -> str:
    with path.open() as f:
        return f.read()

Пути можно передавать между задачами через XCom:

@task
def create(path: ObjectStoragePath) -> ObjectStoragePath:
    return path / "new_file.txt"


@task
def write_file(path: ObjectStoragePath, content: str):
    with path.open("wb") as f:
        f.write(content)


new_file = create(base)
write = write_file(new_file, b"data")

read >> write

Конфигурация

В базовом сценарии абстракция объектного хранилища почти не требует настройки и использует стандартный механизм подключений (connections) Airflow. Подключение задаётся аргументом conn_id. Параметры подключения передаются в нижележащую реализацию. Например, для s3 можно указать aws_access_key_id и aws_secret_access_key, а также дополнительные параметры вроде endpoint_url для своего эндпоинта.

Альтернативные бэкенды

Для схемы или протокола можно задать другой бэкенд, «прикрепив» его к схеме. Пример для бэкенда Databricks и схемы dbfs:

from airflow.sdk import ObjectStoragePath
from airflow.sdk.io import attach

from fsspec.implementations.dbfs import DBFSFileSystem

attach(protocol="dbfs", fs=DBFSFileSystem(instance="myinstance", token="mytoken"))
base = ObjectStoragePath("dbfs://my-location/")

Примечание. Чтобы регистрация бэкенда была доступна во всех задачах, вызывайте attach на верхнем уровне DAG. Иначе бэкенд не будет виден в других задачах.

Path API

Абстракция объектного хранилища реализована как Path API и опирается на Universal Pathlib. То есть с объектным хранилищем можно работать почти так же, как с локальной ФС. Ниже перечислены отличия от стандартного Path API. Операции копирования и перемещения описаны в следующем разделе. Подробности по аргументам — в документации класса ObjectStoragePath.

mkdir

Создаёт запись каталога по указанному пути или внутри бакета/контейнера. В системах без настоящих каталогов может создаваться только запись для данного экземпляра, без изменения реальной ФС.

Если parents=True, создаются все недостающие родительские каталоги.

touch

Создаёт файл по пути или обновляет время модификации. Если truncate=True (по умолчанию), файл обрезается. Если файл уже есть, при exists_ok=True вызов успешен (время модификации обновляется), иначе выбрасывается FileExistsError.

stat

Возвращает объект, похожий на stat_result, с атрибутами st_size, st_mtime, st_mode, а также ведущий себя как словарь с дополнительными метаданными. Например, для s3 могут быть ключи вроде ['ETag', 'ContentType']. Для переносимости между разными хранилищами не полагайтесь на расширенные метаданные.

Расширения

Следующие операции не входят в стандартный Path API, но поддерживаются абстракцией объектного хранилища.

bucket

Имя бакета.

checksum

Контрольная сумма файла.

container

Синоним bucket.

fs

Атрибут для доступа к экземпляру файловой системы (fsspec).

key

Ключ объекта.

namespace

Пространство имён объекта. Обычно это протокол и имя бакета, например s3:// + имя бакета.

path

Путь, совместимый с fsspec, для использования с экземплярами файловой системы.

protocol

Протокол filesystem_spec.

read_block

Читает блок байт из файла по пути.

Чтение length байт начиная с offset. Если задан delimiter, чтение начинается и заканчивается на границах разделителя после offset и offset + length. При offset=0 чтение с начала. Возвращаемая строка байт включает конечный разделитель.

Если offset + length выходит за конец файла, читается до конца файла.

sign

Создаёт подписанный URL для пути. Некоторые реализации позволяют генерировать временные URL для делегирования учётных данных.

size

Размер файла в байтах.

storage_options

Параметры хранилища для создания нижележащей файловой системы.

ukey

Хеш свойств файла для проверки изменения.

Копирование и перемещение

Ожидаемое поведение операций copy и move, в том числе между разными хранилищами (например, file → s3). Каждый метод копирует или перемещает файлы/каталоги из source в target. Поведение соответствует fsspec. При копировании каталога между разными хранилищами Airflow обходит дерево и копирует каждый файл по отдельности, стримя данные из источника в приёмник.

Внешние интеграции

Многие проекты (DuckDB, Apache Iceberg и др.) могут использовать абстракцию объектного хранилища, обычно получая нижележащую реализацию fsspec. Для этого у ObjectStoragePath есть свойство fs. Пример с DuckDB: данные подключения из Airflow используются для доступа к s3, parquet-файл задаётся через ObjectStoragePath:

import duckdb
from airflow.sdk import ObjectStoragePath

path = ObjectStoragePath("s3://my-bucket/my-table.parquet", conn_id="aws_default")
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE my_table AS SELECT * FROM read_parquet('{path}');")

Источник: Airflow 3.1.7 — Object Storage. Перевод неофициальный.