Построение архитектуры Medallion для данных Bluesky в формате JSON с помощью ClickHouse

Ниже — перевод статьи “Building a Medallion architecture for Bluesky JSON data with ClickHouse” с сайта ClickHouse.

Построение архитектуры Medallion для данных Bluesky в формате JSON с помощью ClickHouse

Мы так же взволнованы, как и вся остальная дата-сообщество, из-за недавнего всплеска популярности социальной сети BlueSky и её API, который позволяет получать доступ к потоку публикуемого контента.

Этот набор данных содержит поток с высокой пропускной способностью — тысячи JSON-событий в секунду, и мы подумали, что будет интересно сделать эти данные доступными для сообщества, чтобы каждый мог выполнять по ним запросы.

Во время исследования данных мы обнаружили, что во многих событиях присутствуют некорректные или повреждённые временные метки. Набор данных также содержит частые дубликаты. Поэтому мы не можем просто импортировать данные и на этом закончить — потребуется некоторая очистка.

Это идеальная возможность попробовать архитектуру Medallion, о которой мы недавно писали в блоге. В этом посте мы оживим эти концепции на практическом примере.

Мы создадим рабочий процесс, который решает эти задачи, организуя набор данных в три отдельные уровня: бронзовый, серебряный и золотой. Мы будем придерживаться принципов архитектуры Medallion и активно использовать недавно представленный тип данных JSON.

Каждый уровень будет доступен для публичных запросов в нашей демо-среде на sql.clickhouse.com, где читатели смогут самостоятельно изучить и взаимодействовать с результатами. Мы даже подготовили несколько примерных аналитических запросов, чтобы вам было проще начать!

Что такое Bluesky?

Для тех, кто не так активен в социальных сетях, вы могли пропустить недавний взлёт популярности Bluesky, которая в настоящее время набирает почти миллион пользователей в день. Bluesky — это социальная сеть, похожая на X (бывший Twitter), но, в отличие от него, она полностью открыта и децентрализована!

Bluesky, построенная на AT Protocol (ATProto), представляет собой децентрализованную платформу социальных сетей, которая позволяет пользователям самостоятельно размещать свой контент. По умолчанию данные хранятся на Bluesky Personal Data Server (PDS), но пользователи могут выбирать — размещать эти серверы (и свой контент) у себя. Такой подход отражает возврат к принципам раннего Интернета, когда пользователи имели контроль над своим контентом и связями, вместо того чтобы зависеть от централизованных платформ, которые доминируют и владеют пользовательскими данными.

Данные каждого пользователя управляются в лёгкой, открытой программной среде, где одна база данных SQLite используется для хранения. Такая структура обеспечивает взаимодействие между системами (interoperability) и гарантирует, что право собственности на контент остаётся за пользователем, даже если центральная платформа выйдет из строя или изменит свою политику.

И самое главное для нас: как и старый Twitter, Bluesky предоставляет бесплатный способ получать события — например, посты — в реальном времени, что открывает потенциально огромный набор данных для аналитики, по мере того как сеть набирает популярность.

Чтение данных Bluesky

Чтобы загрузить данные из Bluesky, мы используем недавно выпущенный Jetstream API, который упрощает потребление событий Bluesky, предоставляя потоки, закодированные в формате JSON. В отличие от оригинального firehose, который требует обработки бинарных данных CBOR и файлов CAR, Jetstream снижает сложность, делая процесс доступным для разработчиков, работающих с приложениями в реальном времени. Этот API идеально соответствует нашему случаю использования, позволяя фильтровать и обрабатывать тысячи событий в секунду из постов Bluesky, одновременно решая распространённые проблемы, такие как повреждённые данные и высокий уровень дублирования.

В нашей реализации мы подключаемся к публичному экземпляру Jetstream, потребляя непрерывный поток событий в формате JSON для загрузки. Для этого используется простой bash-скрипт, который обрабатывает поток JSON-событий в реальном времени из Jetstream.

Ссылка на полный bash-скрипт.

Вкратце, он выполняет следующее:

  • Проверяет GCS bucket на наличие самого последнего файла .csv.gz, извлекает его временную метку (используемую как курсор) и применяет её для возобновления подписки Jetstream с нужной позиции. Это обеспечивает непрерывность данных и минимизирует дублирование.
  • Инструмент websocat используется для подключения к Jetstream API, подписки на события и передачи JSON-потока для обработки. Параметр wantedCollections фильтрует нужные события, а cursor обеспечивает пошаговое (инкрементальное) получение данных, например:

  • Входящие данные JSON разбиваются на фрагменты по 500 000 строк, при этом каждый фрагмент представляет собой файл, где последняя временная метка используется в качестве идентификатора файла. Мы используем clickhouse-local для преобразования файла в CSV, затем сжимаем его в .gz и загружаем в GCS bucket с помощью gsutil.
  • Скрипт выполняется внутри Docker-контейнера ClickHouse, который запускается каждые 3 минуты с помощью Google Cloud Run Job.

Обратите внимание, что файлы естественным образом упорядочены по своим именам, основанным на временной метке последнего события. Это критически важно для последующего эффективного инкрементального чтения из GCS bucket. Однако скрипт не гарантирует, что будут зафиксированы все события Bluesky.

Выборка (sampling) данных

На момент написания этого поста мы зафиксировали почти 1,5 миллиарда событийных строк, собранных примерно за 21 день. Мы можем использовать функцию gcs в ClickHouse, чтобы выполнить запрос к данным напрямую и определить общее количество необработанных строк.

Мы можем взять выборку данных, используя ту же функцию, преобразовав каждую строку в тип JSON и применив формат PrettyJSONEachRow, чтобы получить читаемый результат.

Хотя приведённый выше пример даёт некоторое представление о структуре событий, он не полностью отражает сложность, изменчивость и непоследовательность данных. Столбец kind в значительной степени определяет последующую структуру, при этом API передаёт три типа событий: commit, identity и account.

Краткое описание типов событий:

  • commit — событие фиксации (commit) указывает на создание, обновление или удаление записи. Этот тип представляет большинство событий и включает посты, лайки и подписки.
  • identity — обновление идентичности учётной записи.
  • account — обновление состояния учётной записи.

Мы подробнее исследуем эти данные после их загрузки в бронзовый слой (Bronze layer).

Проблемы с данными Bluesky

Данные Bluesky, как они поступают через JetStream API, имеют ряд проблем, включая следующее:

  • Повреждённый JSON (Malformed JSON) — время от времени встречаются некорректно сформированные JSON-события. Хотя они редки, такие записи могут нарушить обработку файла. Мы исключаем их с помощью функции isValidJSON, ограничивая загрузку в бронзовый слой (Bronze layer) только теми строками, для которых функция возвращает значение 1.
  • Непоследовательная структура (Inconsistent structure) — хотя временная метка сбора данных (поле time_us) присутствует во всех событиях, путь JSON, содержащий время, когда событие произошло, зависит от типа события. Наш рабочий процесс должен извлекать единую согласованную временную метку, основываясь на этих условиях. Простой анализ показывает, что:
    • commit.record.createdAt можно использовать для событий типа commit;
    • identity.time — для событий identity;
    • account.time — для событий account.
  • Будущие или некорректные временные метки (Future or invalid timestamps) — некоторые события имеют временные метки из будущего. Например, при выборке событий на момент написания поста 42 тысячи commit-событий имели будущие значения времени. Ещё 4 миллиона commit-событий имели метки времени, относящиеся к периоду до запуска Bluesky как сервиса.
  • Повторяющиеся структуры (Repeated structures) — встречаются случаи, когда JSON содержит глубоко рекурсивные структуры. Это приводит к появлению более 1800 уникальных JSON-путей, большинство из которых, вероятно, не имеют существенной ценности для анализа содержимого.
  • Дубликаты (Duplicates) — несмотря на использование курсора для поддержания последовательности данных, JetStream API создаёт дубликаты (где содержимое идентично, за исключением временной метки сбора). Удивительно, но такие дубликаты могут появляться в широком диапазоне времени — в некоторых случаях с разницей до 24 часов. Важно отметить, что большинство дубликатов встречаются в интервале около 20 минут.

Приведённые выше пункты не представляют собой исчерпывающий список проблем с качеством данных — мы продолжаем находить новые сложности! Однако, в целях наглядности и сжатости примера, в нашем демонстрационном Medallion workflow мы сосредоточимся именно на перечисленных проблемах.

Тип данных JSON в ClickHouse

JSON играет ключевую роль в реализации архитектуры Medallion для данных Bluesky, позволяя системе хранить высокодинамичную и полуструктурированную информацию в бронзовом слое (Bronze layer). Новый тип данных JSON в ClickHouse, представленный в версии 24.8, решил ключевые проблемы, с которыми сталкивались предыдущие реализации.

В отличие от традиционных подходов, которые предполагают единственный тип для каждого JSON-пути (что часто приводит к принудительному приведению типов или их преобразованию), JSON-тип в ClickHouse хранит значения каждого уникального пути и типа в отдельных подколонках (sub-columns).

Такой подход обеспечивает эффективное хранение, сводит к минимуму лишние операции ввода-вывода (I/O) и избегает затрат на приведение типов во время выполнения запроса.

Например, когда в таблицу вставляются два JSON-пути с разными типами данных, ClickHouse сохраняет значения каждого типа в отдельных подколонках. Эти подколонки могут быть запрошены независимо, что снижает ненужные операции ввода-вывода.
При этом, если запросить колонку, содержащую несколько типов данных, её значения всё равно возвращаются как единый столбец в ответе.

Кроме того, благодаря использованию смещений (offsets), ClickHouse гарантирует, что подколонки остаются плотными (dense) — то есть не хранят значения по умолчанию для отсутствующих JSON-путей. Такой подход максимизирует степень сжатия и дополнительно снижает нагрузку на I/O.

Также данный тип данных не страдает от проблемы “взрыва подколонок” (sub-column explosion), возникающей при большом количестве уникальных JSON-путей.
Это особенно важно для данных Bluesky, где при отсутствии фильтрации встречается более 1800 уникальных путей.
При этом это не мешает хранению всех этих путей — новые пути просто сохраняются в общей колонке данных, если превышен лимит (при этом статистика ускоряет выполнение запросов).

Такое оптимизированное обращение с JSON обеспечивает эффективное хранение сложных, полуструктурированных наборов данных, таких как данные Bluesky, в бронзовом слое архитектуры.
Для пользователей, заинтересованных в технических деталях реализации этого нового типа колонок, рекомендуется ознакомиться с подробным постом в нашем блоге (ссылка предоставлена в оригинале).

Бронзовый уровень для необработанных данных (Bronze, сырые данные)

Хотя исходное описание бронзового слоя (Bronze layer) не предполагает фильтрацию или преобразование данных, мы относимся к этому менее догматично и считаем, что минимальная фильтрация и недеструктивные преобразования данных могут быть полезны для исследования проблем и возможности воспроизведения данных в будущем.

Для преобразований мы рекомендуем ограничиться теми, которые можно реализовать с помощью материализованных колонок (Materialized columns), как показано ниже в нашей схеме бронзового слоя:

Некоторые важные замечания по этой схеме:

  • Тип JSON — колонка data использует новый тип данных JSON и содержит всё событие целиком.
    Мы применяем оператор SKIP, чтобы исключить определённые пути JSON, которые, как показал анализ, были ответственны за повторяющиеся структуры, отмеченные ранее.
  • Сохранение метаданных — колонка _file содержит ссылку на файл, из которого была загружена строка.
  • Материализованные колонки (Materialized columns) — остальные колонки являются материализованными и вычисляются из колонки data во время вставки:
    • scrape_ts — время, когда событие было доставлено; извлекается из поля JSON time_us.
    • kind — тип события, как упоминалось ранее.
    • bluesky_ts — выполняет условную логику, извлекая временную метку события на основе значения kind; это решает проблему непоследовательной структуры и обеспечивает единый формат временных меток для всех событий.
    • dedup_hash — содержит хеш события.
  • Для вычисления хеша создаётся массив всех JSON-путей и их значений, за исключением time_us (так как это поле отличается у дубликатов), с помощью функции JSONExtractKeysAndValues.
    Затем функция cityHash64 обрабатывает этот массив, создавая уникальный хеш события.
  • ReplacingMergeTree — используется движок ReplacingMergeTree, который позволяет устранять дубликаты записей, имеющих одинаковые значения ключей сортировки (ORDER BY).
    Дедупликация выполняется асинхронно во время фоновых слияний, которые происходят в неопределённое время и не могут быть напрямую контролируемы — то есть дедупликация осуществляется постепенно (eventual deduplication).

В нашей схеме ключ ORDER BY включает kind и bluesky_ts, что:

  • обеспечивает эффективное чтение;
  • гарантирует высокую степень сжатия, группируя строки с похожими атрибутами.

Мы также добавляем dedup_hash, чтобы уникально идентифицировать строки для дедупликации, но не включаем его в PRIMARY KEY.
Это оптимизация, которая предотвращает загрузку индекса по dedup_hash в память — разумное решение, так как мы не выполняем прямые запросы по хешу.

Наш бронзовый слой выполняет минимальные преобразования данных с помощью материализованных колонок, но при этом обеспечивает возможность дедупликации.
Важно отметить, что использование ReplacingMergeTree здесь не является обязательным и не влияет на будущие слои.

Пользователи могут предпочесть обычный MergeTree, если хотят анализировать дубликаты напрямую.

Наш выбор обусловлен главным образом желанием минимизировать объём хранимых данных.

Загрузка данных из объектного хранилища (s3, object storage)

Как описано выше, наш конвейер загрузки данных (ingestion pipeline) использует инструмент websocat для потоковой передачи данных из JetStream API, сохраняя события в виде файлов .csv.gz в Google Cloud Storage (GCS).

Этот промежуточный шаг предоставляет несколько преимуществ:

  • он позволяет воспроизводить данные (data replay),
  • сохраняет оригинальную копию необработанных данных (raw data)
  • и имитирует подход, который многие пользователи используют для загрузки данных из объектного хранилища.

Чтобы считать эти файлы из GCS в нашу таблицу бронзового слоя bluesky_raw, мы используем движок таблицы S3Queue (S3Queue table engine). Этот движок считывает данные из объектного хранилища, совместимого с S3, автоматически обрабатывает новые файлы по мере их добавления в бакет и вставляет их в указанную таблицу через материализованное представление (materialized view).

Создание этой таблицы требует небольшой DDL-команды:

Обратите внимание, что мы указываем GCS-бакет, содержащий сжатые (gzipped) файлы,
и определяем каждую строку как тип String с помощью объявления схемы.

Важно, что мы включаем «ordered mode» через настройку mode = 'ordered'. Это заставляет файлы обрабатываться в лексикографическом порядке, обеспечивая последовательную загрузку данных.

Хотя это означает, что файлы, добавленные с более ранним порядком сортировки, игнорируются, такая конфигурация поддерживает эффективную и инкрементальную обработку, и устраняет необходимость выполнять масштабные операции сравнения множеств, если файлы не имеют естественного порядка.

Наше раннее использование временных меток (timestamps) для имен файлов гарантирует, что данные обрабатываются в правильной последовательности, а движок S3Queue быстро распознаёт новые файлы, которые нужно загрузить.

Наша среда sql.clickhouse.com, в которую мы загружаем данные, состоит из трёх узлов, каждый из которых имеет 60 виртуальных процессорных ядер (vCPUs).

Параметр s3queue_processing_threads_num задаёт количество потоков для обработки файлов на каждом сервере.

Кроме того, при использовании ordered mode вводится дополнительная настройка — s3queue_buckets. Как рекомендуется, мы устанавливаем её как произведение количества реплик (3) на количество потоков обработки (10).

Чтобы потреблять строки из этой очереди, необходимо присоединить инкрементальное материализованное представление (Incremental Materialized View). Это представление читает данные из очереди, выполняет SELECT-запрос над строками, а результат отправляется в таблицу бронзового слоя bluesky_raw.

Обратите внимание, что мы выполняем базовую фильтрацию уже на этом уровне:
в таблицу бронзового слоя передаются только строки, где isValidJSON(data) = 1, то есть содержащие валидный JSON.

Также мы добавляем метаданные — колонку _file, чтобы иметь запись о том, из какого gzip-файла была загружена каждая строка.

Потоковая передача Bluesky напрямую в ClickHouse (Streaming)

Обратите внимание, что ClickHouse может напрямую выполнять потоковую загрузку данных с использованием JSON-форматов ввода, как недавно продемонстрировал наш технический директор (CTO) Алексей Миловидов.

Это можно реализовать, объединив тип данных JSON и формат ввода JSON.

Например:

ClickPipes в ClickHouse Cloud

Хотя механизм таблиц S3Queue позволяет нам выполнять потоковую передачу данных из объектного хранилища в ClickHouse, у него есть определённые ограничения. Помимо того, что он поддерживает только S3-совместимые хранилища, он обеспечивает семантику “по крайней мере один раз” (at-least-once).

Пользователи ClickHouse Cloud могут предпочесть использовать ClickPipes — управляемое решение для загрузки данных, которое обеспечивает семантику “ровно один раз” (exactly-once), поддерживает больше источников (например, Kafka) и разделяет ресурсы загрузки и ресурсы кластера.

Эта технология может быть использована для замены S3Queue в описанной выше архитектуре с минимальной настройкой через пошаговый мастер (guided wizard).

Запросы к бронзовому уровню

Хотя мы не рекомендуем предоставлять доступ к вашей таблице уровня Bronze конечным пользователям (downstream consumers), выбранный нами ключ сортировки (ordering key) позволяет эффективно исследовать данные, выявлять дополнительные проблемы с их качеством или, при необходимости, повторно воспроизводить данные через последующие уровни архитектуры.

Мы отмечали, что во время слияния (merge) движок ReplacingMergeTree определяет дубликаты строк, используя значения столбцов, указанных в ORDER BY, как уникальный идентификатор, и сохраняет только последнюю версию записи. Однако это обеспечивает лишь постепенную (eventual) корректность — то есть не гарантирует, что все дубликаты будут удалены, поэтому полагаться на это не стоит.

Чтобы гарантировать корректные результаты, пользователям необходимо дополнять фоновое объединение (background merges) операцией удаления дубликатов во время выполнения запроса, что можно сделать с помощью оператора FINAL.
Однако это создаёт дополнительную нагрузку на ресурсы и негативно влияет на производительность запросов, что является ещё одной причиной, по которой мы не советуем предоставлять доступ к Bronze-таблицам потребителям данных.

В приведённых выше примерах запросов мы опускаем оператор FINAL, принимая небольшой уровень дублирования, поскольку это допустимо для разведочного анализа данных.

Большинство данных представляют собой commit-события (commit events):

Внутри этих commit-событий можно исследовать типы событий с помощью синтаксиса пути JSON (JSON path syntax):

Мы видим, что основная часть событий — это “лайки” и “подписки (follows)”, что вполне ожидаемо.

Серебряный уровень для очищенных данных (Silver Layer)

Слой Silver (Серебряный) представляет собой следующий этап в архитектуре Medallion, преобразуя сырые данные из слоя Bronze (Бронзового) в более согласованную и структурированную форму.

Этот слой решает проблемы качества данных: выполняет дополнительную фильтрацию, стандартизирует схемы, производит преобразования и обеспечивает полное удаление дубликатов.
В ClickHouse обычно наблюдается прямая связь между таблицами Bronze и их эквивалентами в Silver.

Мы знаем, что дубликаты событий имеют одинаковые значения bluesky_ts (и других столбцов), различаясь лишь по scrape_ts, причём последнее значение может быть значительно позже.
Однако ранее мы установили, что большинство дубликатов появляются в пределах 20 минут.
Чтобы гарантировать, что в золотой слой (Gold) не попадут дубликаты, мы вводим понятие конечного окна дедупликации (finite duplication window) в слое Silver.

События будут распределяться по этим окнам дедупликации, которые смещены относительно текущего времени на основе значения bluesky_ts.
Эти «окна» периодически сбрасываются (flushed) в слой Gold, с гарантией, что в каждое окно попадёт только одна копия события.

Использование окон дедупликации избавляет нас от необходимости проводить дедупликацию за бесконечный период времени, что существенно снижает нагрузку на систему и делает задачу более управляемой.

Как мы покажем далее, это можно эффективно реализовать в ClickHouse.

Назначение событий в окна дедупликации, которые синхронизируются с реальным временем и периодически сбрасываются, предполагает, что данные доставляются без значительных задержек.

Анализ таблицы Bronze показывает, что:

  • 90% событий имеют значение bluesky_ts, отличающееся от времени их поступления (извлечённого из имени файла в GCS) не более чем на 20 минут.
    Это возможно, если:
  • Обработка 1 миллиона сообщений за один раз не вызывает значительных задержек;
  • Время чтения и обработки через S3Queue также незначительно (это можно проверить через системные таблицы);
  • Время, извлечённое из имени файла, близко к реальному времени загрузки, что подтверждается запросами к GCS.

Кроме того, более 94% событий имеют разницу между scrape_ts и bluesky_ts меньше 20 минут (в 90% случаев — даже менее 10 секунд).
Это означает, что значение scrape_ts также не отстаёт от времени поступления данных.

Понимая, что события обычно доставляются в течение 20 минут после их bluesky_ts, мы можем надёжно формировать окна дедупликации в слое Silver.
Для этого мы создаём раздел (partition) в ClickHouse для каждого 20-минутного интервала — таким образом, раздел фактически соответствует одному окну.

События распределяются по разделам в зависимости от того, в какой интервал они попадают, с помощью функции:

Итоговая схема таблицы Silver выглядит следующим образом:

  • Мы используем ReplacingMergeTree, но выполняем дедупликацию только внутри каждого раздела, то есть слияние происходит только в пределах окна.
  • Для управления объёмом данных применяется TTL, который удаляет строки, старше 1440 секунд (24 часа).

Параметр ttl_only_drop_parts = 1 гарантирует, что части удаляются только тогда, когда все строки в них устарели.

Так как слишком большое количество разделов может привести к проблемам производительности и ошибкам вроде “Too many parts”, мы ограничиваем таблицу Silver только одними сутками данных (всего 72 окна по 20 минут). Старые данные автоматически удаляются с помощью правил TTL, сохраняя эффективность и стабильность системы.

Инкрементные материализованные представления для фильтрации

При применении фильтрации и правил дедупликации к данным уровня Bronze, пользователи часто сохраняют «негативные совпадения» (то есть записи, не прошедшие фильтры) в отдельной таблице — так называемой Dead-Letter таблице — для последующего анализа.

Так как мы планируем периодически отправлять свежие партиции из слоя Silver в слой Gold, нам нежелательно, чтобы события поступали слишком поздно.
По этой причине, а также чтобы продемонстрировать принцип “dead letter queue”, мы будем отправлять все события из слоя Bronze, у которых разница между scrape_ts и bluesky_ts превышает 20 минут, в очередь “dead letter”.

События же, у которых задержка меньше 20 минут, будут вставляться в соответствующую партицию таблицы Silver, показанную ранее.

Для реализации этого подхода мы используем две инкрементные материализованные представления (incremental materialized views).
Каждое из них выполняет SELECT-запрос к строкам, вставленным в таблицу уровня Bronze (bluesky_raw), и отправляет результаты либо:

  • в таблицу dead letter queue,
  • либо в таблицу Silver (bluesky_dedup).

Основное различие между этими двумя представлениями заключается в их фильтрующих условиях.

Представление для отправки строк в таблицу Silver:

Схема таблицы Dead-Letter Queue и связанное с ней материализованное представление:

Обратите внимание, что для очереди “dead letter” используется обычный движок MergeTree,
так как дедупликация здесь не требуется — эти данные предназначены для анализа проблем и диагностики, а не для основной аналитики.

Отправка данных на золотой уровень (Gold Layer)

Описанный выше процесс оставляет разделы (partitions), заполненные на уровне Silver.
Периодически нам необходимо переносить данные из этих разделов в уровень Gold, гарантируя, что все события были полностью дедуплицированы, и при этом делать это достаточно оперативно, чтобы обеспечить наличие свежих данных в слое Gold для аналитики.

Мы реализуем этот периодический перенос (flushing) с помощью Refreshable Materialized View.
Такие представления выполняются периодически по таблицам уровня Silver и позволяют выполнять сложные преобразования, включая денормализацию данных перед их записью в таблицы уровня Gold.

В нашем случае нам нужно просто периодически вставлять данные из последнего раздела, который больше не получает новых данных, в таблицу Gold.
Запрос при этом должен выполняться с использованием оператора FINAL, чтобы гарантировать, что все события дедуплицированы.

Хотя такой запрос обычно более затратен вычислительно, здесь мы можем использовать два преимущества:

  • Запрос выполняется периодически — в нашем случае каждые 20 минут, что смещает нагрузку с пользовательских запросов на уровень загрузки данных.
  • Мы обрабатываем только один раздел за одно выполнение. Можно ограничить дедупликацию во время выполнения только этим разделом, установив параметр do_not_merge_across_partitions_select_final=1, что дополнительно оптимизирует запрос и снижает нагрузку.

Для этого требуется определить, какой именно раздел нужно перенести в Gold при каждом выполнении.
Эта логика показана на диаграмме выше, а в кратком виде выглядит так:

  • Мы определяем последний раздел в таблице Silver bluesky_dedup с помощью служебного поля _partition_id.
    Из этого значения вычитаем 40 минут, получая раздел, который был создан два окна назад (X — 2) — называем его current_partition.
  • В целевой таблице уровня Gold bluesky есть столбец _rmt_partition_id,
    заполняемый refreshable materialized view, где хранится, из какого раздела уровня Silver поступило каждое событие.
    Мы используем это поле, чтобы определить последний успешно перенесённый раздел, прибавляем 20 минут,
    получая раздел, который нужно обработать следующим — next_to_process.

Если next_to_process = 1200, это значит, что таблица bluesky пуста
(0 + 1200 секунд = 1200), и ещё не было ни одной передачи данных.
В этом случае мы используем current_partition и вставляем все события, где _partition_id = current_partition.

Если next_to_process > 1200, значит, переносы уже выполнялись.
Если current_partition >= next_to_process, то мы отстаём не менее чем на 40 минут (2 окна),
и используем значение next_to_process, вставляя все события, где _partition_id = next_to_process.
Если же current_partition < next_to_process, выполняется noop (ничего не происходит) — данные не переносятся.

Эта логика устойчива к сбоям, таким как пропуски выполнения каждые 20 минут, повторные запуски или задержки выполнения. В результате формируется Refreshable Materialized View, в SELECT-запросе которого инкапсулирована описанная выше логика.

Это представление выполняется каждые 20 минут, передавая очищенные и дедуплицированные данные в уровень Gold. Следует отметить, что данные появляются в Gold с задержкой около 40 минут, хотя при необходимости пользователи могут выполнять запросы к уровню Silver для получения более свежих данных.

Внимательный читатель заметит, что в шаге 2 и на диаграмме выше наш запрос использует таблицу latest_partition, а не обращается напрямую к _rmt_partition_id в таблице bluesky уровня Gold.
Эта таблица создаётся с помощью инкрементного материализованного представления (incremental materialized view) и служит оптимизацией, которая ускоряет определение следующего раздела для обработки.

Это представление отслеживает последний вставленный раздел в таблицу Gold и выглядит следующим образом.

Золотой уровень для анализа данных (Gold Layer для аналитики)

Указанное выше refreshable materialized view периодически отправляет данные в таблицу уровня Gold — bluesky.

Схема этой таблицы показана ниже:

Поскольку данные полностью дедуплицированы до момента вставки, мы можем использовать стандартный MergeTree.

Ключ сортировки (ORDER BY) выбирается исключительно на основе шаблонов доступа потребителей данных и с целью оптимизации сжатия.

Таблица разделена по месяцам (partitioned by month) — в первую очередь для удобства управления данными, а также потому, что ожидается, что большинство запросов будут обращаться к самым последним данным.

Обратите внимание: хотя мы по-прежнему используем тип данных JSON на этом уровне,
возможно выполнение дополнительных трансформаций данных на этапе предыдущего
refreshable materialized view — например:

  • извлечение часто используемых полей в корень таблицы,
  • использование столбцов типа ALIAS, чтобы упростить синтаксис запросов и повысить удобство анализа.

Материализованные представления для общих запросов (Для часто запрашиваемых метрик)

Этот слой gold должен быть полностью оптимизирован для выполнения запросов со стороны прикладных систем и потребителей данных. Хотя наш ключ сортировки направлен на то, чтобы облегчить этот процесс, не все шаблоны доступа будут одинаковыми. До настоящего времени наиболее распространённым применением инкрементных материализованных представлений было выполнение фильтрации и вставки данных между слоями. Однако наше более раннее использование представления для вычисления следующего раздела (partition) намекало на то, как ещё можно оптимизировать другие запросы.

Помимо фильтрации и отправки подмножеств данных в целевую таблицу с другими ключами сортировки (оптимизированными под иные шаблоны доступа), материализованные представления могут использоваться для предварительного вычисления агрегатов во время вставки данных в таблицу gold.

Результаты таких агрегатов будут представлять собой уменьшенную форму исходных данных (частичный набросок, если речь идёт об агрегации). Это не только упрощает последующие запросы к целевой таблице, но и обеспечивает более высокую скорость выполнения, поскольку вычисления переносятся с момента запроса на момент вставки, тем самым снижая время отклика при запросе.

Полное руководство по материализованным представлениям можно найти здесь.

В качестве примера рассмотрим наш предыдущий запрос, который вычисляет наиболее распространённые типы commit-событий:

Для 500 миллионов событий выполнение этого запроса занимает около 6 секунд.
Чтобы преобразовать его в инкрементное материализованное представление, необходимо подготовить таблицу, которая будет получать результаты инкрементной агрегации:

Обратите внимание, что нам необходимо использовать AggregatingMergeTree и указать ключ сортировки как ключ группировки — результаты агрегации с одинаковыми значениями этого столбца будут объединяться.

Инкрементные результаты должны храниться в специальных типах столбцов SimpleAggregateFunction и AggregateFunction — для этого необходимо указать саму функцию и связанный с ней тип данных.

Ниже показано соответствующее материализованное представление, которое заполняет эту таблицу при вставке строк в таблицу gold. Обратите внимание, что используется суффикс — State, чтобы явно сгенерировать состояние агрегации:

Производительность запроса улучшилась более чем в 150 раз!

Ниже приведена финальная диаграмма архитектуры, показывающая все наши уровни:

Примеры запросов и визуализации на sql.clickhouse.com

Приведённый выше пример представляет собой очень простую демонстрацию. Эти данные доступны на сайте sql.clickhouse.com, где описанный выше рабочий процесс Medallion выполняется непрерывно. Мы также предоставили дополнительные материализованные представления в качестве примеров для эффективного выполнения запросов.

Например, чтобы определить, в какое время суток пользователи чаще всего ставят лайки, публикуют и репостят в Bluesky, можно выполнить следующий запрос:

Запрос выполняется за 7 миллисекунд.

Вы можете запустить этот запрос в нашем playground, чтобы отобразить результат в виде графика.

Ниже приведено соответствующее материализованное представление и целевая таблица, которая заполняется по мере вставки строк в gold-таблицу:

Полный список запросов и соответствующих им представлений можно посмотреть здесь.
Кроме того, вы можете напрямую выполнять запросы к gold или silver таблицам!

Некоторые примеры, с которых можно начать:

Заключительные мысли

В этом блоге мы продемонстрировали полностью реализованную архитектуру Medallion, построенную исключительно на ClickHouse, показав, как его мощные возможности позволяют преобразовывать “сырые”, полуструктурированные данные в качественные, готовые к запросам наборы данных.

Через уровни Bronze, Silver и Gold мы решили типичные проблемы, такие как повреждённые данные (malformed data), несогласованность структуры и значительное количество дубликатов.
Благодаря использованию типа данных JSON в ClickHouse, нам удалось эффективно обрабатывать по своей природе полуструктурированные и динамичные данные, при этом сохраняя высокую производительность.

Хотя эта архитектура обеспечивает надёжный и гибкий рабочий процесс, она всё же вносит определённые задержки по мере перемещения данных между слоями.
В нашем решении “окна дедупликации” (deduplication windows) помогли минимизировать эти задержки, однако остаётся компромисс между скоростью доставки данных в реальном времени и качеством данных.
Поэтому архитектура Medallion особенно хорошо подходит для наборов данных с высокой степенью дублирования и менее критичными требованиями к мгновенной доступности данных.

Полный список запросов и соответствующих им представлений можно посмотреть здесь.
Кроме того, вы можете напрямую выполнять запросы к gold или silver таблицам!

Некоторые примеры, с которых можно начать:

Заключительные мысли

В этом блоге мы продемонстрировали полностью реализованную архитектуру Medallion, построенную исключительно на ClickHouse, показав, как его мощные возможности позволяют преобразовывать “сырые”, полуструктурированные данные в качественные, готовые к запросам наборы данных.

Через уровни Bronze, Silver и Gold мы решили типичные проблемы, такие как повреждённые данные (malformed data), несогласованность структуры и значительное количество дубликатов.
Благодаря использованию типа данных JSON в ClickHouse, нам удалось эффективно обрабатывать по своей природе полуструктурированные и динамичные данные, при этом сохраняя высокую производительность.

Хотя эта архитектура обеспечивает надёжный и гибкий рабочий процесс, она всё же вносит определённые задержки по мере перемещения данных между слоями.

В нашем решении “окна дедупликации” (deduplication windows) помогли минимизировать эти задержки, однако остаётся компромисс между скоростью доставки данных в реальном времени и качеством данных.

Поэтому архитектура Medallion особенно хорошо подходит для наборов данных с высокой степенью дублирования и менее критичными требованиями к мгновенной доступности данных.

0
Оставьте комментарий! Напишите, что думаете по поводу статьи.x