Перейти к основному содержимому
Версия: 2.0.x

Kafka connector для Selena

Загрузка данных с помощью Kafka connector

Selena предоставляет собственный connector под названием Apache Kafka® connector (Selena Connector for Apache Kafka®, сокращённо Kafka connector), который является sink connector'ом и непрерывно потребляет сообщения из Kafka, загружая их в Selena. Kafka connector гарантирует семантику at-least-once.

Kafka connector может легко интегрироваться с Kafka Connect, что позволяет Selena лучше интегрироваться с экосистемой Kafka. Это мудрый выбор, если вы хотите загружать данные в реальном времени в Selena. По сравнению с Routine Load, рекомендуется использовать Kafka connector в следующих сценариях:

  • В отличие от Routine Load, которая поддерживает загрузку данных только в форматах CSV, JSON и Avro, Kafka connector может загружать данные в большем количестве форматов, таких как Protobuf. Если данные можно преобразовать в форматы JSON и CSV с помощью конвертеров Kafka Connect, то данные можно загрузить в Selena через Kafka connector.
  • Настройка пользовательского преобразования данных, например, CDC-данных в формате Debezium.
  • Загрузка данных из нескольких топиков Kafka.
  • Загрузка данных из Confluent Cloud.
  • Более точный контроль над размерами пакетов загрузки, параллелизмом и другими параметрами для достижения баланса между скоростью загрузки и использованием ресурсов.

Подготовка

Требования к версиям

ConnectorKafkaSelenaJava
1.0.43.42.5 и выше8
1.0.33.42.5 и выше8

Настройка окружения Kafka

Поддерживаются как самоуправляемые кластеры Apache Kafka, так и Confluent Cloud.

  • Для самоуправляемого кластера Apache Kafka вы можете обратиться к Apache Kafka quickstart для быстрого развёртывания кластера Kafka. Kafka Connect уже интегрирован в Kafka.
  • Для Confluent Cloud убедитесь, что у вас есть учётная запись Confluent и вы создали кластер.

Загрузка Kafka connector

Добавьте Kafka connector в Kafka Connect:

  • Самоуправляемый кластер Kafka:

    Скачайте и распакуйте selena-kafka-connector-xxx.tar.gz.

  • Confluent Cloud:

    В настоящее время Kafka connector не загружен в Confluent Hub. Вам нужно скачать и распаковать selena-kafka-connector-xxx.tar.gz, упаковать его в ZIP-файл и загрузить ZIP-файл в Confluent Cloud.

Конфигурация сети

Убедитесь, что машина, на которой расположен Kafka, может получить доступ к узлам FE кластера Selena через http_port (по умолчанию: 8030) и query_port (по умолчанию: 9030), а также к узлам BE через be_http_port (по умолчанию: 8040).

Использование

В этом разделе в качестве примера используется самоуправляемый кластер Kafka для объяснения того, как настроить Kafka connector и Kafka Connect, а затем запустить Kafka Connect для загрузки данных в Selena.

Подготовка набора данных

Предположим, что в топике test кластера Kafka существуют данные в формате JSON.

{"id":1,"city":"New York"}
{"id":2,"city":"Los Angeles"}
{"id":3,"city":"Chicago"}

Создание таблицы

Создайте таблицу test_tbl в базе данных example_db в кластере Selena в соответствии с ключами данных в формате JSON.

CREATE DATABASE example_db;
USE example_db;
CREATE TABLE test_tbl (id INT, city STRING);

Настройка Kafka connector и Kafka Connect, запуск Kafka Connect для загрузки данных

Запуск Kafka Connect в автономном режиме

  1. Настройте Kafka connector. В директории config в каталоге установки Kafka создайте файл конфигурации connect-Selena-sink.properties для Kafka connector и настройте следующие параметры. Больше параметров и их описаний см. в разделе Параметры.

    к сведению
    • В этом примере Kafka connector, предоставляемый Selena, является sink connector'ом, который может непрерывно потреблять данные из Kafka и загружать данные в Selena.
    • Если исходные данные являются CDC-данными, например, данными в формате Debezium, и таблица Selena является таблицей Primary Key, вам также необходимо настроить transform в файле конфигурации connect-Selena-sink.properties для Kafka connector, предоставляемого Selena, чтобы синхронизировать изменения исходных данных с таблицей Primary Key.
    name=selena-kafka-connector
    connector.class=com.selena.connector.kafka.SelenaSinkConnector
    topics=test
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=false
    # HTTP URL узла FE в вашем кластере Selena. Порт по умолчанию - 8030.
    selena.http.url=192.168.xxx.xxx:8030
    # Если имя топика Kafka отличается от имени таблицы Selena, необходимо настроить соответствие между ними.
    selena.topic2table.map=test:test_tbl
    # Введите имя пользователя Selena.
    selena.username=user1
    # Введите пароль Selena.
    selena.password=123456
    selena.database.name=example_db
    sink.properties.strip_outer_array=true
  2. Настройте и запустите Kafka Connect.

    1. Настройте Kafka Connect. В файле конфигурации config/connect-standalone.properties в директории config настройте следующие параметры. Больше параметров и их описаний см. в разделе Running Kafka Connect. Обратите внимание, что в следующих примерах используется версия selena-kafka-connector 1.0.3. Если вы используете более новую версию, вам необходимо внести соответствующие изменения.

      # Адреса брокеров Kafka. Несколько адресов брокеров Kafka должны быть разделены запятыми (,).
      # Обратите внимание, что в этом примере используется PLAINTEXT в качестве протокола безопасности для доступа к кластеру Kafka. Если вы используете другой протокол безопасности для доступа к кластеру Kafka, вам необходимо настроить соответствующую информацию в этом файле.
      bootstrap.servers=<kafka_broker_ip>:9092
      offset.storage.file.filename=/tmp/connect.offsets
      offset.flush.interval.ms=10000
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter.schemas.enable=true
      value.converter.schemas.enable=false
      # Абсолютный путь к Kafka connector после распаковки. Например:
      plugin.path=/home/kafka-connect/selena-kafka-connector-1.0.3
    2. Запустите Kafka Connect.

      CLASSPATH=/home/kafka-connect/selena-kafka-connector-1.0.3/* bin/connect-standalone.sh config/connect-standalone.properties config/connect-selena-sink.properties

Запуск Kafka Connect в распределённом режиме

  1. Настройте и запустите Kafka Connect.

    1. Настройте Kafka Connect. В файле конфигурации config/connect-distributed.properties в директории config настройте следующие параметры. Больше параметров и их описаний см. в разделе Running Kafka Connect.

      # Адреса брокеров Kafka. Несколько адресов брокеров Kafka должны быть разделены запятыми (,).
      # Обратите внимание, что в этом примере используется PLAINTEXT в качестве протокола безопасности для доступа к кластеру Kafka. Если вы используете другой протокол безопасности для доступа к кластеру Kafka, вам необходимо настроить соответствующую информацию в этом файле.
      bootstrap.servers=<kafka_broker_ip>:9092
      offset.storage.file.filename=/tmp/connect.offsets
      offset.flush.interval.ms=10000
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter.schemas.enable=true
      value.converter.schemas.enable=false
      # Абсолютный путь к Kafka connector после распаковки. Например:
      plugin.path=/home/kafka-connect/selena-kafka-connector-1.0.3
    2. Запустите Kafka Connect.

      CLASSPATH=/home/kafka-connect/selena-kafka-connector-1.0.3/* bin/connect-distributed.sh config/connect-distributed.properties
  2. Настройте и создайте Kafka connector. Обратите внимание, что в распределённом режиме вам необходимо настроить и создать Kafka connector через REST API. Параметры и их описания см. в разделе Параметры.

    к сведению
    • В этом примере Kafka connector, предоставляемый Selena, является sink connector'ом, который может непрерывно потреблять данные из Kafka и загружать данные в Selena.
    • Если исходные данные являются CDC-данными, например, данными в формате Debezium, и таблица Selena является таблицей Primary Key, вам также необходимо настроить transform в файле конфигурации connect-Selena-sink.properties для Kafka connector, предоставляемого Selena, чтобы синхронизировать изменения исходных данных с таблицей Primary Key.
    curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
    "name":"selena-kafka-connector",
    "config":{
    "connector.class":"com.selena.connector.kafka.SelenaSinkConnector",
    "topics":"test",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"true",
    "value.converter.schemas.enable":"false",
    "selena.http.url":"192.168.xxx.xxx:8030",
    "selena.topic2table.map":"test:test_tbl",
    "selena.username":"user1",
    "selena.password":"123456",
    "selena.database.name":"example_db",
    "sink.properties.strip_outer_array":"true"
    }
    }'

Запрос таблицы Selena

Запросите целевую таблицу Selena test_tbl.

MySQL [example_db]> select * from test_tbl;

+------+-------------+
| id | city |
+------+-------------+
| 1 | New York |
| 2 | Los Angeles |
| 3 | Chicago |
+------+-------------+
3 rows in set (0.01 sec)

Данные успешно загружены, когда возвращается вышеуказанный результат.

Параметры

name

Обязательный: ДА
Значение по умолчанию:
Описание: Имя для этого Kafka connector. Оно должно быть глобально уникальным среди всех Kafka connector'ов в этом кластере Kafka Connect. Например, selena-kafka-connector.

connector.class

Обязательный: ДА
Значение по умолчанию:
Описание: Класс, используемый sink'ом этого Kafka connector. Установите значение com.selena.connector.kafka.SelenaSinkConnector.

topics

Обязательный:
Значение по умолчанию:
Описание: Один или несколько топиков для подписки, где каждый топик соответствует таблице Selena. По умолчанию Selena предполагает, что имя топика совпадает с именем таблицы Selena. Таким образом, Selena определяет целевую таблицу Selena, используя имя топика. Пожалуйста, выберите заполнение либо topics, либо topics.regex (ниже), но не оба одновременно. Однако, если имя таблицы Selena не совпадает с именем топика, используйте необязательный параметр selena.topic2table.map (ниже) для указания соответствия между именем топика и именем таблицы.

topics.regex

Обязательный:
Значение по умолчанию: Описание: Регулярное выражение для сопоставления одного или нескольких топиков для подписки. Более подробное описание см. в topics. Пожалуйста, выберите заполнение либо topics.regex, либо topics (выше), но не оба одновременно.

selena.topic2table.map

Обязательный: НЕТ
Значение по умолчанию:
Описание: Соответствие между именем таблицы Selena и именем топика, когда имя топика отличается от имени таблицы Selena. Формат: <topic-1>:<table-1>,<topic-2>:<table-2>,....

selena.http.url

Обязательный: ДА
Значение по умолчанию:
Описание: HTTP URL узла FE в вашем кластере Selena. Формат: <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>,.... Несколько адресов разделяются запятыми (,). Например, 192.168.xxx.xxx:8030,192.168.xxx.xxx:8030.

selena.database.name

Обязательный: ДА
Значение по умолчанию:
Описание: Имя базы данных Selena.

selena.username

Обязательный: ДА
Значение по умолчанию:
Описание: Имя пользователя вашей учётной записи кластера Selena. Пользователь должен иметь привилегию INSERT на таблицу Selena.

selena.password

Обязательный: ДА
Значение по умолчанию:
Описание: Пароль вашей учётной записи кластера Selena.

key.converter

Обязательный: НЕТ
Значение по умолчанию: Key converter, используемый кластером Kafka Connect
Описание: Этот параметр указывает key converter для sink connector (Kafka-connector-selena), который используется для десериализации ключей данных Kafka. Key converter по умолчанию - тот, который используется кластером Kafka Connect.

value.converter

Обязательный: НЕТ
Значение по умолчанию: Value converter, используемый кластером Kafka Connect
Описание: Этот параметр указывает value converter для sink connector (Kafka-connector-selena), который используется для десериализации значений данных Kafka. Value converter по умолчанию - тот, который используется кластером Kafka Connect.

key.converter.schema.registry.url

Обязательный: НЕТ
Значение по умолчанию:
Описание: URL Schema registry для key converter.

value.converter.schema.registry.url

Обязательный: НЕТ
Значение по умолчанию:
Описание: URL Schema registry для value converter.

tasks.max

Обязательный: НЕТ
Значение по умолчанию: 1
Описание: Верхний лимит для количества потоков задач, которые может создать Kafka connector, который обычно совпадает с количеством ядер CPU на рабочих узлах в кластере Kafka Connect. Вы можете настроить этот параметр для управления производительностью загрузки.

bufferflush.maxbytes

Обязательный: НЕТ
Значение по умолчанию: 94371840(90M)
Описание: Максимальный размер данных, которые могут быть накоплены в памяти перед отправкой в Selena за один раз. Максимальное значение варьируется от 64 MB до 10 GB. Имейте в виду, что буфер Stream Load SDK может создавать несколько заданий Stream Load для буферизации данных. Поэтому упомянутый здесь порог относится к общему размеру данных.

bufferflush.intervalms

Обязательный: НЕТ
Значение по умолчанию: 1000
Описание: Интервал для отправки пакета данных, который контролирует задержку загрузки. Диапазон: [1000, 3600000].

connect.timeoutms

Обязательный: НЕТ
Значение по умолчанию: 1000
Описание: Тайм-аут для подключения к HTTP URL. Диапазон: [100, 60000].

sink.properties.*

Обязательный:
Значение по умолчанию:
Описание: Параметры Stream Load для управления поведением загрузки. Например, параметр sink.properties.format указывает формат, используемый для Stream Load, такой как CSV или JSON. Список поддерживаемых параметров и их описания см. в разделе STREAM LOAD.

sink.properties.format

Обязательный: НЕТ
Значение по умолчанию: json
Описание: Формат, используемый для Stream Load. Kafka connector будет преобразовывать каждый пакет данных в указанный формат перед отправкой их в Selena. Допустимые значения: csv и json. Для получения дополнительной информации см. Параметры CSV и Параметры JSON.

sink.properties.partial_update

Обязательный: НЕТ
Значение по умолчанию: FALSE
Описание: Использовать ли частичные обновления. Допустимые значения: TRUE и FALSE. Значение по умолчанию: FALSE, что означает отключение этой функции.

sink.properties.partial_update_mode

Обязательный: НЕТ
Значение по умолчанию: row
Описание: Указывает режим для частичных обновлений. Допустимые значения: row и column.

  • Значение row (по умолчанию) означает частичные обновления в режиме строк, что более подходит для обновлений в реальном времени с большим количеством столбцов и небольшими пакетами.
  • Значение column означает частичные обновления в режиме столбцов, что более подходит для пакетных обновлений с небольшим количеством столбцов и большим количеством строк. В таких сценариях включение режима столбцов обеспечивает более высокую скорость обновления. Например, в таблице с 100 столбцами, если обновляется только 10 столбцов (10% от общего количества) для всех строк, скорость обновления в режиме столбцов в 10 раз выше.

Примечания по использованию

Политика сброса

Kafka connector будет буферизовать данные в памяти и сбрасывать их пакетами в Selena через Stream Load. Сброс будет запущен при выполнении любого из следующих условий:

  • Размер буферизованных строк в байтах достигает лимита bufferflush.maxbytes.
  • Время, прошедшее с момента последнего сброса, достигает лимита bufferflush.intervalms.
  • Достигается интервал, с которым connector пытается фиксировать смещения для задач. Интервал контролируется конфигурацией Kafka Connect offset.flush.interval.ms, и значение по умолчанию составляет 60000.

Для снижения задержки данных настройте эти конфигурации в настройках Kafka connector. Однако более частые сбросы увеличат использование CPU и I/O.

Ограничения

  • Не поддерживается преобразование одного сообщения из топика Kafka в несколько строк данных и загрузка в Selena.
  • Sink Kafka connector, предоставляемый Selena, гарантирует семантику at-least-once.

Лучшие практики

Загрузка CDC-данных в формате Debezium

Debezium - это популярный инструмент Change Data Capture (CDC), который поддерживает мониторинг изменений данных в различных системах баз данных и потоковую передачу этих изменений в Kafka. Следующий пример демонстрирует, как настроить и использовать Kafka connector для записи изменений PostgreSQL в таблицу Primary Key в Selena.

Шаг 1: Установка и запуск Kafka

ПРИМЕЧАНИЕ

Вы можете пропустить этот шаг, если у вас есть собственное окружение Kafka.

  1. Скачайте последний релиз Kafka с официального сайта и распакуйте пакет.

    tar -xzf kafka_2.13-3.7.0.tgz
    cd kafka_2.13-3.7.0
  2. Запустите окружение Kafka.

    Сгенерируйте UUID кластера Kafka.

    KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

    Отформатируйте директории логов.

    bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

    Запустите сервер Kafka.

    bin/kafka-server-start.sh config/kraft/server.properties

Шаг 2: Настройка PostgreSQL

  1. Убедитесь, что пользователю PostgreSQL предоставлены привилегии REPLICATION.

  2. Настройте конфигурацию PostgreSQL.

    Установите wal_level в logical в postgresql.conf.

    wal_level = logical

    Перезапустите сервер PostgreSQL для применения изменений.

    pg_ctl restart
  3. Подготовьте набор данных.

    Создайте таблицу и вставьте тестовые данные.

    CREATE TABLE customers (
    id int primary key ,
    first_name varchar(65533) NULL,
    last_name varchar(65533) NULL ,
    email varchar(65533) NULL
    );

    INSERT INTO customers VALUES (1,'a','a','a@a.com');
  4. Проверьте сообщения CDC-логов в Kafka.

    {
    "schema": {
    "type": "struct",
    "fields": [
    {
    "type": "struct",
    "fields": [
    {
    "type": "int32",
    "optional": false,
    "field": "id"
    },
    {
    "type": "string",
    "optional": true,
    "field": "first_name"
    },
    {
    "type": "string",
    "optional": true,
    "field": "last_name"
    },
    {
    "type": "string",
    "optional": true,
    "field": "email"
    }
    ],
    "optional": true,
    "name": "test.public.customers.Value",
    "field": "before"
    },
    {
    "type": "struct",
    "fields": [
    {
    "type": "int32",
    "optional": false,
    "field": "id"
    },
    {
    "type": "string",
    "optional": true,
    "field": "first_name"
    },
    {
    "type": "string",
    "optional": true,
    "field": "last_name"
    },
    {
    "type": "string",
    "optional": true,
    "field": "email"
    }
    ],
    "optional": true,
    "name": "test.public.customers.Value",
    "field": "after"
    },
    {
    "type": "struct",
    "fields": [
    {
    "type": "string",
    "optional": false,
    "field": "version"
    },
    {
    "type": "string",
    "optional": false,
    "field": "connector"
    },
    {
    "type": "string",
    "optional": false,
    "field": "name"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "ts_ms"
    },
    {
    "type": "string",
    "optional": true,
    "name": "io.debezium.data.Enum",
    "version": 1,
    "parameters": {
    "allowed": "true,last,false,incremental"
    },
    "default": "false",
    "field": "snapshot"
    },
    {
    "type": "string",
    "optional": false,
    "field": "db"
    },
    {
    "type": "string",
    "optional": true,
    "field": "sequence"
    },
    {
    "type": "string",
    "optional": false,
    "field": "schema"
    },
    {
    "type": "string",
    "optional": false,
    "field": "table"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "txId"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "lsn"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "xmin"
    }
    ],
    "optional": false,
    "name": "io.debezium.connector.postgresql.Source",
    "field": "source"
    },
    {
    "type": "string",
    "optional": false,
    "field": "op"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "ts_ms"
    },
    {
    "type": "struct",
    "fields": [
    {
    "type": "string",
    "optional": false,
    "field": "id"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "total_order"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "data_collection_order"
    }
    ],
    "optional": true,
    "name": "event.block",
    "version": 1,
    "field": "transaction"
    }
    ],
    "optional": false,
    "name": "test.public.customers.Envelope",
    "version": 1
    },
    "payload": {
    "before": null,
    "after": {
    "id": 1,
    "first_name": "a",
    "last_name": "a",
    "email": "a@a.com"
    },
    "source": {
    "version": "2.5.3.Final",
    "connector": "postgresql",
    "name": "test",
    "ts_ms": 1714283798721,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"22910216\",\"22910504\"]",
    "schema": "public",
    "table": "customers",
    "txId": 756,
    "lsn": 22910504,
    "xmin": null
    },
    "op": "c",
    "ts_ms": 1714283798790,
    "transaction": null
    }
    }

Шаг 3: Настройка Selena

Создайте таблицу Primary Key в Selena с той же схемой, что и исходная таблица в PostgreSQL.

CREATE TABLE `customers` (
`id` int(11) COMMENT "",
`first_name` varchar(65533) NULL COMMENT "",
`last_name` varchar(65533) NULL COMMENT "",
`email` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY hash(id) buckets 1
PROPERTIES (
"bucket_size" = "4294967296",
"in_memory" = "false",
"enable_persistent_index" = "true",
"replicated_storage" = "true",
"fast_schema_evolution" = "true"
);

Шаг 4: Установка connector

  1. Скачайте connector'ы и распакуйте пакеты в директорию plugins.

    mkdir plugins
    tar -zxvf debezium-debezium-connector-postgresql-2.5.3.zip -C plugins
    tar -zxvf selena-kafka-connector-1.0.3.tar.gz -C plugins

    Эта директория является значением элемента конфигурации plugin.path в config/connect-standalone.properties.

    plugin.path=/path/to/kafka_2.13-3.7.0/plugins
  2. Настройте source connector для PostgreSQL в pg-source.properties.

    {
    "name": "inventory-connector",
    "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "",
    "database.dbname" : "postgres",
    "topic.prefix": "test"
    }
    }
  3. Настройте sink connector для Selena в sr-sink.properties.

    {
    "name": "selena-kafka-connector",
    "config": {
    "connector.class": "com.selena.connector.kafka.SelenaSinkConnector",
    "tasks.max": "1",
    "topics": "test.public.customers",
    "selena.http.url": "172.26.195.69:28030",
    "selena.database.name": "test",
    "selena.username": "root",
    "selena.password": "Selena@123",
    "sink.properties.strip_outer_array": "true",
    "connect.timeoutms": "3000",
    "selena.topic2table.map": "test.public.customers:customers",
    "transforms": "addfield,unwrap",
    "transforms.addfield.type": "com.selena.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.unwrap.delete.handling.mode": "rewrite"
    }
    }

    ПРИМЕЧАНИЕ

    • Если таблица Selena не является таблицей Primary Key, вам не нужно указывать трансформацию addfield.
    • Трансформация unwrap предоставляется Debezium и используется для распаковки сложной структуры данных Debezium на основе типа операции. Для получения дополнительной информации см. New Record State Extraction.
  4. Настройте Kafka Connect.

    Настройте следующие элементы конфигурации в файле конфигурации Kafka Connect config/connect-standalone.properties.

    # Адреса брокеров Kafka. Несколько адресов брокеров Kafka должны быть разделены запятыми (,).
    # Обратите внимание, что в этом примере используется PLAINTEXT в качестве протокола безопасности для доступа к кластеру Kafka.
    # Если вы используете другой протокол безопасности для доступа к кластеру Kafka, настройте соответствующую информацию в этой части.

    bootstrap.servers=<kafka_broker_ip>:9092
    offset.storage.file.filename=/tmp/connect.offsets
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=false

    # Абсолютный путь к selena-kafka-connector после распаковки. Например:
    plugin.path=/home/kafka-connect/selena-kafka-connector-1.0.3

    # Параметры, которые управляют политикой сброса. Для получения дополнительной информации см. раздел Примечания по использованию.
    offset.flush.interval.ms=10000
    bufferflush.maxbytes = xxx
    bufferflush.intervalms = xxx

    Для описания дополнительных параметров см. Running Kafka Connect.

Шаг 5: Запуск Kafka Connect в автономном режиме

Запустите Kafka Connect в автономном режиме для инициализации connector'ов.

bin/connect-standalone.sh config/connect-standalone.properties config/pg-source.properties config/sr-sink.properties

Шаг 6: Проверка загрузки данных

Протестируйте следующие операции и убедитесь, что данные правильно загружены в Selena.

INSERT
  • В PostgreSQL:
postgres=# insert into customers values (2,'b','b','b@b.com');
INSERT 0 1
postgres=# select * from customers;
id | first_name | last_name | email
----+------------+-----------+---------
1 | a | a | a@a.com
2 | b | b | b@b.com
(2 rows)
  • В Selena:
MySQL [test]> select * from customers;
+------+------------+-----------+---------+
| id | first_name | last_name | email |
+------+------------+-----------+---------+
| 1 | a | a | a@a.com |
| 2 | b | b | b@b.com |
+------+------------+-----------+---------+
2 rows in set (0.01 sec)
UPDATE
  • В PostgreSQL:
postgres=# update customers set email='c@c.com';
UPDATE 2
postgres=# select * from customers;
id | first_name | last_name | email
----+------------+-----------+---------
1 | a | a | c@c.com
2 | b | b | c@c.com
(2 rows)
  • В Selena:
MySQL [test]> select * from customers;
+------+------------+-----------+---------+
| id | first_name | last_name | email |
+------+------------+-----------+---------+
| 1 | a | a | c@c.com |
| 2 | b | b | c@c.com |
+------+------------+-----------+---------+
2 rows in set (0.00 sec)
DELETE
  • В PostgreSQL:
postgres=# delete from customers where id=1;
DELETE 1
postgres=# select * from customers;
id | first_name | last_name | email
----+------------+-----------+---------
2 | b | b | c@c.com
(1 row)
  • В Selena:
MySQL [test]> select * from customers;
+------+------------+-----------+---------+
| id | first_name | last_name | email |
+------+------------+-----------+---------+
| 2 | b | b | c@c.com |
+------+------------+-----------+---------+
1 row in set (0.00 sec)