Перейти к основному содержимому
Версия: 2.0.x

AutoMQ Kafka

AutoMQ for Kafka — это облачная версия Kafka, переработанная для облачных сред. AutoMQ Kafka является открытым исходным кодом и полностью совместима с протоколом Kafka, полностью используя преимущества облака. По сравнению с самостоятельно управляемым Apache Kafka, AutoMQ Kafka с её облачной архитектурой предлагает такие функции, как автоматическое масштабирование ёмкости, самобалансировка сетевого трафика, перемещение партиции за секунды. Эти функции способствуют значительно более низкой совокупной стоимости владения (TCO) для пользователей.

Эта статья поможет вам импортировать данные в AutoMQ Kafka с помощью Selena Routine Load. Для понимания основных принципов Routine Load обратитесь к разделу об основах Routine Load.

Подготовка окружения

Подготовка Selena и тестовых данных

Убедитесь, что у вас есть работающий cluster Selena.

Создание базы данных и таблицы с первичным ключом для тестирования:

create database automq_db;
create table users (
id bigint NOT NULL,
name string NOT NULL,
timestamp string NULL,
status string NULL
) PRIMARY KEY (id)
DISTRIBUTED BY HASH(id)
PROPERTIES (
"enable_persistent_index" = "true"
);
примечание

Если cluster Selena в тестовой среде содержит только один BE, количество replica можно установить равным 1 в предложении PROPERTIES, например PROPERTIES( "replication_num" = "1" ). Количество replica по умолчанию равно 3, что также является рекомендуемым значением для production-кластеров Selena. Если вы хотите использовать значение по умолчанию, настраивать параметр replication_num не нужно.

Подготовка AutoMQ Kafka и тестовых данных

Для подготовки вашего окружения AutoMQ Kafka и тестовых данных следуйте руководству Быстрый старт AutoMQ для развёртывания вашего cluster AutoMQ Kafka. Убедитесь, что Selena может напрямую подключаться к вашему серверу AutoMQ Kafka.

Чтобы быстро создать топик с именем example_topic в AutoMQ Kafka и записать в него тестовые JSON-данные, выполните следующие шаги:

Создание топика

Используйте инструменты командной строки Kafka для создания топика. Убедитесь, что у вас есть доступ к окружению Kafka и служба Kafka запущена. Вот команда для создания топика:

./kafka-topics.sh --create --topic example_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1

Примечание: Замените topic и bootstrap-server на адрес вашего сервера Kafka.

Чтобы проверить результат создания топика, используйте эту команду:

./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092

Генерация тестовых данных

Сгенерируйте простые тестовые данные в формате JSON

{
"id": 1,
"name": "testuser",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}

Запись тестовых данных

Используйте инструменты командной строки Kafka или программные методы для записи тестовых данных в example_topic. Вот пример с использованием инструментов командной строки:

echo '{"id": 1, "name": "testuser", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic

Примечание: Замените topic и bootstrap-server на адрес вашего сервера Kafka.

Чтобы просмотреть недавно записанные данные топика, используйте следующую команду:

sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning

Создание задания Routine Load

В командной строке Selena создайте задание Routine Load для непрерывного импорта данных из топика AutoMQ Kafka:

CREATE ROUTINE LOAD automq_example_load ON users
COLUMNS(id, name, timestamp, status)
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.timestamp\",\"$.status\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "10.0.96.4:9092",
"kafka_topic" = "example_topic",
"kafka_partitions" = "0",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Примечание: Замените kafka_broker_list на адрес вашего сервера Kafka.

Описание параметров

Формат данных

Укажите формат данных как JSON в "format" = "json" в предложении PROPERTIES.

Извлечение и преобразование данных

Чтобы указать отношение сопоставления и преобразования между исходными данными и целевой таблицей, настройте параметры COLUMNS и jsonpaths. Имена столбцов в COLUMNS соответствуют именам столбцов целевой таблицы, а их порядок соответствует порядку столбцов в исходных данных. Параметр jsonpaths используется для извлечения необходимых данных полей из JSON-данных, подобно вновь сгенерированным CSV-данным. Затем параметр COLUMNS временно именует поля в jsonpaths по порядку. Для получения дополнительной информации о преобразовании данных см. Преобразование данных при импорте.

Примечание: Если каждый JSON-объект на строке имеет имена ключей и количество (порядок не требуется), соответствующие столбцам целевой таблицы, нет необходимости настраивать COLUMNS.

Проверка импорта данных

Сначала мы проверяем задание импорта Routine Load и убеждаемся, что статус задания импорта Routine Load находится в состоянии RUNNING.

show routine load\G

Затем, выполняя запрос к соответствующей таблице в базе данных Selena, мы можем наблюдать, что данные были успешно импортированы.

Selena > select * from users;
+------+--------------+---------------------+--------+
| id | name | timestamp | status |
+------+--------------+---------------------+--------+
| 1 | testuser | 2023-11-10T12:00:00 | active |
| 2 | testuser | 2023-11-10T12:00:00 | active |
+------+--------------+---------------------+--------+
2 rows in set (0.01 sec)