[Предварительная версия] Непрерывная загрузка данных из Apache® Pulsar™
Начиная с версии Selena 2.5, Routine Load поддерживает непрерывную загрузку данных из Apache® Pulsar™. Pulsar — это распределенная платформа обмена сообщениями и потоковой передачи данных с открытым исходным кодом, построенная по архитектуре разделения хранения и вычислений. Загрузка данных из Pulsar через Routine Load аналогична загрузке данных из Apache Kafka. В этой теме используются данные в формате CSV в качестве примера для демонстрации загрузки данных из Apache Pulsar через Routine Load.
Поддерживаемые форматы файлов данных
Routine Load поддерживает потребление данных в форматах CSV и JSON из кластера Pulsar.
ПРИМЕЧАНИЕ
Для данных в формате CSV Selena поддерживает строки в кодировке UTF-8 размером до 50 байт в качестве разделителей столбцов. Часто используемые разделители столбцов включают запятую (,), табуляцию и вертикальную черту (|).
Концепции, связанные с Pulsar
Topics в Pulsar — это именованные каналы для передачи сообщений от производителей к потребителям. Topics в Pulsar делятся на секционированные и несекционированные темы.
- Partitioned topics — это специальн ый тип тем, которые обрабатываются несколькими брокерами, что обеспечивает более высокую пропускную способность. Секционированная тема фактически реализована как N внутренних тем, где N — количество разделов.
- Non-partitioned topics — это обычный тип тем, которые обслуживаются только одним брокером, что ограничивает максимальную пропускную способность темы.
Идентификатор сообщения назначается экземплярами BookKeeper сразу после постоянного сохранения сообщения. Message ID указывает конкретную позицию сообщения в реестре и является уникальным в пределах кластера Pulsar.
Pulsar поддерживает указание потребителями начальной позиции через consumer.seek(messageId). Но по сравнению с offset потребителя Kafka, который является длинным целым числом, message ID состоит из четырех частей: ledgerId:entryID:partition-index:batch-index.
Поэтому вы не можете получить Message ID напрямую из сообщения. В результате в настоящее время Routine Load не поддерживает указание нача льной позиции при загрузке данных из Pulsar, а поддерживает только потребление данных с начала или конца раздела.
Подписка — это именованное правило конфигурации, которое определяет, как сообщения доставляются потребителям. Pulsar также поддерживает одновременную подписку потребителей на несколько тем. У темы может быть несколько подписок.
Тип подписки определяется при подключении потребителя к ней, и тип можно изменить, перезапустив всех потребителей с другой конфигурацией. В Pulsar доступны четыре типа подписок:
exclusive(по умолчанию): К подписке может подключиться только один потребитель. Только один клиент может потреблять сообщения.shared: К одной подписке могут подключаться несколько потребителей. Сообщения доставляются по кругу между потребителями, и любое данное сообщение доставляется только одному потребителю.failover: К одной подписке могут подключаться несколько потребителей. Для несекционированной темы или каждого раздела секционированной темы выбирает ся главный потребитель, который получает сообщения. Когда главный потребитель отключается, все (неподтвержденные и последующие) сообщения доставляются следующему потребителю в очереди.key_shared: К одной подписке могут подключаться несколько потребителей. Сообщения распределяются между потребителями, и сообщения с одинаковым ключом или одинаковым ключом упорядочивания доставляются только одному потребителю.
Примечание:
В настоящее время Routine Load использует тип exclusive.
Создание задания Routine Load
Следующие примеры описывают, как потреблять сообщения в формате CSV в Pulsar и загружать данные в Selena, создав задание Routine Load. Для подробных инструкций и справочной информации см. CREATE ROUTINE LOAD.
CREATE ROUTINE LOAD load_test.routine_wiki_edit_1 ON routine_wiki_edit
COLUMNS TERMINATED BY ",",
ROWS TERMINATED BY "\n",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
WHERE event_time > "2022-01-01 00:00:00",
PROPERTIES
(
"desired_concurrent_number" = "1",
"max_batch_interval" = "15000",
"max_error_number" = "1000"
)
FROM PULSAR
(
"pulsar_service_url" = "pulsar://localhost:6650",
"pulsar_topic" = "persistent://tenant/namespace/topic-name",
"pulsar_subscription" = "load-test",
"pulsar_partitions" = "load-partition-0,load-partition-1",
"pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_LATEST",
"property.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD5Y"
);
Когда Routine Load создается для потребления данных из Pulsar, большинство входных параметров, за исключением data_source_properties, такие же, как при потреблении данных из Kafka. Для описания параметров, кроме data_source_properties, см. CREATE ROUTINE LOAD.
Параметры, связанные с data_source_properties, и их описания следующие:
| Параметр | Обязательный | Описание |
|---|---|---|
| pulsar_service_url | Да | URL, используемый для подключения к кластеру Pulsar. Формат: "pulsar://ip:port" или "pulsar://service:port". Пример: "pulsar_service_url" = "pulsar://localhost:6650" |
| pulsar_topic | Да | Подписанная тема. Пример: "pulsar_topic" = "persistent://tenant/namespace/topic-name" |
| pulsar_subscription | Да | Подписка, настроенная для темы. Пример: "pulsar_subscription" = "my_subscription" |
| pulsar_partitions, pulsar_initial_positions | Нет | pulsar_partitions: Подписанные разделы в теме. pulsar_initial_positions: начальные позиции разделов, указанных в pulsar_partitions. Начальные позиции должны соответствовать разделам в pulsar_partitions. Допустимые значения: POSITION_EARLIEST (значение по умолчанию): Подписка начинается с самого раннего доступного сообщения в разделе. POSITION_LATEST: Подписка начинается с последнего доступного сообщения в разделе. Примечание: Если pulsar_partitions не указан, подписываются все разделы темы. Если указаны и pulsar_partitions, и property.pulsar_default_initial_position, значение pulsar_partitions переопределяет значение property.pulsar_default_initial_position. Если не указаны ни pulsar_partitions, ни property.pulsar_default_initial_position, подписка начинается с последнего доступного сообщения в разделе. Пример: "pulsar_partitions" = "my-partition-0,my-partition-1,my-partition-2,my-partition-3", "pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_EARLIEST,POSITION_LATEST,POSITION_LATEST" |
Routine Load поддерживает следующие пользовательские параметры для Pulsar.
| Параметр | Обязательный | Описание |
|---|---|---|
| property.pulsar_default_initial_position | Н ет | Начальные позиции по умолчанию при подписке на разделы темы. Параметр действует, когда pulsar_initial_positions не указан. Его допустимые значения такие же, как у pulsar_initial_positions. Пример: "property.pulsar_default_initial_position" = "POSITION_EARLIEST" |
| property.auth.token | Нет | Если Pulsar включает аутентификацию клиентов с использованием токенов безопасности, вам нужна строка токена для подтверждения вашей личности. Пример: "property.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD" |
Проверка задания и задачи загрузки
Проверка задания загрузки
Выполните оператор SHOW ROUTINE LOAD для проверки статуса задания загрузки routine_wiki_edit_1. Selena возвращает состояние выполнения State, статистическую информацию (включая общее количество потребленных и загруженных строк) Statistics и прогресс задания загрузки progress.
При проверке задания Routine Load, которое потребляет данные из Pulsar, большинство возвращаемых параметров, за исключением progress, такие же, как при потреблении данных из Kafka. progress относится к backlog, то есть количеству неподтвержденных сообщений в разделе.
MySQL [load_test] > SHOW ROUTINE LOAD for routine_wiki_edit_1 \G
*************************** 1. row ***************************
Id: 10142
Name: routine_wiki_edit_1
CreateTime: 2022-06-29 14:52:55
PauseTime: 2022-06-29 17:33:53
EndTime: NULL
DbName: default_cluster:test_pulsar
TableName: test1
State: PAUSED
DataSourceType: PULSAR
CurrentTaskNum: 0
JobProperties: {"partitions":"*","rowDelimiter":"'\n'","partial_update":"false","columnToColumnExpr":"*","maxBatchIntervalS":"10","whereExpr":"*","timezone":"Asia/Shanghai","format":"csv","columnSeparator":"','","json_root":"","strict_mode":"false","jsonpaths":"","desireTaskConcurrentNum":"3","maxErrorNum":"10","strip_outer_array":"false","currentTaskConcurrentNum":"0","maxBatchRows":"200000"}
DataSourceProperties: {"serviceUrl":"pulsar://localhost:6650","currentPulsarPartitions":"my-partition-0,my-partition-1","topic":"persistent://tenant/namespace/topic-name","subscription":"load-test"}
CustomProperties: {"auth.token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD"}
Statistic: {"receivedBytes":5480943882,"errorRows":0,"committedTaskNum":696,"loadedRows":66243440,"loadRowsRate":29000,"abortedTaskNum":0,"totalRows":66243440,"unselectedRows":0,"receivedBytesRate":2400000,"taskExecuteTimeMs":2283166}
Progress: {"my-partition-0(backlog): 100","my-partition-1(backlog): 0"}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
1 row in set (0.00 sec)