Kafka connector для Selena
Загрузка данных с помощью Kafka connector
Selena предоставляет собственный connector под названием Apache Kafka® connector (Selena Connector for Apache Kafka®, сокращённо Kafka connector), который является sink connector'ом и непрерывно потребляет сообщения из Kafka, загружая их в Selena. Kafka connector гарантирует семантику at-least-once.
Kafka connector может легко интегрироваться с Kafka Connect, что позволяет Selena лучше интегрироваться с экосистемой Kafka. Это мудрый выбор, если вы хотите загружать данные в реальном времени в Selena. По сравнению с Routine Load, рекомендуется использовать Kafka connector в следующих сценариях:
- В отличие от Routine Load, которая поддерживает загрузку данных только в форматах CSV, JSON и Avro, Kafka connector может загружать данные в большем количестве форматов, таких как Protobuf. Если данные можно преобразовать в форматы JSON и CSV с помощью конвертеров Kafka Connect, то данные можно загрузить в Selena через Kafka connector.
- Настройка пользовательского преобразования данных, например, CDC-данных в формате Debezium.
- Загрузка данных из нескольких топиков Kafka.
- Загрузка данных из Confluent Cloud.
- Более точный контроль над размерами пакетов загрузки, параллелизмом и другими параметрами для достижения баланса между скоростью загрузки и использованием ресурсов.
Подготовка
Требования к версиям
| Connector | Kafka | Selena | Java |
|---|---|---|---|
| 1.0.4 | 3.4 | 2.5 и выше | 8 |
| 1.0.3 | 3.4 | 2.5 и выше | 8 |
Настройка окружения Kafka
Поддерживаются как самоуправляемые кластеры Apache Kafka, так и Confluent Cloud.
- Для самоуправляемого кластера Apache Kafka вы можете обратиться к Apache Kafka quickstart для быстрого развёртывания кластера Kafka. Kafka Connect уже интегрирован в Kafka.
- Для Confluent Cloud убедитесь, что у вас есть учётная запись Confluent и вы создали кластер.
Загрузка Kafka connector
Добавьте Kafka connector в Kafka Connect:
-
Самоуправляемый кластер Kafka:
Скачайте и распакуйте selena-kafka-connector-xxx.tar.gz.
-
Confluent Cloud:
В настоящее время Kafka connector не загружен в Confluent Hub. Вам нужно скачать и распаковать selena-kafka-connector-xxx.tar.gz, упаковать его в ZIP-файл и загрузить ZIP-файл в Confluent Cloud.
Конфигурация сети
Убедитесь, что машина, на которой расположен Kafka, может получить доступ к узлам FE кластера Selena через http_port (по умолчанию: 8030) и query_port (по умолчанию: 9030), а также к узлам BE через be_http_port (по умолчанию: 8040).
Использование
В этом разделе в качестве примера используется самоуправляемый кластер Kafka для объяснения того, как настроить Kafka connector и Kafka Connect, а затем запустить Kafka Connect для загрузки данных в Selena.
Подготовка набора данных
Предположим, что в топике test кластера Kafka существуют данные в фо рмате JSON.
{"id":1,"city":"New York"}
{"id":2,"city":"Los Angeles"}
{"id":3,"city":"Chicago"}
Создание таблицы
Создайте таблицу test_tbl в базе данных example_db в кластере Selena в соответствии с ключами данных в формате JSON.
CREATE DATABASE example_db;
USE example_db;
CREATE TABLE test_tbl (id INT, city STRING);
Настройка Kafka connector и Kafka Connect, запуск Kafka Connect для загрузки данных
Запуск Kafka Connect в автономном режиме
-
Настройте Kafka connector. В директории config в каталоге установки Kafka создайте файл конфигурации connect-Selena-sink.properties для Kafka connector и настройте следующие параметры. Больше параметров и их описаний см. в разделе Параметры.
к сведению- В этом примере Kafka connector, предоставляемый Selena, является sink connector'ом, который может непрерывно потреблять данные из Kafka и загружать данные в Selena.
- Если исходные данные являются CDC-данными, например, данными в формате Debezium, и таблица Selena является таблицей Primary Key, вам также необходимо настроить
transformв файле конфигурации connect-Selena-sink.properties для Kafka connector, предоставляемого Selena, чтобы синхронизировать изменения исходных данных с таблицей Primary Key.
name=selena-kafka-connector
connector.class=com.selena.connector.kafka.SelenaSinkConnector
topics=test
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# HTTP URL узла FE в вашем кластере Selena. Порт по умолчанию - 8030.
selena.http.url=192.168.xxx.xxx:8030
# Если имя топика Kafka отличается от имени таблицы Selena, необходимо настроить соответствие между ними.
selena.topic2table.map=test:test_tbl
# Введите имя пользователя Selena.
selena.username=user1
# Введите пароль Selena.
selena.password=123456
selena.database.name=example_db
sink.properties.strip_outer_array=true -
Настройте и запустите Kafka Connect.
-
Настройте Kafka Connect. В файле конфигурации config/connect-standalone.properties в директории config настройте следующие параметры. Больше параметров и их описаний см. в разделе Running Kafka Connect. Обратите внимание, что в следующих примерах используетс я версия selena-kafka-connector
1.0.3. Если вы используете более новую версию, вам необходимо внести соответствующие изменения.# Адреса брокеров Kafka. Несколько адресов брокеров Kafka должны быть разделены запятыми (,).
# Обратите внимание, что в этом примере используется PLAINTEXT в качестве протокола безопасности для доступа к кластеру Kafka. Если вы используете другой протокол безопасности для доступа к кластеру Kafka, вам необходимо настроить соответствующую информацию в этом файле.
bootstrap.servers=<kafka_broker_ip>:9092
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# Абсолютный путь к Kafka connector после распаковки. Например:
plugin.path=/home/kafka-connect/selena-kafka-connector-1.0.3 -
Запустите Kafka Connect.
CLASSPATH=/home/kafka-connect/selena-kafka-connector-1.0.3/* bin/connect-standalone.sh config/connect-standalone.properties config/connect-selena-sink.properties
-
Запуск Kafka Connect в распределённом режиме
-
Настройте и запустите Kafka Connect.
-
Настройте Kafka Connect. В файле конфигурации
config/connect-distributed.propertiesв директории config настройте следующие параметры. Больше параметров и их описаний см. в разделе Running Kafka Connect.# Адреса брокеров Kafka. Несколько адресов брокеров Kafka должны быть разделены запятыми (,).
# Обратите внимание, что в этом примере используется PLAINTEXT в качестве протокола безопасности для доступа к кластеру Kafka. Если вы используете другой протокол безопасности для доступа к кластеру Kafka, вам необходимо настроить соответствующую информацию в этом файле.
bootstrap.servers=<kafka_broker_ip>:9092
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# Абсолютный путь к Kafka connector после распаковки. Например:
plugin.path=/home/kafka-connect/selena-kafka-connector-1.0.3 -
Запустите Kafka Connect.
CLASSPATH=/home/kafka-connect/selena-kafka-connector-1.0.3/* bin/connect-distributed.sh config/connect-distributed.properties
-
-
Настройте и создайте Kafka connector. Обратите внимание, что в распределённом режиме вам необходимо настроить и создать Kafka connector через REST API. Параметры и их описания см. в разделе Параметры.
к сведению- В этом примере Kafka connector, предоставляемый Selena, является sink connector'ом, который может непрерывно потреблять данные из Kafka и загружать данные в Selena.
- Если исходные данные являются CDC-данными, например, данными в формате Debezium, и таблица Selena является таблицей Primary Key, вам также необходимо настроить
transformв файле конфигурации connect-Selena-sink.properties для Kafka connector, предоставляемого Selena, чтобы синхронизировать изменения исходных данных с таблицей Primary Key.
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"selena-kafka-connector",
"config":{
"connector.class":"com.selena.connector.kafka.SelenaSinkConnector",
"topics":"test",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter.schemas.enable":"false",
"selena.http.url":"192.168.xxx.xxx:8030",
"selena.topic2table.map":"test:test_tbl",
"selena.username":"user1",
"selena.password":"123456",
"selena.database.name":"example_db",
"sink.properties.strip_outer_array":"true"
}
}'
Запрос таблицы Selena
Запросите целевую таблицу Selena test_tbl.
MySQL [example_db]> select * from test_tbl;
+------+-------------+
| id | city |
+------+-------------+
| 1 | New York |
| 2 | Los Angeles |
| 3 | Chicago |
+------+-------------+
3 rows in set (0.01 sec)
Данные успешно загружены, когда возвращается вышеуказанный результат.
Параметры
name
Обязательный: ДА
Значение по умолчанию:
Описание: Имя для этого Kafka connector. Оно должно быть глобально уникальным среди всех Kafka connector'ов в этом кластере Kafka Connect. Например, selena-kafka-connector.
connector.class
Обязательный: ДА
Значение по умолчанию:
Описание: Класс, используемый sink'ом этого Kafka connector. Установите значение com.selena.connector.kafka.SelenaSinkConnector.
topics
Обязательный:
Значение по умолчанию:
Описание: Один или несколько топиков для подписки, где каждый топик соответствует таблице Selena. По умолчанию Selena предполагает, что имя топика совпадает с именем таблицы Selena. Таким образом, Selena определяет целевую таблицу Selena, используя имя топика. Пожалуйста, выберите заполнение либо topics, либо topics.regex (ниже), но не оба одновременно. Однако, если имя таблицы Selena не совпадает с именем топика, используйте необязательный параметр selena.topic2table.map (ниже) для указания соответствия между именем топика и именем таблицы.
topics.regex
Обязательный:
Значение по умолчанию:
Описание: Регулярное выражение для сопоставления одного или нескольких топиков для подписки. Более подробное описание см. в topics. Пожалуйста, выберите заполнение либо topics.regex, либо topics (выше), но не оба одновременно.
selena.topic2table.map
Обязательный: НЕТ
Значение по умолчанию:
Описание: Соответствие между именем таблицы Selena и именем топика, когда имя топика отличается от имени таблицы Selena. Формат: <topic-1>:<table-1>,<topic-2>:<table-2>,....
selena.http.url
Обязательный: ДА
Значение по умолчанию:
Описание: HTTP URL узла FE в вашем кластере Selena. Формат: <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>,.... Несколько адресов разделяются запятыми (,). Например, 192.168.xxx.xxx:8030,192.168.xxx.xxx:8030.
selena.database.name
Обязательный: ДА
Значение по умолчанию:
Описание: Имя базы данных Selena.
selena.username
Обязательный: ДА
Значение по умолчанию:
Описание: Имя пользователя вашей учётной записи кластера Selena. Пользователь должен иметь привилегию INSERT на таблицу Selena.
selena.password
Обязательный: ДА
Значение по умолчанию:
Описание: Пароль вашей учётной записи кластера Selena.
key.converter
Обязательный: НЕТ
Значение по умолчанию: Key converter, используемый кластером Kafka Connect
Описание: Этот параметр указывает key converter для sink connector (Kafka-connector-selena), который используется для десериализации ключей данных Kafka. Key converter по умолчанию - тот, который используется кластером Kafka Connect.
value.converter
Обязательный: НЕТ
Значение по умолчанию: Value converter, используемый кластером Kafka Connect
Описание: Этот параметр указывает value converter для sink connector (Kafka-connector-selena), который используется для десериализации значений данных Kafka. Value converter по умолчанию - тот, который используется кластером Kafka Connect.
key.converter.schema.registry.url
Обязательный: НЕТ
Значение по умолчанию:
Описани е: URL Schema registry для key converter.
value.converter.schema.registry.url
Обязательный: НЕТ
Значение по умолчанию:
Описание: URL Schema registry для value converter.
tasks.max
Обязательный: НЕТ
Значение по умолчанию: 1
Описание: Верхний лимит для количества потоков задач, которые может создать Kafka connector, который обычно совпадает с количеством ядер CPU на рабочих узлах в кластере Kafka Connect. Вы можете настроить этот параметр для управления производительностью загрузки.
bufferflush.maxbytes
Обязательный: НЕТ
Значение по умолчанию: 94371840(90M)
Описание: Максимальный размер данных, которые могут быть накоплены в памяти перед отправкой в Selena за один раз. Максимальное значение варьируется от 64 MB до 10 GB. Имейте в виду, что буфер Stream Load SDK может создавать несколько заданий Stream Load для буферизации данных. Поэтому упомянутый здесь порог относится к общему размеру данных.
bufferflush.intervalms
Обязательный: НЕТ
Значение по умолчанию: 1000
Описание: Интервал для отправки пакета данных, который контролирует задержку загрузки. Диапазон: [1000, 3600000].
connect.timeoutms
Обязательный: НЕТ
Значение по умолчанию: 1000
Описание: Тайм-аут для подключения к HTTP URL. Диапазон: [100, 60000].
sink.properties.*
Обязательный:
Значение по умолчанию:
О писание: Параметры Stream Load для управления поведением загрузки. Например, параметр sink.properties.format указывает формат, используемый для Stream Load, такой как CSV или JSON. Список поддерживаемых параметров и их описания см. в разделе STREAM LOAD.
sink.properties.format
Обязательный: НЕТ
Значение по умолчанию: json
Описание: Формат, используемый для Stream Load. Kafka connector будет преобразовывать каждый пакет данных в указанный формат перед отправкой их в Selena. Допустимые значения: csv и json. Для получения дополнительной информации см. Параметры CSV и Параметры JSON.
sink.properties.partial_update
Обязательный: НЕТ
Значение по умолчанию: FALSE
Описание: Использовать ли частичные обновления. Допустимые значения: TRUE и FALSE. Значение по умолчанию: FALSE, что означает отключение этой функции.
sink.properties.partial_update_mode
Обязательный: НЕТ
Значение по умолчанию: row
Описание: Указывает режим для частичных обновлений. Допустимые значения: row и column.
- Значение
row(по умолчанию) означает частичные обновления в режиме строк, что более подходит для обновлений в реальном времени с большим количеством столбцов и небольшими пакетами. - Значение
columnозначает частичные обновления в режиме столбцов, что более подходит для пакетных обновлений с небольшим количеством столбцов и большим количеством строк. В таких сценариях включение режима столбцов обеспечивает более высокую скорость обновления. Например, в таблице с 100 столбцами, если обновляется только 10 столбцов (10% от общего количества) для всех строк, скорость обновления в режиме столбцов в 10 раз выше.
Примечания по использованию
Политика сброса
Kafka connector будет буферизовать данные в памяти и сбрасывать их пакетами в Selena через Stream Load. Сброс будет запущен при выполнении любого из следующих условий:
- Размер буферизованных строк в байтах достигает лимита
bufferflush.maxbytes. - Время, прошедшее с момента последнего сброса, достигает лимита
bufferflush.intervalms. - Достигается интервал, с которым connector пытается фиксировать смещения для задач. Интервал контролируется конфигурацией Kafka Connect
offset.flush.interval.ms, и значение по умолчанию составляет60000.
Для снижения задержки данных настройте эти конфигурации в настройках Kafka connector. Однако более частые сбросы увеличат использование CPU и I/O.
Ограничения
- Не поддерживается преобразование одного сообщения из топика Kafka в несколько строк данных и загрузка в Selena.
- Sink Kafka connector, предоставляемый Selena, гарантирует семантику at-least-once.
Лучшие практики
Загрузка CDC-данных в формате Debezium
Debezium - это популярный инструмент Change Data Capture (CDC), который поддерживает мониторинг изменений данных в различных системах баз данных и потоковую передачу этих изменений в Kafka. Следующий пример демонстрирует, как настроить и использовать Kafka connector для записи изменений PostgreSQL в таблицу Primary Key в Selena.
Шаг 1: Установка и запуск Kafka
ПРИМЕЧАНИЕ
Вы можете пропустить этот шаг, если у вас есть собственное окружение Kafka.
-
Скачайте последний релиз Kafka с официального сайта и распакуйте пакет.
tar -xzf kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.0