Перейти к основному содержимому
Версия: 1.5.x

[Предварительная версия] Непрерывная загрузка данных из Apache® Pulsar™

Начиная с версии Selena 2.5, Routine Load поддерживает непрерывную загрузку данных из Apache® Pulsar™. Pulsar — это распределенная платформа обмена сообщениями и потоковой передачи данных с открытым исходным кодом, построенная по архитектуре разделения хранения и вычислений. Загрузка данных из Pulsar через Routine Load аналогична загрузке данных из Apache Kafka. В этой теме используются данные в формате CSV в качестве примера для демонстрации загрузки данных из Apache Pulsar через Routine Load.

Поддерживаемые форматы файлов данных

Routine Load поддерживает потребление данных в форматах CSV и JSON из кластера Pulsar.

ПРИМЕЧАНИЕ

Для данных в формате CSV Selena поддерживает строки в кодировке UTF-8 размером до 50 байт в качестве разделителей столбцов. Часто используемые разделители столбцов включают запятую (,), табуляцию и вертикальную черту (|).

Концепции, связанные с Pulsar

Topic

Topics в Pulsar — это именованные каналы для передачи сообщений от производителей к потребителям. Topics в Pulsar делятся на секционированные и несекционированные темы.

  • Partitioned topics — это специальный тип тем, которые обрабатываются несколькими брокерами, что обеспечивает более высокую пропускную способность. Секционированная тема фактически реализована как N внутренних тем, где N — количество разделов.
  • Non-partitioned topics — это обычный тип тем, которые обслуживаются только одним брокером, что ограничивает максимальную пропускную способность темы.

Message ID

Идентификатор сообщения назначается экземплярами BookKeeper сразу после постоянного сохранения сообщения. Message ID указывает конкретную позицию сообщения в реестре и является уникальным в пределах кластера Pulsar.

Pulsar поддерживает указание потребителями начальной позиции через consumer.seek(messageId). Но по сравнению с offset потребителя Kafka, который является длинным целым числом, message ID состоит из четырех частей: ledgerId:entryID:partition-index:batch-index.

Поэтому вы не можете получить Message ID напрямую из сообщения. В результате в настоящее время Routine Load не поддерживает указание начальной позиции при загрузке данных из Pulsar, а поддерживает только потребление данных с начала или конца раздела.

Subscription

Подписка — это именованное правило конфигурации, которое определяет, как сообщения доставляются потребителям. Pulsar также поддерживает одновременную подписку потребителей на несколько тем. У темы может быть несколько подписок.

Тип подписки определяется при подключении потребителя к ней, и тип можно изменить, перезапустив всех потребителей с другой конфигурацией. В Pulsar доступны четыре типа подписок:

  • exclusive (по умолчанию): К подписке может подключиться только один потребитель. Только один клиент может потреблять сообщения.
  • shared: К одной подписке могут подключаться несколько потребителей. Сообщения доставляются по кругу между потребителями, и любое данное сообщение доставляется только одному потребителю.
  • failover: К одной подписке могут подключаться несколько потребителей. Для несекционированной темы или каждого раздела секционированной темы выбирается главный потребитель, который получает сообщения. Когда главный потребитель отключается, все (неподтвержденные и последующие) сообщения доставляются следующему потребителю в очереди.
  • key_shared: К одной подписке могут подключаться несколько потребителей. Сообщения распределяются между потребителями, и сообщения с одинаковым ключом или одинаковым ключом упорядочивания доставляются только одному потребителю.

Примечание:

В настоящее время Routine Load использует тип exclusive.

Создание задания Routine Load

Следующие примеры описывают, как потреблять сообщения в формате CSV в Pulsar и загружать данные в Selena, создав задание Routine Load. Для подробных инструкций и справочной информации см. CREATE ROUTINE LOAD.

CREATE ROUTINE LOAD load_test.routine_wiki_edit_1 ON routine_wiki_edit
COLUMNS TERMINATED BY ",",
ROWS TERMINATED BY "\n",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
WHERE event_time > "2022-01-01 00:00:00",
PROPERTIES
(
"desired_concurrent_number" = "1",
"max_batch_interval" = "15000",
"max_error_number" = "1000"
)
FROM PULSAR
(
"pulsar_service_url" = "pulsar://localhost:6650",
"pulsar_topic" = "persistent://tenant/namespace/topic-name",
"pulsar_subscription" = "load-test",
"pulsar_partitions" = "load-partition-0,load-partition-1",
"pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_LATEST",
"property.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD5Y"
);

Когда Routine Load создается для потребления данных из Pulsar, большинство входных параметров, за исключением data_source_properties, такие же, как при потреблении данных из Kafka. Для описания параметров, кроме data_source_properties, см. CREATE ROUTINE LOAD.

Параметры, связанные с data_source_properties, и их описания следующие:

ПараметрОбязательныйОписание
pulsar_service_urlДаURL, используемый для подключения к кластеру Pulsar. Формат: "pulsar://ip:port" или "pulsar://service:port". Пример: "pulsar_service_url" = "pulsar://localhost:6650"
pulsar_topicДаПодписанная тема. Пример: "pulsar_topic" = "persistent://tenant/namespace/topic-name"
pulsar_subscriptionДаПодписка, настроенная для темы. Пример: "pulsar_subscription" = "my_subscription"
pulsar_partitions, pulsar_initial_positionsНетpulsar_partitions: Подписанные разделы в теме. pulsar_initial_positions: начальные позиции разделов, указанных в pulsar_partitions. Начальные позиции должны соответствовать разделам в pulsar_partitions. Допустимые значения: POSITION_EARLIEST (значение по умолчанию): Подписка начинается с самого раннего доступного сообщения в разделе. POSITION_LATEST: Подписка начинается с последнего доступного сообщения в разделе. Примечание: Если pulsar_partitions не указан, подписываются все разделы темы. Если указаны и pulsar_partitions, и property.pulsar_default_initial_position, значение pulsar_partitions переопределяет значение property.pulsar_default_initial_position. Если не указаны ни pulsar_partitions, ни property.pulsar_default_initial_position, подписка начинается с последнего доступного сообщения в разделе. Пример: "pulsar_partitions" = "my-partition-0,my-partition-1,my-partition-2,my-partition-3", "pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_EARLIEST,POSITION_LATEST,POSITION_LATEST"

Routine Load поддерживает следующие пользовательские параметры для Pulsar.

ПараметрОбязательныйОписание
property.pulsar_default_initial_positionНетНачальные позиции по умолчанию при подписке на разделы темы. Параметр действует, когда pulsar_initial_positions не указан. Его допустимые значения такие же, как у pulsar_initial_positions. Пример: "property.pulsar_default_initial_position" = "POSITION_EARLIEST"
property.auth.tokenНетЕсли Pulsar включает аутентификацию клиентов с использованием токенов безопасности, вам нужна строка токена для подтверждения вашей личности. Пример: "property.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD"

Проверка задания и задачи загрузки

Проверка задания загрузки

Выполните оператор SHOW ROUTINE LOAD для проверки статуса задания загрузки routine_wiki_edit_1. Selena возвращает состояние выполнения State, статистическую информацию (включая общее количество потребленных и загруженных строк) Statistics и прогресс задания загрузки progress.

При проверке задания Routine Load, которое потребляет данные из Pulsar, большинство возвращаемых параметров, за исключением progress, такие же, как при потреблении данных из Kafka. progress относится к backlog, то есть количеству неподтвержденных сообщений в разделе.

MySQL [load_test] > SHOW ROUTINE LOAD for routine_wiki_edit_1 \G
*************************** 1. row ***************************
Id: 10142
Name: routine_wiki_edit_1
CreateTime: 2022-06-29 14:52:55
PauseTime: 2022-06-29 17:33:53
EndTime: NULL
DbName: default_cluster:test_pulsar
TableName: test1
State: PAUSED
DataSourceType: PULSAR
CurrentTaskNum: 0
JobProperties: {"partitions":"*","rowDelimiter":"'\n'","partial_update":"false","columnToColumnExpr":"*","maxBatchIntervalS":"10","whereExpr":"*","timezone":"Asia/Shanghai","format":"csv","columnSeparator":"','","json_root":"","strict_mode":"false","jsonpaths":"","desireTaskConcurrentNum":"3","maxErrorNum":"10","strip_outer_array":"false","currentTaskConcurrentNum":"0","maxBatchRows":"200000"}
DataSourceProperties: {"serviceUrl":"pulsar://localhost:6650","currentPulsarPartitions":"my-partition-0,my-partition-1","topic":"persistent://tenant/namespace/topic-name","subscription":"load-test"}
CustomProperties: {"auth.token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD"}
Statistic: {"receivedBytes":5480943882,"errorRows":0,"committedTaskNum":696,"loadedRows":66243440,"loadRowsRate":29000,"abortedTaskNum":0,"totalRows":66243440,"unselectedRows":0,"receivedBytesRate":2400000,"taskExecuteTimeMs":2283166}
Progress: {"my-partition-0(backlog): 100","my-partition-1(backlog): 0"}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
1 row in set (0.00 sec)

Проверка задачи загрузки

Выполните оператор SHOW ROUTINE LOAD TASK для проверки задач загрузки задания routine_wiki_edit_1, например, сколько задач выполняется, какие разделы темы Kafka потребляются и прогресс потребления DataSourceProperties, а также соответствующий узел Coordinator BE BeId.

MySQL [example_db]> SHOW ROUTINE LOAD TASK WHERE JobName = "routine_wiki_edit_1" \G

Изменение задания загрузки

Перед изменением задания загрузки необходимо приостановить его с помощью оператора PAUSE ROUTINE LOAD. Затем можно выполнить ALTER ROUTINE LOAD. После изменения можно выполнить оператор RESUME ROUTINE LOAD для возобновления и проверить его статус с помощью оператора SHOW ROUTINE LOAD.

Когда Routine Load используется для потребления данных из Pulsar, большинство возвращаемых параметров, за исключением data_source_properties, такие же, как при потреблении данных из Kafka.

Обратите внимание на следующие моменты:

  • Среди параметров, связанных с data_source_properties, в настоящее время поддерживается изменение только pulsar_partitions, pulsar_initial_positions и пользовательских параметров Pulsar property.pulsar_default_initial_position и property.auth.token. Параметры pulsar_service_url, pulsar_topic и pulsar_subscription изменить нельзя.
  • Если вам нужно изменить раздел для потребления и соответствующую начальную позицию, убедитесь, что вы указали раздел с помощью pulsar_partitions при создании задания Routine Load, и можно изменить только начальную позицию pulsar_initial_positions указанного раздела.
  • Если при создании задания Routine Load вы указали только Topic pulsar_topic, но не разделы pulsar_partitions, вы можете изменить начальную позицию всех разделов под темой через pulsar_default_initial_position.