Загрузка данных с использованием Kafka connector
Selena предоставляет собственный коннектор под названием Apache Kafka® connector (Selena Connector for Apache Kafka®), который непрерывно потребляет сообщения из Kafka и загружает их в Selena. Kafka connector гарантирует семантику at-least-once.
Kafka connector может легко интегрироваться с Kafka Connect, что позволяет Selena лучше интегрироваться с экосистемой Kafka. Это мудрый выбор, если вы хотите загружать данные в реальном времени в Selena. По сравнению с Routine Load, рекомендуется использовать Kafka connector в следующих сценариях:
- По сравнению с Routine Load, который поддерживает загрузку данных только в форматах CSV, JSON и Avro, Kafka connector может загружать данные в большем количестве форматов, таких как Protobuf. Пока данные могут быть преобразованы в форматы JSON и CSV с использованием конвертеров Kafka Connect, данные могут быть загружены в Selena через Kafka connector.
- Настройка преобразования данных, например данные CDC в формате Debezium.
- Загрузка данных из нескольких топиков Kafka.
- Загрузка данных из Confluent Cloud.
- Необходимость более точного контроля над размерами пакетов загрузки, параллелизмом и другими параметрами для достижения баланса между скоростью загрузки и использованием ресурсов.
Подготовка
Требования к версиям
| Connector | Kafka | Selena | Java |
|---|---|---|---|
| 1.0.4 | 3.4 | 2.5 и позже | 8 |
| 1.0.3 | 3.4 | 2.5 и позже | 8 |
Настройка среды Kafka
Поддерживаются как самоуправляемые кластеры Apache Kafka, так и Confluent Cloud.
- Для самоуправляемого кластера Apache Kafka вы мож ете обратиться к Apache Kafka quickstart для быстрого развертывания кластера Kafka. Kafka Connect уже интегрирован в Kafka.
- Для Confluent Cloud убедитесь, что у вас есть учетная запись Confluent и вы создали кластер.
Загрузка Kafka connector
Отправьте Kafka connector в Kafka Connect:
-
Самоуправляемый кластер Kafka:
Загрузите и извлеките starrocks-kafka-connector-xxx.tar.gz.
-
Confluent Cloud:
В настоящее время Kafka connector не загружен в Confluent Hub. Вам нужно загрузить и извлечь starrocks-kafka-connector-xxx.tar.gz, упаковать его в ZIP-файл и загрузить ZIP-файл в Confluent Cloud.
Конфигурация сети
Убедитесь, что машина, на которой расположен Kafka, может получить доступ к узлам FE кластера Selena через http_port (по умолчанию: 8030) и query_port (по умолчанию: 9030), а также к узлам BE через be_http_port (по умолчанию: 8040).
Использование
В этом разделе используется самоуправляемый кластер Kafka в качестве примера для объяснения того, как настроить Kafka connector и Kafka Connect, а затем запустить Kafka Connect для загрузки данных в Selena.
Подготовка набора данных
Предположим, что данные в формате JSON существуют в топике test в кластере Kafka.
{"id":1,"city":"New York"}
{"id":2,"city":"Los Angeles"}
{"id":3,"city":"Chicago"}
Создание таблицы
Создайте таблицу test_tbl в базе данных example_db в кластере Selena в соответствии с ключами данных в формате JSON.
CREATE DATABASE example_db;
USE example_db;
CREATE TABLE test_tbl (id INT, city STRING);
Настройка Kafka connector и Kafka Connect, а затем запуск Kafka Connect для загрузки данных
Запуск Kafka Connect в автономном режиме
-
Настройте Kafka connector. В каталоге config в каталоге установки Kafka создайте файл конфигурации connect-StarRocks-sink.properties для Kafka connector и настройте следующие параметры. Для получения дополнительных параметров и описаний см. Параметры.
примечаниеKafka connector является sink connector.
name=starrocks-kafka-connector
connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
topics=test
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# HTTP URL FE в вашем кластере Selena. Порт по умолчанию 8030.
starrocks.http.url=192.168.xxx.xxx:8030
# Если имя топика Kafka отличается от имени таблицы Selena, вам нужно настроить отображение между ними.
starrocks.topic2table.map=test:test_tbl
# Введите имя пользователя Selena.
starrocks.username=user1
# Введите пароль Selena.
starrocks.password=123456
starrocks.database.name=example_db
sink.properties.strip_outer_array=trueУВЕДОМЛЕНИЕ
Если исходные данные являются данными CDC, такими как данные в формате Debezium, а таблица Selena является таблицей Primary Key, вам также необходимо настроить
transformдля синхронизации изменений исходных данных с таблицей Primary Key. -
Настройте и запустите Kafka Connect.
-
Настройте Kafka Connect. В файле конфигурации config/connect-standalone.properties в каталоге config настройте следующие параметры. Для получения дополнительных параметров и описаний см. Running Kafka Connect. Обратите внимание, что следующие примеры используют starrocks-kafka-connector версии
1.0.3. Если вы используете более новую версию, вам нужно внести соответствующие изменения.# Адреса брокеров Kafka. Несколько адресов брокеров Kafka должны быть разделены запятыми (,).
# Обратите внимание, что этот пример использует PLAINTEXT в качестве протокола безопасности для доступа к кластеру Kafka. Если вы используете другой протокол безопасности для доступа к кластеру Kafka, вам нужно настроить соответствующую информацию в этом файле.
bootstrap.servers=<kafka_broker_ip>:9092
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# Абсолютный путь к starrocks-kafka-connector после извлечения. Например:
plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3 -
Запустите Kafka Connect.
CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-standalone.sh config/connect-standalone.properties config/connect-starrocks-sink.properties
-
Запуск Kafka Connect в распределенном режиме
-
Настройте и запустите Kafka Connect.
-
Настройте Kafka Connect. В файле конфигурации
config/connect-distributed.propertiesв каталоге config настройте следующие параметры. Для получения дополнительных параметров и описаний обратитесь к Running Kafka Connect.# Адреса брокеров Kafka. Несколько адресов брокеров Kafka должны быть разделены запятыми (,).
# Обратите внимание, что этот пример использует PLAINTEXT в качестве протокола безопасности для доступа к кластеру Kafka. Если вы используете другой протокол безопасности для доступа к кластеру Kafka, вам нужно настроить соответствующую информацию в этом файле.
bootstrap.servers=<kafka_broker_ip>:9092
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# Абсолютный путь к starrocks-kafka-connector после извлечения. Например:
plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3 -
Запустите Kafka Connect.
CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-distributed.sh config/connect-distributed.properties
-
-
Настройте и создайте Kafka connector. Обратите внимание, что в распределенном режиме вам нужно настроить и создать Kafka connector через REST API. Для параметров и описаний см. Параметры.
примечаниеKafka connector является sink connector.
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"starrocks-kafka-connector",
"config":{
"connector.class":"com.starrocks.connector.kafka.StarRocksSinkConnector",
"topics":"test",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter.schemas.enable":"false",
"starrocks.http.url":"192.168.xxx.xxx:8030",
"starrocks.topic2table.map":"test:test_tbl",
"starrocks.username":"user1",
"starrocks.password":"123456",
"starrocks.database.name":"example_db",
"sink.properties.strip_outer_array":"true"
}
}'к сведениюЕсли исходные данные являются данными CDC, такими как данные в формате Debezium, а таблица Selena является таблицей Primary Key, вам также необходимо настроить
transformдля синхронизации изменений исходных данных с таблицей Primary Key.
Запрос таблицы Selena
Запросите целевую таблицу Selena test_tbl.
MySQL [example_db]> select * from test_tbl;
+------+-------------+
| id | city |
+------+-------------+
| 1 | New York |
| 2 | Los Angeles |
| 3 | Chicago |
+------+-------------+
3 rows in set (0.01 sec)
Данные успешно загружены, когда возвращается указанный выше результат.
Параметры
name
Обязательный: ДА
Значение по умолчанию:
Описание: Имя для этого Kafka connector. Оно должно быть глобально уникальным среди всех Kafka connector в этом кластере Kafka Connect. Например, starrocks-kafka-connector.
connector.class
Обязательный: ДА
Значение по умолчанию:
Описание: Класс, используемый sink этого Kafka connector. Установите значение com.starrocks.connector.kafka.StarRocksSinkConnector.
topics
Обязательный: ДА
Значение по умолчанию:
Описание: Один или несколько топиков для подписки, где каждый топик соответствует таблице Selena. По умолчанию Selena предполагает, что имя топика совпадает с именем таблицы Selena. Поэтому Selena определяет целевую таблицу Selena, используя имя топика. Пожалуйста, выберите заполнение либо topics, либо topics.regex (ниже), но не оба. Однако, если имя таблицы Selena не совпадает с именем топика, используйте необязательный параметр starrocks.topic2table.map (ниже) для указания отображения от имени топика к имени таблицы.
topics.regex
Обязательный:
Значение по умолчанию: Регулярное выражение для сопоставления одного или нескольких топиков для подписки. Для получения дополнительного описания см. topics. Пожалуйста, выберите заполнение либо topics.regex, либо topics (выше), но не оба.
Описание:
starrocks.topic2table.map
Обязательный: НЕТ
Значение по умолчанию:
Описание: Отображение имени таблицы Selena и имени топика, когда имя топика отличается от имени таблицы Selena. Формат: <topic-1>:<table-1>,<topic-2>:<table-2>,....
starrocks.http.url
Обязательный: ДА
Значение по умолчанию:
Описание: HTTP URL FE в вашем кластере Selena. Формат: <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>,.... Несколько адресов разделяются запятыми (,). Например, 192.168.xxx.xxx:8030,192.168.xxx.xxx:8030.
starrocks.database.name
Обязательный: ДА
Значение по умолчанию:
Описание: Имя базы данных Selena.
starrocks.username
Обязательный: ДА
Значение по умолчанию:
Описание: Имя пользователя учетной записи вашего кластера Selena. Пользователю нужна привилегия INSERT на таблицу Selena.
starrocks.password
Обязательный: ДА
Значение по умолчанию:
Описание: Пароль учетной записи вашего кластера Selena.
key.converter
Обязательный: НЕТ
Значение по умолчанию: Key converter, используемый кластером Kafka Connect
Описание: Этот параметр указывает key converter для sink connector (Kafka-connector-selena), который используется для десериализации ключей данных Kafka. Key converter по умолчанию - тот, который используется кластером Kafka Connect.
value.converter
Обязательный: НЕТ
Значение по умолчанию: Value converter, используемый кластером Kafka Connect
Описание: Этот параметр указывает value converter для sink connector (Kafka-connector-selena), который используется для десериализации значений данных Kafka. Value converter по умолчанию - тот, который используется кластером Kafka Connect.
key.converter.schema.registry.url
Обязательный: НЕТ
Значение по умолчанию:
Описание: URL реестра схем для key converter.
value.converter.schema.registry.url
Обязательный: НЕТ
Значение по умолчанию:
Описание: URL реестра схем для value converter.
tasks.max
Обязательный: НЕТ
Значение по умолчанию: 1
Описание: Верхний лимит для количества потоков задач, которые может создать Kafka connector, что обычно равно количеству ядер CPU на рабочих узлах в кластере Kafka Connect. Вы можете настроить этот параметр для контроля производительности загрузки.
bufferflush.maxbytes
Обязательный: НЕТ
Значение по умолчанию: 94371840(90M)
Описание: Максимальный размер данных, которые могут быть накоплены в памяти перед отправкой в Selena за один раз. Максимальное значение варьируется от 64 МБ до 10 ГБ. Имейте в виду, что буфер Stream Load SDK может создавать несколько заданий Stream Load для буферизации данных. Поэтому упомянутый здесь порог относится к общему размеру данных.
bufferflush.intervalms
Обязательный: НЕТ
Значение по умолчанию: 1000
Описание: Интервал для отправки пакета данных, который контролирует задержку загрузки. Диапазон: [1000, 3600000].
connect.timeoutms
Обязательный: НЕТ
Значение по умолчанию: 1000
Описание: Тайм-аут для подключения к HTTP URL. Диапазон: [100, 60000].
sink.properties.*
Обязательный:
Значение по умолчанию:
Описание: Параметры Stream Load для контроля поведения загрузки. Например, параметр sink.properties.format указывает формат, используемый для Stream Load, такой как CSV или JSON. Для списка поддерживаемых параметров и их описаний см. STREAM LOAD.
sink.properties.format
Обязательный: НЕТ
Значение по умолчанию: json
Описание: Формат, используемый для Stream Load. Kafka connector преобразует каждый пакет данных в формат перед отправкой их в Selena. Допустимые значения: csv и json. Для получения дополнительной информации см. Параметры CSV и Параметры JSON.
sink.properties.partial_update
Обязательный: НЕТ
Значение по умолчанию: FALSE
Описание: Использовать ли частичные обновления. Допустимые значения: TRUE и FALSE. Значение по умолчанию: FALSE, указывающее на отключение этой функции.
sink.properties.partial_update_mode
Обязательный: НЕТ
Значение по умолчанию: row
Описание: Указывает режим для частичных обновлений. Допустимые значения: row и column.
- Значение
row(по умолчанию) означает частичные обновления в режиме строк, что более подходит для обновлений в реальном времени с множеством столбцов и небольшими пакетами. - Значение
columnозначает частичные обновления в режиме столбцов, что более подходит для пакетных обновлений с небольшим количеством столбцов и множеством строк. В таких сценариях включение режима столбцов обеспечивает более быстрые скорости обновления. Например, в таблице со 100 столбцами, если обновляются только 10 столбцов (10% от общего количества) для всех строк, скорость обновления режима столбцов в 10 раз быстрее.
Примечания по использованию
Политика сброса
Kafka connector будет буферизовать данные в памяти и сбрасывать их пакетами в Selena через Stream Load. Сброс будет запущен при выполнении любого из следующих условий:
- Байты буферизованных строк достигают лимита
bufferflush.maxbytes. - Прошедшее время с последнего сброса достигает лимита
bufferflush.intervalms. - Достигается интервал, с которым коннектор пытается зафиксировать смещения для задач. Интервал контролируется конфигурацией Kafka Connect
offset.flush.interval.ms, и значение по умолчанию составляет60000.
Для меньшей задержки данных настройте эти конфигурации в настройках Kafka connector. Однако более частые сбросы увеличат использование CPU и I/O.
Ограничения
- Не поддерживается разворачивание одного сообщения из топика Kafka в несколько строк данных и загрузка в Selena.
- Sink Kafka connector гарантирует семантику at-least-once.
Лучшие практики
Загрузка данных CDC в формате Debezium
Debezium - это популярный инструмент Change Data Capture (CDC), который поддерживает мониторинг изменений данных в различных системах баз данных и потоковую передачу этих изменений в Kafka. Следующий пример демонстрирует, как настроить и использовать Kafka connector для записи изменений PostgreSQL в таблицу Primary Key в Selena.
Шаг 1: Установка и запуск Kafka
ПРИМЕЧАНИЕ
Вы можете пропустить этот шаг, если у вас есть собственная среда Kafka.
-
Загрузите последний релиз Kafka с официального сайта и извлеките пакет.
tar -xzf kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.0 -
Запустите среду Kafka.
Сгенерируйте UUID кластера Kafka.
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"Отформатируйте каталоги логов.
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.propertiesЗапустите сервер Kafka.
bin/kafka-server-start.sh config/kraft/server.properties
Шаг 2: Настройка PostgreSQL
-
Убедитесь, что пользователю PostgreSQL предоставлены привилегии
REPLICATION. -
Настройте конфигурацию PostgreSQL.
Установите
wal_levelвlogicalв postgresql.conf.wal_level = logicalПерезапустите сервер PostgreSQL для применения изменений.
pg_ctl restart -
Подготовьте набор данных.
Создайте таблицу и вставьте тестовые данные.
CREATE TABLE customers (
id int primary key ,
first_name varchar(65533) NULL,
last_name varchar(65533) NULL ,
email varchar(65533) NULL
);
INSERT INTO customers VALUES (1,'a','a','a@a.com'); -
Проверьте сообщения лога CDC в Kafka.
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "first_name"
},
{
"type": "string",
"optional": true,
"field": "last_name"
},
{
"type": "string",
"optional": true,
"field": "email"
}
],
"optional": true,
"name": "test.public.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "first_name"
},
{
"type": "string",
"optional": true,
"field": "last_name"
},
{
"type": "string",
"optional": true,
"field": "email"
}
],
"optional": true,
"name": "test.public.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "test.public.customers.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 1,
"first_name": "a",
"last_name": "a",
"email": "a@a.com"
},
"source": {
"version": "2.5.3.Final",
"connector": "postgresql",
"name": "test",
"ts_ms": 1714283798721,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"22910216\",\"22910504\"]",
"schema": "public",
"table": "customers",
"txId": 756,
"lsn": 22910504,
"xmin": null
},
"op": "c",
"ts_ms": 1714283798790,
"transaction": null
}
}
Шаг 3: Настройка Selena
Создайте таблицу Primary Key в Selena с той же схемой, что и исходная таблица в PostgreSQL.
CREATE TABLE `customers` (
`id` int(11) COMMENT "",
`first_name` varchar(65533) NULL COMMENT "",
`last_name` varchar(65533) NULL COMMENT "",
`email` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY hash(id) buckets 1
PROPERTIES (
"bucket_size" = "4294967296",
"in_memory" = "false",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"fast_schema_evolution" = "true"
);