Перейти к основному содержимому
Версия: 2.0.x

Routine Load

Как улучшить производительность загрузки?

Метод 1: Увеличить фактический параллелизм задач загрузки, разделив задание загрузки на максимально возможное количество параллельных задач загрузки.

ПРИМЕЧАНИЕ

Этот метод может потреблять больше ресурсов CPU и вызывать слишком много версий tablet.

Фактический параллелизм задач загрузки определяется следующей формулой, состоящей из нескольких параметров, с верхним пределом в виде количества работающих узлов BE или количества партиций для потребления.

min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num)

Описание параметров:

  • alive_be_number: количество работающих узлов BE.
  • partition_number: количество партиций для потребления.
  • desired_concurrent_number: желаемый параллелизм задач загрузки для задания Routine Load. Значение по умолчанию — 3. Вы можете установить более высокое значение для этого параметра, чтобы увеличить фактический параллелизм задач загрузки.
    • Если вы ещё не создали задание Routine Load, вам нужно установить этот параметр при использовании CREATE ROUTINE LOAD для создания задания Routine Load.
    • Если вы уже создали задание Routine Load, вам нужно использовать ALTER ROUTINE LOAD для изменения этого параметра.
  • max_routine_load_task_concurrent_num: максимальный параллелизм задач по умолчанию для задания Routine Load. Значение по умолчанию — 5. Этот параметр является динамическим параметром FE. Для получения дополнительной информации и метода конфигурации см. Конфигурация параметров.

Следовательно, когда количество партиций для потребления и количество работающих узлов BE больше двух других параметров, вы можете увеличить значения параметров desired_concurrent_number и max_routine_load_task_concurrent_num для увеличения фактического параллелизма задач загрузки.

Например, количество партиций для потребления — 7, количество работающих узлов BE — 5, а max_routine_load_task_concurrent_num имеет значение по умолчанию 5. В это время, если вам нужно увеличить параллелизм задач загрузки до верхнего предела, вам нужно установить desired_concurrent_number на 5 (значение по умолчанию — 3). Тогда фактический параллелизм задач min(5,7,5,5) вычисляется как 5.

Для получения дополнительных описаний параметров см. CREATE ROUTINE LOAD.

Метод 2: Увеличить объём данных, потребляемых задачей Routine Load из одной или нескольких партиций.

ПРИМЕЧАНИЕ

Этот метод может вызвать задержку в загрузке данных.

Верхний предел количества сообщений, которые может потребить задача Routine Load, определяется либо параметром max_routine_load_batch_size, который означает максимальное количество сообщений, которые может потребить задача загрузки, либо параметром routine_load_task_consume_second, который означает максимальную продолжительность потребления сообщений. Как только задача загрузки потребит достаточно данных, соответствующих любому из требований, потребление завершено. Эти два параметра являются динамическими параметрами FE. Для получения дополнительной информации и метода конфигурации см. Конфигурация параметров.

Вы можете проанализировать, какой параметр определяет верхний предел объёма данных, потребляемых задачей загрузки, просмотрев журнал в be/log/be.INFO. Увеличивая этот параметр, вы можете увеличить объём данных, потребляемых задачей загрузки.

I0325 20:27:50.410579 15259 data_consumer_group.cpp:131] consumer group done: 41448fb1a0ca59ad-30e34dabfa7e47a0. consume time(ms)=3261, received rows=179190, received bytes=9855450, eos: 1, left_time: -261, left_bytes: 514432550, blocking get time(us): 3065086, blocking put time(us): 24855

Обычно поле left_bytes в журнале больше или равно 0, что указывает на то, что объём данных, потребляемых задачей загрузки, не превысил max_routine_load_batch_size в течение routine_load_task_consume_second. Это означает, что пакет запланированных задач загрузки может потреблять все данные из Kafka без задержки в потреблении. В этом сценарии вы можете установить большее значение для routine_load_task_consume_second, чтобы увеличить объём данных, потребляемых задачей загрузки из одной или нескольких партиций.

Если поле left_bytes меньше 0, это означает, что объём данных, потребляемых задачей загрузки, достиг max_routine_load_batch_size в течение routine_load_task_consume_second. Каждый раз данные из Kafka заполняют пакет запланированных задач загрузки. Следовательно, весьма вероятно, что в Kafka остались данные, которые не были потреблены, что вызывает задержку в потреблении. В этом случае вы можете установить большее значение для max_routine_load_batch_size.

Что делать, если результат SHOW ROUTINE LOAD показывает, что задание загрузки находится в состоянии PAUSED?

  • Проверьте поле ReasonOfStateChanged, и оно сообщает об ошибке Broker: Offset out of range.

    Анализ причины: Смещение потребителя задания загрузки не существует в партиции Kafka.

    Решение: Вы можете выполнить SHOW ROUTINE LOAD и проверить последнее смещение потребителя задания загрузки в параметре Progress. Затем вы можете проверить, существует ли соответствующее сообщение в партиции Kafka. Если оно не существует, это может быть потому, что

    • Смещение потребителя, указанное при создании задания загрузки, является смещением в будущем.
    • Сообщение с указанным смещением потребителя в партиции Kafka было удалено до того, как оно было потреблено заданием загрузки. Рекомендуется установить разумную политику очистки журналов Kafka и параметры, такие как log.retention.hours и log.retention.bytes, в зависимости от скорости загрузки.
  • Проверьте поле ReasonOfStateChanged, и оно не сообщает об ошибке Broker: Offset out of range.

    Анализ причины: Количество строк с ошибками в задаче загрузки превышает порог max_error_number.

    Решение: Вы можете устранить неполадки и исправить проблему, используя сообщения об ошибках в полях ReasonOfStateChanged и ErrorLogUrls.

    • Если это вызвано неправильным форматом данных в источнике данных, вам нужно проверить формат данных и исправить проблему. После успешного исправления проблемы вы можете использовать RESUME ROUTINE LOAD для возобновления приостановленного задания загрузки.

    • Если это потому, что Selena не может разобрать формат данных в источнике данных, вам нужно скорректировать порог max_error_number. Сначала вы можете выполнить SHOW ROUTINE LOAD, чтобы просмотреть значение max_error_number, а затем использовать ALTER ROUTINE LOAD для увеличения порога. После изменения порога вы можете использовать RESUME ROUTINE LOAD для возобновления приостановленного задания загрузки.

Что делать, если результат SHOW ROUTINE LOAD показывает, что задание загрузки находится в состоянии CANCELLED?

Анализ причины: Задание загрузки столкнулось с исключением во время загрузки, например, таблица была удалена.

Решение: При устранении неполадок и исправлении проблемы вы можете обратиться к сообщениям об ошибках в полях ReasonOfStateChanged и ErrorLogUrls. Однако после исправления проблемы вы не можете возобновить отменённое задание загрузки.

Может ли Routine Load гарантировать семантику согласованности при потреблении из Kafka и записи в Selena?

Routine Load гарантирует семантику exactly-once.

Каждая задача загрузки является отдельной транзакцией. Если во время выполнения транзакции возникает ошибка, транзакция прерывается, и FE не обновляет прогресс потребления соответствующих партиций задач загрузки. Когда FE планирует задачи загрузки из очереди задач в следующий раз, задачи загрузки отправляют запрос на потребление с последней сохранённой позиции потребления партиций, тем самым обеспечивая семантику exactly-once.

Что делать, если Routine Load возвращает ошибку SSL Authentication?

Сообщение об ошибке: routines:tls_process_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (install ca-certificates package) (after 273ms in state SSL_HANDSHAKE)

Анализ причины: Домен в сертификате отличается от домена Kafka Broker. См. подробности.

Решение: Добавьте свойство "property.ssl.endpoint.identification.algorithm"= "none" к заданию Routine Load.

Почему Routine Load сообщает "JSON data is an array.strip_outer_array must be set true"?

Ваши входные данные представляют собой JSON-массив ([{},{}]). Установите свойство strip_outer_array в true, чтобы развернуть его.

Почему я получаю "There are more than 100 routine load jobs running" при создании задания Routine Load?

Увеличьте значение конфигурации FE max_routine_load_job_num.

Почему создание задания Routine Load завершается ошибкой "failed to get partition meta" даже после настройки SASL?

Фактическая причина может заключаться в неправильной конфигурации SASL.

Как обработать ошибку Routine Load "Create replicas failed …"?

Скорректируйте следующие конфигурации FE:

admin set frontend config ("tablet_create_timeout_second"="5");
admin set frontend config ("max_create_table_timeout_second"="600");

Вы также можете установить их в файле конфигурации, чтобы сохранить изменения.

Почему Routine Load сообщает "Bad message format" при потреблении Kafka?

Kafka использует имя хоста для связи. Добавьте разрешение имени хоста для узлов Kafka в /etc/hosts на всех серверах, где размещены узлы Selena.

Что вызывает сбой Routine Load с ошибкой "failed to send task: failed to submit task. error code: TOO MANY TASKS"?

Это потому, что общий параллелизм Routine Load превышает возможности cluster (который равен routine_load_thread_pool_size × количество активных BE).

Решения:

  • Уменьшите QPS загрузки (рекомендуемый QPS cluster < 10). Рассчитайте QPS cluster на основе cluster routine_load_task_num / routine_load_task_consume_second.

  • Увеличьте размер пакета на задачу (> 1 ГБ), скорректировав max_routine_load_batch_size и routine_load_task_timeout_second.

  • Убедитесь, что routine_load_thread_pool_size меньше половины ядер CPU BE.

Параллелизм задания определяется минимумом из следующих значений:

  • kafka_partition_num
  • desired_concurrent_number
  • alive_be_num
  • max_routine_load_task_concurrent_num

Вы можете начать корректировку параллелизма с уменьшения max_routine_load_task_concurrent_num.