Перейти к основному содержимому

AutoMQ Kafka

AutoMQ for Kafka — это облачная версия Kafka, переработанная для облачных сред. AutoMQ Kafka является открытым исходным кодом и полностью совместима с протоколом Kafka, максимально используя преимущества облака. По сравнению с самостоятельно управляемой Apache Kafka, AutoMQ Kafka с её облачной архитектурой предлагает такие функции, как автоматическое масштабирование емкости, самобалансировка сетевого трафика, перемещение разделов за секунды. Эти функции способствуют значительному снижению общей стоимости владения (TCO) для пользователей.

Эта статья проведет вас через импорт данных в AutoMQ Kafka с использованием Selena Routine Load. Для понимания основных принципов Routine Load обратитесь к разделу о основах Routine Load.

Подготовка окружения

Подготовка Selena и тестовых данных

Убедитесь, что у вас есть работающий кластер Selena.

Создание базы данных и таблицы с первичным ключом для тестирования:

create database automq_db;
create table users (
id bigint NOT NULL,
name string NOT NULL,
timestamp string NULL,
status string NULL
) PRIMARY KEY (id)
DISTRIBUTED BY HASH(id)
PROPERTIES (
"enable_persistent_index" = "true"
);
примечание

Если кластер Selena в промежуточной среде содержит только один BE, количество реплик может быть установлено в 1 в секции PROPERTIES, например PROPERTIES( "replication_num" = "1" ). Количество реплик по умолчанию равно 3, что также является рекомендуемым числом для производственных кластеров Selena. Если вы хотите использовать количество по умолчанию, вам не нужно настраивать параметр replication_num.

Подготовка AutoMQ Kafka и тестовых данных

Для подготовки вашего окружения AutoMQ Kafka и тестовых данных следуйте руководству AutoMQ Quick Start для развертывания вашего кластера AutoMQ Kafka. Убедитесь, что Selena может напрямую подключиться к вашему серверу AutoMQ Kafka.

Чтобы быстро создать топик с именем example_topic в AutoMQ Kafka и записать в него тестовые JSON данные, выполните следующие шаги:

Создание топика

Используйте инструменты командной строки Kafka для создания топика. Убедитесь, что у вас есть доступ к окружению Kafka и служба Kafka запущена. Вот команда для создания топика:

./kafka-topics.sh --create --topic example_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1

Примечание: Замените topic и bootstrap-server на адрес вашего сервера Kafka.

Чтобы проверить результат создания топика, используйте эту команду:

./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092

Генерация тестовых данных

Сгенерируйте простые тестовые данные в формате JSON

{
"id": 1,
"name": "testuser",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}

Запись тестовых данных

Используйте инструменты командной строки Kafka или программные методы для записи тестовых данных в example_topic. Вот пример использования инструментов командной строки:

echo '{"id": 1, "name": "testuser", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic

Примечание: Замените topic и bootstrap-server на адрес вашего сервера Kafka.

Чтобы просмотреть недавно записанные данные топика, используйте следующую команду:

sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning

Создание задачи Routine Load

В командной строке Selena создайте задачу Routine Load для непрерывного импорта данных из топика AutoMQ Kafka:

CREATE ROUTINE LOAD automq_example_load ON users
COLUMNS(id, name, timestamp, status)
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.timestamp\",\"$.status\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "10.0.96.4:9092",
"kafka_topic" = "example_topic",
"kafka_partitions" = "0",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Примечание: Замените kafka_broker_list на адрес вашего сервера Kafka.

Объяснение параметров

Формат данных

Укажите формат данных как JSON в "format" = "json" в разделе PROPERTIES.

Извлечение и преобразование данных

Чтобы указать отношения сопоставления и преобразования между исходными данными и целевой таблицей, настройте параметры COLUMNS и jsonpaths. Имена столбцов в COLUMNS соответствуют именам столбцов целевой таблицы, и их порядок соответствует порядку столбцов в исходных данных. Параметр jsonpaths используется для извлечения необходимых данных полей из JSON данных, аналогично вновь сгенерированным CSV данным. Затем параметр COLUMNS временно именует поля в jsonpaths по порядку. Для более подробных объяснений преобразования данных см. Преобразование данных во время импорта.

Примечание: Если каждый JSON объект в строке имеет имена ключей и количества (порядок не требуется), которые соответствуют столбцам целевой таблицы, нет необходимости настраивать COLUMNS.

Проверка импорта данных

Сначала мы проверяем задачу импорта Routine Load и подтверждаем, что статус задачи импорта Routine Load находится в состоянии RUNNING.

show routine load\G

Затем, запрашивая соответствующую таблицу в базе данных Selena, мы можем наблюдать, что данные были успешно импортированы.

StarRocks > select * from users;
+------+--------------+---------------------+--------+
| id | name | timestamp | status |
+------+--------------+---------------------+--------+
| 1 | testuser | 2023-11-10T12:00:00 | active |
| 2 | testuser | 2023-11-10T12:00:00 | active |
+------+--------------+---------------------+--------+
2 rows in set (0.01 sec)