Перейти к основному содержимому
Версия: 2.0.x

Непрерывная загрузка данных из Apache Pulsar

Начиная с версии Selena 2.5, Routine Load поддерживает непрерывную загрузку данных из Apache Pulsar. Pulsar — это распределённая платформа обмена сообщениями и потоковой передачи с открытым исходным кодом, основанная на архитектуре разделения хранения и вычислений. Загрузка данных из Pulsar через Routine Load аналогична загрузке данных из Apache Kafka. В этой теме используются данные в формате CSV в качестве примера для демонстрации загрузки данных из Apache Pulsar через Routine Load.

Поддерживаемые форматы файлов данных

Routine Load поддерживает потребление данных в форматах CSV и JSON из cluster Pulsar.

ПРИМЕЧАНИЕ

Для данных в формате CSV Selena поддерживает строки в кодировке UTF-8 длиной до 50 байт в качестве разделителей столбцов. Часто используемые разделители столбцов включают запятую (,), табуляцию и вертикальную черту (|).

Концепции, связанные с Pulsar

Топик

Топики в Pulsar — это именованные каналы для передачи сообщений от производителей к потребителям. Топики в Pulsar делятся на партиционированные топики и непартиционированные топики.

  • Партиционированные топики — это особый тип топика, который обрабатывается несколькими брокерами, что обеспечивает более высокую пропускную способность. Партиционированный топик фактически реализован как N внутренних топиков, где N — количество партиций.
  • Непартиционированные топики — это обычный тип топика, который обслуживается только одним брокером, что ограничивает максимальную пропускную способность топика.

Message ID

Message ID сообщения назначается экземплярами BookKeeper сразу после постоянного сохранения сообщения. Message ID указывает конкретную позицию сообщения в реестре и уникален в пределах cluster Pulsar.

Pulsar поддерживает указание потребителями начальной позиции через consumer.seek(messageId). Но в отличие от смещения потребителя Kafka, которое является длинным целочисленным значением, Message ID состоит из четырёх частей: ledgerId:entryID:partition-index:batch-index.

Поэтому вы не можете получить Message ID напрямую из сообщения. В результате в настоящее время Routine Load не поддерживает указание начальной позиции при загрузке данных из Pulsar, а поддерживает только потребление данных с начала или конца партиции.

Подписка

Подписка — это именованное правило конфигурации, которое определяет, как сообщения доставляются потребителям. Pulsar также поддерживает одновременную подписку потребителей на несколько топиков. Топик может иметь несколько подписок.

Тип подписки определяется при подключении потребителя к ней, и тип можно изменить, перезапустив всех потребителей с другой конфигурацией. В Pulsar доступны четыре типа подписок:

  • exclusive (по умолчанию): Только один потребитель может быть подключён к подписке. Только одному клиенту разрешено потреблять сообщения.
  • shared: Несколько потребителей могут быть подключены к одной подписке. Сообщения доставляются по кругу между потребителями, и любое данное сообщение доставляется только одному потребителю.
  • failover: Несколько потребителей могут быть подключены к одной подписке. Главный потребитель выбирается для непартиционированного топика или каждой партиции партиционированного топика и получает сообщения. Когда главный потребитель отключается, все (неподтверждённые и последующие) сообщения доставляются следующему потребителю в очереди.
  • key_shared: Несколько потребителей могут быть подключены к одной подписке. Сообщения распределяются между потребителями, и сообщения с одинаковым ключом или одинаковым ключом упорядочивания доставляются только одному потребителю.

Примечание:

В настоящее время Routine Load использует тип exclusive.

Создание задания Routine Load

Следующие примеры описывают, как потреблять сообщения в формате CSV в Pulsar и загружать данные в Selena, создавая задание Routine Load. Для получения подробных инструкций и справки см. CREATE ROUTINE LOAD.

CREATE ROUTINE LOAD load_test.routine_wiki_edit_1 ON routine_wiki_edit
COLUMNS TERMINATED BY ",",
ROWS TERMINATED BY "\n",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
WHERE event_time > "2022-01-01 00:00:00",
PROPERTIES
(
"desired_concurrent_number" = "1",
"max_batch_interval" = "15000",
"max_error_number" = "1000"
)
FROM PULSAR
(
"pulsar_service_url" = "pulsar://localhost:6650",
"pulsar_topic" = "persistent://tenant/namespace/topic-name",
"pulsar_subscription" = "load-test",
"pulsar_partitions" = "load-partition-0,load-partition-1",
"pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_LATEST",
"property.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD5Y"
);

При создании Routine Load для потребления данных из Pulsar большинство входных параметров, кроме data_source_properties, такие же, как при потреблении данных из Kafka. Для описания параметров, кроме data_source_properties, см. CREATE ROUTINE LOAD.

Параметры, связанные с data_source_properties, и их описания приведены ниже:

ПараметрОбязательныйОписание
pulsar_service_urlДаURL, используемый для подключения к cluster Pulsar. Формат: "pulsar://ip:port" или "pulsar://service:port". Пример: "pulsar_service_url" = "pulsar://localhost:6650"
pulsar_topicДаТопик для подписки. Пример: "pulsar_topic" = "persistent://tenant/namespace/topic-name"
pulsar_subscriptionДаПодписка, настроенная для топика. Пример: "pulsar_subscription" = "my_subscription"
pulsar_partitions, pulsar_initial_positionsНетpulsar_partitions: Партиции для подписки в топике. pulsar_initial_positions: начальные позиции партиций, указанных в pulsar_partitions. Начальные позиции должны соответствовать партициям в pulsar_partitions. Допустимые значения: POSITION_EARLIEST (значение по умолчанию): Подписка начинается с самого раннего доступного сообщения в партиции. POSITION_LATEST: Подписка начинается с самого последнего доступного сообщения в партиции. Примечание: Если pulsar_partitions не указан, подписка оформляется на все партиции топика. Если указаны и pulsar_partitions, и property.pulsar_default_initial_position, значение pulsar_partitions переопределяет значение property.pulsar_default_initial_position. Если ни pulsar_partitions, ни property.pulsar_default_initial_position не указаны, подписка начинается с самого последнего доступного сообщения в партиции. Пример: "pulsar_partitions" = "my-partition-0,my-partition-1,my-partition-2,my-partition-3", "pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_EARLIEST,POSITION_LATEST,POSITION_LATEST"

Routine Load поддерживает следующие пользовательские параметры для Pulsar.

ПараметрОбязательныйОписание
property.pulsar_default_initial_positionНетНачальные позиции по умолчанию при подписке на партиции топика. Параметр действует, когда pulsar_initial_positions не указан. Его допустимые значения такие же, как у pulsar_initial_positions. Пример: "property.pulsar_default_initial_position" = "POSITION_EARLIEST"
property.auth.tokenНетЕсли Pulsar включает аутентификацию клиентов с использованием токенов безопасности, вам нужна строка токена для подтверждения вашей личности. Пример: "property.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD"

Проверка задания и задачи загрузки

Проверка задания загрузки

Выполните оператор SHOW ROUTINE LOAD, чтобы проверить статус задания загрузки routine_wiki_edit_1. Selena возвращает состояние выполнения State, статистическую информацию (включая общее количество потреблённых строк и общее количество загруженных строк) Statistics и прогресс задания загрузки progress.

При проверке задания Routine Load, которое потребляет данные из Pulsar, большинство возвращаемых параметров, кроме progress, такие же, как при потреблении данных из Kafka. progress относится к отставанию, то есть количеству неподтверждённых сообщений в партиции.

MySQL [load_test] > SHOW ROUTINE LOAD for routine_wiki_edit_1 \G
*************************** 1. row ***************************
Id: 10142
Name: routine_wiki_edit_1
CreateTime: 2022-06-29 14:52:55
PauseTime: 2022-06-29 17:33:53
EndTime: NULL
DbName: default_cluster:test_pulsar
TableName: test1
State: PAUSED
DataSourceType: PULSAR
CurrentTaskNum: 0
JobProperties: {"partitions":"*","rowDelimiter":"'\n'","partial_update":"false","columnToColumnExpr":"*","maxBatchIntervalS":"10","whereExpr":"*","timezone":"Asia/Shanghai","format":"csv","columnSeparator":"','","json_root":"","strict_mode":"false","jsonpaths":"","desireTaskConcurrentNum":"3","maxErrorNum":"10","strip_outer_array":"false","currentTaskConcurrentNum":"0","maxBatchRows":"200000"}
DataSourceProperties: {"serviceUrl":"pulsar://localhost:6650","currentPulsarPartitions":"my-partition-0,my-partition-1","topic":"persistent://tenant/namespace/topic-name","subscription":"load-test"}
CustomProperties: {"auth.token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD"}
Statistic: {"receivedBytes":5480943882,"errorRows":0,"committedTaskNum":696,"loadedRows":66243440,"loadRowsRate":29000,"abortedTaskNum":0,"totalRows":66243440,"unselectedRows":0,"receivedBytesRate":2400000,"taskExecuteTimeMs":2283166}
Progress: {"my-partition-0(backlog): 100","my-partition-1(backlog): 0"}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
1 row in set (0.00 sec)

Проверка задачи загрузки

Выполните оператор SHOW ROUTINE LOAD TASK, чтобы проверить задачи загрузки для задания routine_wiki_edit_1, такие как количество выполняемых задач, партиции топика Kafka, которые потребляются, и прогресс потребления DataSourceProperties, а также соответствующий узел Coordinator BE BeId.

MySQL [example_db]> SHOW ROUTINE LOAD TASK WHERE JobName = "routine_wiki_edit_1" \G

Изменение задания загрузки

Перед изменением задания загрузки необходимо приостановить его с помощью оператора PAUSE ROUTINE LOAD. Затем можно выполнить ALTER ROUTINE LOAD. После изменения можно выполнить оператор RESUME ROUTINE LOAD, чтобы возобновить его, и проверить его статус с помощью оператора SHOW ROUTINE LOAD.

При использовании Routine Load для потребления данных из Pulsar большинство возвращаемых параметров, кроме data_source_properties, такие же, как при потреблении данных из Kafka.

Обратите внимание на следующие моменты:

  • Среди параметров, связанных с data_source_properties, в настоящее время поддерживается изменение только pulsar_partitions, pulsar_initial_positions и пользовательских параметров Pulsar property.pulsar_default_initial_position и property.auth.token. Параметры pulsar_service_url, pulsar_topic и pulsar_subscription изменить нельзя.
  • Если вам нужно изменить партицию для потребления и соответствующую начальную позицию, вам нужно убедиться, что вы указали партицию с помощью pulsar_partitions при создании задания Routine Load, и можно изменить только начальную позицию pulsar_initial_positions указанной партиции.
  • Если вы указываете только топик pulsar_topic при создании задания Routine Load, но не партиции pulsar_partitions, вы можете изменить начальную позицию всех партиций топика через pulsar_default_initial_position.