Перейти к основному содержимому

Загрузка данных с помощью Routine Load

подсказка

Попробуйте Routine Load в этом Быстром старте

Эта тема знакомит с тем, как создать задание Routine Load для потоковой передачи сообщений Kafka (событий) в Selena, и знакомит с некоторыми основными концепциями Routine Load.

Для непрерывной загрузки сообщений потока в Selena вы можете сохранить поток сообщений в топике Kafka и создать задание Routine Load для потребления сообщений. Задание Routine Load сохраняется в Selena, генерирует серию задач загрузки для потребления сообщений во всех или части разделов топика и загружает сообщения в Selena.

Задание Routine Load поддерживает семантику доставки exactly-once для гарантии того, что данные, загруженные в Selena, не будут потеряны или дублированы.

Routine Load поддерживает преобразование данных при загрузке и поддерживает изменения данных, выполняемые операциями UPSERT и DELETE во время загрузки данных. Для получения дополнительной информации см. Преобразование данных при загрузке и Изменение данных через загрузку.

Вы можете загружать данные в таблицы Selena только как пользователь, имеющий привилегию INSERT на эти таблицы Selena. Если у вас нет привилегии INSERT, следуйте инструкциям в GRANT, чтобы предоставить привилегию INSERT пользователю, которого вы используете для подключения к вашему кластеру Selena. Синтаксис: GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}.

Поддерживаемые форматы данных

Routine Load теперь поддерживает потребление данных в форматах CSV, JSON и Avro (поддерживается с версии v3.0.1) из кластера Kafka.

ПРИМЕЧАНИЕ

Для данных CSV обратите внимание на следующие моменты:

  • Вы можете использовать строку UTF-8, такую как запятая (,), табуляция или вертикальная черта (|), длина которой не превышает 50 байт, в качестве разделителя текста.
  • Нулевые значения обозначаются с помощью \N. Например, файл данных состоит из трех столбцов, и запись из этого файла данных содержит данные в первом и третьем столбцах, но не содержит данных во втором столбце. В этой ситуации вам нужно использовать \N во втором столбце для обозначения нулевого значения. Это означает, что запись должна быть скомпилирована как a,\N,b вместо a,,b. a,,b означает, что второй столбец записи содержит пустую строку.

Основные концепции

routine load

Терминология

  • Задание загрузки

    Задание Routine Load — это долго выполняющееся задание. Пока его статус RUNNING, задание загрузки непрерывно генерирует одну или несколько параллельных задач загрузки, которые потребляют сообщения в топике кластера Kafka и загружают данные в Selena.

  • Задача загрузки

    Задание загрузки разделяется на несколько задач загрузки по определенным правилам. Задача загрузки является основной единицей загрузки данных. Как отдельное событие, задача загрузки реализует механизм загрузки на основе Stream Load. Несколько задач загрузки одновременно потребляют сообщения из разных разделов топика и загружают данные в Selena.

Рабочий процесс

  1. Создание задания Routine Load. Для загрузки данных из Kafka вам нужно создать задание Routine Load, выполнив оператор CREATE ROUTINE LOAD. FE анализирует оператор и создает задание в соответствии с указанными вами свойствами.

  2. FE разделяет задание на несколько задач загрузки.

    FE разделяет задание на несколько задач загрузки на основе определенных правил. Каждая задача загрузки является отдельной транзакцией. Правила разделения следующие:

    • FE вычисляет фактическое количество параллельных задач загрузки в соответствии с желаемым количеством параллельных задач desired_concurrent_number, количеством разделов в топике Kafka и количеством живых узлов BE.
    • FE разделяет задание на задачи загрузки на основе вычисленного фактического количества параллельных задач и размещает задачи в очереди задач.

    Каждый топик Kafka состоит из нескольких разделов. Связь между разделом топика и задачей загрузки следующая:

    • Раздел уникально назначается задаче загрузки, и все сообщения из раздела потребляются задачей загрузки.
    • Задача загрузки может потреблять сообщения из одного или нескольких разделов.
    • Все разделы равномерно распределяются между задачами загрузки.
  3. Несколько задач загрузки выполняются параллельно для потребления сообщений из нескольких разделов топика Kafka и загрузки данных в Selena

    1. FE планирует и отправляет задачи загрузки: FE планирует задачи загрузки в очереди на своевременной основе и назначает их выбранным узлам Coordinator BE. Интервал между задачами загрузки определяется элементом конфигурации max_batch_interval. FE равномерно распределяет задачи загрузки по всем узлам BE. См. CREATE ROUTINE LOAD для получения дополнительной информации о max_batch_interval.

    2. Coordinator BE запускает задачу загрузки, потребляет сообщения в разделах, анализирует и фильтрует данные. Задача загрузки длится до тех пор, пока не будет потреблено предопределенное количество сообщений или не будет достигнут предопределенный временной лимит. Размер пакета сообщений и временной лимит определяются в конфигурациях FE max_routine_load_batch_size и routine_load_task_consume_second. Для получения подробной информации см. Конфигурация FE. Затем Coordinator BE распределяет сообщения по Executor BE. Executor BE записывают сообщения на диски.

      ПРИМЕЧАНИЕ

      Selena поддерживает доступ к Kafka через протоколы безопасности, включая SASL_SSL, SAS_PLAINTEXT, SSL и PLAINTEXT. В этой теме используется подключение к Kafka через PLAINTEXT в качестве примера. Если вам нужно подключиться к Kafka через другие протоколы безопасности, см. CREATE ROUTINE LOAD.

  4. FE генерирует новые задачи загрузки для непрерывной загрузки данных. После того как Executor BE записали данные на диски, Coordinator BE сообщает результат задачи загрузки в FE. На основе результата FE затем генерирует новые задачи загрузки для непрерывной загрузки данных. Или FE повторяет неудачные задачи, чтобы убедиться, что данные, загруженные в Selena, не потеряны и не дублированы.

Создание задания Routine Load

Следующие три примера описывают, как потреблять данные в формате CSV, JSON и Avro в Kafka и загружать данные в Selena, создав задание Routine Load. Для подробного описания синтаксиса и параметров см. CREATE ROUTINE LOAD.

Загрузка данных в формате CSV

Этот раздел описывает, как создать задание Routine Load для потребления данных в формате CSV в кластере Kafka и загрузки данных в Selena.

Подготовка набора данных

Предположим, что в топике ordertest1 в кластере Kafka есть набор данных в формате CSV. Каждое сообщение в наборе данных включает шесть полей: ID заказа, дата платежа, имя клиента, национальность, пол и цена.

2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
2020050802,2020-05-08,Julien Sorel,France,male,893
2020050803,2020-05-08,Dorian Grey,UK,male,1262
2020050901,2020-05-09,Anna Karenina",Russia,female,175
2020051001,2020-05-10,Tess Durbeyfield,US,female,986
2020051101,2020-05-11,Edogawa Conan,japan,male,8924

Создание таблицы

В соответствии с полями данных в формате CSV создайте таблицу example_tbl1 в базе данных example_db. Следующий пример создает таблицу с 5 полями, исключая поле пола клиента из данных в формате CSV.

CREATE TABLE example_db.example_tbl1 ( 
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Payment date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Nationality",
`price`double NULL COMMENT "Price"
)
ENGINE=OLAP
DUPLICATE KEY (order_id,pay_dt)
DISTRIBUTED BY HASH(`order_id`);

УВЕДОМЛЕНИЕ

Начиная с версии 1.5.0, Selena может автоматически устанавливать количество корзин (BUCKETS) при создании таблицы или добавлении раздела. Вам больше не нужно вручную устанавливать количество корзин. Для получения подробной информации см. установка количества корзин.

Отправка задания Routine Load

Выполните следующий оператор для отправки задания Routine Load с именем example_tbl1_ordertest1 для потребления сообщений в топике ordertest1 и загрузки данных в таблицу example_tbl1. Задача загрузки потребляет сообщения с начального смещения в указанных разделах топика.

CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
PROPERTIES
(
"desired_concurrent_number" = "5"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1",
"kafka_partitions" = "0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

После отправки задания загрузки вы можете выполнить оператор SHOW ROUTINE LOAD для проверки статуса задания загрузки.

  • Имя задания загрузки

    На таблице может быть несколько заданий загрузки. Поэтому мы рекомендуем называть задание загрузки с соответствующим топиком Kafka и временем отправки задания загрузки. Это поможет вам различать задания загрузки на каждой таблице.

  • Разделитель столбцов

    Свойство COLUMN TERMINATED BY определяет разделитель столбцов данных в формате CSV. По умолчанию это \t.

  • Раздел и смещение топика Kafka

    Вы можете указать свойства kafka_partitions и kafka_offsets для указания разделов и смещений для потребления сообщений. Например, если вы хотите, чтобы задание загрузки потребляло сообщения из разделов Kafka "0,1,2,3,4" топика ordertest1, все с начальными смещениями, вы можете указать свойства следующим образом: Если вы хотите, чтобы задание загрузки потребляло сообщения из разделов Kafka "0,1,2,3,4" и вам нужно указать отдельное начальное смещение для каждого раздела, вы можете настроить следующим образом:

    "kafka_partitions" ="0,1,2,3,4",
    "kafka_offsets" = "OFFSET_BEGINNING, OFFSET_END, 1000, 2000, 3000"

    Вы также можете установить смещения по умолчанию для всех разделов с помощью свойства property.kafka_default_offsets.

    "kafka_partitions" ="0,1,2,3,4",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"

    Для получения подробной информации см. CREATE ROUTINE LOAD.

  • Сопоставление и преобразование данных

    Для указания отношения сопоставления и преобразования между данными в формате CSV и таблицей Selena вам нужно использовать параметр COLUMNS.

    Сопоставление данных:

    • Selena извлекает столбцы в данных формата CSV и сопоставляет их по порядку с полями, объявленными в параметре COLUMNS.

    • Selena извлекает поля, объявленные в параметре COLUMNS, и сопоставляет их по имени со столбцами таблицы Selena.

    Преобразование данных:

    И поскольку пример исключает столбец пола клиента из данных в формате CSV, поле temp_gender в параметре COLUMNS используется как заполнитель для этого поля. Другие поля сопоставляются со столбцами таблицы Selena example_tbl1 напрямую.

    Для получения дополнительной информации о преобразовании данных см. Преобразование данных при загрузке.

    ПРИМЕЧАНИЕ

    Вам не нужно указывать параметр COLUMNS, если имена, количество и порядок столбцов в данных формата CSV полностью соответствуют таковым в таблице Selena.

  • Параллелизм задач

    Когда есть много разделов топика Kafka и достаточно узлов BE, вы можете ускорить загрузку, увеличив параллелизм задач.

    Для увеличения фактического параллелизма задач загрузки вы можете увеличить желаемый параллелизм задач загрузки desired_concurrent_number при создании задания routine load. Вы также можете установить динамический элемент конфигурации FE max_routine_load_task_concurrent_num (максимальный параллелизм задач загрузки по умолчанию) на большее значение. Для получения дополнительной информации о max_routine_load_task_concurrent_num см. элементы конфигурации FE.

    Фактический параллелизм задач определяется минимальным значением среди количества живых узлов BE, количества предварительно указанных разделов топика Kafka и значений desired_concurrent_number и max_routine_load_task_concurrent_num.

    В примере количество живых узлов BE равно 5, количество предварительно указанных разделов топика Kafka равно 5, а значение max_routine_load_task_concurrent_num равно 5. Для увеличения фактического параллелизма задач загрузки вы можете увеличить desired_concurrent_number с значения по умолчанию 3 до 5.

    Для получения дополнительной информации о свойствах см. CREATE ROUTINE LOAD.

Загрузка данных в формате JSON

Этот раздел описывает, как создать задание Routine Load для потребления данных в формате JSON в кластере Kafka и загрузки данных в Selena.

Подготовка набора данных

Предположим, что в топике ordertest2 в кластере Kafka есть набор данных в формате JSON. Набор данных включает шесть ключей: ID товара, имя клиента, национальность, время платежа и цена. Кроме того, вы хотите преобразовать столбец времени платежа в тип DATE и загрузить его в столбец pay_dt в таблице Selena.

{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875}
{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895}
{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}

ВНИМАНИЕ Каждый объект JSON в строке должен быть в одном сообщении Kafka, иначе возвращается ошибка анализа JSON.

Создание таблицы

В соответствии с ключами данных в формате JSON создайте таблицу example_tbl2 в базе данных example_db.

CREATE TABLE `example_tbl2` ( 
`commodity_id` varchar(26) NULL COMMENT "Commodity ID",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`country` varchar(26) NULL COMMENT "Country",
`pay_time` bigint(20) NULL COMMENT "Payment time",
`pay_dt` date NULL COMMENT "Payment date",
`price`double SUM NULL COMMENT "Price"
)
ENGINE=OLAP
AGGREGATE KEY(`commodity_id`,`customer_name`,`country`,`pay_time`,`pay_dt`)
DISTRIBUTED BY HASH(`commodity_id`);

УВЕДОМЛЕНИЕ

Начиная с версии 1.5.0, Selena может автоматически устанавливать количество корзин (BUCKETS) при создании таблицы или добавлении раздела. Вам больше не нужно вручную устанавливать количество корзин. Для получения подробной информации см. установка количества корзин.

Отправка задания Routine Load

Выполните следующий оператор для отправки задания Routine Load с именем example_tbl2_ordertest2 для потребления сообщений в топике ordertest2 и загрузки данных в таблицу example_tbl2. Задача загрузки потребляет сообщения с начального смещения в указанных разделах топика.

CREATE ROUTINE LOAD example_db.example_tbl2_ordertest2 ON example_tbl2
COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2",
"kafka_partitions" ="0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

После отправки задания загрузки вы можете выполнить оператор SHOW ROUTINE LOAD для проверки статуса задания загрузки.

  • Формат данных

    Вам нужно указать "format" = "json" в предложении PROPERTIES для определения того, что формат данных — JSON.

  • Сопоставление и преобразование данных

    Для указания отношения сопоставления и преобразования между данными в формате JSON и таблицей Selena вам нужно указать параметр COLUMNS и свойство jsonpaths. Порядок полей, указанных в параметре COLUMNS, должен соответствовать порядку данных в формате JSON, а имена полей должны соответствовать именам таблицы Selena. Свойство jsonpaths используется для извлечения необходимых полей из данных JSON. Эти поля затем именуются свойством COLUMNS.

    Поскольку пример требует преобразования поля времени платежа в тип данных DATE и загрузки данных в столбец pay_dt в таблице Selena, вам нужно использовать функцию from_unixtime. Другие поля сопоставляются с полями таблицы example_tbl2 напрямую.

    Сопоставление данных:

    • Selena извлекает ключи name и code данных в формате JSON и сопоставляет их с ключами, объявленными в свойстве jsonpaths.

    • Selena извлекает ключи, объявленные в свойстве jsonpaths, и сопоставляет их по порядку с полями, объявленными в параметре COLUMNS.

    • Selena извлекает поля, объявленные в параметре COLUMNS, и сопоставляет их по имени со столбцами таблицы Selena.

    Преобразование данных:

    • Поскольку пример требует преобразования ключа pay_time в тип данных DATE и загрузки данных в столбец pay_dt в таблице Selena, вам нужно использовать функцию from_unixtime в параметре COLUMNS. Другие поля сопоставляются с полями таблицы example_tbl2 напрямую.

    • И поскольку пример исключает столбец пола клиента из данных в формате JSON, поле temp_gender в параметре COLUMNS используется как заполнитель для этого поля. Другие поля сопоставляются со столбцами таблицы Selena example_tbl1 напрямую.

      Для получения дополнительной информации о преобразовании данных см. Преобразование данных при загрузке.

      ПРИМЕЧАНИЕ

      Вам не нужно указывать параметр COLUMNS, если имена и количество ключей в объекте JSON полностью соответствуют именам полей в таблице Selena.

Загрузка данных в формате Avro

Начиная с версии 1.5.0, Selena поддерживает загрузку данных Avro с помощью Routine Load.

Подготовка набора данных

Схема Avro
  1. Создайте следующий файл схемы Avro avro_schema.avsc:

    {
    "type": "record",
    "name": "sensor_log",
    "fields" : [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "checked", "type" : "boolean"},
    {"name": "data", "type": "double"},
    {"name": "sensor_type", "type": {"type": "enum", "name": "sensor_type_enum", "symbols" : ["TEMPERATURE", "HUMIDITY", "AIR-PRESSURE"]}}
    ]
    }
  2. Зарегистрируйте схему Avro в Schema Registry.

Данные Avro

Подготовьте данные Avro и отправьте их в топик Kafka topic_0.

Создание таблицы

В соответствии с полями данных Avro создайте таблицу sensor_log в целевой базе данных example_db в кластере Selena. Имена столбцов таблицы должны соответствовать именам полей в данных Avro. Для сопоставления типов данных между столбцами таблицы и полями данных Avro см. [Сопоставление типов данных](#Data types mapping).

CREATE TABLE example_db.sensor_log ( 
`id` bigint NOT NULL COMMENT "sensor id",
`name` varchar(26) NOT NULL COMMENT "sensor name",
`checked` boolean NOT NULL COMMENT "checked",
`data` double NULL COMMENT "sensor data",
`sensor_type` varchar(26) NOT NULL COMMENT "sensor type"
)
ENGINE=OLAP
DUPLICATE KEY (id)
DISTRIBUTED BY HASH(`id`);

УВЕДОМЛЕНИЕ

Начиная с версии 1.5.0, Selena может автоматически устанавливать количество корзин (BUCKETS) при создании таблицы или добавлении раздела. Вам больше не нужно вручную устанавливать количество корзин. Для получения подробной информации см. установка количества корзин.

Отправка задания Routine Load

Выполните следующий оператор для отправки задания Routine Load с именем sensor_log_load_job для потребления сообщений Avro в топике Kafka topic_0 и загрузки данных в таблицу sensor_log в базе данных sensor. Задание загрузки потребляет сообщения с начального смещения в указанных разделах топика.

CREATE ROUTINE LOAD example_db.sensor_log_load_job ON sensor_log  
PROPERTIES
(
"format" = "avro"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>,...",
"confluent.schema.registry.url" = "http://172.xx.xxx.xxx:8081",
"kafka_topic" = "topic_0",
"kafka_partitions" = "0,1,2,3,4,5",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
  • Формат данных

    Вам нужно указать "format = "avro" в предложении PROPERTIES для определения того, что формат данных — Avro.

  • Schema Registry

    Вам нужно настроить confluent.schema.registry.url для указания URL Schema Registry, где зарегистрирована схема Avro. Selena получает схему Avro, используя этот URL. Формат следующий:

    confluent.schema.registry.url = http[s]://[<schema-registry-api-key>:<schema-registry-api-secret>@]<hostname|ip address>[:<port>]
  • Сопоставление и преобразование данных

    Для указания отношения сопоставления и преобразования между данными в формате Avro и таблицей Selena вам нужно указать параметр COLUMNS и свойство jsonpaths. Порядок полей, указанных в параметре COLUMNS, должен соответствовать порядку полей в свойстве jsonpaths, а имена полей должны соответствовать именам таблицы Selena. Свойство jsonpaths используется для извлечения необходимых полей из данных Avro. Эти поля затем именуются свойством COLUMNS.

    Для получения дополнительной информации о преобразовании данных см. Преобразование данных при загрузке.

    ПРИМЕЧАНИЕ

    Вам не нужно указывать параметр COLUMNS, если имена и количество полей в записи Avro полностью соответствуют именам столбцов в таблице Selena.

После отправки задания загрузки вы можете выполнить оператор SHOW ROUTINE LOAD для проверки статуса задания загрузки.

Сопоставление типов данных

Сопоставление типов данных между полями данных Avro, которые вы хотите загрузить, и столбцами таблицы Selena следующее:

Примитивные типы
AvroSelena
nulNULL
booleanBOOLEAN
intINT
longBIGINT
floatFLOAT
doubleDOUBLE
bytesSTRING
stringSTRING
Сложные типы
AvroSelena
recordЗагрузить всю ЗАПИСЬ или ее подполя в Selena как JSON.
enumsSTRING
arraysARRAY
mapsJSON
union(T, null)NULLABLE(T)
fixedSTRING

Ограничения

  • В настоящее время Selena не поддерживает эволюцию схемы.
  • Каждое сообщение Kafka должно содержать только одну запись данных Avro.

Проверка задания и задачи загрузки

Проверка задания загрузки

Выполните оператор SHOW ROUTINE LOAD для проверки статуса задания загрузки example_tbl2_ordertest2. Selena возвращает состояние выполнения State, статистическую информацию (включая общее количество потребленных строк и общее количество загруженных строк) Statistics и прогресс задания загрузки progress.

Если состояние задания загрузки автоматически изменилось на PAUSED, возможно, это связано с тем, что количество строк с ошибками превысило пороговое значение. Для получения подробных инструкций по установке этого порогового значения см. CREATE ROUTINE LOAD. Вы можете проверить файлы ReasonOfStateChanged и ErrorLogUrls для выявления и устранения проблемы. Устранив проблему, вы можете затем выполнить оператор RESUME ROUTINE LOAD для возобновления ПРИОСТАНОВЛЕННОГО задания загрузки.

Если состояние задания загрузки CANCELLED, возможно, это связано с тем, что задание загрузки столкнулось с исключением (например, таблица была удалена). Вы можете проверить файлы ReasonOfStateChanged и ErrorLogUrls для выявления и устранения проблемы. Однако вы не можете возобновить ОТМЕНЕННОЕ задание загрузки.

MySQL [example_db]> SHOW ROUTINE LOAD FOR example_tbl2_ordertest2 \G
*************************** 1. row ***************************
Id: 63013
Name: example_tbl2_ordertest2
CreateTime: 2022-08-10 17:09:00
PauseTime: NULL
EndTime: NULL
DbName: default_cluster:example_db
TableName: example_tbl2
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 3
JobProperties: {"partitions":"*","partial_update":"false","columnToColumnExpr":"commodity_id,customer_name,country,pay_time,pay_dt=from_unixtime(`pay_time`, '%Y%m%d'),price","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","timezone":"Asia/Shanghai","format":"json","json_root":"","strict_mode":"false","jsonpaths":"[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"3","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"ordertest2","currentKafkaPartitions":"0,1,2,3,4","brokerList":"<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>"}
CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING"}
Statistic: {"receivedBytes":230,"errorRows":0,"committedTaskNum":1,"loadedRows":2,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":2,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":522}
Progress: {"0":"1","1":"OFFSET_ZERO","2":"OFFSET_ZERO","3":"OFFSET_ZERO","4":"OFFSET_ZERO"}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:

ВНИМАНИЕ

Вы не можете проверить задание загрузки, которое остановлено или еще не запущено.

Проверка задачи загрузки

Выполните оператор SHOW ROUTINE LOAD TASK для проверки задач загрузки задания загрузки example_tbl2_ordertest2, например, сколько задач в настоящее время выполняется, разделы топика Kafka, которые потребляются, и прогресс потребления DataSourceProperties, а также соответствующий узел Coordinator BE BeId.

MySQL [example_db]> SHOW ROUTINE LOAD TASK WHERE JobName = "example_tbl2_ordertest2" \G
*************************** 1. row ***************************
TaskId: 18c3a823-d73e-4a64-b9cb-b9eced026753
TxnId: -1
TxnStatus: UNKNOWN
JobId: 63013
CreateTime: 2022-08-10 17:09:05
LastScheduledTime: 2022-08-10 17:47:27
ExecuteStartTime: NULL
Timeout: 60
BeId: -1
DataSourceProperties: {"1":0,"4":0}
Message: there is no new data in kafka, wait for 20 seconds to schedule again
*************************** 2. row ***************************
TaskId: f76c97ac-26aa-4b41-8194-a8ba2063eb00
TxnId: -1
TxnStatus: UNKNOWN
JobId: 63013
CreateTime: 2022-08-10 17:09:05
LastScheduledTime: 2022-08-10 17:47:26
ExecuteStartTime: NULL
Timeout: 60
BeId: -1
DataSourceProperties: {"2":0}
Message: there is no new data in kafka, wait for 20 seconds to schedule again
*************************** 3. row ***************************
TaskId: 1a327a34-99f4-4f8d-8014-3cd38db99ec6
TxnId: -1
TxnStatus: UNKNOWN
JobId: 63013
CreateTime: 2022-08-10 17:09:26
LastScheduledTime: 2022-08-10 17:47:27
ExecuteStartTime: NULL
Timeout: 60
BeId: -1
DataSourceProperties: {"0":2,"3":0}
Message: there is no new data in kafka, wait for 20 seconds to schedule again

Приостановка задания загрузки

Вы можете выполнить оператор PAUSE ROUTINE LOAD для приостановки задания загрузки. Состояние задания загрузки будет PAUSED после выполнения оператора. Однако оно не остановлено. Вы можете выполнить оператор RESUME ROUTINE LOAD для его возобновления. Вы также можете проверить его статус с помощью оператора SHOW ROUTINE LOAD.

Следующий пример приостанавливает задание загрузки example_tbl2_ordertest2:

PAUSE ROUTINE LOAD FOR example_tbl2_ordertest2;

Возобновление задания загрузки

Вы можете выполнить оператор RESUME ROUTINE LOAD для возобновления приостановленного задания загрузки. Состояние задания загрузки будет временно NEED_SCHEDULE (поскольку задание загрузки перепланируется), а затем станет RUNNING. Вы можете проверить его статус с помощью оператора SHOW ROUTINE LOAD.

Следующий пример возобновляет приостановленное задание загрузки example_tbl2_ordertest2:

RESUME ROUTINE LOAD FOR example_tbl2_ordertest2;

Изменение задания загрузки

Перед изменением задания загрузки вы должны приостановить его с помощью оператора PAUSE ROUTINE LOAD. Затем вы можете выполнить ALTER ROUTINE LOAD. После изменения вы можете выполнить оператор RESUME ROUTINE LOAD для его возобновления и проверить его статус с помощью оператора SHOW ROUTINE LOAD.

Предположим, что количество живых узлов BE увеличивается до 6, а разделы топика Kafka для потребления — "0,1,2,3,4,5,6,7". Если вы хотите увеличить фактический параллелизм задач загрузки, вы можете выполнить следующий оператор для увеличения количества желаемого параллелизма задач desired_concurrent_number до 6 (больше или равно количеству живых узлов BE) и указать разделы топика Kafka и начальные смещения.

ПРИМЕЧАНИЕ

Поскольку фактический параллелизм задач определяется минимальным значением нескольких параметров, вы должны убедиться, что значение динамического параметра FE max_routine_load_task_concurrent_num больше или равно 6.

ALTER ROUTINE LOAD FOR example_tbl2_ordertest2
PROPERTIES
(
"desired_concurrent_number" = "6"
)
FROM kafka
(
"kafka_partitions" = "0,1,2,3,4,5,6,7",
"kafka_offsets" = "OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_END,OFFSET_END,OFFSET_END,OFFSET_END"
);

Остановка задания загрузки

Вы можете выполнить оператор STOP ROUTINE LOAD для остановки задания загрузки. Состояние задания загрузки будет STOPPED после выполнения оператора, и вы не можете возобновить остановленное задание загрузки. Вы не можете проверить статус остановленного задания загрузки с помощью оператора SHOW ROUTINE LOAD.

Следующий пример останавливает задание загрузки example_tbl2_ordertest2:

STOP ROUTINE LOAD FOR example_tbl2_ordertest2;