Учебное руководство по коннектору Kafka#

Введение#

Коннектор Kafka для Trino предоставляет доступ к live-данным топиков Apache Kafka с помощью Trino. В этом руководстве показано, как настроить топики и создать файлы описания топиков, лежащие в основе таблиц Trino.

Установка#

В этом руководстве предполагается знакомство с Trino и наличие рабочей локальной установки Trino (см. Развертывание Trino). Основное внимание уделяется настройке Apache Kafka и интеграции с Trino.

Шаг 1. Установка Apache Kafka#

Скачайте и распакуйте Apache Kafka.

Note

Это руководство было протестировано с Apache Kafka 0.8.1. Оно должно работать с любой версией Apache Kafka 0.8.x.

Запустите ZooKeeper и сервер Kafka:

$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
$ bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

Это запускает Zookeeper на порту 2181, а Kafka — на порту 9092.

Шаг 2. Загрузка данных#

Скачайте загрузчик tpch-kafka из Maven Central:

$ curl -o kafka-tpch https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_0811-1.0.sh
$ chmod 755 kafka-tpch

Теперь запустите программу kafka-tpch, чтобы предварительно загрузить несколько топиков данными tpch:

$ ./kafka-tpch load --brokers localhost:9092 --prefix tpch. --tpch-type tiny
2014-07-28T17:17:07.594-0700     INFO    main    io.airlift.log.Logging    Logging to stderr
2014-07-28T17:17:07.623-0700     INFO    main    de.softwareforge.kafka.LoadCommand    Processing tables: [customer, orders, lineitem, part, partsupp, supplier, nation, region]
2014-07-28T17:17:07.981-0700     INFO    pool-1-thread-1    de.softwareforge.kafka.LoadCommand    Loading table 'customer' into topic 'tpch.customer'...
2014-07-28T17:17:07.981-0700     INFO    pool-1-thread-2    de.softwareforge.kafka.LoadCommand    Loading table 'orders' into topic 'tpch.orders'...
2014-07-28T17:17:07.981-0700     INFO    pool-1-thread-3    de.softwareforge.kafka.LoadCommand    Loading table 'lineitem' into topic 'tpch.lineitem'...
2014-07-28T17:17:07.982-0700     INFO    pool-1-thread-4    de.softwareforge.kafka.LoadCommand    Loading table 'part' into topic 'tpch.part'...
2014-07-28T17:17:07.982-0700     INFO    pool-1-thread-5    de.softwareforge.kafka.LoadCommand    Loading table 'partsupp' into topic 'tpch.partsupp'...
2014-07-28T17:17:07.982-0700     INFO    pool-1-thread-6    de.softwareforge.kafka.LoadCommand    Loading table 'supplier' into topic 'tpch.supplier'...
2014-07-28T17:17:07.982-0700     INFO    pool-1-thread-7    de.softwareforge.kafka.LoadCommand    Loading table 'nation' into topic 'tpch.nation'...
2014-07-28T17:17:07.982-0700     INFO    pool-1-thread-8    de.softwareforge.kafka.LoadCommand    Loading table 'region' into topic 'tpch.region'...
2014-07-28T17:17:10.612-0700    ERROR    pool-1-thread-8    kafka.producer.async.DefaultEventHandler    Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.region
2014-07-28T17:17:10.781-0700     INFO    pool-1-thread-8    de.softwareforge.kafka.LoadCommand    Generated 5 rows for table 'region'.
2014-07-28T17:17:10.797-0700    ERROR    pool-1-thread-3    kafka.producer.async.DefaultEventHandler    Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.lineitem
2014-07-28T17:17:10.932-0700    ERROR    pool-1-thread-1    kafka.producer.async.DefaultEventHandler    Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.customer
2014-07-28T17:17:11.068-0700    ERROR    pool-1-thread-2    kafka.producer.async.DefaultEventHandler    Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.orders
2014-07-28T17:17:11.200-0700    ERROR    pool-1-thread-6    kafka.producer.async.DefaultEventHandler    Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.supplier
2014-07-28T17:17:11.319-0700     INFO    pool-1-thread-6    de.softwareforge.kafka.LoadCommand    Generated 100 rows for table 'supplier'.
2014-07-28T17:17:11.333-0700    ERROR    pool-1-thread-4    kafka.producer.async.DefaultEventHandler    Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.part
2014-07-28T17:17:11.466-0700    ERROR    pool-1-thread-5    kafka.producer.async.DefaultEventHandler    Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.partsupp
2014-07-28T17:17:11.597-0700    ERROR    pool-1-thread-7    kafka.producer.async.DefaultEventHandler    Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.nation
2014-07-28T17:17:11.706-0700     INFO    pool-1-thread-7    de.softwareforge.kafka.LoadCommand    Generated 25 rows for table 'nation'.
2014-07-28T17:17:12.180-0700     INFO    pool-1-thread-1    de.softwareforge.kafka.LoadCommand    Generated 1500 rows for table 'customer'.
2014-07-28T17:17:12.251-0700     INFO    pool-1-thread-4    de.softwareforge.kafka.LoadCommand    Generated 2000 rows for table 'part'.
2014-07-28T17:17:12.905-0700     INFO    pool-1-thread-2    de.softwareforge.kafka.LoadCommand    Generated 15000 rows for table 'orders'.
2014-07-28T17:17:12.919-0700     INFO    pool-1-thread-5    de.softwareforge.kafka.LoadCommand    Generated 8000 rows for table 'partsupp'.
2014-07-28T17:17:13.877-0700     INFO    pool-1-thread-3    de.softwareforge.kafka.LoadCommand    Generated 60175 rows for table 'lineitem'.

Теперь в Kafka есть несколько топиков, предварительно загруженных данными для запросов.

Шаг 3. Сделайте топики Kafka известными Trino#

В установке Trino добавьте файл свойств каталога etc/catalog/kafka.properties для коннектора Kafka. В этом файле перечислены узлы Kafka и топики:

connector.name=kafka
kafka.nodes=localhost:9092
kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region
kafka.hide-internal-columns=false

Теперь запустите Trino:

$ bin/launcher start

Поскольку все таблицы Kafka имеют префикс tpch. в конфигурации, таблицы находятся в схеме tpch. Коннектор смонтирован в каталог kafka, поскольку файл свойств называется kafka.properties.

Запустите Trino CLI:

$ ./trino --catalog kafka --schema tpch

Выведите список таблиц, чтобы проверить, что все работает:

trino:tpch> SHOW TABLES;
  Table
----------
 customer
 lineitem
 nation
 orders
 part
 partsupp
 region
 supplier
(8 rows)

Шаг 4. Базовые запросы к данным#

Данные Kafka неструктурированы и не имеют метаданных, описывающих формат сообщений. Без дополнительной конфигурации коннектор Kafka может получить доступ к данным и сопоставить их в сыром виде. Однако, кроме встроенных, в таблице нет фактических столбцов:

trino:tpch> DESCRIBE customer;
      Column       |  Type      | Extra |                   Comment
-------------------+------------+-------+---------------------------------------------
 _partition_id     | bigint     |       | Partition Id
 _partition_offset | bigint     |       | Offset for the message within the partition
 _key              | varchar    |       | Key text
 _key_corrupt      | boolean    |       | Key data is corrupt
 _key_length       | bigint     |       | Total number of key bytes
 _message          | varchar    |       | Message text
 _message_corrupt  | boolean    |       | Message data is corrupt
 _message_length   | bigint     |       | Total number of message bytes
 _timestamp        | timestamp  |       | Message timestamp
(11 rows)

trino:tpch> SELECT count(*) FROM customer;
 _col0
-------
  1500

trino:tpch> SELECT _message FROM customer LIMIT 5;
                                                                                                                                                 _message
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 {"rowNumber":1,"customerKey":1,"name":"Customer#000000001","address":"IVhzIApeRb ot,c,E","nationKey":15,"phone":"25-989-741-2988","accountBalance":711.56,"marketSegment":"BUILDING","comment":"to the even, regular platelets. regular, ironic epitaphs nag e"}
 {"rowNumber":3,"customerKey":3,"name":"Customer#000000003","address":"MG9kdTD2WBHm","nationKey":1,"phone":"11-719-748-3364","accountBalance":7498.12,"marketSegment":"AUTOMOBILE","comment":" deposits eat slyly ironic, even instructions. express foxes detect slyly. blithel
 {"rowNumber":5,"customerKey":5,"name":"Customer#000000005","address":"KvpyuHCplrB84WgAiGV6sYpZq7Tj","nationKey":3,"phone":"13-750-942-6364","accountBalance":794.47,"marketSegment":"HOUSEHOLD","comment":"n accounts will have to unwind. foxes cajole accor"}
 {"rowNumber":7,"customerKey":7,"name":"Customer#000000007","address":"TcGe5gaZNgVePxU5kRrvXBfkasDTea","nationKey":18,"phone":"28-190-982-9759","accountBalance":9561.95,"marketSegment":"AUTOMOBILE","comment":"ainst the ironic, express theodolites. express, even pinto bean
 {"rowNumber":9,"customerKey":9,"name":"Customer#000000009","address":"xKiAFTjUsCuxfeleNqefumTrjS","nationKey":8,"phone":"18-338-906-3675","accountBalance":8324.07,"marketSegment":"FURNITURE","comment":"r theodolites according to the requests wake thinly excuses: pending
(5 rows)

trino:tpch> SELECT sum(cast(json_extract_scalar(_message, '$.accountBalance') AS DOUBLE)) FROM customer LIMIT 10;
   _col0
------------
 6681865.59
(1 row)

Данные из Kafka можно запрашивать с помощью Trino, но они еще не представлены в форме настоящей таблицы. Сырые данные доступны через столбцы _message и _key, но не декодированы в столбцы. Поскольку пример данных находится в формате JSON, для извлечения частей данных можно использовать встроенные в Trino Функции и операторы JSON.

Шаг 5. Добавление файла описания топика#

Коннектор Kafka поддерживает файлы описания топиков, чтобы преобразовывать сырые данные в табличный формат. Эти файлы находятся в папке etc/kafka в установке Trino и должны заканчиваться на .json. Рекомендуется, чтобы имя файла совпадало с именем таблицы, но это не обязательно.

Добавьте следующий файл как etc/kafka/tpch.customer.json и перезапустите Trino:

{
    "tableName": "customer",
    "schemaName": "tpch",
    "topicName": "tpch.customer",
    "key": {
        "dataFormat": "raw",
        "fields": [
            {
                "name": "kafka_key",
                "dataFormat": "LONG",
                "type": "BIGINT",
                "hidden": "false"
            }
        ]
    }
}

Теперь в таблице customer есть дополнительный столбец: kafka_key.

trino:tpch> DESCRIBE customer;
      Column       |  Type      | Extra |                   Comment
-------------------+------------+-------+---------------------------------------------
 kafka_key         | bigint     |       |
 _partition_id     | bigint     |       | Partition Id
 _partition_offset | bigint     |       | Offset for the message within the partition
 _key              | varchar    |       | Key text
 _key_corrupt      | boolean    |       | Key data is corrupt
 _key_length       | bigint     |       | Total number of key bytes
 _message          | varchar    |       | Message text
 _message_corrupt  | boolean    |       | Message data is corrupt
 _message_length   | bigint     |       | Total number of message bytes
 _timestamp        | timestamp  |       | Message timestamp
(12 rows)

trino:tpch> SELECT kafka_key FROM customer ORDER BY kafka_key LIMIT 10;
 kafka_key
-----------
         0
         1
         2
         3
         4
         5
         6
         7
         8
         9
(10 rows)

Файл определения топика сопоставляет внутренний ключ Kafka, который является сырым long в восьми байтах, со столбцом Trino BIGINT.

Шаг 6. Сопоставление всех значений из сообщения топика со столбцами#

Обновите файл etc/kafka/tpch.customer.json, добавив поля для сообщения, и перезапустите Trino. Поскольку поля в сообщении являются JSON, используется формат данных JSON. Это пример, где для ключа и сообщения используются разные форматы данных.

{
    "tableName": "customer",
    "schemaName": "tpch",
    "topicName": "tpch.customer",
    "key": {
        "dataFormat": "raw",
        "fields": [
            {
                "name": "kafka_key",
                "dataFormat": "LONG",
                "type": "BIGINT",
                "hidden": "false"
            }
        ]
    },
    "message": {
        "dataFormat": "json",
        "fields": [
            {
                "name": "row_number",
                "mapping": "rowNumber",
                "type": "BIGINT"
            },
            {
                "name": "customer_key",
                "mapping": "customerKey",
                "type": "BIGINT"
            },
            {
                "name": "name",
                "mapping": "name",
                "type": "VARCHAR"
            },
            {
                "name": "address",
                "mapping": "address",
                "type": "VARCHAR"
            },
            {
                "name": "nation_key",
                "mapping": "nationKey",
                "type": "BIGINT"
            },
            {
                "name": "phone",
                "mapping": "phone",
                "type": "VARCHAR"
            },
            {
                "name": "account_balance",
                "mapping": "accountBalance",
                "type": "DOUBLE"
            },
            {
                "name": "market_segment",
                "mapping": "marketSegment",
                "type": "VARCHAR"
            },
            {
                "name": "comment",
                "mapping": "comment",
                "type": "VARCHAR"
            }
        ]
    }
}

Теперь для всех полей в JSON сообщения определены столбцы, и предыдущий запрос с суммой может работать непосредственно со столбцом account_balance:

trino:tpch> DESCRIBE customer;
      Column       |  Type      | Extra |                   Comment
-------------------+------------+-------+---------------------------------------------
 kafka_key         | bigint     |       |
 row_number        | bigint     |       |
 customer_key      | bigint     |       |
 name              | varchar    |       |
 address           | varchar    |       |
 nation_key        | bigint     |       |
 phone             | varchar    |       |
 account_balance   | double     |       |
 market_segment    | varchar    |       |
 comment           | varchar    |       |
 _partition_id     | bigint     |       | Partition Id
 _partition_offset | bigint     |       | Offset for the message within the partition
 _key              | varchar    |       | Key text
 _key_corrupt      | boolean    |       | Key data is corrupt
 _key_length       | bigint     |       | Total number of key bytes
 _message          | varchar    |       | Message text
 _message_corrupt  | boolean    |       | Message data is corrupt
 _message_length   | bigint     |       | Total number of message bytes
 _timestamp        | timestamp  |       | Message timestamp
(21 rows)

trino:tpch> SELECT * FROM customer LIMIT 5;
 kafka_key | row_number | customer_key |        name        |                address                | nation_key |      phone      | account_balance | market_segment |                                                      comment
-----------+------------+--------------+--------------------+---------------------------------------+------------+-----------------+-----------------+----------------+---------------------------------------------------------------------------------------------------------
         1 |          2 |            2 | Customer#000000002 | XSTf4,NCwDVaWNe6tEgvwfmRchLXak        |         13 | 23-768-687-3665 |          121.65 | AUTOMOBILE     | l accounts. blithely ironic theodolites integrate boldly: caref
         3 |          4 |            4 | Customer#000000004 | XxVSJsLAGtn                           |          4 | 14-128-190-5944 |         2866.83 | MACHINERY      |  requests. final, regular ideas sleep final accou
         5 |          6 |            6 | Customer#000000006 | sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn  |         20 | 30-114-968-4951 |         7638.57 | AUTOMOBILE     | tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious
         7 |          8 |            8 | Customer#000000008 | I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5 |         17 | 27-147-574-9335 |         6819.74 | BUILDING       | among the slyly regular theodolites kindle blithely courts. carefully even theodolites haggle slyly alon
         9 |         10 |           10 | Customer#000000010 | 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2    |          5 | 15-741-346-9870 |         2753.54 | HOUSEHOLD      | es regular deposits haggle. fur
(5 rows)

trino:tpch> SELECT sum(account_balance) FROM customer LIMIT 10;
   _col0
------------
 6681865.59
(1 row)

Теперь все поля из сообщений топика customer доступны как столбцы таблицы Trino.

Шаг 7. Использование live-данных#

Trino может запрашивать live-данные в Kafka по мере их поступления. Чтобы сымитировать live-поток данных, в этом руководстве настраивается поток live tweet в Kafka.

Настройка live-потока Twitter#

  • Скачайте инструмент twistr

$ curl -o twistr https://repo1.maven.org/maven2/de/softwareforge/twistr_kafka_0811/1.2/twistr_kafka_0811-1.2.sh
$ chmod 755 twistr
  • Создайте учетную запись разработчика на https://dev.twitter.com/ и настройте access token и consumer token.

  • Создайте файл twistr.properties и поместите в него access/consumer key и секреты:

twistr.access-token-key=...
twistr.access-token-secret=...
twistr.consumer-key=...
twistr.consumer-secret=...
twistr.kafka.brokers=localhost:9092

Создание таблицы tweets в Trino#

Добавьте таблицу tweets в файл etc/catalog/kafka.properties:

connector.name=kafka
kafka.nodes=localhost:9092
kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region,tweets
kafka.hide-internal-columns=false

Добавьте файл определения топика для потока Twitter как etc/kafka/tweets.json:

{
    "tableName": "tweets",
    "topicName": "twitter_feed",
    "dataFormat": "json",
    "key": {
        "dataFormat": "raw",
        "fields": [
            {
                "name": "kafka_key",
                "dataFormat": "LONG",
                "type": "BIGINT",
                "hidden": "false"
            }
        ]
    },
    "message": {
        "dataFormat":"json",
        "fields": [
            {
                "name": "text",
                "mapping": "text",
                "type": "VARCHAR"
            },
            {
                "name": "user_name",
                "mapping": "user/screen_name",
                "type": "VARCHAR"
            },
            {
                "name": "lang",
                "mapping": "lang",
                "type": "VARCHAR"
            },
            {
                "name": "created_at",
                "mapping": "created_at",
                "type": "TIMESTAMP",
                "dataFormat": "rfc2822"
            },
            {
                "name": "favorite_count",
                "mapping": "favorite_count",
                "type": "BIGINT"
            },
            {
                "name": "retweet_count",
                "mapping": "retweet_count",
                "type": "BIGINT"
            },
            {
                "name": "favorited",
                "mapping": "favorited",
                    "type": "BOOLEAN"
            },
            {
                "name": "id",
                "mapping": "id_str",
                "type": "VARCHAR"
            },
            {
                "name": "in_reply_to_screen_name",
                "mapping": "in_reply_to_screen_name",
                "type": "VARCHAR"
            },
            {
                "name": "place_name",
                "mapping": "place/full_name",
                "type": "VARCHAR"
            }
        ]
    }
}

Поскольку у этой таблицы нет явного имени схемы, она помещается в схему default.

Передача live-данных#

Запустите инструмент twistr:

$ java -Dness.config.location=file:$(pwd) -Dness.config=twistr -jar ./twistr

twistr подключается к Twitter API и передает поток “sample tweet” в топик Kafka с именем twitter_feed.

Теперь выполните запросы к live-данным:

$ ./trino --catalog kafka --schema default

trino:default> SELECT count(*) FROM tweets;
 _col0
-------
  4467
(1 row)

trino:default> SELECT count(*) FROM tweets;
 _col0
-------
  4517
(1 row)

trino:default> SELECT count(*) FROM tweets;
 _col0
-------
  4572
(1 row)

trino:default> SELECT kafka_key, user_name, lang, created_at FROM tweets LIMIT 10;
     kafka_key      |    user_name    | lang |       created_at
--------------------+-----------------+------+-------------------------
 494227746231685121 | burncaniff      | en   | 2014-07-29 14:07:31.000
 494227746214535169 | gu8tn           | ja   | 2014-07-29 14:07:31.000
 494227746219126785 | pequitamedicen  | es   | 2014-07-29 14:07:31.000
 494227746201931777 | josnyS          | ht   | 2014-07-29 14:07:31.000
 494227746219110401 | Cafe510         | en   | 2014-07-29 14:07:31.000
 494227746210332673 | Da_JuanAnd_Only | en   | 2014-07-29 14:07:31.000
 494227746193956865 | Smile_Kidrauhl6 | pt   | 2014-07-29 14:07:31.000
 494227750426017793 | CashforeverCD   | en   | 2014-07-29 14:07:32.000
 494227750396653569 | FilmArsivimiz   | tr   | 2014-07-29 14:07:32.000
 494227750388256769 | jmolas          | es   | 2014-07-29 14:07:32.000
(10 rows)

Теперь в Kafka есть live-поток, который можно запрашивать с помощью Trino.

Эпилог. Timestamp#

Поток tweets, настроенный на предыдущем шаге, содержит timestamp в формате RFC 2822 как атрибут created_at в каждом tweet.

trino:default> SELECT DISTINCT json_extract_scalar(_message, '$.created_at')) AS raw_date
             -> FROM tweets LIMIT 5;
            raw_date
--------------------------------
 Tue Jul 29 21:07:31 +0000 2014
 Tue Jul 29 21:07:32 +0000 2014
 Tue Jul 29 21:07:33 +0000 2014
 Tue Jul 29 21:07:34 +0000 2014
 Tue Jul 29 21:07:35 +0000 2014
(5 rows)

Файл определения топика для таблицы tweets содержит сопоставление с timestamp с помощью преобразователя rfc2822:

...
{
    "name": "created_at",
    "mapping": "created_at",
    "type": "TIMESTAMP",
    "dataFormat": "rfc2822"
},
...

Это позволяет сопоставить сырые данные со столбцом Trino TIMESTAMP:

trino:default> SELECT created_at, raw_date FROM (
             ->   SELECT created_at, json_extract_scalar(_message, '$.created_at') AS raw_date
             ->   FROM tweets)
             -> GROUP BY 1, 2 LIMIT 5;
       created_at        |            raw_date
-------------------------+--------------------------------
 2014-07-29 14:07:20.000 | Tue Jul 29 21:07:20 +0000 2014
 2014-07-29 14:07:21.000 | Tue Jul 29 21:07:21 +0000 2014
 2014-07-29 14:07:22.000 | Tue Jul 29 21:07:22 +0000 2014
 2014-07-29 14:07:23.000 | Tue Jul 29 21:07:23 +0000 2014
 2014-07-29 14:07:24.000 | Tue Jul 29 21:07:24 +0000 2014
(5 rows)

Коннектор Kafka содержит преобразователи для текстовых форматов ISO 8601, RFC 2822 и timestamp на основе чисел с использованием секунд или миллисекунд с начала эпохи. Также есть универсальный текстовый форматтер, который использует строки формата Joda-Time для разбора текстовых столбцов.