Перейти к основному содержимому

Загрузка данных с использованием Kafka connector

Selena предоставляет собственный коннектор под названием Apache Kafka® connector (Selena Connector for Apache Kafka®), который непрерывно потребляет сообщения из Kafka и загружает их в Selena. Kafka connector гарантирует семантику at-least-once.

Kafka connector может легко интегрироваться с Kafka Connect, что позволяет Selena лучше интегрироваться с экосистемой Kafka. Это мудрый выбор, если вы хотите загружать данные в реальном времени в Selena. По сравнению с Routine Load, рекомендуется использовать Kafka connector в следующих сценариях:

  • По сравнению с Routine Load, который поддерживает загрузку данных только в форматах CSV, JSON и Avro, Kafka connector может загружать данные в большем количестве форматов, таких как Protobuf. Пока данные могут быть преобразованы в форматы JSON и CSV с использованием конвертеров Kafka Connect, данные могут быть загружены в Selena через Kafka connector.
  • Настройка преобразования данных, например данные CDC в формате Debezium.
  • Загрузка данных из нескольких топиков Kafka.
  • Загрузка данных из Confluent Cloud.
  • Необходимость более точного контроля над размерами пакетов загрузки, параллелизмом и другими параметрами для достижения баланса между скоростью загрузки и использованием ресурсов.

Подготовка

Требования к версиям

ConnectorKafkaSelenaJava
1.0.43.42.5 и позже8
1.0.33.42.5 и позже8

Настройка среды Kafka

Поддерживаются как самоуправляемые кластеры Apache Kafka, так и Confluent Cloud.

  • Для самоуправляемого кластера Apache Kafka вы можете обратиться к Apache Kafka quickstart для быстрого развертывания кластера Kafka. Kafka Connect уже интегрирован в Kafka.
  • Для Confluent Cloud убедитесь, что у вас есть учетная запись Confluent и вы создали кластер.

Загрузка Kafka connector

Отправьте Kafka connector в Kafka Connect:

  • Самоуправляемый кластер Kafka:

    Загрузите и извлеките starrocks-kafka-connector-xxx.tar.gz.

  • Confluent Cloud:

    В настоящее время Kafka connector не загружен в Confluent Hub. Вам нужно загрузить и извлечь starrocks-kafka-connector-xxx.tar.gz, упаковать его в ZIP-файл и загрузить ZIP-файл в Confluent Cloud.

Конфигурация сети

Убедитесь, что машина, на которой расположен Kafka, может получить доступ к узлам FE кластера Selena через http_port (по умолчанию: 8030) и query_port (по умолчанию: 9030), а также к узлам BE через be_http_port (по умолчанию: 8040).

Использование

В этом разделе используется самоуправляемый кластер Kafka в качестве примера для объяснения того, как настроить Kafka connector и Kafka Connect, а затем запустить Kafka Connect для загрузки данных в Selena.

Подготовка набора данных

Предположим, что данные в формате JSON существуют в топике test в кластере Kafka.

{"id":1,"city":"New York"}
{"id":2,"city":"Los Angeles"}
{"id":3,"city":"Chicago"}

Создание таблицы

Создайте таблицу test_tbl в базе данных example_db в кластере Selena в соответствии с ключами данных в формате JSON.

CREATE DATABASE example_db;
USE example_db;
CREATE TABLE test_tbl (id INT, city STRING);

Настройка Kafka connector и Kafka Connect, а затем запуск Kafka Connect для загрузки данных

Запуск Kafka Connect в автономном режиме

  1. Настройте Kafka connector. В каталоге config в каталоге установки Kafka создайте файл конфигурации connect-StarRocks-sink.properties для Kafka connector и настройте следующие параметры. Для получения дополнительных параметров и описаний см. Параметры.

    примечание

    Kafka connector является sink connector.

    name=starrocks-kafka-connector
    connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
    topics=test
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=false
    # HTTP URL FE в вашем кластере Selena. Порт по умолчанию 8030.
    starrocks.http.url=192.168.xxx.xxx:8030
    # Если имя топика Kafka отличается от имени таблицы Selena, вам нужно настроить отображение между ними.
    starrocks.topic2table.map=test:test_tbl
    # Введите имя пользователя Selena.
    starrocks.username=user1
    # Введите пароль Selena.
    starrocks.password=123456
    starrocks.database.name=example_db
    sink.properties.strip_outer_array=true

    УВЕДОМЛЕНИЕ

    Если исходные данные являются данными CDC, такими как данные в формате Debezium, а таблица Selena является таблицей Primary Key, вам также необходимо настроить transform для синхронизации изменений исходных данных с таблицей Primary Key.

  2. Настройте и запустите Kafka Connect.

    1. Настройте Kafka Connect. В файле конфигурации config/connect-standalone.properties в каталоге config настройте следующие параметры. Для получения дополнительных параметров и описаний см. Running Kafka Connect. Обратите внимание, что следующие примеры используют starrocks-kafka-connector версии 1.0.3. Если вы используете более новую версию, вам нужно внести соответствующие изменения.

      # Адреса брокеров Kafka. Несколько адресов брокеров Kafka должны быть разделены запятыми (,).
      # Обратите внимание, что этот пример использует PLAINTEXT в качестве протокола безопасности для доступа к кластеру Kafka. Если вы используете другой протокол безопасности для доступа к кластеру Kafka, вам нужно настроить соответствующую информацию в этом файле.
      bootstrap.servers=<kafka_broker_ip>:9092
      offset.storage.file.filename=/tmp/connect.offsets
      offset.flush.interval.ms=10000
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter.schemas.enable=true
      value.converter.schemas.enable=false
      # Абсолютный путь к starrocks-kafka-connector после извлечения. Например:
      plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3
    2. Запустите Kafka Connect.

      CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-standalone.sh config/connect-standalone.properties config/connect-starrocks-sink.properties

Запуск Kafka Connect в распределенном режиме

  1. Настройте и запустите Kafka Connect.

    1. Настройте Kafka Connect. В файле конфигурации config/connect-distributed.properties в каталоге config настройте следующие параметры. Для получения дополнительных параметров и описаний обратитесь к Running Kafka Connect.

      # Адреса брокеров Kafka. Несколько адресов брокеров Kafka должны быть разделены запятыми (,).
      # Обратите внимание, что этот пример использует PLAINTEXT в качестве протокола безопасности для доступа к кластеру Kafka. Если вы используете другой протокол безопасности для доступа к кластеру Kafka, вам нужно настроить соответствующую информацию в этом файле.
      bootstrap.servers=<kafka_broker_ip>:9092
      offset.storage.file.filename=/tmp/connect.offsets
      offset.flush.interval.ms=10000
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter.schemas.enable=true
      value.converter.schemas.enable=false
      # Абсолютный путь к starrocks-kafka-connector после извлечения. Например:
      plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3
    2. Запустите Kafka Connect.

      CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-distributed.sh config/connect-distributed.properties
  2. Настройте и создайте Kafka connector. Обратите внимание, что в распределенном режиме вам нужно настроить и создать Kafka connector через REST API. Для параметров и описаний см. Параметры.

    примечание

    Kafka connector является sink connector.

    curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
    "name":"starrocks-kafka-connector",
    "config":{
    "connector.class":"com.starrocks.connector.kafka.StarRocksSinkConnector",
    "topics":"test",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"true",
    "value.converter.schemas.enable":"false",
    "starrocks.http.url":"192.168.xxx.xxx:8030",
    "starrocks.topic2table.map":"test:test_tbl",
    "starrocks.username":"user1",
    "starrocks.password":"123456",
    "starrocks.database.name":"example_db",
    "sink.properties.strip_outer_array":"true"
    }
    }'
    к сведению

    Если исходные данные являются данными CDC, такими как данные в формате Debezium, а таблица Selena является таблицей Primary Key, вам также необходимо настроить transform для синхронизации изменений исходных данных с таблицей Primary Key.

Запрос таблицы Selena

Запросите целевую таблицу Selena test_tbl.

MySQL [example_db]> select * from test_tbl;

+------+-------------+
| id | city |
+------+-------------+
| 1 | New York |
| 2 | Los Angeles |
| 3 | Chicago |
+------+-------------+
3 rows in set (0.01 sec)

Данные успешно загружены, когда возвращается указанный выше результат.

Параметры

name

Обязательный: ДА
Значение по умолчанию:
Описание: Имя для этого Kafka connector. Оно должно быть глобально уникальным среди всех Kafka connector в этом кластере Kafka Connect. Например, starrocks-kafka-connector.

connector.class

Обязательный: ДА
Значение по умолчанию:
Описание: Класс, используемый sink этого Kafka connector. Установите значение com.starrocks.connector.kafka.StarRocksSinkConnector.

topics

Обязательный: ДА
Значение по умолчанию:
Описание: Один или несколько топиков для подписки, где каждый топик соответствует таблице Selena. По умолчанию Selena предполагает, что имя топика совпадает с именем таблицы Selena. Поэтому Selena определяет целевую таблицу Selena, используя имя топика. Пожалуйста, выберите заполнение либо topics, либо topics.regex (ниже), но не оба. Однако, если имя таблицы Selena не совпадает с именем топика, используйте необязательный параметр starrocks.topic2table.map (ниже) для указания отображения от имени топика к имени таблицы.

topics.regex

Обязательный:
Значение по умолчанию: Регулярное выражение для сопоставления одного или нескольких топиков для подписки. Для получения дополнительного описания см. topics. Пожалуйста, выберите заполнение либо topics.regex, либо topics (выше), но не оба.
Описание:

starrocks.topic2table.map

Обязательный: НЕТ
Значение по умолчанию:
Описание: Отображение имени таблицы Selena и имени топика, когда имя топика отличается от имени таблицы Selena. Формат: <topic-1>:<table-1>,<topic-2>:<table-2>,....

starrocks.http.url

Обязательный: ДА
Значение по умолчанию:
Описание: HTTP URL FE в вашем кластере Selena. Формат: <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>,.... Несколько адресов разделяются запятыми (,). Например, 192.168.xxx.xxx:8030,192.168.xxx.xxx:8030.

starrocks.database.name

Обязательный: ДА
Значение по умолчанию:
Описание: Имя базы данных Selena.

starrocks.username

Обязательный: ДА
Значение по умолчанию:
Описание: Имя пользователя учетной записи вашего кластера Selena. Пользователю нужна привилегия INSERT на таблицу Selena.

starrocks.password

Обязательный: ДА
Значение по умолчанию:
Описание: Пароль учетной записи вашего кластера Selena.

key.converter

Обязательный: НЕТ
Значение по умолчанию: Key converter, используемый кластером Kafka Connect
Описание: Этот параметр указывает key converter для sink connector (Kafka-connector-selena), который используется для десериализации ключей данных Kafka. Key converter по умолчанию - тот, который используется кластером Kafka Connect.

value.converter

Обязательный: НЕТ
Значение по умолчанию: Value converter, используемый кластером Kafka Connect
Описание: Этот параметр указывает value converter для sink connector (Kafka-connector-selena), который используется для десериализации значений данных Kafka. Value converter по умолчанию - тот, который используется кластером Kafka Connect.

key.converter.schema.registry.url

Обязательный: НЕТ
Значение по умолчанию:
Описание: URL реестра схем для key converter.

value.converter.schema.registry.url

Обязательный: НЕТ
Значение по умолчанию:
Описание: URL реестра схем для value converter.

tasks.max

Обязательный: НЕТ
Значение по умолчанию: 1
Описание: Верхний лимит для количества потоков задач, которые может создать Kafka connector, что обычно равно количеству ядер CPU на рабочих узлах в кластере Kafka Connect. Вы можете настроить этот параметр для контроля производительности загрузки.

bufferflush.maxbytes

Обязательный: НЕТ
Значение по умолчанию: 94371840(90M)
Описание: Максимальный размер данных, которые могут быть накоплены в памяти перед отправкой в Selena за один раз. Максимальное значение варьируется от 64 МБ до 10 ГБ. Имейте в виду, что буфер Stream Load SDK может создавать несколько заданий Stream Load для буферизации данных. Поэтому упомянутый здесь порог относится к общему размеру данных.

bufferflush.intervalms

Обязательный: НЕТ
Значение по умолчанию: 1000
Описание: Интервал для отправки пакета данных, который контролирует задержку загрузки. Диапазон: [1000, 3600000].

connect.timeoutms

Обязательный: НЕТ
Значение по умолчанию: 1000
Описание: Тайм-аут для подключения к HTTP URL. Диапазон: [100, 60000].

sink.properties.*

Обязательный:
Значение по умолчанию:
Описание: Параметры Stream Load для контроля поведения загрузки. Например, параметр sink.properties.format указывает формат, используемый для Stream Load, такой как CSV или JSON. Для списка поддерживаемых параметров и их описаний см. STREAM LOAD.

sink.properties.format

Обязательный: НЕТ
Значение по умолчанию: json
Описание: Формат, используемый для Stream Load. Kafka connector преобразует каждый пакет данных в формат перед отправкой их в Selena. Допустимые значения: csv и json. Для получения дополнительной информации см. Параметры CSV и Параметры JSON.

sink.properties.partial_update

Обязательный: НЕТ
Значение по умолчанию: FALSE
Описание: Использовать ли частичные обновления. Допустимые значения: TRUE и FALSE. Значение по умолчанию: FALSE, указывающее на отключение этой функции.

sink.properties.partial_update_mode

Обязательный: НЕТ
Значение по умолчанию: row
Описание: Указывает режим для частичных обновлений. Допустимые значения: row и column.

  • Значение row (по умолчанию) означает частичные обновления в режиме строк, что более подходит для обновлений в реальном времени с множеством столбцов и небольшими пакетами.
  • Значение column означает частичные обновления в режиме столбцов, что более подходит для пакетных обновлений с небольшим количеством столбцов и множеством строк. В таких сценариях включение режима столбцов обеспечивает более быстрые скорости обновления. Например, в таблице со 100 столбцами, если обновляются только 10 столбцов (10% от общего количества) для всех строк, скорость обновления режима столбцов в 10 раз быстрее.

Примечания по использованию

Политика сброса

Kafka connector будет буферизовать данные в памяти и сбрасывать их пакетами в Selena через Stream Load. Сброс будет запущен при выполнении любого из следующих условий:

  • Байты буферизованных строк достигают лимита bufferflush.maxbytes.
  • Прошедшее время с последнего сброса достигает лимита bufferflush.intervalms.
  • Достигается интервал, с которым коннектор пытается зафиксировать смещения для задач. Интервал контролируется конфигурацией Kafka Connect offset.flush.interval.ms, и значение по умолчанию составляет 60000.

Для меньшей задержки данных настройте эти конфигурации в настройках Kafka connector. Однако более частые сбросы увеличат использование CPU и I/O.

Ограничения

  • Не поддерживается разворачивание одного сообщения из топика Kafka в несколько строк данных и загрузка в Selena.
  • Sink Kafka connector гарантирует семантику at-least-once.

Лучшие практики

Загрузка данных CDC в формате Debezium

Debezium - это популярный инструмент Change Data Capture (CDC), который поддерживает мониторинг изменений данных в различных системах баз данных и потоковую передачу этих изменений в Kafka. Следующий пример демонстрирует, как настроить и использовать Kafka connector для записи изменений PostgreSQL в таблицу Primary Key в Selena.

Шаг 1: Установка и запуск Kafka

ПРИМЕЧАНИЕ

Вы можете пропустить этот шаг, если у вас есть собственная среда Kafka.

  1. Загрузите последний релиз Kafka с официального сайта и извлеките пакет.

    tar -xzf kafka_2.13-3.7.0.tgz
    cd kafka_2.13-3.7.0
  2. Запустите среду Kafka.

    Сгенерируйте UUID кластера Kafka.

    KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

    Отформатируйте каталоги логов.

    bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

    Запустите сервер Kafka.

    bin/kafka-server-start.sh config/kraft/server.properties

Шаг 2: Настройка PostgreSQL

  1. Убедитесь, что пользователю PostgreSQL предоставлены привилегии REPLICATION.

  2. Настройте конфигурацию PostgreSQL.

    Установите wal_level в logical в postgresql.conf.

    wal_level = logical

    Перезапустите сервер PostgreSQL для применения изменений.

    pg_ctl restart
  3. Подготовьте набор данных.

    Создайте таблицу и вставьте тестовые данные.

    CREATE TABLE customers (
    id int primary key ,
    first_name varchar(65533) NULL,
    last_name varchar(65533) NULL ,
    email varchar(65533) NULL
    );

    INSERT INTO customers VALUES (1,'a','a','a@a.com');
  4. Проверьте сообщения лога CDC в Kafka.

    {
    "schema": {
    "type": "struct",
    "fields": [
    {
    "type": "struct",
    "fields": [
    {
    "type": "int32",
    "optional": false,
    "field": "id"
    },
    {
    "type": "string",
    "optional": true,
    "field": "first_name"
    },
    {
    "type": "string",
    "optional": true,
    "field": "last_name"
    },
    {
    "type": "string",
    "optional": true,
    "field": "email"
    }
    ],
    "optional": true,
    "name": "test.public.customers.Value",
    "field": "before"
    },
    {
    "type": "struct",
    "fields": [
    {
    "type": "int32",
    "optional": false,
    "field": "id"
    },
    {
    "type": "string",
    "optional": true,
    "field": "first_name"
    },
    {
    "type": "string",
    "optional": true,
    "field": "last_name"
    },
    {
    "type": "string",
    "optional": true,
    "field": "email"
    }
    ],
    "optional": true,
    "name": "test.public.customers.Value",
    "field": "after"
    },
    {
    "type": "struct",
    "fields": [
    {
    "type": "string",
    "optional": false,
    "field": "version"
    },
    {
    "type": "string",
    "optional": false,
    "field": "connector"
    },
    {
    "type": "string",
    "optional": false,
    "field": "name"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "ts_ms"
    },
    {
    "type": "string",
    "optional": true,
    "name": "io.debezium.data.Enum",
    "version": 1,
    "parameters": {
    "allowed": "true,last,false,incremental"
    },
    "default": "false",
    "field": "snapshot"
    },
    {
    "type": "string",
    "optional": false,
    "field": "db"
    },
    {
    "type": "string",
    "optional": true,
    "field": "sequence"
    },
    {
    "type": "string",
    "optional": false,
    "field": "schema"
    },
    {
    "type": "string",
    "optional": false,
    "field": "table"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "txId"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "lsn"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "xmin"
    }
    ],
    "optional": false,
    "name": "io.debezium.connector.postgresql.Source",
    "field": "source"
    },
    {
    "type": "string",
    "optional": false,
    "field": "op"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "ts_ms"
    },
    {
    "type": "struct",
    "fields": [
    {
    "type": "string",
    "optional": false,
    "field": "id"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "total_order"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "data_collection_order"
    }
    ],
    "optional": true,
    "name": "event.block",
    "version": 1,
    "field": "transaction"
    }
    ],
    "optional": false,
    "name": "test.public.customers.Envelope",
    "version": 1
    },
    "payload": {
    "before": null,
    "after": {
    "id": 1,
    "first_name": "a",
    "last_name": "a",
    "email": "a@a.com"
    },
    "source": {
    "version": "2.5.3.Final",
    "connector": "postgresql",
    "name": "test",
    "ts_ms": 1714283798721,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"22910216\",\"22910504\"]",
    "schema": "public",
    "table": "customers",
    "txId": 756,
    "lsn": 22910504,
    "xmin": null
    },
    "op": "c",
    "ts_ms": 1714283798790,
    "transaction": null
    }
    }

Шаг 3: Настройка Selena

Создайте таблицу Primary Key в Selena с той же схемой, что и исходная таблица в PostgreSQL.

CREATE TABLE `customers` (
`id` int(11) COMMENT "",
`first_name` varchar(65533) NULL COMMENT "",
`last_name` varchar(65533) NULL COMMENT "",
`email` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY hash(id) buckets 1
PROPERTIES (
"bucket_size" = "4294967296",
"in_memory" = "false",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"fast_schema_evolution" = "true"
);

Шаг 4: Установка коннектора

  1. Загрузите коннекторы и извлеките пакеты в каталог plugins.

    mkdir plugins
    tar -zxvf debezium-debezium-connector-postgresql-2.5.3.zip -C plugins
    tar -zxvf starrocks-kafka-connector-1.0.3.tar.gz -C plugins

    Этот каталог является значением элемента конфигурации plugin.path в config/connect-standalone.properties.

    plugin.path=/path/to/kafka_2.13-3.7.0/plugins
  2. Настройте коннектор источника PostgreSQL в pg-source.properties.

    {
    "name": "inventory-connector",
    "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "",
    "database.dbname" : "postgres",
    "topic.prefix": "test"
    }
    }
  3. Настройте sink коннектор Selena в sr-sink.properties.

    {
    "name": "starrocks-kafka-connector",
    "config": {
    "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector",
    "tasks.max": "1",
    "topics": "test.public.customers",
    "starrocks.http.url": "172.26.195.69:28030",
    "starrocks.database.name": "test",
    "starrocks.username": "root",
    "starrocks.password": "StarRocks@123",
    "sink.properties.strip_outer_array": "true",
    "connect.timeoutms": "3000",
    "starrocks.topic2table.map": "test.public.customers:customers",
    "transforms": "addfield,unwrap",
    "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.unwrap.delete.handling.mode": "rewrite"
    }
    }

    ПРИМЕЧАНИЕ

    • Если таблица Selena не является таблицей Primary Key, вам не нужно указывать преобразование addfield.
    • Преобразование unwrap предоставляется Debezium и используется для разворачивания сложной структуры данных Debezium на основе типа операции. Для получения дополнительной информации см. New Record State Extraction.
  4. Настройте Kafka Connect.

    Настройте следующие элементы конфигурации в файле конфигурации Kafka Connect config/connect-standalone.properties.

    # Адреса брокеров Kafka. Несколько адресов брокеров Kafka должны быть разделены запятыми (,).
    # Обратите внимание, что этот пример использует PLAINTEXT в качестве протокола безопасности для доступа к кластеру Kafka.
    # Если вы используете другой протокол безопасности для доступа к кластеру Kafka, настройте соответствующую информацию в этой части.

    bootstrap.servers=<kafka_broker_ip>:9092
    offset.storage.file.filename=/tmp/connect.offsets
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=false

    # Абсолютный путь к starrocks-kafka-connector после извлечения. Например:
    plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3

    # Параметры, которые контролируют политику сброса. Для получения дополнительной информации см. раздел Примечания по использованию.
    offset.flush.interval.ms=10000
    bufferflush.maxbytes = xxx
    bufferflush.intervalms = xxx

    Для описаний дополнительных параметров см. Running Kafka Connect.

Шаг 5: Запуск Kafka Connect в автономном режиме

Запустите Kafka Connect в автономном режиме для инициализации коннекторов.

bin/connect-standalone.sh config/connect-standalone.properties config/pg-source.properties config/sr-sink.properties 

Шаг 6: Проверка приема данных

Протестируйте следующие операции и убедитесь, что данные правильно поступают в Selena.

INSERT
  • В PostgreSQL:
postgres=# insert into customers values (2,'b','b','b@b.com');
INSERT 0 1
postgres=# select * from customers;
id | first_name | last_name | email
----+------------+-----------+---------
1 | a | a | a@a.com
2 | b | b | b@b.com
(2 rows)
  • В Selena:
MySQL [test]> select * from customers;
+------+------------+-----------+---------+
| id | first_name | last_name | email |
+------+------------+-----------+---------+
| 1 | a | a | a@a.com |
| 2 | b | b | b@b.com |
+------+------------+-----------+---------+
2 rows in set (0.01 sec)
UPDATE
  • В PostgreSQL:
postgres=# update customers set email='c@c.com';
UPDATE 2
postgres=# select * from customers;
id | first_name | last_name | email
----+------------+-----------+---------
1 | a | a | c@c.com
2 | b | b | c@c.com
(2 rows)
  • В Selena:
MySQL [test]> select * from customers;
+------+------------+-----------+---------+
| id | first_name | last_name | email |
+------+------------+-----------+---------+
| 1 | a | a | c@c.com |
| 2 | b | b | c@c.com |
+------+------------+-----------+---------+
2 rows in set (0.00 sec)
DELETE
  • В PostgreSQL:
postgres=# delete from customers where id=1;
DELETE 1
postgres=# select * from customers;
id | first_name | last_name | email
----+------------+-----------+---------
2 | b | b | c@c.com
(1 row)
  • В Selena:
MySQL [test]> select * from customers;
+------+------------+-----------+---------+
| id | first_name | last_name | email |
+------+------------+-----------+---------+
| 2 | b | b | c@c.com |
+------+------------+-----------+---------+
1 row in set (0.00 sec)