Ниже — перевод статьи “Building a Medallion architecture for Bluesky JSON data with ClickHouse” с сайта ClickHouse.
Построение архитектуры Medallion для данных Bluesky в формате JSON с помощью ClickHouse
Мы так же взволнованы, как и вся остальная дата-сообщество, из-за недавнего всплеска популярности социальной сети BlueSky и её API, который позволяет получать доступ к потоку публикуемого контента.
Этот набор данных содержит поток с высокой пропускной способностью — тысячи JSON-событий в секунду, и мы подумали, что будет интересно сделать эти данные доступными для сообщества, чтобы каждый мог выполнять по ним запросы.
Во время исследования данных мы обнаружили, что во многих событиях присутствуют некорректные или повреждённые временные метки. Набор данных также содержит частые дубликаты. Поэтому мы не можем просто импортировать данные и на этом закончить — потребуется некоторая очистка.
Это идеальная возможность попробовать архитектуру Medallion, о которой мы недавно писали в блоге. В этом посте мы оживим эти концепции на практическом примере.
Мы создадим рабочий процесс, который решает эти задачи, организуя набор данных в три отдельные уровня: бронзовый, серебряный и золотой. Мы будем придерживаться принципов архитектуры Medallion и активно использовать недавно представленный тип данных JSON.
Каждый уровень будет доступен для публичных запросов в нашей демо-среде на sql.clickhouse.com, где читатели смогут самостоятельно изучить и взаимодействовать с результатами. Мы даже подготовили несколько примерных аналитических запросов, чтобы вам было проще начать!
Что такое Bluesky?
Для тех, кто не так активен в социальных сетях, вы могли пропустить недавний взлёт популярности Bluesky, которая в настоящее время набирает почти миллион пользователей в день. Bluesky — это социальная сеть, похожая на X (бывший Twitter), но, в отличие от него, она полностью открыта и децентрализована!
Bluesky, построенная на AT Protocol (ATProto), представляет собой децентрализованную платформу социальных сетей, которая позволяет пользователям самостоятельно размещать свой контент. По умолчанию данные хранятся на Bluesky Personal Data Server (PDS), но пользователи могут выбирать — размещать эти серверы (и свой контент) у себя. Такой подход отражает возврат к принципам раннего Интернета, когда пользователи имели контроль над своим контентом и связями, вместо того чтобы зависеть от централизованных платформ, которые доминируют и владеют пользовательскими данными.
Данные каждого пользователя управляются в лёгкой, открытой программной среде, где одна база данных SQLite используется для хранения. Такая структура обеспечивает взаимодействие между системами (interoperability) и гарантирует, что право собственности на контент остаётся за пользователем, даже если центральная платформа выйдет из строя или изменит свою политику.
И самое главное для нас: как и старый Twitter, Bluesky предоставляет бесплатный способ получать события — например, посты — в реальном времени, что открывает потенциально огромный набор данных для аналитики, по мере того как сеть набирает популярность.
Чтение данных Bluesky
Чтобы загрузить данные из Bluesky, мы используем недавно выпущенный Jetstream API, который упрощает потребление событий Bluesky, предоставляя потоки, закодированные в формате JSON. В отличие от оригинального firehose, который требует обработки бинарных данных CBOR и файлов CAR, Jetstream снижает сложность, делая процесс доступным для разработчиков, работающих с приложениями в реальном времени. Этот API идеально соответствует нашему случаю использования, позволяя фильтровать и обрабатывать тысячи событий в секунду из постов Bluesky, одновременно решая распространённые проблемы, такие как повреждённые данные и высокий уровень дублирования.
В нашей реализации мы подключаемся к публичному экземпляру Jetstream, потребляя непрерывный поток событий в формате JSON для загрузки. Для этого используется простой bash-скрипт, который обрабатывает поток JSON-событий в реальном времени из Jetstream.
Вкратце, он выполняет следующее:
- Проверяет GCS bucket на наличие самого последнего файла
.csv.gz, извлекает его временную метку (используемую как курсор) и применяет её для возобновления подписки Jetstream с нужной позиции. Это обеспечивает непрерывность данных и минимизирует дублирование. - Инструмент
websocatиспользуется для подключения к Jetstream API, подписки на события и передачи JSON-потока для обработки. ПараметрwantedCollectionsфильтрует нужные события, аcursorобеспечивает пошаговое (инкрементальное) получение данных, например:
|
1 |
websocat -Un --max-messages-rev $MAX_MESSAGES "$WS_URL/subscribe?wantedCollections=app.*&cursor=$cursor" > "$OUTPUT_FILE" |
- Входящие данные JSON разбиваются на фрагменты по 500 000 строк, при этом каждый фрагмент представляет собой файл, где последняя временная метка используется в качестве идентификатора файла. Мы используем
clickhouse-localдля преобразования файла в CSV, затем сжимаем его в.gzи загружаем в GCS bucket с помощьюgsutil. - Скрипт выполняется внутри Docker-контейнера ClickHouse, который запускается каждые 3 минуты с помощью
Google Cloud Run Job.
Обратите внимание, что файлы естественным образом упорядочены по своим именам, основанным на временной метке последнего события. Это критически важно для последующего эффективного инкрементального чтения из GCS bucket. Однако скрипт не гарантирует, что будут зафиксированы все события Bluesky.
Выборка (sampling) данных
На момент написания этого поста мы зафиксировали почти 1,5 миллиарда событийных строк, собранных примерно за 21 день. Мы можем использовать функцию gcs в ClickHouse, чтобы выполнить запрос к данным напрямую и определить общее количество необработанных строк.
|
1 2 3 4 5 6 7 8 9 |
clickhouse-cloud :) SELECT count() FROM gcs('https://storage.googleapis.com/pme-internal/bluesky/*.gz', '', '', 'CSVWithNames') ┌────count()─┐ │ 1484500000 │ -- 1.48 billion └────────────┘ 1 row in set. Elapsed: 72.396 sec. Processed 1.48 billion rows, 205.07 GB (20.51 million rows/s., 2.83 GB/s.) Peak memory usage: 4.85 GiB. |
Мы можем взять выборку данных, используя ту же функцию, преобразовав каждую строку в тип JSON и применив формат PrettyJSONEachRow, чтобы получить читаемый результат.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
SET allow_experimental_json_type = 1 SELECT data::'JSON' AS event FROM gcs('https://storage.googleapis.com/pme-internal/bluesky/*.gz', '', '', 'CSVWithNames') LIMIT 1 FORMAT PrettyJSONEachRow { "account": { "active": true, "did": "did:plc:kjealuouxn3l6v4byxh2fhff", "seq": "706717212", "time": "2024-11-27T18:00:02.429Z" }, "did": "did:plc:kjealuouxn3l6v4byxh2fhff", "kind": "account", "time_us": "1732730402720719" } 1 row in set. Elapsed: 0.233 sec. |
Хотя приведённый выше пример даёт некоторое представление о структуре событий, он не полностью отражает сложность, изменчивость и непоследовательность данных. Столбец kind в значительной степени определяет последующую структуру, при этом API передаёт три типа событий: commit, identity и account.
Краткое описание типов событий:
commit— событие фиксации (commit) указывает на создание, обновление или удаление записи. Этот тип представляет большинство событий и включает посты, лайки и подписки.identity— обновление идентичности учётной записи.account— обновление состояния учётной записи.
Мы подробнее исследуем эти данные после их загрузки в бронзовый слой (Bronze layer).
Проблемы с данными Bluesky
Данные Bluesky, как они поступают через JetStream API, имеют ряд проблем, включая следующее:
- Повреждённый JSON (Malformed JSON) — время от времени встречаются некорректно сформированные JSON-события. Хотя они редки, такие записи могут нарушить обработку файла. Мы исключаем их с помощью функции
isValidJSON, ограничивая загрузку в бронзовый слой (Bronze layer) только теми строками, для которых функция возвращает значение 1. - Непоследовательная структура (Inconsistent structure) — хотя временная метка сбора данных (поле time_us) присутствует во всех событиях, путь JSON, содержащий время, когда событие произошло, зависит от типа события. Наш рабочий процесс должен извлекать единую согласованную временную метку, основываясь на этих условиях. Простой анализ показывает, что:
commit.record.createdAtможно использовать для событий типа commit;identity.time— для событий identity;account.time— для событий account.
- Будущие или некорректные временные метки (Future or invalid timestamps) — некоторые события имеют временные метки из будущего. Например, при выборке событий на момент написания поста 42 тысячи commit-событий имели будущие значения времени. Ещё 4 миллиона commit-событий имели метки времени, относящиеся к периоду до запуска Bluesky как сервиса.
- Повторяющиеся структуры (Repeated structures) — встречаются случаи, когда JSON содержит глубоко рекурсивные структуры. Это приводит к появлению более 1800 уникальных JSON-путей, большинство из которых, вероятно, не имеют существенной ценности для анализа содержимого.
- Дубликаты (Duplicates) — несмотря на использование курсора для поддержания последовательности данных, JetStream API создаёт дубликаты (где содержимое идентично, за исключением временной метки сбора). Удивительно, но такие дубликаты могут появляться в широком диапазоне времени — в некоторых случаях с разницей до 24 часов. Важно отметить, что большинство дубликатов встречаются в интервале около 20 минут.
Приведённые выше пункты не представляют собой исчерпывающий список проблем с качеством данных — мы продолжаем находить новые сложности! Однако, в целях наглядности и сжатости примера, в нашем демонстрационном Medallion workflow мы сосредоточимся именно на перечисленных проблемах.
Тип данных JSON в ClickHouse
JSON играет ключевую роль в реализации архитектуры Medallion для данных Bluesky, позволяя системе хранить высокодинамичную и полуструктурированную информацию в бронзовом слое (Bronze layer). Новый тип данных JSON в ClickHouse, представленный в версии 24.8, решил ключевые проблемы, с которыми сталкивались предыдущие реализации.
В отличие от традиционных подходов, которые предполагают единственный тип для каждого JSON-пути (что часто приводит к принудительному приведению типов или их преобразованию), JSON-тип в ClickHouse хранит значения каждого уникального пути и типа в отдельных подколонках (sub-columns).
Такой подход обеспечивает эффективное хранение, сводит к минимуму лишние операции ввода-вывода (I/O) и избегает затрат на приведение типов во время выполнения запроса.
Например, когда в таблицу вставляются два JSON-пути с разными типами данных, ClickHouse сохраняет значения каждого типа в отдельных подколонках. Эти подколонки могут быть запрошены независимо, что снижает ненужные операции ввода-вывода.
При этом, если запросить колонку, содержащую несколько типов данных, её значения всё равно возвращаются как единый столбец в ответе.
Кроме того, благодаря использованию смещений (offsets), ClickHouse гарантирует, что подколонки остаются плотными (dense) — то есть не хранят значения по умолчанию для отсутствующих JSON-путей. Такой подход максимизирует степень сжатия и дополнительно снижает нагрузку на I/O.
Также данный тип данных не страдает от проблемы “взрыва подколонок” (sub-column explosion), возникающей при большом количестве уникальных JSON-путей.
Это особенно важно для данных Bluesky, где при отсутствии фильтрации встречается более 1800 уникальных путей.
При этом это не мешает хранению всех этих путей — новые пути просто сохраняются в общей колонке данных, если превышен лимит (при этом статистика ускоряет выполнение запросов).
Такое оптимизированное обращение с JSON обеспечивает эффективное хранение сложных, полуструктурированных наборов данных, таких как данные Bluesky, в бронзовом слое архитектуры.
Для пользователей, заинтересованных в технических деталях реализации этого нового типа колонок, рекомендуется ознакомиться с подробным постом в нашем блоге (ссылка предоставлена в оригинале).
Бронзовый уровень для необработанных данных (Bronze, сырые данные)
Хотя исходное описание бронзового слоя (Bronze layer) не предполагает фильтрацию или преобразование данных, мы относимся к этому менее догматично и считаем, что минимальная фильтрация и недеструктивные преобразования данных могут быть полезны для исследования проблем и возможности воспроизведения данных в будущем.
Для преобразований мы рекомендуем ограничиться теми, которые можно реализовать с помощью материализованных колонок (Materialized columns), как показано ниже в нашей схеме бронзового слоя:
|
1 2 3 4 5 6 7 8 9 10 11 12 |
CREATE TABLE bluesky.bluesky_raw ( `data` JSON(SKIP `commit.record.reply.root.record`, SKIP `commit.record.value.value`), `_file` LowCardinality(String), `kind` LowCardinality(String) MATERIALIZED getSubcolumn(data, 'kind'), `scrape_ts` DateTime64(6) MATERIALIZED fromUnixTimestamp64Micro(CAST(getSubcolumn(data, 'time_us'), 'UInt64')), `bluesky_ts` DateTime64(6) MATERIALIZED multiIf(getSubcolumn(data, 'kind') = 'commit', parseDateTime64BestEffortOrZero(CAST(getSubcolumn(data, 'commit.record.createdAt'), 'String')), getSubcolumn(data, 'kind') = 'identity', parseDateTime64BestEffortOrZero(CAST(getSubcolumn(data, 'identity.time'), 'String')), getSubcolumn(data, 'kind') = 'account', parseDateTime64BestEffortOrZero(CAST(getSubcolumn(data, 'account.time'), 'String')), toDateTime64(0, 6)), `dedup_hash` String MATERIALIZED cityHash64(arrayFilter(p -> ((p.1) != 'time_us'), JSONExtractKeysAndValues(CAST(data, 'String'), 'String'))) ) ENGINE = ReplacingMergeTree PRIMARY KEY (kind, bluesky_ts) ORDER BY (kind, bluesky_ts, dedup_hash) |
Некоторые важные замечания по этой схеме:
- Тип JSON — колонка data использует новый тип данных JSON и содержит всё событие целиком.
Мы применяем оператор SKIP, чтобы исключить определённые пути JSON, которые, как показал анализ, были ответственны за повторяющиеся структуры, отмеченные ранее. - Сохранение метаданных — колонка
_fileсодержит ссылку на файл, из которого была загружена строка. - Материализованные колонки (Materialized columns) — остальные колонки являются материализованными и вычисляются из колонки data во время вставки:
scrape_ts— время, когда событие было доставлено; извлекается из поля JSON time_us.kind— тип события, как упоминалось ранее.bluesky_ts— выполняет условную логику, извлекая временную метку события на основе значения kind; это решает проблему непоследовательной структуры и обеспечивает единый формат временных меток для всех событий.dedup_hash— содержит хеш события.
- Для вычисления хеша создаётся массив всех JSON-путей и их значений, за исключением
time_us(так как это поле отличается у дубликатов), с помощью функцииJSONExtractKeysAndValues.
Затем функцияcityHash64обрабатывает этот массив, создавая уникальный хеш события. - ReplacingMergeTree — используется движок
ReplacingMergeTree, который позволяет устранять дубликаты записей, имеющих одинаковые значения ключей сортировки (ORDER BY).
Дедупликация выполняется асинхронно во время фоновых слияний, которые происходят в неопределённое время и не могут быть напрямую контролируемы — то есть дедупликация осуществляется постепенно (eventual deduplication).
В нашей схеме ключ ORDER BY включает kind и bluesky_ts, что:
- обеспечивает эффективное чтение;
- гарантирует высокую степень сжатия, группируя строки с похожими атрибутами.
Мы также добавляем dedup_hash, чтобы уникально идентифицировать строки для дедупликации, но не включаем его в PRIMARY KEY.
Это оптимизация, которая предотвращает загрузку индекса по dedup_hash в память — разумное решение, так как мы не выполняем прямые запросы по хешу.
Наш бронзовый слой выполняет минимальные преобразования данных с помощью материализованных колонок, но при этом обеспечивает возможность дедупликации.
Важно отметить, что использование ReplacingMergeTree здесь не является обязательным и не влияет на будущие слои.
Пользователи могут предпочесть обычный MergeTree, если хотят анализировать дубликаты напрямую.
Наш выбор обусловлен главным образом желанием минимизировать объём хранимых данных.
Загрузка данных из объектного хранилища (s3, object storage)
Как описано выше, наш конвейер загрузки данных (ingestion pipeline) использует инструмент websocat для потоковой передачи данных из JetStream API, сохраняя события в виде файлов .csv.gz в Google Cloud Storage (GCS).
Этот промежуточный шаг предоставляет несколько преимуществ:
- он позволяет воспроизводить данные (data replay),
- сохраняет оригинальную копию необработанных данных (raw data)
- и имитирует подход, который многие пользователи используют для загрузки данных из объектного хранилища.
Чтобы считать эти файлы из GCS в нашу таблицу бронзового слоя bluesky_raw, мы используем движок таблицы S3Queue (S3Queue table engine). Этот движок считывает данные из объектного хранилища, совместимого с S3, автоматически обрабатывает новые файлы по мере их добавления в бакет и вставляет их в указанную таблицу через материализованное представление (materialized view).
Создание этой таблицы требует небольшой DDL-команды:
|
1 2 3 4 5 6 |
CREATE TABLE bluesky.bluesky_queue ( `data` Nullable(String) ) ENGINE = S3Queue('https://storage.googleapis.com/pme-internal/bluesky/*.gz', '', '', 'CSVWithNames') SETTINGS mode = 'ordered', s3queue_buckets = 30, s3queue_processing_threads_num = 10; |
Обратите внимание, что мы указываем GCS-бакет, содержащий сжатые (gzipped) файлы,
и определяем каждую строку как тип String с помощью объявления схемы.
Важно, что мы включаем «ordered mode» через настройку mode = 'ordered'. Это заставляет файлы обрабатываться в лексикографическом порядке, обеспечивая последовательную загрузку данных.
Хотя это означает, что файлы, добавленные с более ранним порядком сортировки, игнорируются, такая конфигурация поддерживает эффективную и инкрементальную обработку, и устраняет необходимость выполнять масштабные операции сравнения множеств, если файлы не имеют естественного порядка.
Наше раннее использование временных меток (timestamps) для имен файлов гарантирует, что данные обрабатываются в правильной последовательности, а движок S3Queue быстро распознаёт новые файлы, которые нужно загрузить.
Наша среда sql.clickhouse.com, в которую мы загружаем данные, состоит из трёх узлов, каждый из которых имеет 60 виртуальных процессорных ядер (vCPUs).
Параметр s3queue_processing_threads_num задаёт количество потоков для обработки файлов на каждом сервере.
Кроме того, при использовании ordered mode вводится дополнительная настройка — s3queue_buckets. Как рекомендуется, мы устанавливаем её как произведение количества реплик (3) на количество потоков обработки (10).
Чтобы потреблять строки из этой очереди, необходимо присоединить инкрементальное материализованное представление (Incremental Materialized View). Это представление читает данные из очереди, выполняет SELECT-запрос над строками, а результат отправляется в таблицу бронзового слоя bluesky_raw.
|
1 2 3 4 5 6 7 8 9 |
CREATE MATERIALIZED VIEW bluesky.bluesky_mv TO bluesky.bluesky_raw ( `data` Nullable(String) ) AS SELECT data, _file FROM bluesky.bluesky_queue WHERE isValidJSON(data) = 1 |
Обратите внимание, что мы выполняем базовую фильтрацию уже на этом уровне:
в таблицу бронзового слоя передаются только строки, где isValidJSON(data) = 1, то есть содержащие валидный JSON.
Также мы добавляем метаданные — колонку _file, чтобы иметь запись о том, из какого gzip-файла была загружена каждая строка.
Потоковая передача Bluesky напрямую в ClickHouse (Streaming)
Обратите внимание, что ClickHouse может напрямую выполнять потоковую загрузку данных с использованием JSON-форматов ввода, как недавно продемонстрировал наш технический директор (CTO) Алексей Миловидов.
Это можно реализовать, объединив тип данных JSON и формат ввода JSON.
Например:
|
1 |
websocat -n "wss://jetstream1.us-east.bsky.network/subscribe?wantedCollections=app.*" | pv -l | split -l 1000 --filter='clickhouse-client --host sql-clickhouse.clickhouse.com --secure --password "" --query "INSERT INTO bluesky.bluesky_raw (data) FORMAT JSONAsObject"' |
ClickPipes в ClickHouse Cloud
Хотя механизм таблиц S3Queue позволяет нам выполнять потоковую передачу данных из объектного хранилища в ClickHouse, у него есть определённые ограничения. Помимо того, что он поддерживает только S3-совместимые хранилища, он обеспечивает семантику “по крайней мере один раз” (at-least-once).
Пользователи ClickHouse Cloud могут предпочесть использовать ClickPipes — управляемое решение для загрузки данных, которое обеспечивает семантику “ровно один раз” (exactly-once), поддерживает больше источников (например, Kafka) и разделяет ресурсы загрузки и ресурсы кластера.
Эта технология может быть использована для замены S3Queue в описанной выше архитектуре с минимальной настройкой через пошаговый мастер (guided wizard).
Запросы к бронзовому уровню
Хотя мы не рекомендуем предоставлять доступ к вашей таблице уровня Bronze конечным пользователям (downstream consumers), выбранный нами ключ сортировки (ordering key) позволяет эффективно исследовать данные, выявлять дополнительные проблемы с их качеством или, при необходимости, повторно воспроизводить данные через последующие уровни архитектуры.
Мы отмечали, что во время слияния (merge) движок ReplacingMergeTree определяет дубликаты строк, используя значения столбцов, указанных в ORDER BY, как уникальный идентификатор, и сохраняет только последнюю версию записи. Однако это обеспечивает лишь постепенную (eventual) корректность — то есть не гарантирует, что все дубликаты будут удалены, поэтому полагаться на это не стоит.
Чтобы гарантировать корректные результаты, пользователям необходимо дополнять фоновое объединение (background merges) операцией удаления дубликатов во время выполнения запроса, что можно сделать с помощью оператора FINAL.
Однако это создаёт дополнительную нагрузку на ресурсы и негативно влияет на производительность запросов, что является ещё одной причиной, по которой мы не советуем предоставлять доступ к Bronze-таблицам потребителям данных.
В приведённых выше примерах запросов мы опускаем оператор FINAL, принимая небольшой уровень дублирования, поскольку это допустимо для разведочного анализа данных.
Большинство данных представляют собой commit-события (commit events):
|
1 2 3 4 5 6 7 8 9 10 11 12 |
SELECT kind, formatReadableQuantity(count()) AS c FROM bluesky_raw GROUP BY kind FORMAT PrettyCompactMonoBlock ┌─kind─────┬─c──────────────┐ │ commit │ 614.55 million │ │ account │ 1.72 million │ │ identity │ 1.70 million │ └──────────┴────────────────┘ 3 rows in set. Elapsed: 0.124 sec. Processed 617.97 million rows, 617.97 MB (5.00 billion rows/s., 5.00 GB/s.) Peak memory usage: 139.03 MiB. |
Внутри этих commit-событий можно исследовать типы событий с помощью синтаксиса пути JSON (JSON path syntax):
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
SELECT data.commit.collection AS collection, count() AS c, uniq(data.did) AS users FROM bluesky_raw WHERE kind = 'commit' GROUP BY ALL ORDER BY c DESC LIMIT 10 FORMAT PrettyCompactMonoBlock ┌─collection───────────────┬─────────c─┬───users─┐ │ app.bsky.feed.like │ 705468149 │ 7106516 │ │ app.bsky.graph.follow │ 406406091 │ 8629730 │ │ app.bsky.feed.post │ 137946245 │ 4323265 │ │ app.bsky.feed.repost │ 90847077 │ 2811398 │ │ app.bsky.graph.block │ 25277808 │ 1523621 │ │ app.bsky.graph.listitem │ 8464006 │ 166002 │ │ app.bsky.actor.profile │ 8168943 │ 4083558 │ │ app.bsky.graph.listblock │ 643292 │ 216695 │ │ app.bsky.feed.threadgate │ 559504 │ 94202 │ │ app.bsky.feed.postgate │ 275675 │ 38790 │ └──────────────────────────┴───────────┴─────────┘ 10 rows in set. Elapsed: 19.923 sec. Processed 1.38 billion rows, 122.00 GB (69.50 million rows/s., 6.12 GB/s.) Peak memory usage: 1003.91 MiB. |
Мы видим, что основная часть событий — это “лайки” и “подписки (follows)”, что вполне ожидаемо.
Серебряный уровень для очищенных данных (Silver Layer)
Слой Silver (Серебряный) представляет собой следующий этап в архитектуре Medallion, преобразуя сырые данные из слоя Bronze (Бронзового) в более согласованную и структурированную форму.
Этот слой решает проблемы качества данных: выполняет дополнительную фильтрацию, стандартизирует схемы, производит преобразования и обеспечивает полное удаление дубликатов.
В ClickHouse обычно наблюдается прямая связь между таблицами Bronze и их эквивалентами в Silver.
Мы знаем, что дубликаты событий имеют одинаковые значения bluesky_ts (и других столбцов), различаясь лишь по scrape_ts, причём последнее значение может быть значительно позже.
Однако ранее мы установили, что большинство дубликатов появляются в пределах 20 минут.
Чтобы гарантировать, что в золотой слой (Gold) не попадут дубликаты, мы вводим понятие конечного окна дедупликации (finite duplication window) в слое Silver.
События будут распределяться по этим окнам дедупликации, которые смещены относительно текущего времени на основе значения bluesky_ts.
Эти «окна» периодически сбрасываются (flushed) в слой Gold, с гарантией, что в каждое окно попадёт только одна копия события.
Использование окон дедупликации избавляет нас от необходимости проводить дедупликацию за бесконечный период времени, что существенно снижает нагрузку на систему и делает задачу более управляемой.
Как мы покажем далее, это можно эффективно реализовать в ClickHouse.
Назначение событий в окна дедупликации, которые синхронизируются с реальным временем и периодически сбрасываются, предполагает, что данные доставляются без значительных задержек.
Анализ таблицы Bronze показывает, что:
- 90% событий имеют значение bluesky_ts, отличающееся от времени их поступления (извлечённого из имени файла в GCS) не более чем на 20 минут.
Это возможно, если: - Обработка 1 миллиона сообщений за один раз не вызывает значительных задержек;
- Время чтения и обработки через S3Queue также незначительно (это можно проверить через системные таблицы);
- Время, извлечённое из имени файла, близко к реальному времени загрузки, что подтверждается запросами к GCS.
Кроме того, более 94% событий имеют разницу между scrape_ts и bluesky_ts меньше 20 минут (в 90% случаев — даже менее 10 секунд).
Это означает, что значение scrape_ts также не отстаёт от времени поступления данных.
Понимая, что события обычно доставляются в течение 20 минут после их bluesky_ts, мы можем надёжно формировать окна дедупликации в слое Silver.
Для этого мы создаём раздел (partition) в ClickHouse для каждого 20-минутного интервала — таким образом, раздел фактически соответствует одному окну.
События распределяются по разделам в зависимости от того, в какой интервал они попадают, с помощью функции:
|
1 |
toStartOfInterval(bluesky_ts, toIntervalMinute(20)) |
Итоговая схема таблицы Silver выглядит следующим образом:
- Мы используем ReplacingMergeTree, но выполняем дедупликацию только внутри каждого раздела, то есть слияние происходит только в пределах окна.
- Для управления объёмом данных применяется TTL, который удаляет строки, старше 1440 секунд (24 часа).
Параметр ttl_only_drop_parts = 1 гарантирует, что части удаляются только тогда, когда все строки в них устарели.
|
1 2 3 4 5 6 7 8 9 10 11 12 |
CREATE TABLE bluesky.bluesky_dedup ( `data` JSON(SKIP `commit.record.reply.root.record`, SKIP `commit.record.value.value`), `kind` LowCardinality(String), `scrape_ts` DateTime64(6), `bluesky_ts` DateTime64(6), `dedup_hash` String ) ENGINE = ReplacingMergeTree PARTITION BY toStartOfInterval(bluesky_ts, toIntervalMinute(20)) ORDER BY dedup_hash TTL toStartOfMinute(bluesky_ts) + toIntervalMinute(1440) SETTINGS ttl_only_drop_parts=1 |
Так как слишком большое количество разделов может привести к проблемам производительности и ошибкам вроде “Too many parts”, мы ограничиваем таблицу Silver только одними сутками данных (всего 72 окна по 20 минут). Старые данные автоматически удаляются с помощью правил TTL, сохраняя эффективность и стабильность системы.
Инкрементные материализованные представления для фильтрации
При применении фильтрации и правил дедупликации к данным уровня Bronze, пользователи часто сохраняют «негативные совпадения» (то есть записи, не прошедшие фильтры) в отдельной таблице — так называемой Dead-Letter таблице — для последующего анализа.
Так как мы планируем периодически отправлять свежие партиции из слоя Silver в слой Gold, нам нежелательно, чтобы события поступали слишком поздно.
По этой причине, а также чтобы продемонстрировать принцип “dead letter queue”, мы будем отправлять все события из слоя Bronze, у которых разница между scrape_ts и bluesky_ts превышает 20 минут, в очередь “dead letter”.
События же, у которых задержка меньше 20 минут, будут вставляться в соответствующую партицию таблицы Silver, показанную ранее.
Для реализации этого подхода мы используем две инкрементные материализованные представления (incremental materialized views).
Каждое из них выполняет SELECT-запрос к строкам, вставленным в таблицу уровня Bronze (bluesky_raw), и отправляет результаты либо:
- в таблицу dead letter queue,
- либо в таблицу Silver (bluesky_dedup).
Основное различие между этими двумя представлениями заключается в их фильтрующих условиях.
Представление для отправки строк в таблицу Silver:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
CREATE MATERIALIZED VIEW bluesky.bluesky_dedup_mv TO bluesky.bluesky_dedup ( `data` JSON, `kind` LowCardinality(String), `scrape_ts` DateTime64(6), `bluesky_ts` DateTime64(6), `dedup_hash` String ) AS SELECT data, kind, scrape_ts, bluesky_ts, dedup_hash FROM bluesky.bluesky_raw WHERE abs(timeDiff(scrape_ts, bluesky_ts)) < 1200 |
Схема таблицы Dead-Letter Queue и связанное с ней материализованное представление:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
CREATE TABLE bluesky.bluesky_dlq ( `data` JSON(SKIP `commit.record.reply.root.record`, SKIP `commit.record.value.value`), `kind` LowCardinality(String), `scrape_ts` DateTime64(6), `bluesky_ts` DateTime64(6), `dedup_hash` String ) ENGINE = MergeTree ORDER BY (kind, scrape_ts) CREATE MATERIALIZED VIEW bluesky.bluesky_dlq_mv TO bluesky.bluesky_dlq ( `data` JSON, `kind` LowCardinality(String), `scrape_ts` DateTime64(6), `bluesky_ts` DateTime64(6), `dedup_hash` String ) AS SELECT data, kind, scrape_ts, bluesky_ts, dedup_hash FROM bluesky.bluesky_raw WHERE abs(timeDiff(scrape_ts, bluesky_ts)) >= 1200 |
Обратите внимание, что для очереди “dead letter” используется обычный движок MergeTree,
так как дедупликация здесь не требуется — эти данные предназначены для анализа проблем и диагностики, а не для основной аналитики.
Отправка данных на золотой уровень (Gold Layer)
Описанный выше процесс оставляет разделы (partitions), заполненные на уровне Silver.
Периодически нам необходимо переносить данные из этих разделов в уровень Gold, гарантируя, что все события были полностью дедуплицированы, и при этом делать это достаточно оперативно, чтобы обеспечить наличие свежих данных в слое Gold для аналитики.
Мы реализуем этот периодический перенос (flushing) с помощью Refreshable Materialized View.
Такие представления выполняются периодически по таблицам уровня Silver и позволяют выполнять сложные преобразования, включая денормализацию данных перед их записью в таблицы уровня Gold.
В нашем случае нам нужно просто периодически вставлять данные из последнего раздела, который больше не получает новых данных, в таблицу Gold.
Запрос при этом должен выполняться с использованием оператора FINAL, чтобы гарантировать, что все события дедуплицированы.
Хотя такой запрос обычно более затратен вычислительно, здесь мы можем использовать два преимущества:
- Запрос выполняется периодически — в нашем случае каждые 20 минут, что смещает нагрузку с пользовательских запросов на уровень загрузки данных.
- Мы обрабатываем только один раздел за одно выполнение. Можно ограничить дедупликацию во время выполнения только этим разделом, установив параметр
do_not_merge_across_partitions_select_final=1, что дополнительно оптимизирует запрос и снижает нагрузку.
Для этого требуется определить, какой именно раздел нужно перенести в Gold при каждом выполнении.
Эта логика показана на диаграмме выше, а в кратком виде выглядит так:
- Мы определяем последний раздел в таблице Silver bluesky_dedup с помощью служебного поля
_partition_id.
Из этого значения вычитаем 40 минут, получая раздел, который был создан два окна назад (X — 2) — называем егоcurrent_partition. - В целевой таблице уровня Gold bluesky есть столбец
_rmt_partition_id,
заполняемый refreshable materialized view, где хранится, из какого раздела уровня Silver поступило каждое событие.
Мы используем это поле, чтобы определить последний успешно перенесённый раздел, прибавляем 20 минут,
получая раздел, который нужно обработать следующим —next_to_process.
Если next_to_process = 1200, это значит, что таблица bluesky пуста
(0 + 1200 секунд = 1200), и ещё не было ни одной передачи данных.
В этом случае мы используем current_partition и вставляем все события, где _partition_id = current_partition.
Если next_to_process > 1200, значит, переносы уже выполнялись.
Если current_partition >= next_to_process, то мы отстаём не менее чем на 40 минут (2 окна),
и используем значение next_to_process, вставляя все события, где _partition_id = next_to_process.
Если же current_partition < next_to_process, выполняется noop (ничего не происходит) — данные не переносятся.
Эта логика устойчива к сбоям, таким как пропуски выполнения каждые 20 минут, повторные запуски или задержки выполнения. В результате формируется Refreshable Materialized View, в SELECT-запросе которого инкапсулирована описанная выше логика.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
CREATE MATERIALIZED VIEW bluesky.blue_sky_dedupe_rmv REFRESH EVERY 20 MINUTE APPEND TO bluesky.bluesky ( `data` JSON(SKIP `commit.record.reply.root.record`, SKIP `commit.record.value.value`), `kind` LowCardinality(String), `bluesky_ts` DateTime64(6), `_rmt_partition_id` LowCardinality(String) ) AS WITH ( --step 1 SELECT toUnixTimestamp(subtractMinutes(CAST(_partition_id, 'DateTime'), 40)) FROM bluesky.bluesky_dedup GROUP BY _partition_id ORDER BY _partition_id DESC LIMIT 1 ) AS current_partition, ( --step 2 SELECT toUnixTimestamp(addMinutes(CAST(max(partition_id), 'DateTime'), 20)) FROM bluesky.latest_partition ) AS next_to_process SELECT data, kind, bluesky_ts, _partition_id AS _rmt_partition_id FROM bluesky.bluesky_dedup FINAL --step 3 & 4 WHERE _partition_id = CAST(if(next_to_process = 1200, current_partition, if(current_partition >= next_to_process, next_to_process, 0)), 'String') SETTINGS do_not_merge_across_partitions_select_final = 1 |
Это представление выполняется каждые 20 минут, передавая очищенные и дедуплицированные данные в уровень Gold. Следует отметить, что данные появляются в Gold с задержкой около 40 минут, хотя при необходимости пользователи могут выполнять запросы к уровню Silver для получения более свежих данных.
Внимательный читатель заметит, что в шаге 2 и на диаграмме выше наш запрос использует таблицу latest_partition, а не обращается напрямую к _rmt_partition_id в таблице bluesky уровня Gold.
Эта таблица создаётся с помощью инкрементного материализованного представления (incremental materialized view) и служит оптимизацией, которая ускоряет определение следующего раздела для обработки.
Это представление отслеживает последний вставленный раздел в таблицу Gold и выглядит следующим образом.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
CREATE MATERIALIZED VIEW bluesky.latest_partition_mv TO bluesky.latest_partition ( `partition_id` UInt32 ) AS SELECT max(CAST(_rmt_partition_id, 'UInt32')) AS partition_id FROM bluesky.bluesky CREATE TABLE bluesky.latest_partition ( `partition_id` SimpleAggregateFunction(max, UInt32) ) ENGINE = AggregatingMergeTree ORDER BY tuple() |
Золотой уровень для анализа данных (Gold Layer для аналитики)
Указанное выше refreshable materialized view периодически отправляет данные в таблицу уровня Gold — bluesky.
Схема этой таблицы показана ниже:
|
1 2 3 4 5 6 7 8 9 10 |
CREATE TABLE bluesky.bluesky ( `data` JSON(SKIP `commit.record.reply.root.record`, SKIP `commit.record.value.value`), `kind` LowCardinality(String), `bluesky_ts` DateTime64(6), `_rmt_partition_id` LowCardinality(String) ) ENGINE = MergeTree PARTITION BY toStartOfInterval(bluesky_ts, toIntervalMonth(1)) ORDER BY (kind, bluesky_ts) |
Поскольку данные полностью дедуплицированы до момента вставки, мы можем использовать стандартный MergeTree.
Ключ сортировки (ORDER BY) выбирается исключительно на основе шаблонов доступа потребителей данных и с целью оптимизации сжатия.
Таблица разделена по месяцам (partitioned by month) — в первую очередь для удобства управления данными, а также потому, что ожидается, что большинство запросов будут обращаться к самым последним данным.
Обратите внимание: хотя мы по-прежнему используем тип данных JSON на этом уровне,
возможно выполнение дополнительных трансформаций данных на этапе предыдущего
refreshable materialized view — например:
- извлечение часто используемых полей в корень таблицы,
- использование столбцов типа ALIAS, чтобы упростить синтаксис запросов и повысить удобство анализа.
Материализованные представления для общих запросов (Для часто запрашиваемых метрик)
Этот слой gold должен быть полностью оптимизирован для выполнения запросов со стороны прикладных систем и потребителей данных. Хотя наш ключ сортировки направлен на то, чтобы облегчить этот процесс, не все шаблоны доступа будут одинаковыми. До настоящего времени наиболее распространённым применением инкрементных материализованных представлений было выполнение фильтрации и вставки данных между слоями. Однако наше более раннее использование представления для вычисления следующего раздела (partition) намекало на то, как ещё можно оптимизировать другие запросы.
Помимо фильтрации и отправки подмножеств данных в целевую таблицу с другими ключами сортировки (оптимизированными под иные шаблоны доступа), материализованные представления могут использоваться для предварительного вычисления агрегатов во время вставки данных в таблицу gold.
Результаты таких агрегатов будут представлять собой уменьшенную форму исходных данных (частичный набросок, если речь идёт об агрегации). Это не только упрощает последующие запросы к целевой таблице, но и обеспечивает более высокую скорость выполнения, поскольку вычисления переносятся с момента запроса на момент вставки, тем самым снижая время отклика при запросе.
Полное руководство по материализованным представлениям можно найти здесь.
В качестве примера рассмотрим наш предыдущий запрос, который вычисляет наиболее распространённые типы commit-событий:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
SELECT data.commit.collection AS collection, count() AS c, uniq(data.did) AS users FROM bluesky WHERE kind = 'commit' GROUP BY ALL ORDER BY c DESC LIMIT 10 ┌─collection───────────────┬─────────c─┬───users─┐ │ app.bsky.feed.like │ 269979403 │ 5270604 │ │ app.bsky.graph.follow │ 150891706 │ 5631987 │ │ app.bsky.feed.post │ 46886207 │ 3083647 │ │ app.bsky.feed.repost │ 33249341 │ 1956986 │ │ app.bsky.graph.block │ 9789707 │ 993578 │ │ app.bsky.graph.listitem │ 3231676 │ 102020 │ │ app.bsky.actor.profile │ 1731669 │ 1280895 │ │ app.bsky.graph.listblock │ 263667 │ 105310 │ │ app.bsky.feed.threadgate │ 215715 │ 49871 │ │ app.bsky.feed.postgate │ 99625 │ 19960 │ └──────────────────────────┴───────────┴─────────┘ 10 rows in set. Elapsed: 6.445 sec. Processed 516.53 million rows, 45.50 GB (80.15 million rows/s., 7.06 GB/s.) Peak memory usage: 986.51 MiB. |
Для 500 миллионов событий выполнение этого запроса занимает около 6 секунд.
Чтобы преобразовать его в инкрементное материализованное представление, необходимо подготовить таблицу, которая будет получать результаты инкрементной агрегации:
|
1 2 3 4 5 6 7 8 |
CREATE TABLE bluesky.top_post_types ( `collection` LowCardinality(String), `posts` SimpleAggregateFunction(sum, UInt64), `users` AggregateFunction(uniq, String) ) ENGINE = AggregatingMergeTree ORDER BY collection |
Обратите внимание, что нам необходимо использовать AggregatingMergeTree и указать ключ сортировки как ключ группировки — результаты агрегации с одинаковыми значениями этого столбца будут объединяться.
Инкрементные результаты должны храниться в специальных типах столбцов SimpleAggregateFunction и AggregateFunction — для этого необходимо указать саму функцию и связанный с ней тип данных.
Ниже показано соответствующее материализованное представление, которое заполняет эту таблицу при вставке строк в таблицу gold. Обратите внимание, что используется суффикс — State, чтобы явно сгенерировать состояние агрегации:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
CREATE MATERIALIZED VIEW top_post_types_mv TO top_posts_types AS SELECT data.commit.collection AS collection, count() AS posts, uniqState(CAST(data.did, 'String')) AS users FROM bluesky WHERE kind = 'commit' GROUP BY ALL When querying this table, we use the -Merge suffix to merge aggregation states. SELECT collection, sum(posts) AS posts, uniqMerge(users) AS users FROM top_post_types GROUP BY collection ORDER BY posts DESC LIMIT 10 10 rows in set. Elapsed: 0.042 sec. |
Производительность запроса улучшилась более чем в 150 раз!
Ниже приведена финальная диаграмма архитектуры, показывающая все наши уровни:
Примеры запросов и визуализации на sql.clickhouse.com
Приведённый выше пример представляет собой очень простую демонстрацию. Эти данные доступны на сайте sql.clickhouse.com, где описанный выше рабочий процесс Medallion выполняется непрерывно. Мы также предоставили дополнительные материализованные представления в качестве примеров для эффективного выполнения запросов.
Например, чтобы определить, в какое время суток пользователи чаще всего ставят лайки, публикуют и репостят в Bluesky, можно выполнить следующий запрос:
|
1 2 3 4 5 6 7 |
SELECT event, hour_of_day, sum(count) as count FROM bluesky.events_per_hour_of_day WHERE event in ['post', 'repost', 'like'] GROUP BY event, hour_of_day ORDER BY hour_of_day; 72 rows in set. Elapsed: 0.007 sec. |
Запрос выполняется за 7 миллисекунд.
Вы можете запустить этот запрос в нашем playground, чтобы отобразить результат в виде графика.
Ниже приведено соответствующее материализованное представление и целевая таблица, которая заполняется по мере вставки строк в gold-таблицу:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
CREATE TABLE bluesky.events_per_hour_of_day ( event LowCardinality(String), hour_of_day UInt8, count SimpleAggregateFunction(sum, UInt64) ) ENGINE = AggregatingMergeTree ORDER BY (event, hour_of_day); CREATE MATERIALIZED VIEW bluesky.events_per_hour_of_day_mv TO bluesky.events_per_hour_of_day AS SELECT extract(data.commit.collection, '\\.([^.]+) |
Полный список запросов и соответствующих им представлений можно посмотреть здесь.
Кроме того, вы можете напрямую выполнять запросы к gold или silver таблицам!
Некоторые примеры, с которых можно начать:
- Общее количество событий (Total events)
- Когда пользователи чаще всего используют Bluesky (When do people use BlueSky)
- Самые популярные типы событий (Top event types)
- Топ типов событий по количеству (Top event types by count)
- Топ типов событий по уникальным пользователям (Top event types by unique users)
- Самые залайканные посты (Most liked posts)
- Самые залайканные посты о ClickHouse (Most liked posts about ClickHouse)
- Самые часто reposted посты (Most reposted posts)
- Самые используемые языки (Most used languages)
- Самые залайканные пользователи (Most liked users)
- Самые часто reposted пользователи (Most reposted users)
Заключительные мысли
В этом блоге мы продемонстрировали полностью реализованную архитектуру Medallion, построенную исключительно на ClickHouse, показав, как его мощные возможности позволяют преобразовывать “сырые”, полуструктурированные данные в качественные, готовые к запросам наборы данных.
Через уровни Bronze, Silver и Gold мы решили типичные проблемы, такие как повреждённые данные (malformed data), несогласованность структуры и значительное количество дубликатов.
Благодаря использованию типа данных JSON в ClickHouse, нам удалось эффективно обрабатывать по своей природе полуструктурированные и динамичные данные, при этом сохраняя высокую производительность.
Хотя эта архитектура обеспечивает надёжный и гибкий рабочий процесс, она всё же вносит определённые задержки по мере перемещения данных между слоями.
В нашем решении “окна дедупликации” (deduplication windows) помогли минимизировать эти задержки, однако остаётся компромисс между скоростью доставки данных в реальном времени и качеством данных.
Поэтому архитектура Medallion особенно хорошо подходит для наборов данных с высокой степенью дублирования и менее критичными требованиями к мгновенной доступности данных.
|
1 2 3 4 5 6 |
) AS event, toHour(bluesky_ts) as hour_of_day, count() AS count FROM bluesky.bluesky WHERE (kind = 'commit') GROUP BY event, hour_of_day; |
Полный список запросов и соответствующих им представлений можно посмотреть здесь.
Кроме того, вы можете напрямую выполнять запросы к gold или silver таблицам!
Некоторые примеры, с которых можно начать:
- Общее количество событий (Total events)
- Когда пользователи чаще всего используют Bluesky (When do people use BlueSky)
- Самые популярные типы событий (Top event types)
- Топ типов событий по количеству (Top event types by count)
- Топ типов событий по уникальным пользователям (Top event types by unique users)
- Самые залайканные посты (Most liked posts)
- Самые залайканные посты о ClickHouse (Most liked posts about ClickHouse)
- Самые часто reposted посты (Most reposted posts)
- Самые используемые языки (Most used languages)
- Самые залайканные пользователи (Most liked users)
- Самые часто reposted пользователи (Most reposted users)
Заключительные мысли
В этом блоге мы продемонстрировали полностью реализованную архитектуру Medallion, построенную исключительно на ClickHouse, показав, как его мощные возможности позволяют преобразовывать “сырые”, полуструктурированные данные в качественные, готовые к запросам наборы данных.
Через уровни Bronze, Silver и Gold мы решили типичные проблемы, такие как повреждённые данные (malformed data), несогласованность структуры и значительное количество дубликатов.
Благодаря использованию типа данных JSON в ClickHouse, нам удалось эффективно обрабатывать по своей природе полуструктурированные и динамичные данные, при этом сохраняя высокую производительность.
Хотя эта архитектура обеспечивает надёжный и гибкий рабочий процесс, она всё же вносит определённые задержки по мере перемещения данных между слоями.
В нашем решении “окна дедупликации” (deduplication windows) помогли минимизировать эти задержки, однако остаётся компромисс между скоростью доставки данных в реальном времени и качеством данных.
Поэтому архитектура Medallion особенно хорошо подходит для наборов данных с высокой степенью дублирования и менее критичными требованиями к мгновенной доступности данных.


















Leave a Reply