Перейти к основному содержимому

CREATE ROUTINE LOAD

подсказка

Попробуйте Routine Load в этом Быстром старте

Routine Load может непрерывно потреблять сообщения из Apache Kafka® и загружать данные в Selena. Routine Load может потреблять данные в форматах CSV, JSON и Avro (поддерживается с версии v3.0.1) из кластера Kafka и получать доступ к Kafka через несколько протоколов безопасности, включая plaintext, ssl, sasl_plaintext и sasl_ssl.

В этой теме описывается синтаксис, параметры и примеры оператора CREATE ROUTINE LOAD.

ПРИМЕЧАНИЕ

  • Для получения информации о сценариях применения, принципах и основных операциях Routine Load см. Загрузка данных с помощью Routine Load.
  • Вы можете загружать данные в таблицы Selena только как пользователь, имеющий привилегию INSERT для этих таблиц Selena. Если у вас нет привилегии INSERT, следуйте инструкциям в GRANT для предоставления привилегии INSERT пользователю, которого вы используете для подключения к кластеру Selena.

Синтаксис

CREATE ROUTINE LOAD <database_name>.<job_name> ON <table_name>
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]

Параметры

database_name, job_name, table_name

database_name

Необязательный. Имя базы данных Selena.

job_name

Обязательный. Имя задания Routine Load. Таблица может получать данные от нескольких заданий Routine Load. Мы рекомендуем установить осмысленное имя задания Routine Load, используя идентифицируемую информацию, например, имя топика Kafka и приблизительное время создания задания, чтобы различать несколько заданий Routine Load. Имя задания Routine Load должно быть уникальным в пределах одной базы данных.

table_name

Обязательный. Имя таблицы Selena, в которую загружаются данные.

load_properties

Необязательный. Свойства данных. Синтаксис:

[COLUMNS TERMINATED BY '<column_separator>'],
[ROWS TERMINATED BY '<row_separator>'],
[COLUMNS (<column1_name>[, <column2_name>, <column_assignment>, ... ])],
[WHERE <expr>],
[PARTITION (<partition1_name>[, <partition2_name>, ...])]
[TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name>, ...])]

COLUMNS TERMINATED BY

Разделитель столбцов для данных в формате CSV. Разделитель столбцов по умолчанию — \t (табуляция). Например, вы можете использовать COLUMNS TERMINATED BY "," для указания запятой в качестве разделителя столбцов.

Примечание

  • Убедитесь, что разделитель столбцов, указанный здесь, совпадает с разделителем столбцов в данных, которые будут загружены.
  • Вы можете использовать строку UTF-8, такую как запятая (,), табуляция или вертикальная черта (|), длина которой не превышает 50 байт, в качестве текстового разделителя.
  • Нулевые значения обозначаются с помощью \N. Например, запись данных состоит из трех столбцов, и запись содержит данные в первом и третьем столбцах, но не содержит данных во втором столбце. В этой ситуации вам нужно использовать \N во втором столбце для обозначения нулевого значения. Это означает, что запись должна быть составлена как a,\N,b вместо a,,b. a,,b означает, что второй столбец записи содержит пустую строку.

ROWS TERMINATED BY

Разделитель строк для данных в формате CSV. Разделитель строк по умолчанию — \n.

COLUMNS

Сопоставление между столбцами в исходных данных и столбцами в таблице Selena. Для получения дополнительной информации см. Сопоставление столбцов в этой теме.

  • column_name: Если столбец исходных данных может быть сопоставлен со столбцом таблицы Selena без каких-либо вычислений, вам нужно указать только имя столбца. Эти столбцы можно назвать сопоставленными столбцами.
  • column_assignment: Если столбец исходных данных не может быть напрямую сопоставлен со столбцом таблицы Selena, и значения столбца должны быть вычислены с использованием функций перед загрузкой данных, вы должны указать функцию вычисления в expr. Эти столбцы можно назвать производными столбцами. Рекомендуется размещать производные столбцы после сопоставленных столбцов, поскольку Selena сначала анализирует сопоставленные столбцы.

WHERE

Условие фильтрации. Только данные, соответствующие условию фильтрации, могут быть загружены в Selena. Например, если вы хотите загрузить только строки, где значение col1 больше 100, а значение col2 равно 1000, вы можете использовать WHERE col1 > 100 and col2 = 1000.

ПРИМЕЧАНИЕ

Столбцы, указанные в условии фильтрации, могут быть исходными столбцами или производными столбцами.

PARTITION

Если таблица Selena распределена по разделам p0, p1, p2 и p3, и вы хотите загрузить данные только в p1, p2 и p3 в Selena и отфильтровать данные, которые будут храниться в p0, то вы можете указать PARTITION(p1, p2, p3) в качестве условия фильтрации. По умолчанию, если вы не указываете этот параметр, данные будут загружены во все разделы. Пример:

PARTITION (p1, p2, p3)

TEMPORARY PARTITION

Имя временного раздела, в который вы хотите загрузить данные. Вы можете указать несколько временных разделов, которые должны быть разделены запятыми (,).

job_properties

Обязательный. Свойства задания загрузки. Синтаксис:

PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
СвойствоОбязательныйОписание
desired_concurrent_numberНетОжидаемый параллелизм задач одного задания Routine Load. Значение по умолчанию: 3. Фактический параллелизм задач определяется минимальным значением из нескольких параметров: min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num).
  • alive_be_number: количество активных узлов BE.
  • partition_number: количество разделов для потребления.
  • desired_concurrent_number: ожидаемый параллелизм задач одного задания Routine Load. Значение по умолчанию: 3.
  • max_routine_load_task_concurrent_num: максимальный параллелизм задач задания Routine Load по умолчанию, который составляет 5. См. Динамический параметр FE.
Максимальный фактический параллелизм задач определяется либо количеством активных узлов BE, либо количеством разделов для потребления.
max_batch_intervalНетИнтервал планирования для задачи, то есть как часто выполняется задача. Единица: секунды. Диапазон значений: 5 ~ 60. Значение по умолчанию: 10. Рекомендуется установить значение больше 10. Если планирование короче 10 секунд, создается слишком много версий планшетов из-за чрезмерно высокой частоты загрузки.
max_batch_rowsНетЭто свойство используется только для определения окна обнаружения ошибок. Окно — это количество строк данных, потребляемых одной задачей Routine Load. Значение составляет 10 * max_batch_rows. Значение по умолчанию — 10 * 200000 = 2000000. Задача Routine Load обнаруживает ошибочные данные в окне обнаружения ошибок. Ошибочные данные — это данные, которые Selena не может проанализировать, например, недопустимые данные в формате JSON.
max_error_numberНетМаксимальное количество строк ошибочных данных, разрешенное в окне обнаружения ошибок. Если количество строк ошибочных данных превышает это значение, задание загрузки приостанавливается. Вы можете выполнить SHOW ROUTINE LOAD и просмотреть журналы ошибок, используя ErrorLogUrls. После этого вы можете исправить ошибку в Kafka согласно журналам ошибок. Значение по умолчанию — 0, что означает, что строки с ошибками не разрешены.
ПРИМЕЧАНИЕ
  • Последний пакет задач успешно завершится до приостановки задания загрузки при наличии слишком большого количества строк ошибочных данных. То есть квалифицированные данные будут загружены, а неквалифицированные данные будут отфильтрованы. Если вы не хотите фильтровать слишком много неквалифицированных строк данных, установите параметр max_filter_ratio.
  • Строки ошибочных данных не включают строки данных, которые отфильтрованы предложением WHERE.
  • Этот параметр вместе со следующим параметром max_filter_ratio контролирует максимальное количество записей ошибочных данных. Когда max_filter_ratio не установлен, действует значение этого параметра. Когда max_filter_ratio установлен, задание загрузки приостанавливается, как только количество записей ошибочных данных достигает либо порога, установленного этим параметром, либо параметром max_filter_ratio.
max_filter_ratioНетМаксимальная допустимость ошибок задания загрузки. Допустимость ошибок — это максимальный процент записей данных, которые могут быть отфильтрованы из-за неадекватного качества данных во всех записях данных, запрошенных заданием загрузки. Допустимые значения: от 0 до 1. Значение по умолчанию: 1 (это означает, что фактически не будет действовать).
Мы рекомендуем установить его в 0. Таким образом, если обнаружены неквалифицированные записи данных, задание загрузки приостанавливается, тем самым обеспечивая правильность данных.
Если вы хотите игнорировать неквалифицированные записи данных, вы можете установить этот параметр в значение больше 0. Таким образом, задание загрузки может быть успешным, даже если файл данных содержит неквалифицированные записи данных.
ПРИМЕЧАНИЕ
  • Последний пакет задач завершится неудачей, когда будет слишком много строк ошибочных данных больше max_filter_ratio. Это немного отличается от эффекта max_error_number.
  • Неквалифицированные записи данных не включают записи данных, которые отфильтрованы предложением WHERE.
  • Этот параметр вместе с последним параметром max_error_number контролирует максимальное количество записей ошибочных данных. Когда этот параметр не установлен (он работает так же, как установка max_filter_ratio = 1), действует значение параметра max_error_number. Когда этот параметр установлен, задание загрузки приостанавливается, как только количество записей ошибочных данных достигает либо порога, установленного этим параметром, либо параметром max_error_number.
strict_modeНетУказывает, включить ли строгий режим. Допустимые значения: true и false. Значение по умолчанию: false. Когда строгий режим включен, если значение для столбца в загружаемых данных равно NULL, но целевая таблица не разрешает значение NULL для этого столбца, строка данных будет отфильтрована.
log_rejected_record_numНетУказывает максимальное количество неквалифицированных строк данных, которые могут быть зарегистрированы. Этот параметр поддерживается с версии v3.1. Допустимые значения: 0, -1 и любое ненулевое положительное целое число. Значение по умолчанию: 0.
  • Значение 0 указывает, что отфильтрованные строки данных не будут зарегистрированы.
  • Значение -1 указывает, что все отфильтрованные строки данных будут зарегистрированы.
  • Ненулевое положительное целое число, такое как n, указывает, что до n отфильтрованных строк данных могут быть зарегистрированы на каждом BE.
Вы можете получить доступ ко всем неквалифицированным строкам данных, которые отфильтрованы в задании загрузки, перейдя по пути, возвращенному в поле REJECTED_RECORD_PATH из запроса к представлению information_schema.loads.
timezoneНетЧасовой пояс, используемый заданием загрузки. Значение по умолчанию: Asia/Shanghai. Значение этого параметра влияет на результаты, возвращаемые такими функциями, как strftime(), alignment_timestamp() и from_unixtime(). Часовой пояс, указанный этим параметром, является часовым поясом уровня сеанса. Для получения дополнительной информации см. Настройка часового пояса.
partial_updateНетИспользовать ли частичные обновления. Допустимые значения: TRUE и FALSE. Значение по умолчанию: FALSE, указывающее на отключение этой функции.
merge_conditionНетУказывает имя столбца, который вы хотите использовать в качестве условия для определения необходимости обновления данных. Данные будут обновлены только тогда, когда значение данных, загружаемых в этот столбец, больше или равно текущему значению этого столбца. ПРИМЕЧАНИЕ
Только таблицы Primary Key поддерживают условные обновления. Столбец, который вы указываете, не может быть столбцом первичного ключа.
formatНетФормат загружаемых данных. Допустимые значения: CSV, JSON и Avro (поддерживается с версии v3.0.1). Значение по умолчанию: CSV.
trim_spaceНетУказывает, удалять ли пробелы, предшествующие и следующие за разделителями столбцов из файла данных, когда файл данных находится в формате CSV. Тип: BOOLEAN. Значение по умолчанию: false.
Для некоторых баз данных пробелы добавляются к разделителям столбцов при экспорте данных в виде файла данных в формате CSV. Такие пробелы называются ведущими пробелами или завершающими пробелами в зависимости от их расположения. Установив параметр trim_space, вы можете включить Selena для удаления таких ненужных пробелов во время загрузки данных.
Обратите внимание, что Selena не удаляет пробелы (включая ведущие пробелы и завершающие пробелы) внутри поля, заключенного в пару символов, указанных enclose. Например, следующие значения полей используют вертикальную черту (|) в качестве разделителя столбцов и двойные кавычки (") в качестве символа, указанного enclose: | "Love Selena" |. Если вы установите trim_space в true, Selena обработает предыдущие значения полей как |"Love Selena"|.
encloseНетУказывает символ, который используется для обертывания значений полей в файле данных согласно RFC4180, когда файл данных находится в формате CSV. Тип: однобайтовый символ. Значение по умолчанию: NONE. Наиболее распространенными символами являются одинарная кавычка (') и двойная кавычка (").
Все специальные символы (включая разделители строк и столбцов), обернутые с использованием символа, указанного enclose, считаются обычными символами. Selena может делать больше, чем RFC4180, поскольку позволяет указать любой однобайтовый символ в качестве символа, указанного enclose.
Если значение поля содержит символ, указанный enclose, вы можете использовать тот же символ для экранирования этого символа, указанного enclose. Например, вы устанавливаете enclose в ", а значение поля — a "quoted" c. В этом случае вы можете ввести значение поля как "a ""quoted"" c" в файл данных.
escapeНетУказывает символ, который используется для экранирования различных специальных символов, таких как разделители строк, разделители столбцов, символы экранирования и символы, указанные enclose, которые затем рассматриваются Selena как обычные символы и анализируются как часть значений полей, в которых они находятся. Тип: однобайтовый символ. Значение по умолчанию: NONE. Наиболее распространенным символом является косая черта (\), которая должна быть записана как двойная косая черта (\\) в операторах SQL.
ПРИМЕЧАНИЕ
Символ, указанный escape, применяется как внутри, так и снаружи каждой пары символов, указанных enclose.
Два примера следующие:
  • Когда вы устанавливаете enclose в " и escape в \, Selena анализирует "say \"Hello world\"" в say "Hello world".
  • Предположим, что разделитель столбцов — запятая (,). Когда вы устанавливаете escape в \, Selena анализирует a, b\, c в два отдельных значения полей: a и b, c.
strip_outer_arrayНетУказывает, удалять ли самую внешнюю структуру массива данных в формате JSON. Допустимые значения: true и false. Значение по умолчанию: false. В реальных бизнес-сценариях данные в формате JSON могут иметь самую внешнюю структуру массива, обозначенную парой квадратных скобок []. В этой ситуации мы рекомендуем установить этот параметр в true, чтобы Selena удалила самые внешние квадратные скобки [] и загрузила каждый внутренний массив как отдельную запись данных. Если вы установите этот параметр в false, Selena анализирует все данные в формате JSON в один массив и загружает массив как одну запись данных. Используйте данные в формате JSON [{"category" : 1, "author" : 2}, {"category" : 3, "author" : 4} ] в качестве примера. Если вы установите этот параметр в true, {"category" : 1, "author" : 2} и {"category" : 3, "author" : 4} анализируются как две отдельные записи данных и загружаются в две строки данных Selena.
jsonpathsНетИмена полей, которые вы хотите загрузить из данных в формате JSON. Значение этого параметра является допустимым выражением JsonPath. Для получения дополнительной информации см. Таблица Selena содержит производные столбцы, значения которых генерируются с использованием выражений) в этой теме.
json_rootНетКорневой элемент данных в формате JSON для загрузки. Selena извлекает элементы корневого узла через json_root для анализа. По умолчанию значение этого параметра пустое, указывая, что все данные в формате JSON будут загружены. Для получения дополнительной информации см. Указание корневого элемента данных в формате JSON для загрузки в этой теме.
task_consume_secondНетМаксимальное время для каждой задачи Routine Load в рамках указанного задания Routine Load для потребления данных. Единица: секунда. В отличие от динамических параметров FE routine_load_task_consume_second (который применяется ко всем заданиям Routine Load в кластере), этот параметр специфичен для отдельного задания Routine Load, что более гибко. Этот параметр поддерживается с версии v3.1.0.
  • Когда task_consume_second и task_timeout_second не настроены, Selena использует динамические параметры FE routine_load_task_consume_second и routine_load_task_timeout_second для управления поведением загрузки.
  • Когда настроен только task_consume_second, значение по умолчанию для task_timeout_second вычисляется как task_consume_second * 4.
  • Когда настроен только task_timeout_second, значение по умолчанию для task_consume_second вычисляется как task_timeout_second/4.
task_timeout_secondНетВремя ожидания для каждой задачи Routine Load в рамках указанного задания Routine Load. Единица: секунда. В отличие от динамического параметра FE routine_load_task_timeout_second (который применяется ко всем заданиям Routine Load в кластере), этот параметр специфичен для отдельного задания Routine Load, что более гибко. Этот параметр поддерживается с версии v3.1.0.
  • Когда task_consume_second и task_timeout_second не настроены, Selena использует динамические параметры FE routine_load_task_consume_second и routine_load_task_timeout_second для управления поведением загрузки.
  • Когда настроен только task_timeout_second, значение по умолчанию для task_consume_second вычисляется как task_timeout_second/4.
  • Когда настроен только task_consume_second, значение по умолчанию для task_timeout_second вычисляется как task_consume_second * 4.
pause_on_fatal_parse_errorНЕТУказывает, автоматически ли приостанавливать задание при обнаружении неустранимых ошибок анализа данных. Допустимые значения: true и false. Значение по умолчанию: false. Этот параметр поддерживается с версии v3.3.12/v3.4.2.
Такие ошибки анализа обычно вызваны недопустимыми форматами данных, такими как:
  • Импорт массива JSON без установки strip_outer_array.
  • Импорт данных JSON, но сообщение Kafka содержит недопустимый JSON, например abcd.

data_source, data_source_properties

Обязательный. Источник данных и соответствующие свойства.

FROM <data_source>
("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])

data_source

Обязательный. Источник данных, которые вы хотите загрузить. Допустимое значение: KAFKA.

data_source_properties

Свойства источника данных.

СвойствоОбязательныйОписание
kafka_broker_listДаИнформация о подключении брокера Kafka. Формат: <kafka_broker_ip>:<broker_ port>. Несколько брокеров разделяются запятыми (,). Порт по умолчанию, используемый брокерами Kafka, — 9092. Пример:"kafka_broker_list" = ""xxx.xx.xxx.xx:9092,xxx.xx.xxx.xx:9092".
kafka_topicДаТопик Kafka для потребления. Задание Routine Load может потреблять сообщения только из одного топика.
kafka_partitionsНетРазделы Kafka для потребления, например, "kafka_partitions" = "0, 1, 2, 3". Если это свойство не указано, все разделы потребляются по умолчанию.
kafka_offsetsНетНачальное смещение, с которого начинается потребление данных в разделе Kafka, как указано в kafka_partitions. Если это свойство не указано, задание Routine Load потребляет данные, начиная с последних смещений в kafka_partitions. Допустимые значения:
  • Конкретное смещение: потребляет данные, начиная с конкретного смещения.
  • OFFSET_BEGINNING: потребляет данные, начиная с самого раннего возможного смещения.
  • OFFSET_END: потребляет данные, начиная с последнего смещения.
Несколько начальных смещений разделяются запятыми (,), например, "kafka_offsets" = "1000, OFFSET_BEGINNING, OFFSET_END, 2000".
property.kafka_default_offsetsНетНачальное смещение по умолчанию для всех разделов потребителя. Поддерживаемые значения для этого свойства такие же, как для свойства kafka_offsets.
confluent.schema.registry.urlНетURL реестра схем, где зарегистрирована схема Avro. Selena получает схему Avro, используя этот URL. Формат следующий:
confluent.schema.registry.url = http[s]://[<schema-registry-api-key>:<schema-registry-api-secret>@]<hostname or ip address>[:<port>]

Дополнительные свойства, связанные с источником данных

Вы можете указать дополнительные свойства, связанные с источником данных (Kafka), которые эквивалентны использованию командной строки Kafka --property. Для получения дополнительных поддерживаемых свойств см. свойства для клиента-потребителя Kafka в свойствах конфигурации librdkafka.

ПРИМЕЧАНИЕ

Если значение свойства является именем файла, добавьте ключевое слово FILE: перед именем файла. Для получения информации о том, как создать файл, см. CREATE FILE.

  • Указание начального смещения по умолчанию для всех разделов, которые будут потребляться
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
  • Указание ID группы потребителей, используемой заданием Routine Load
"property.group.id" = "group_id_0"

Если property.group.id не указан, Selena генерирует случайное значение на основе имени задания Routine Load в формате {job_name}_{random uuid}, например simple_job_0a64fe25-3983-44b2-a4d8-f52d3af4c3e8.

  • Указание протокола безопасности и соответствующих параметров, используемых BE для доступа к Kafka

    Протокол безопасности может быть указан как plaintext (по умолчанию), ssl, sasl_plaintext или sasl_ssl. И вам нужно настроить связанные параметры в соответствии с указанным протоколом безопасности.

    Когда протокол безопасности установлен в sasl_plaintext или sasl_ssl, поддерживаются следующие механизмы аутентификации SASL:

    • PLAIN
    • SCRAM-SHA-256 и SCRAM-SHA-512
    • OAUTHBEARER
    • GSSAPI (Kerberos)

    Примеры:

    • Доступ к Kafka с использованием протокола безопасности SSL:

      "property.security.protocol" = "ssl", -- Указать протокол безопасности как SSL.
      "property.ssl.ca.location" = "FILE:ca-cert", -- Путь к файлу или каталогу с сертификатом CA для проверки ключа брокера kafka.
      -- Если сервер Kafka включает аутентификацию клиента, также требуются следующие три параметра:
      "property.ssl.certificate.location" = "FILE:client.pem", -- Путь к открытому ключу клиента, используемому для аутентификации.
      "property.ssl.key.location" = "FILE:client.key", -- Путь к закрытому ключу клиента, используемому для аутентификации.
      "property.ssl.key.password" = "xxxxxx" -- Пароль для закрытого ключа клиента.
    • Доступ к Kafka с использованием протокола безопасности SASL_PLAINTEXT и механизма аутентификации SASL/PLAIN:

      "property.security.protocol" = "SASL_PLAINTEXT", -- Указать протокол безопасности как SASL_PLAINTEXT.
      "property.sasl.mechanism" = "PLAIN", -- Указать механизм SASL как PLAIN, который является простым механизмом аутентификации имя пользователя/пароль.
      "property.sasl.username" = "admin", -- Имя пользователя SASL.
      "property.sasl.password" = "xxxxxx" -- Пароль SASL.
    • Доступ к Kafka с использованием протокола безопасности SASL_PLAINTEXT и механизма аутентификации SASL/GSSAPI (Kerberos):

      "property.security.protocol" = "SASL_PLAINTEXT", -- Указать протокол безопасности как SASL_PLAINTEXT.
      "property.sasl.mechanism" = "GSSAPI", -- Указать механизм аутентификации SASL как GSSAPI. Значение по умолчанию — GSSAPI.
      "property.sasl.kerberos.service.name" = "kafka", -- Имя службы брокера. Значение по умолчанию — kafka.
      "property.sasl.kerberos.keytab" = "/home/starrocks/starrocks.keytab", -- Расположение keytab клиента.
      "property.sasl.kerberos.principal" = "starrocks@YOUR.COM" -- Принципал Kerberos.
      примечание
      • С версии Selena v3.1.4 поддерживается аутентификация SASL/GSSAPI (Kerberos).

      • Модули, связанные с SASL, должны быть установлены на машине BE.

        # Debian/Ubuntu:
        sudo apt-get install libsasl2-modules-gssapi-mit libsasl2-dev
        # CentOS/Redhat:
        sudo yum install cyrus-sasl-gssapi cyrus-sasl-devel

Элементы конфигурации FE и BE

Для элементов конфигурации FE и BE, связанных с Routine Load, см. элементы конфигурации.

Сопоставление столбцов

Настройка сопоставления столбцов для загрузки данных в формате CSV

Если столбцы данных в формате CSV могут быть сопоставлены один к одному в последовательности со столбцами таблицы Selena, вам не нужно настраивать сопоставление столбцов между данными и таблицей Selena.

Если столбцы данных в формате CSV не могут быть сопоставлены один к одному в последовательности со столбцами таблицы Selena, вам нужно использовать параметр columns для настройки сопоставления столбцов между файлом данных и таблицей Selena. Это включает следующие два случая использования:

  • Одинаковое количество столбцов, но разная последовательность столбцов. Также данные из файла данных не нужно вычислять функциями перед загрузкой в соответствующие столбцы таблицы Selena.

    • В параметре columns вам нужно указать имена столбцов таблицы Selena в той же последовательности, в которой расположены столбцы файла данных.

    • Например, таблица Selena состоит из трех столбцов, которые являются col1, col2 и col3 в последовательности, и файл данных также состоит из трех столбцов, которые могут быть сопоставлены со столбцами таблицы Selena col3, col2 и col1 в последовательности. В этом случае вам нужно указать "columns: col3, col2, col1".

  • Разное количество столбцов и разная последовательность столбцов. Также данные из файла данных нужно вычислить функциями перед загрузкой в соответствующие столбцы таблицы Selena.

    В параметре columns вам нужно указать имена столбцов таблицы Selena в той же последовательности, в которой расположены столбцы файла данных, и указать функции, которые вы хотите использовать для вычисления данных. Два примера следующие:

    • Таблица Selena состоит из трех столбцов, которые являются col1, col2 и col3 в последовательности. Файл данных состоит из четырех столбцов, среди которых первые три столбца могут быть сопоставлены в последовательности со столбцами таблицы Selena col1, col2 и col3, а четвертый столбец не может быть сопоставлен ни с одним из столбцов таблицы Selena. В этом случае вам нужно временно указать имя для четвертого столбца файла данных, и временное имя должно отличаться от любого из имен столбцов таблицы Selena. Например, вы можете указать "columns: col1, col2, col3, temp", в котором четвертый столбец файла данных временно назван temp.
    • Таблица Selena состоит из трех столбцов, которые являются year, month и day в последовательности. Файл данных состоит только из одного столбца, который содержит значения даты и времени в формате yyyy-mm-dd hh:mm:ss. В этом случае вы можете указать "columns: col, year = year(col), month=month(col), day=day(col)", в котором col — временное имя столбца файла данных, а функции year = year(col), month=month(col) и day=day(col) используются для извлечения данных из столбца файла данных col и загрузки данных в соответствующие столбцы таблицы Selena. Например, year = year(col) используется для извлечения данных yyyy из столбца файла данных col и загрузки данных в столбец таблицы Selena year.

Для получения дополнительных примеров см. Настройка сопоставления столбцов.

Настройка сопоставления столбцов для загрузки данных в формате JSON или Avro

ПРИМЕЧАНИЕ

С версии v3.0.1 Selena поддерживает загрузку данных Avro с помощью Routine Load. При загрузке данных JSON или Avro конфигурация для сопоставления столбцов и преобразования одинакова. Поэтому в этом разделе данные JSON используются в качестве примера для введения конфигурации.

Если ключи данных в формате JSON имеют те же имена, что и столбцы таблицы Selena, вы можете загрузить данные в формате JSON, используя простой режим. В простом режиме вам не нужно указывать параметр jsonpaths. Этот режим требует, чтобы данные в формате JSON были объектом, обозначенным фигурными скобками {}, например {"category": 1, "author": 2, "price": "3"}. В этом примере category, author и price — имена ключей, и эти ключи могут быть сопоставлены один к одному по имени со столбцами category, author и price таблицы Selena. Для примеров см. простой режим.

Если ключи данных в формате JSON имеют разные имена от столбцов таблицы Selena, вы можете загрузить данные в формате JSON, используя согласованный режим. В согласованном режиме вам нужно использовать параметры jsonpaths и COLUMNS для указания сопоставления столбцов между данными в формате JSON и таблицей Selena:

  • В параметре jsonpaths укажите ключи JSON в последовательности, как они расположены в данных в формате JSON.
  • В параметре COLUMNS укажите сопоставление между ключами JSON и столбцами таблицы Selena:
    • Имена столбцов, указанные в параметре COLUMNS, сопоставляются один к одному в последовательности с данными в формате JSON.
    • Имена столбцов, указанные в параметре COLUMNS, сопоставляются один к одному по имени со столбцами таблицы Selena.

Для примеров см. Таблица Selena содержит производные столбцы, значения которых генерируются с использованием выражений.

Примеры

Загрузка данных в формате CSV

В этом разделе используются данные в формате CSV в качестве примера для описания того, как вы можете использовать различные настройки параметров и комбинации для удовлетворения ваших разнообразных требований к загрузке.

Подготовка набора данных

Предположим, вы хотите загрузить данные в формате CSV из топика Kafka с именем ordertest1. Каждое сообщение в наборе данных включает шесть столбцов: ID заказа, дата платежа, имя клиента, национальность, пол и цена.

2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
2020050802,2020-05-08,Julien Sorel,France,male,893
2020050803,2020-05-08,Dorian Grey,UK,male,1262
2020050901,2020-05-09,Anna Karenina,Russia,female,175
2020051001,2020-05-10,Tess Durbeyfield,US,female,986
2020051101,2020-05-11,Edogawa Conan,japan,male,8924

Создание таблицы

Согласно столбцам данных в формате CSV создайте таблицу с именем example_tbl1 в базе данных example_db.

CREATE TABLE example_db.example_tbl1 ( 
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Payment date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Nationality",
`gender` varchar(26) NULL COMMENT "Gender",
`price` double NULL COMMENT "Price")
DUPLICATE KEY (order_id,pay_dt)
DISTRIBUTED BY HASH(`order_id`);

Потребление данных, начиная с указанных смещений для указанных разделов

Если задание Routine Load должно потреблять данные, начиная с указанных разделов и смещений, вам нужно настроить параметры kafka_partitions и kafka_offsets.

CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1",
"kafka_partitions" ="0,1,2,3,4", -- разделы для потребления
"kafka_offsets" = "1000, OFFSET_BEGINNING, OFFSET_END, 2000" -- соответствующие начальные смещения
);

Улучшение производительности загрузки за счет увеличения параллелизма задач

Для улучшения производительности загрузки и избежания накопительного потребления вы можете увеличить параллелизм задач, увеличив значение desired_concurrent_number при создании задания Routine Load. Параллелизм задач позволяет разделить одно задание Routine Load на как можно больше параллельных задач.

Обратите внимание, что фактический параллелизм задач определяется минимальным значением среди следующих нескольких параметров:

min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num)

Примечание

Максимальный фактический параллелизм задач — это либо количество активных узлов BE, либо количество разделов для потребления.

Поэтому, когда количество активных узлов BE и количество разделов для потребления больше значений других двух параметров max_routine_load_task_concurrent_num и desired_concurrent_number, вы можете увеличить значения других двух параметров для увеличения фактического параллелизма задач.

Предположим, что количество разделов для потребления равно 7, количество активных узлов BE равно 5, а max_routine_load_task_concurrent_num имеет значение по умолчанию 5. Если вы хотите увеличить фактический параллелизм задач, вы можете установить desired_concurrent_number в 5 (значение по умолчанию — 3). В этом случае фактический параллелизм задач min(5,7,5,5) настроен на 5.

CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"desired_concurrent_number" = "5" -- установить значение desired_concurrent_number в 5
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);

Настройка сопоставления столбцов

Если последовательность столбцов в данных в формате CSV не соответствует столбцам в целевой таблице, предполагая, что пятый столбец в данных в формате CSV не нужно импортировать в целевую таблицу, вам нужно указать сопоставление столбцов между данными в формате CSV и целевой таблицей через параметр COLUMNS.

Целевая база данных и таблица

Создайте целевую таблицу example_tbl2 в целевой базе данных example_db согласно столбцам в данных в формате CSV. В этом сценарии вам нужно создать пять столбцов, соответствующих пяти столбцам в данных в формате CSV, за исключением пятого столбца, который хранит пол.

CREATE TABLE example_db.example_tbl2 ( 
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Payment date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Nationality",
`price` double NULL COMMENT "Price"
)
DUPLICATE KEY (order_id,pay_dt)
DISTRIBUTED BY HASH(order_id);

Задание Routine Load

В этом примере, поскольку пятый столбец в данных в формате CSV не нужно загружать в целевую таблицу, пятый столбец временно назван temp_gender в COLUMNS, а другие столбцы напрямую сопоставляются с таблицей example_tbl2.

CREATE ROUTINE LOAD example_db.example_tbl2_ordertest1 ON example_tbl2
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);

Установка условий фильтрации

Если вы хотите загрузить только данные, соответствующие определенным условиям, вы можете установить условия фильтрации в предложении WHERE, например, price > 100.

CREATE ROUTINE LOAD example_db.example_tbl2_ordertest1 ON example_tbl2
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price),
WHERE price > 100 -- установить условие фильтрации
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);

Включение строгого режима для фильтрации строк со значениями NULL

В PROPERTIES вы можете установить "strict_mode" = "true", что означает, что задание Routine Load находится в строгом режиме. Если в исходном столбце есть значение NULL, но столбец целевой таблицы Selena не разрешает значения NULL, строка, содержащая значение NULL в исходном столбце, отфильтровывается.

CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"strict_mode" = "true" -- включить строгий режим
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);

Установка допустимости ошибок

Если ваш бизнес-сценарий имеет низкую толерантность к неквалифицированным данным, вам нужно установить окно обнаружения ошибок и максимальное количество строк ошибочных данных, настроив параметры max_batch_rows и max_error_number. Когда количество строк ошибочных данных в окне обнаружения ошибок превышает значение max_error_number, задание Routine Load приостанавливается.

CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"max_batch_rows" = "100000",-- Значение max_batch_rows, умноженное на 10, равно окну обнаружения ошибок.
"max_error_number" = "100" -- Максимальное количество строк ошибочных данных, разрешенное в окне обнаружения ошибок.
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);

Указание протокола безопасности как SSL и настройка связанных параметров

Если вам нужно указать протокол безопасности как SSL, используемый BE для доступа к Kafka, вам нужно настроить "property.security.protocol" = "ssl" и связанные параметры.

CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1",
-- Указать протокол безопасности как SSL.
"property.security.protocol" = "ssl",
-- Расположение сертификата CA.
"property.ssl.ca.location" = "FILE:ca-cert",
-- Если аутентификация включена для клиентов Kafka, вам нужно настроить следующие свойства:
-- Расположение открытого ключа клиента Kafka.
"property.ssl.certificate.location" = "FILE:client.pem",
-- Расположение закрытого ключа клиента Kafka.
"property.ssl.key.location" = "FILE:client.key",
-- Пароль к закрытому ключу клиента Kafka.
"property.ssl.key.password" = "abcdefg"
);

Настройка trim_space, enclose и escape

Предположим, вы хотите загрузить данные в формате CSV из топика Kafka с именем test_csv. Каждое сообщение в наборе данных включает шесть столбцов: ID заказа, дата платежа, имя клиента, национальность, пол и цена.

 "2020050802" , "2020-05-08" , "Johann Georg Faust" , "Deutschland" , "male" , "895"
"2020050802" , "2020-05-08" , "Julien Sorel" , "France" , "male" , "893"
"2020050803" , "2020-05-08" , "Dorian Grey\,Lord Henry" , "UK" , "male" , "1262"
"2020050901" , "2020-05-09" , "Anna Karenina" , "Russia" , "female" , "175"
"2020051001" , "2020-05-10" , "Tess Durbeyfield" , "US" , "female" , "986"
"2020051101" , "2020-05-11" , "Edogawa Conan" , "japan" , "male" , "8924"

Если вы хотите загрузить все данные из топика Kafka test_csv в example_tbl1, с намерением удалить пробелы, предшествующие и следующие за разделителями столбцов, и установить enclose в " и escape в \, выполните следующую команду:

CREATE ROUTINE LOAD example_db.example_tbl1_test_csv ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"trim_space"="true",
"enclose"="\"",
"escape"="\\",
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic"="test_csv",
"property.kafka_default_offsets"="OFFSET_BEGINNING"
);

Загрузка данных в формате JSON

Имена столбцов таблицы Selena соответствуют именам ключей JSON

Подготовка набора данных

Например, следующие данные в формате JSON существуют в топике Kafka ordertest2.

{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875}
{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895}
{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}

Примечание Каждый объект JSON должен быть в одном сообщении Kafka. В противном случае возникает ошибка, указывающая на сбой в анализе данных в формате JSON.

Целевая база данных и таблица

Создайте таблицу example_tbl3 в целевой базе данных example_db в кластере Selena. Имена столбцов соответствуют именам ключей в данных в формате JSON.

CREATE TABLE example_db.example_tbl3 ( 
commodity_id varchar(26) NULL,
customer_name varchar(26) NULL,
country varchar(26) NULL,
pay_time bigint(20) NULL,
price double SUM NULL COMMENT "Price")
AGGREGATE KEY(commodity_id,customer_name,country,pay_time)
DISTRIBUTED BY HASH(commodity_id);

Задание Routine Load

Вы можете использовать простой режим для задания Routine Load. То есть вам не нужно указывать параметры jsonpaths и COLUMNS при создании задания Routine Load. Selena извлекает ключи данных в формате JSON в топике ordertest2 кластера Kafka согласно именам столбцов целевой таблицы example_tbl3 и загружает данные в формате JSON в целевую таблицу.

CREATE ROUTINE LOAD example_db.example_tbl3_ordertest2 ON example_tbl3
PROPERTIES
(
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2"
);

Примечание

  • Если самый внешний слой данных в формате JSON является структурой массива, вам нужно установить "strip_outer_array"="true" в PROPERTIES для удаления самой внешней структуры массива. Кроме того, когда вам нужно указать jsonpaths, корневой элемент всех данных в формате JSON является сплющенным объектом JSON, поскольку самая внешняя структура массива данных в формате JSON удалена.
  • Вы можете использовать json_root для указания корневого элемента данных в формате JSON.

Таблица Selena содержит производные столбцы, значения которых генерируются с использованием выражений

Подготовка набора данных

Например, следующие данные в формате JSON существуют в топике ordertest2 кластера Kafka.

{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875}
{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895}
{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}

Целевая база данных и таблица

Создайте таблицу с именем example_tbl4 в базе данных example_db в кластере Selena. Столбец pay_dt является производным столбцом, значения которого генерируются путем вычисления значений ключа pay_time в данных в формате JSON.

CREATE TABLE example_db.example_tbl4 ( 
`commodity_id` varchar(26) NULL,
`customer_name` varchar(26) NULL,
`country` varchar(26) NULL,
`pay_time` bigint(20) NULL,
`pay_dt` date NULL,
`price` double SUM NULL)
AGGREGATE KEY(`commodity_id`,`customer_name`,`country`,`pay_time`,`pay_dt`)
DISTRIBUTED BY HASH(`commodity_id`);

Задание Routine Load

Вы можете использовать согласованный режим для задания Routine Load. То есть вам нужно указать параметры jsonpaths и COLUMNS при создании задания Routine Load.

Вам нужно указать ключи данных в формате JSON и расположить их в последовательности в параметре jsonpaths.

И поскольку значения в ключе pay_time данных в формате JSON нужно преобразовать в тип DATE перед тем, как значения будут сохранены в столбце pay_dt таблицы example_tbl4, вам нужно указать вычисление, используя pay_dt=from_unixtime(pay_time,'%Y%m%d') в COLUMNS. Значения других ключей в данных в формате JSON могут быть напрямую сопоставлены с таблицей example_tbl4.

CREATE ROUTINE LOAD example_db.example_tbl4_ordertest2 ON example_tbl4
COLUMNS(commodity_id, customer_name, country, pay_time, pay_dt=from_unixtime(pay_time, '%Y%m%d'), price)
PROPERTIES
(
"format" = "json",
"jsonpaths" = "[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2"
);

Примечание

  • Если самый внешний слой данных JSON является структурой массива, вам нужно установить "strip_outer_array"="true" в PROPERTIES для удаления самой внешней структуры массива. Кроме того, когда вам нужно указать jsonpaths, корневой элемент всех данных JSON является сплющенным объектом JSON, поскольку самая внешняя структура массива данных JSON удалена.
  • Вы можете использовать json_root для указания корневого элемента данных в формате JSON.

Таблица Selena содержит производный столбец, значения которого генерируются с использованием выражения CASE

Подготовка набора данных

Например, следующие данные в формате JSON существуют в топике Kafka topic-expr-test.

{"key1":1, "key2": 21}
{"key1":12, "key2": 22}
{"key1":13, "key2": 23}
{"key1":14, "key2": 24}

Целевая база данных и таблица

Создайте таблицу с именем tbl_expr_test в базе данных example_db в кластере Selena. Целевая таблица tbl_expr_test содержит два столбца, где значения столбца col2 нужно вычислить, используя выражение case для данных JSON.

CREATE TABLE tbl_expr_test (
col1 string, col2 string)
DISTRIBUTED BY HASH (col1);

Задание Routine Load

Поскольку значения в столбце col2 в целевой таблице генерируются с использованием выражения CASE, вам нужно указать соответствующее выражение в параметре COLUMNS для задания Routine load.

CREATE ROUTINE LOAD rl_expr_test ON tbl_expr_test
COLUMNS (
key1,
key2,
col1 = key1,
col2 = CASE WHEN key1 = "1" THEN "key1=1"
WHEN key1 = "12" THEN "key1=12"
ELSE "nothing" END)
PROPERTIES ("format" = "json")
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "topic-expr-test"
);

Запрос таблицы Selena

Запросите таблицу Selena. Результат показывает, что значения в столбце col2 являются выводом выражения CASE.

MySQL [example_db]> SELECT * FROM tbl_expr_test;
+------+---------+
| col1 | col2 |
+------+---------+
| 1 | key1=1 |
| 12 | key1=12 |
| 13 | nothing |
| 14 | nothing |
+------+---------+
4 rows in set (0.015 sec)

Указание корневого элемента данных в формате JSON для загрузки

Вам нужно использовать json_root для указания корневого элемента данных в формате JSON для загрузки, и значение должно быть допустимым выражением JsonPath.

Подготовка набора данных

Например, следующие данные в формате JSON существуют в топике ordertest3 кластера Kafka. И корневой элемент данных в формате JSON для загрузки — $.RECORDS.

{"RECORDS":[{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875},{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895},{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}]}

Целевая база данных и таблица

Создайте таблицу с именем example_tbl3 в базе данных example_db в кластере Selena.

CREATE TABLE example_db.example_tbl3 ( 
commodity_id varchar(26) NULL,
customer_name varchar(26) NULL,
country varchar(26) NULL,
pay_time bigint(20) NULL,
price double SUM NULL)
AGGREGATE KEY(commodity_id,customer_name,country,pay_time)
ENGINE=OLAP
DISTRIBUTED BY HASH(commodity_id);

Задание Routine Load

Вы можете установить "json_root" = "$.RECORDS" в PROPERTIES для указания корневого элемента данных в формате JSON для загрузки. Также, поскольку данные в формате JSON для загрузки находятся в структуре массива, вы также должны установить "strip_outer_array" = "true" для удаления самой внешней структуры массива.

CREATE ROUTINE LOAD example_db.example_tbl3_ordertest3 ON example_tbl3
PROPERTIES
(
"format" = "json",
"json_root" = "$.RECORDS",
"strip_outer_array" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2"
);

Загрузка данных в формате Avro

С версии v3.0.1 Selena поддерживает загрузку данных Avro с помощью Routine Load.

Схема Avro простая

Предположим, что схема Avro относительно простая, и вам нужно загрузить все поля данных Avro.

Подготовка набора данных

  • Схема Avro

    1. Создайте следующий файл схемы Avro avro_schema1.avsc:

      {
      "type": "record",
      "name": "sensor_log",
      "fields" : [
      {"name": "id", "type": "long"},
      {"name": "name", "type": "string"},
      {"name": "checked", "type" : "boolean"},
      {"name": "data", "type": "double"},
      {"name": "sensor_type", "type": {"type": "enum", "name": "sensor_type_enum", "symbols" : ["TEMPERATURE", "HUMIDITY", "AIR-PRESSURE"]}}
      ]
      }
    2. Зарегистрируйте схему Avro в Schema Registry.

  • Данные Avro

Подготовьте данные Avro и отправьте их в топик Kafka topic_1.

Целевая база данных и таблица

Согласно полям данных Avro создайте таблицу sensor_log1 в целевой базе данных sensor в кластере Selena. Имена столбцов таблицы должны соответствовать именам полей в данных Avro. Для сопоставления типов данных при загрузке данных Avro в Selena см. [Сопоставление типов данных](#Data types mapping).

CREATE TABLE sensor.sensor_log1 ( 
`id` bigint NOT NULL COMMENT "sensor id",
`name` varchar(26) NOT NULL COMMENT "sensor name",
`checked` boolean NOT NULL COMMENT "checked",
`data` double NULL COMMENT "sensor data",
`sensor_type` varchar(26) NOT NULL COMMENT "sensor type"
)
ENGINE=OLAP
DUPLICATE KEY (id)
DISTRIBUTED BY HASH(`id`);

Задание Routine Load

Вы можете использовать простой режим для задания Routine Load. То есть вам не нужно указывать параметр jsonpaths при создании задания Routine Load. Выполните следующий оператор для отправки задания Routine Load с именем sensor_log_load_job1 для потребления сообщений Avro в топике Kafka topic_1 и загрузки данных в таблицу sensor_log1 в базе данных sensor.

CREATE ROUTINE LOAD sensor.sensor_log_load_job1 ON sensor_log1  
PROPERTIES
(
"format" = "avro"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>,...",
"confluent.schema.registry.url" = "http://172.xx.xxx.xxx:8081",
"kafka_topic"= "topic_1",
"kafka_partitions" = "0,1,2,3,4,5",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Схема Avro содержит вложенное поле типа record

Предположим, что схема Avro содержит вложенное поле типа record, и вам нужно загрузить подполе во вложенном поле типа record в Selena.

Подготовка набора данных

  • Схема Avro

    1. Создайте следующий файл схемы Avro avro_schema2.avsc. Внешняя запись Avro включает пять полей, которые являются id, name, checked, sensor_type и data в последовательности. И поле data имеет вложенную запись data_record.

      {
      "type": "record",
      "name": "sensor_log",
      "fields" : [
      {"name": "id", "type": "long"},
      {"name": "name", "type": "string"},
      {"name": "checked", "type" : "boolean"},
      {"name": "sensor_type", "type": {"type": "enum", "name": "sensor_type_enum", "symbols" : ["TEMPERATURE", "HUMIDITY", "AIR-PRESSURE"]}},
      {"name": "data", "type":
      {
      "type": "record",
      "name": "data_record",
      "fields" : [
      {"name": "data_x", "type" : "boolean"},
      {"name": "data_y", "type": "long"}
      ]
      }
      }
      ]
      }
    2. Зарегистрируйте схему Avro в Schema Registry.

  • Данные Avro

Подготовьте данные Avro и отправьте их в топик Kafka topic_2.

Целевая база данных и таблица

Согласно полям данных Avro создайте таблицу sensor_log2 в целевой базе данных sensor в кластере Selena.

Предположим, что в дополнение к загрузке полей id, name, checked и sensor_type внешней записи вам также нужно загрузить подполе data_y во вложенной записи data_record.

CREATE TABLE sensor.sensor_log2 ( 
`id` bigint NOT NULL COMMENT "sensor id",
`name` varchar(26) NOT NULL COMMENT "sensor name",
`checked` boolean NOT NULL COMMENT "checked",
`sensor_type` varchar(26) NOT NULL COMMENT "sensor type",
`data_y` long NULL COMMENT "sensor data"
)
ENGINE=OLAP
DUPLICATE KEY (id)
DISTRIBUTED BY HASH(`id`);

Задание Routine Load

Отправьте задание загрузки, используйте jsonpaths для указания полей данных Avro, которые нужно загрузить. Обратите внимание, что для подполя data_y во вложенной записи вам нужно указать его jsonpath как "$.data.data_y".

CREATE ROUTINE LOAD sensor.sensor_log_load_job2 ON sensor_log2  
PROPERTIES
(
"format" = "avro",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.checked\",\"$.sensor_type\",\"$.data.data_y\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>,...",
"confluent.schema.registry.url" = "http://172.xx.xxx.xxx:8081",
"kafka_topic" = "topic_1",
"kafka_partitions" = "0,1,2,3,4,5",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Схема Avro содержит поле Union

Подготовка набора данных

Предположим, что схема Avro содержит поле Union, и вам нужно загрузить поле Union в Selena.

  • Схема Avro

    1. Создайте следующий файл схемы Avro avro_schema3.avsc. Внешняя запись Avro включает пять полей, которые являются id, name, checked, sensor_type и data в последовательности. И поле data имеет тип Union и включает два элемента, null и вложенную запись data_record.

      {
      "type": "record",
      "name": "sensor_log",
      "fields" : [
      {"name": "id", "type": "long"},
      {"name": "name", "type": "string"},
      {"name": "checked", "type" : "boolean"},
      {"name": "sensor_type", "type": {"type": "enum", "name": "sensor_type_enum", "symbols" : ["TEMPERATURE", "HUMIDITY", "AIR-PRESSURE"]}},
      {"name": "data", "type": [null,
      {
      "type": "record",
      "name": "data_record",
      "fields" : [
      {"name": "data_x", "type" : "boolean"},
      {"name": "data_y", "type": "long"}
      ]
      }
      ]
      }
      ]
      }
    2. Зарегистрируйте схему Avro в Schema Registry.

  • Данные Avro

Подготовьте данные Avro и отправьте их в топик Kafka topic_3.

Целевая база данных и таблица

Согласно полям данных Avro создайте таблицу sensor_log3 в целевой базе данных sensor в кластере Selena.

Предположим, что в дополнение к загрузке полей id, name, checked и sensor_type внешней записи вам также нужно загрузить поле data_y элемента data_record в поле типа Union data.

CREATE TABLE sensor.sensor_log3 ( 
`id` bigint NOT NULL COMMENT "sensor id",
`name` varchar(26) NOT NULL COMMENT "sensor name",
`checked` boolean NOT NULL COMMENT "checked",
`sensor_type` varchar(26) NOT NULL COMMENT "sensor type",
`data_y` long NULL COMMENT "sensor data"
)
ENGINE=OLAP
DUPLICATE KEY (id)
DISTRIBUTED BY HASH(`id`);

Задание Routine Load

Отправьте задание загрузки, используйте jsonpaths для указания полей, которые нужно загрузить в данных Avro. Обратите внимание, что для поля data_y вам нужно указать его jsonpath как "$.data.data_y".

CREATE ROUTINE LOAD sensor.sensor_log_load_job3 ON sensor_log3  
PROPERTIES
(
"format" = "avro",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.checked\",\"$.sensor_type\",\"$.data.data_y\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>,...",
"confluent.schema.registry.url" = "http://172.xx.xxx.xxx:8081",
"kafka_topic" = "topic_1",
"kafka_partitions" = "0,1,2,3,4,5",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Когда значение для поля типа Union data равно null, значение, загруженное в столбец data_y в таблице Selena, равно null. Когда значение для поля типа Union data является записью данных, значение, загруженное в столбец data_y, имеет тип Long.