Перейти к основному содержимому
Версия: 2.0.x

CREATE ROUTINE LOAD

подсказка

Попробуйте Routine Load в этом Кратком руководстве

Routine Load может непрерывно потреблять сообщения из Apache Kafka® и загружать данные в Selena. Routine Load может потреблять данные в форматах CSV, JSON и Avro (поддерживается с версии v1.5.2) из кластера Kafka и получать доступ к Kafka через несколько протоколов безопасности, включая plaintext, ssl, sasl_plaintext и sasl_ssl.

В этой теме описываются синтаксис, параметры и примеры оператора CREATE ROUTINE LOAD.

примечание
  • Информацию о сценариях применения, принципах и основных операциях Routine Load см. в разделе Загрузка данных с использованием Routine Load.
  • Вы можете загружать данные в таблицы Selena только как пользователь, у которого есть привилегия INSERT на эти таблицы Selena. Если у вас нет привилегии INSERT, следуйте инструкциям в разделе GRANT, чтобы предоставить привилегию INSERT пользователю, которого вы используете для подключения к кластеру Selena.

Синтаксис

CREATE ROUTINE LOAD <database_name>.<job_name> ON <table_name>
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]

Параметры

database_name, job_name, table_name

database_name

Опционально. Имя базы данных Selena.

job_name

Обязательно. Имя задачи Routine Load. Таблица может получать данные из нескольких задач Routine Load. Мы рекомендуем устанавливать осмысленное имя задачи Routine Load, используя идентифицируемую информацию, например, имя топика Kafka и приблизительное время создания задачи, чтобы различать несколько задач Routine Load. Имя задачи Routine Load должно быть уникальным в рамках одной базы данных.

table_name

Обязательно. Имя таблицы Selena, в которую загружаются данные.

load_properties

Опционально. Свойства данных. Синтаксис:

[COLUMNS TERMINATED BY '<column_separator>'],
[ROWS TERMINATED BY '<row_separator>'],
[COLUMNS (<column1_name>[, <column2_name>, <column_assignment>, ... ])],
[WHERE <expr>],
[PARTITION (<partition1_name>[, <partition2_name>, ...])]
[TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name>, ...])]

COLUMNS TERMINATED BY

Разделитель столбцов для данных в формате CSV. По умолчанию разделитель столбцов — \t (табуляция). Например, вы можете использовать COLUMNS TERMINATED BY ",", чтобы указать разделитель столбцов как запятую.

подсказка
  • Убедитесь, что указанный здесь разделитель столбцов совпадает с разделителем столбцов в загружаемых данных.
  • Вы можете использовать строку UTF-8, такую как запятая (,), табуляция или вертикальная черта (|), длина которой не превышает 50 байт, в качестве текстового разделителя.
  • Значения NULL обозначаются с помощью \N. Например, запись данных состоит из трех столбцов, и запись содержит данные в первом и третьем столбцах, но не содержит данных во втором столбце. В этой ситуации вам нужно использовать \N во втором столбце для обозначения значения NULL. Это означает, что запись должна быть скомпилирована как a,\N,b вместо a,,b. a,,b означает, что второй столбец записи содержит пустую строку.

ROWS TERMINATED BY

Разделитель строк для данных в формате CSV. По умолчанию разделитель строк — \n.

COLUMNS

Отображение между столбцами в исходных данных и столбцами в таблице Selena. Дополнительную информацию см. в разделе Отображение столбцов в этой теме.

  • column_name: Если столбец исходных данных может быть отображен на столбец таблицы Selena без каких-либо вычислений, вам нужно указать только имя столбца. Эти столбцы можно назвать отображаемыми столбцами.
  • column_assignment: Если столбец исходных данных не может быть напрямую отображен на столбец таблицы Selena, и значения столбца должны быть вычислены с помощью функций перед загрузкой данных, вы должны указать вычислительную функцию в expr. Эти столбцы можно назвать производными столбцами. Рекомендуется размещать производные столбцы после отображаемых столбцов, потому что Selena сначала обрабатывает отображаемые столбцы.

WHERE

Условие фильтрации. Только данные, которые соответствуют условию фильтрации, могут быть загружены в Selena. Например, если вы хотите загрузить только строки, где значение col1 больше 100, а значение col2 равно 1000, вы можете использовать WHERE col1 > 100 and col2 = 1000.

примечание

Столбцы, указанные в условии фильтрации, могут быть исходными столбцами или производными столбцами.

PARTITION

Если таблица Selena распределена по партициям p0, p1, p2 и p3, и вы хотите загрузить данные только в p1, p2 и p3 в Selena и отфильтровать данные, которые будут храниться в p0, тогда вы можете указать PARTITION(p1, p2, p3) в качестве условия фильтрации. По умолчанию, если вы не указываете этот параметр, данные будут загружены во все партиции. Пример:

PARTITION (p1, p2, p3)

TEMPORARY PARTITION

Имя временной партиции, в которую вы хотите загрузить данные. Вы можете указать несколько временных партиций, которые должны быть разделены запятыми (,).

job_properties

Обязательно. Свойства задачи загрузки. Синтаксис:

PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])

desired_concurrent_number

Обязательно: Нет
Описание: Ожидаемый параллелизм задачи для одной задачи Routine Load. Значение по умолчанию: 3. Фактический параллелизм задачи определяется минимальным значением из нескольких параметров: min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num).

  • alive_be_number: количество активных BE узлов.
  • partition_number: количество партиций для обработки.
  • desired_concurrent_number: ожидаемый параллелизм задачи для одной задачи Routine Load. Значение по умолчанию: 3.
  • max_routine_load_task_concurrent_num: максимальный параллелизм задачи по умолчанию для задачи Routine Load, который равен 5. См. Динамические параметры FE.
Максимальный фактический параллелизм задачи определяется либо количеством активных BE узлов, либо количеством партиций для обработки.

max_batch_interval

Обязательно: Нет
Описание: Интервал планирования задачи, то есть, как часто выполняется задача. Единица: секунды. Диапазон значений: 5 ~ 60. Значение по умолчанию: 10. Рекомендуется установить значение больше 10. Если планирование короче 10 секунд, генерируется слишком много версий tablet из-за чрезмерно высокой частоты загрузки.

max_batch_rows

Обязательно: Нет
Описание: Это свойство используется только для определения окна обнаружения ошибок. Окно представляет собой количество строк данных, обработанных одной задачей Routine Load. Значение 10 * max_batch_rows. Значение по умолчанию 10 * 200000 = 2000000. Задача Routine Load обнаруживает ошибочные данные в окне обнаружения ошибок. Ошибочные данные — это данные, которые Selena не может обработать, например, недопустимые данные в формате JSON.

max_error_number

Обязательно: Нет
Описание: Максимальное количество строк с ошибочными данными, разрешенное в окне обнаружения ошибок. Если количество строк с ошибочными данными превышает это значение, задача загрузки приостанавливается. Вы можете выполнить SHOW ROUTINE LOAD и просмотреть журналы ошибок с помощью ErrorLogUrls. После этого вы можете исправить ошибку в Kafka в соответствии с журналами ошибок. Значение по умолчанию 0, что означает, что строки с ошибками не разрешены.
ПРИМЕЧАНИЕ

  • Последний пакет задач будет успешен перед приостановкой задачи загрузки, когда слишком много строк с ошибочными данными. То есть, квалифицированные данные будут загружены, а неквалифицированные данные будут отфильтрованы. Если вы не хотите фильтровать слишком много неквалифицированных строк данных, установите параметр max_filter_ratio.
  • Строки с ошибочными данными не включают строки данных, которые отфильтрованы предложением WHERE.
  • Этот параметр вместе со следующим параметром max_filter_ratio контролирует максимальное количество записей с ошибочными данными. Когда max_filter_ratio не установлен, вступает в силу значение этого параметра. Когда max_filter_ratio установлен, задача загрузки приостанавливается, как только количество записей с ошибочными данными достигает порога, установленного этим параметром или параметром max_filter_ratio.

max_filter_ratio

Обязательно: Нет
Описание: Максимальная допустимая погрешность задачи загрузки. Допустимая погрешность — это максимальный процент записей данных, которые могут быть отфильтрованы из-за недостаточного качества данных из всех записей данных, запрошенных задачей загрузки. Допустимые значения: от 0 до 1. Значение по умолчанию: 1 (это означает, что на самом деле оно не вступит в силу).
Мы рекомендуем установить значение 0. Таким образом, если обнаружены неквалифицированные записи данных, задача загрузки приостанавливается, обеспечивая правильность данных.
Если вы хотите игнорировать неквалифицированные записи данных, вы можете установить этот параметр в значение больше 0. Таким образом, задача загрузки может быть успешной, даже если файл данных содержит неквалифицированные записи данных.
ПРИМЕЧАНИЕ

  • Последний пакет задач будет неудачным, когда слишком много строк с ошибочными данными больше, чем max_filter_ratio. Это немного отличается от эффекта max_error_number.
  • Неквалифицированные записи данных не включают записи данных, которые отфильтрованы предложением WHERE.
  • Этот параметр вместе с предыдущим параметром max_error_number контролирует максимальное количество записей с ошибочными данными. Когда этот параметр не установлен (это работает так же, как установка max_filter_ratio = 1), вступает в силу значение параметра max_error_number. Когда этот параметр установлен, задача загрузки приостанавливается, как только количество записей с ошибочными данными достигает порога, установленного этим параметром или параметром max_error_number.

strict_mode

Обязательно: Нет
Описание: Указывает, следует ли включать строгий режим. Допустимые значения: true и false. Значение по умолчанию: false. Когда строгий режим включен, если значение для столбца в загружаемых данных равно NULL, но целевая таблица не допускает значение NULL для этого столбца, строка данных будет отфильтрована.

log_rejected_record_num

Обязательно: Нет
Описание: Указывает максимальное количество неквалифицированных строк данных, которые могут быть залогированы. Этот параметр поддерживается начиная с версии v1.5.2. Допустимые значения: 0, -1 и любое ненулевое положительное целое число. Значение по умолчанию: 0.

  • Значение 0 указывает, что отфильтрованные строки данных не будут залогированы.
  • Значение -1 указывает, что все отфильтрованные строки данных будут залогированы.
  • Ненулевое положительное целое число, такое как n, указывает, что до n отфильтрованных строк данных могут быть залогированы на каждом BE.
Вы можете получить доступ ко всем неквалифицированным строкам данных, которые были отфильтрованы в задаче загрузки, перейдя по пути, возвращенному в поле REJECTED_RECORD_PATH из запроса к представлению information_schema.loads.

timezone

Обязательно: Нет
Описание: Часовой пояс, используемый задачей загрузки. Значение по умолчанию: Asia/Shanghai. Значение этого параметра влияет на результаты, возвращаемые такими функциями, как strftime(), alignment_timestamp() и from_unixtime(). Часовой пояс, указанный этим параметром, является часовым поясом на уровне сеанса. Для получения дополнительной информации см. Настройка часового пояса.

partial_update

Обязательно: Нет
Описание: Использовать ли частичные обновления. Допустимые значения: TRUE и FALSE. Значение по умолчанию: FALSE, указывающее на отключение этой функции.

merge_condition

Обязательно: Нет
Описание: Указывает имя столбца, который вы хотите использовать в качестве условия для определения, следует ли обновлять данные. Данные будут обновлены только в том случае, если значение данных, которые должны быть загружены в этот столбец, больше или равно текущему значению этого столбца. ПРИМЕЧАНИЕ
Только таблицы Primary Key поддерживают условные обновления. Столбец, который вы указываете, не может быть столбцом первичного ключа.

format

Обязательно: Нет
Описание: Формат загружаемых данных. Допустимые значения: CSV, JSON и Avro (поддерживается с версии v1.5.2). Значение по умолчанию: CSV.

trim_space

Обязательно: Нет
Описание: Указывает, следует ли удалять пробелы до и после разделителей столбцов из файла данных, когда файл данных в формате CSV. Тип: BOOLEAN. Значение по умолчанию: false.
Для некоторых баз данных при экспорте данных в виде файла данных в формате CSV к разделителям столбцов добавляются пробелы. Такие пробелы называются ведущими или завершающими пробелами в зависимости от их расположения. Установив параметр trim_space, вы можете разрешить Selena удалять такие ненужные пробелы во время загрузки данных.
Обратите внимание, что Selena не удаляет пробелы (включая ведущие и завершающие пробелы) внутри поля, заключенного в пару символов, указанных в enclose. Например, следующие значения полей используют вертикальную черту (|) в качестве разделителя столбцов и двойные кавычки (") в качестве символа, указанного в enclose: | "Love Selena" |. Если вы установите trim_space в true, Selena обработает предыдущие значения полей как |"Love Selena"|.

enclose

Обязательно: Нет
Описание: Указывает символ, который используется для обертывания значений полей в файле данных в соответствии с RFC4180, когда файл данных в формате CSV. Тип: однобайтовый символ. Значение по умолчанию: NONE. Наиболее распространенными символами являются одинарная кавычка (') и двойная кавычка (").
Все специальные символы (включая разделители строк и разделители столбцов), обернутые с помощью символа, указанного в enclose, считаются обычными символами. Selena может делать больше, чем RFC4180, поскольку позволяет вам указать любой однобайтовый символ в качестве символа, указанного в enclose.
Если значение поля содержит символ, указанный в enclose, вы можете использовать тот же символ для экранирования этого символа, указанного в enclose. Например, вы устанавливаете enclose в ", и значение поля — a "quoted" c. В этом случае вы можете ввести значение поля как "a ""quoted"" c" в файл данных.

escape

Обязательно: Нет
Описание: Указывает символ, который используется для экранирования различных специальных символов, таких как разделители строк, разделители столбцов, escape-символы и символы, указанные в enclose, которые затем рассматриваются Selena как обычные символы и обрабатываются как часть значений полей, в которых они находятся. Тип: однобайтовый символ. Значение по умолчанию: NONE. Наиболее распространенным символом является косая черта (\), которая должна быть записана как двойная косая черта (\\) в операторах SQL.
ПРИМЕЧАНИЕ
Символ, указанный в escape, применяется как внутри, так и снаружи каждой пары символов, указанных в enclose.
Два примера следующие:

  • Когда вы устанавливаете enclose в " и escape в \, Selena обрабатывает "say \"Hello world\"" в say "Hello world".
  • Предположим, что разделитель столбцов — запятая (,). Когда вы устанавливаете escape в \, Selena обрабатывает a, b\, c в два отдельных значения полей: a и b, c.

strip_outer_array

Обязательно: Нет
Описание: Указывает, следует ли удалять внешнюю структуру массива данных в формате JSON. Допустимые значения: true и false. Значение по умолчанию: false. В реальных бизнес-сценариях данные в формате JSON могут иметь внешнюю структуру массива, обозначенную парой квадратных скобок []. В этой ситуации мы рекомендуем установить этот параметр в true, чтобы Selena удалила внешние квадратные скобки [] и загрузила каждый внутренний массив как отдельную запись данных. Если вы установите этот параметр в false, Selena обрабатывает все данные в формате JSON в один массив и загружает массив как одну запись данных. Используйте данные в формате JSON [{"category" : 1, "author" : 2}, {"category" : 3, "author" : 4} ] в качестве примера. Если вы установите этот параметр в true, {"category" : 1, "author" : 2} и {"category" : 3, "author" : 4} обрабатываются как две отдельные записи данных и загружаются в две строки данных Selena.

jsonpaths

Обязательно: Нет
Описание: Имена полей, которые вы хотите загрузить из данных в формате JSON. Значение этого параметра является допустимым выражением JsonPath. Для получения дополнительной информации см. Таблица Selena содержит производные столбцы, значения которых генерируются с помощью выражений) в этой теме.

json_root

Обязательно: Нет
Описание: Корневой элемент данных в формате JSON для загрузки. Selena извлекает элементы корневого узла через json_root для обработки. По умолчанию значение этого параметра пустое, что указывает на то, что все данные в формате JSON будут загружены. Для получения дополнительной информации см. Указание корневого элемента данных в формате JSON для загрузки в этой теме.

task_consume_second

Обязательно: Нет
Описание: Максимальное время для каждой задачи Routine Load в указанной задаче Routine Load для обработки данных. Единица: секунды. В отличие от динамических параметров FE routine_load_task_consume_second (который применяется ко всем задачам Routine Load в кластере), этот параметр специфичен для отдельной задачи Routine Load, что более гибко. Этот параметр поддерживается начиная с версии v1.5.2.

  • Когда task_consume_second и task_timeout_second не настроены, Selena использует динамические параметры FE routine_load_task_consume_second и routine_load_task_timeout_second для управления поведением загрузки.
  • Когда настроен только task_consume_second, значение по умолчанию для task_timeout_second вычисляется как task_consume_second * 4.
  • Когда настроен только task_timeout_second, значение по умолчанию для task_consume_second вычисляется как task_timeout_second/4.

task_timeout_second

Обязательно: Нет
Описание: Длительность тайм-аута для каждой задачи Routine Load в указанной задаче Routine Load. Единица: секунды. В отличие от динамического параметра FE routine_load_task_timeout_second (который применяется ко всем задачам Routine Load в кластере), этот параметр специфичен для отдельной задачи Routine Load, что более гибко. Этот параметр поддерживается начиная с версии v1.5.2.

  • Когда task_consume_second и task_timeout_second не настроены, Selena использует динамические параметры FE routine_load_task_consume_second и routine_load_task_timeout_second для управления поведением загрузки.
  • Когда настроен только task_timeout_second, значение по умолчанию для task_consume_second вычисляется как task_timeout_second/4.
  • Когда настроен только task_consume_second, значение по умолчанию для task_timeout_second вычисляется как task_consume_second * 4.

pause_on_fatal_parse_error

Обязательно: Нет
Описание: Указывает, следует ли автоматически приостанавливать задачу при обнаружении невосстановимых ошибок обработки данных. Допустимые значения: true и false. Значение по умолчанию: false. Этот параметр поддерживается начиная с версии v1.5.2/v1.5.2.
Такие ошибки обработки обычно вызваны недопустимыми форматами данных, такими как:

  • Импорт массива JSON без установки strip_outer_array.
  • Импорт данных JSON, но сообщение Kafka содержит недопустимый JSON, например abcd.

data_source, data_source_properties

Обязательно. Источник данных и соответствующие свойства.

FROM <data_source>
("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])

data_source

Обязательно. Источник данных, которые вы хотите загрузить. Допустимое значение: KAFKA.

data_source_properties

Свойства источника данных.

kafka_broker_list

Обязательно: Да
Описание: Информация о подключении к брокеру Kafka. Формат: <kafka_broker_ip>:<broker_ port>. Несколько брокеров разделяются запятыми (,). Порт по умолчанию, используемый брокерами Kafka, — 9092. Пример:"kafka_broker_list" = ""xxx.xx.xxx.xx:9092,xxx.xx.xxx.xx:9092".

kafka_topic

Обязательно: Да
Описание: Топик Kafka, который нужно обработать. Задача Routine Load может обрабатывать сообщения только из одного топика.

kafka_partitions

Обязательно: Нет
Описание: Партиции Kafka, которые нужно обработать, например, "kafka_partitions" = "0, 1, 2, 3". Если это свойство не указано, все партиции обрабатываются по умолчанию.

kafka_offsets

Обязательно: Нет
Описание: Начальный offset, с которого нужно начать обработку данных в партиции Kafka, указанной в kafka_partitions. Если это свойство не указано, задача Routine Load обрабатывает данные, начиная с последних offset в kafka_partitions. Допустимые значения:

  • Конкретный offset: обрабатывать данные, начиная с конкретного offset.
  • OFFSET_BEGINNING: обрабатывать данные, начиная с самого раннего возможного offset.
  • OFFSET_END: обрабатывать данные, начиная с последнего offset.
Несколько начальных offset разделяются запятыми (,), например, "kafka_offsets" = "1000, OFFSET_BEGINNING, OFFSET_END, 2000".

property.kafka_default_offsets

Обязательно: Нет
Описание: Начальный offset по умолчанию для всех партиций consumer. Поддерживаемые значения для этого свойства такие же, как и для свойства kafka_offsets.

confluent.schema.registry.url

Обязательно: Нет
Описание: URL Schema Registry, где зарегистрирована схема Avro. Selena получает схему Avro, используя этот URL. Формат следующий:
confluent.schema.registry.url = http[s]://[<schema-registry-api-key>:<schema-registry-api-secret>@]<hostname or ip address>[:<port>]

Дополнительные свойства, связанные с источником данных

Вы можете указать дополнительные свойства, связанные с источником данных (Kafka), которые эквивалентны использованию командной строки Kafka --property. Для получения дополнительных поддерживаемых свойств см. свойства для consumer-клиента Kafka в свойствах конфигурации librdkafka.

примечание

Если значение свойства является именем файла, добавьте ключевое слово FILE: перед именем файла. Информацию о том, как создать файл, см. в разделе CREATE FILE.

  • Указать начальный offset по умолчанию для всех партиций, которые нужно обработать
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
  • Указать идентификатор группы consumer, используемой задачей Routine Load
"property.group.id" = "group_id_0"

Если property.group.id не указан, Selena генерирует случайное значение на основе имени задачи Routine Load в формате {job_name}_{random uuid}, например, simple_job_0a64fe25-3983-44b2-a4d8-f52d3af4c3e8.

  • Указать протокол безопасности и соответствующие параметры, используемые BE для доступа к Kafka

    Протокол безопасности может быть указан как plaintext (по умолчанию), ssl, sasl_plaintext или sasl_ssl. И вам нужно настроить связанные параметры в соответствии с указанным протоколом безопасности.

    Когда протокол безопасности установлен на sasl_plaintext или sasl_ssl, поддерживаются следующие механизмы аутентификации SASL:

    • PLAIN
    • SCRAM-SHA-256 и SCRAM-SHA-512
    • OAUTHBEARER
    • GSSAPI (Kerberos)

    Примеры:

    • Доступ к Kafka с использованием протокола безопасности SSL:

      "property.security.protocol" = "ssl", -- Указать протокол безопасности как SSL.
      "property.ssl.ca.location" = "FILE:ca-cert", -- Путь к файлу или каталогу с сертификатом CA для проверки ключа брокера Kafka.
      -- Если на сервере Kafka включена аутентификация клиента, также требуются следующие три параметра:
      "property.ssl.certificate.location" = "FILE:client.pem", -- Путь к публичному ключу клиента, используемому для аутентификации.
      "property.ssl.key.location" = "FILE:client.key", -- Путь к приватному ключу клиента, используемому для аутентификации.
      "property.ssl.key.password" = "xxxxxx" -- Пароль для приватного ключа клиента.
    • Доступ к Kafka с использованием протокола безопасности SASL_PLAINTEXT и механизма аутентификации SASL/PLAIN:

      "property.security.protocol" = "SASL_PLAINTEXT", -- Указать протокол безопасности как SASL_PLAINTEXT.
      "property.sasl.mechanism" = "PLAIN", -- Указать механизм SASL как PLAIN, который является простым механизмом аутентификации username/password.
      "property.sasl.username" = "admin", -- Имя пользователя SASL.
      "property.sasl.password" = "xxxxxx" -- Пароль SASL.
    • Доступ к Kafka с использованием протокола безопасности SASL_PLAINTEXT и механизма аутентификации SASL/GSSAPI (Kerberos):

      "property.security.protocol" = "SASL_PLAINTEXT", -- Указать протокол безопасности как SASL_PLAINTEXT.
      "property.sasl.mechanism" = "GSSAPI", -- Указать механизм аутентификации SASL как GSSAPI. Значение по умолчанию — GSSAPI.
      "property.sasl.kerberos.service.name" = "kafka", -- Имя службы брокера. Значение по умолчанию — kafka.
      "property.sasl.kerberos.keytab" = "/home/selena/selena.keytab", -- Расположение keytab клиента.
      "property.sasl.kerberos.principal" = "selena@YOUR.COM" -- Принципал Kerberos.
      примечание
      • Начиная с версии Selena v1.5.2, поддерживается аутентификация SASL/GSSAPI (Kerberos).

      • Модули, связанные с SASL, должны быть установлены на машине BE.

        # Debian/Ubuntu:
        sudo apt-get install libsasl2-modules-gssapi-mit libsasl2-dev
        # CentOS/Redhat:
        sudo yum install cyrus-sasl-gssapi cyrus-sasl-devel

Элементы конфигурации FE и BE

Для элементов конфигурации FE и BE, связанных с Routine Load, см. элементы конфигурации.

Отображение столбцов

Настройка отображения столбцов для загрузки данных в формате CSV

Если столбцы данных в формате CSV могут быть отображены один к одному последовательно на столбцы таблицы Selena, вам не нужно настраивать отображение столбцов между данными и таблицей Selena.

Если столбцы данных в формате CSV не могут быть отображены один к одному последовательно на столбцы таблицы Selena, вам нужно использовать параметр columns для настройки отображения столбцов между файлом данных и таблицей Selena. Это включает следующие два случая использования:

  • Одинаковое количество столбцов, но разная последовательность столбцов. Также данные из файла данных не нужно вычислять функциями перед загрузкой в соответствующие столбцы таблицы Selena.

    • В параметре columns вам нужно указать имена столбцов таблицы Selena в той же последовательности, что и столбцы файла данных.

    • Например, таблица Selena состоит из трех столбцов, которые идут последовательно col1, col2 и col3, а файл данных также состоит из трех столбцов, которые могут быть отображены на столбцы таблицы Selena последовательно col3, col2 и col1. В этом случае вам нужно указать "columns: col3, col2, col1".

  • Разное количество столбцов и разная последовательность столбцов. Также данные из файла данных нужно вычислить функциями перед загрузкой в соответствующие столбцы таблицы Selena.

    В параметре columns вам нужно указать имена столбцов таблицы Selena в той же последовательности, что и столбцы файла данных, и указать функции, которые вы хотите использовать для вычисления данных. Два примера следующие:

    • Таблица Selena состоит из трех столбцов, которые идут последовательно col1, col2 и col3. Файл данных состоит из четырех столбцов, среди которых первые три столбца могут быть отображены последовательно на столбцы таблицы Selena col1, col2 и col3, а четвертый столбец не может быть отображен ни на один из столбцов таблицы Selena. В этом случае вам нужно временно указать имя для четвертого столбца файла данных, и временное имя должно отличаться от любого из имен столбцов таблицы Selena. Например, вы можете указать "columns: col1, col2, col3, temp", в котором четвертый столбец файла данных временно назван temp.
    • Таблица Selena состоит из трех столбцов, которые идут последовательно year, month и day. Файл данных состоит только из одного столбца, который содержит значения даты и времени в формате yyyy-mm-dd hh:mm:ss. В этом случае вы можете указать "columns: col, year = year(col), month=month(col), day=day(col)", в котором col — это временное имя столбца файла данных, а функции year = year(col), month=month(col) и day=day(col) используются для извлечения данных из столбца файла данных col и загрузки данных в соответствующие столбцы таблицы Selena. Например, year = year(col) используется для извлечения данных yyyy из столбца файла данных col и загрузки данных в столбец таблицы Selena year.

Для получения дополнительных примеров см. Настройка отображения столбцов.

Настройка отображения столбцов для загрузки данных в формате JSON или Avro

примечание

Начиная с версии v1.5.2, Selena поддерживает загрузку данных Avro с использованием Routine Load. При загрузке данных JSON или Avro конфигурация для отображения и преобразования столбцов одинакова. Поэтому в этом разделе данные JSON используются в качестве примера для представления конфигурации.

Если ключи данных в формате JSON имеют те же имена, что и столбцы таблицы Selena, вы можете загрузить данные в формате JSON, используя простой режим. В простом режиме вам не нужно указывать параметр jsonpaths. Этот режим требует, чтобы данные в формате JSON были объектом, обозначенным фигурными скобками {}, например {"category": 1, "author": 2, "price": "3"}. В этом примере category, author и price — это имена ключей, и эти ключи могут быть отображены один к одному по имени на столбцы category, author и price таблицы Selena. Для примеров см. простой режим.

Если ключи данных в формате JSON имеют разные имена, чем столбцы таблицы Selena, вы можете загрузить данные в формате JSON, используя согласованный режим. В согласованном режиме вам нужно использовать параметры jsonpaths и COLUMNS для указания отображения столбцов между данными в формате JSON и таблицей Selena:

  • В параметре jsonpaths укажите ключи JSON в последовательности, как они расположены в данных в формате JSON.
  • В параметре COLUMNS укажите отображение между ключами JSON и столбцами таблицы Selena:
    • Имена столбцов, указанные в параметре COLUMNS, отображаются один к одному последовательно на данные в формате JSON.
    • Имена столбцов, указанные в параметре COLUMNS, отображаются один к одному по имени на столбцы таблицы Selena.

Для примеров см. Таблица Selena содержит производные столбцы, значения которых генерируются с помощью выражений.

Примеры

Загрузка данных в формате CSV

В этом разделе используются данные в формате CSV в качестве примера для описания того, как вы можете использовать различные настройки параметров и комбинации для удовлетворения ваших разнообразных требований к загрузке.

Подготовка набора данных

Предположим, вы хотите загрузить данные в формате CSV из топика Kafka с именем ordertest1. Каждое сообщение в наборе данных включает шесть столбцов: идентификатор заказа, дата оплаты, имя клиента, национальность, пол и цена.

2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
2020050802,2020-05-08,Julien Sorel,France,male,893
2020050803,2020-05-08,Dorian Grey,UK,male,1262
2020050901,2020-05-09,Anna Karenina,Russia,female,175
2020051001,2020-05-10,Tess Durbeyfield,US,female,986
2020051101,2020-05-11,Edogawa Conan,japan,male,8924

Создание таблицы

Согласно столбцам данных в формате CSV, создайте таблицу с именем example_tbl1 в базе данных example_db.

CREATE TABLE example_db.example_tbl1 (
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Payment date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Nationality",
`gender` varchar(26) NULL COMMENT "Gender",
`price` double NULL COMMENT "Price")
DUPLICATE KEY (order_id,pay_dt)
DISTRIBUTED BY HASH(`order_id`);

Обработка данных, начиная с указанных offset для указанных партиций

Если задача Routine Load должна обрабатывать данные, начиная с указанных партиций и offset, вам нужно настроить параметры kafka_partitions и kafka_offsets.

CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1",
"kafka_partitions" ="0,1,2,3,4", -- партиции для обработки
"kafka_offsets" = "1000, OFFSET_BEGINNING, OFFSET_END, 2000" -- соответствующие начальные offset
);

Повышение производительности загрузки путем увеличения параллелизма задач

Для повышения производительности загрузки и избежания накопительной обработки вы можете увеличить параллелизм задач, увеличив значение desired_concurrent_number при создании задачи Routine Load. Параллелизм задач позволяет разделить одну задачу Routine Load на как можно больше параллельных задач.

Обратите внимание, что фактический параллелизм задачи определяется минимальным значением среди следующих нескольких параметров:

min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num)
примечание

Максимальный фактический параллелизм задачи — это либо количество активных BE узлов, либо количество партиций для обработки.

Поэтому, когда количество активных BE узлов и количество партиций для обработки больше, чем значения двух других параметров max_routine_load_task_concurrent_num и desired_concurrent_number, вы можете увеличить значения двух других параметров для увеличения фактического параллелизма задачи.

Предположим, что количество партиций для обработки равно 7, количество активных BE узлов равно 5, а max_routine_load_task_concurrent_num — значение по умолчанию 5. Если вы хотите увеличить фактический параллелизм задачи, вы можете установить desired_concurrent_number в 5 (значение по умолчанию — 3). В этом случае фактический параллелизм задачи min(5,7,5,5) настроен на 5.

CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"desired_concurrent_number" = "5" -- установить значение desired_concurrent_number в 5
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);

Настройка отображения столбцов

Если последовательность столбцов в данных в формате CSV не согласуется со столбцами в целевой таблице, предположим, что пятый столбец в данных в формате CSV не нужно импортировать в целевую таблицу, вам нужно указать отображение столбцов между данными в формате CSV и целевой таблицей с помощью параметра COLUMNS.

Целевая база данных и таблица

Создайте целевую таблицу example_tbl2 в целевой базе данных example_db в соответствии со столбцами в данных в формате CSV. В этом сценарии вам нужно создать пять столбцов, соответствующих пяти столбцам в данных в формате CSV, за исключением пятого столбца, который хранит пол.

CREATE TABLE example_db.example_tbl2 (
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Payment date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Nationality",
`price` double NULL COMMENT "Price"
)
DUPLICATE KEY (order_id,pay_dt)
DISTRIBUTED BY HASH(order_id);

Задача Routine Load

В этом примере, поскольку пятый столбец в данных в формате CSV не нужно загружать в целевую таблицу, пятый столбец временно назван temp_gender в COLUMNS, а другие столбцы напрямую отображаются на таблицу example_tbl2.

CREATE ROUTINE LOAD example_db.example_tbl2_ordertest1 ON example_tbl2
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);

Установка условий фильтрации

Если вы хотите загрузить только данные, которые соответствуют определенным условиям, вы можете установить условия фильтрации в предложении WHERE, например, price > 100.

CREATE ROUTINE LOAD example_db.example_tbl2_ordertest1 ON example_tbl2
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price),
WHERE price > 100 -- установить условие фильтрации
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);

Включение строгого режима для фильтрации строк со значениями NULL

В PROPERTIES вы можете установить "strict_mode" = "true", что означает, что задача Routine Load находится в строгом режиме. Если в исходном столбце есть значение NULL, но столбец целевой таблицы Selena не допускает значения NULL, строка, которая содержит значение NULL в исходном столбце, будет отфильтрована.

CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"strict_mode" = "true" -- включить строгий режим
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);

Установка допустимой погрешности

Если ваш бизнес-сценарий имеет низкую допустимость к неквалифицированным данным, вам нужно установить окно обнаружения ошибок и максимальное количество строк с ошибочными данными, настроив параметры max_batch_rows и max_error_number. Когда количество строк с ошибочными данными в окне обнаружения ошибок превышает значение max_error_number, задача Routine Load приостанавливается.

CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"max_batch_rows" = "100000",-- Значение max_batch_rows, умноженное на 10, равно окну обнаружения ошибок.
"max_error_number" = "100" -- Максимальное количество строк с ошибочными данными, разрешенное в окне обнаружения ошибок.
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);

Указание протокола безопасности как SSL и настройка связанных параметров

Если вам нужно указать протокол безопасности как SSL, используемый BE для доступа к Kafka, вам нужно настроить "property.security.protocol" = "ssl" и связанные параметры.

CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1",
-- Указать протокол безопасности как SSL.
"property.security.protocol" = "ssl",
-- Расположение сертификата CA.
"property.ssl.ca.location" = "FILE:ca-cert",
-- Если аутентификация включена для клиентов Kafka, вам нужно настроить следующие свойства:
-- Расположение публичного ключа клиента Kafka.
"property.ssl.certificate.location" = "FILE:client.pem",
-- Расположение приватного ключа клиента Kafka.
"property.ssl.key.location" = "FILE:client.key",
-- Пароль к приватному ключу клиента Kafka.
"property.ssl.key.password" = "abcdefg"
);

Установка trim_space, enclose и escape

Предположим, вы хотите загрузить данные в формате CSV из топика Kafka с именем test_csv. Каждое сообщение в наборе данных включает шесть столбцов: идентификатор заказа, дата оплаты, имя клиента, национальность, пол и цена.

 "2020050802" , "2020-05-08" , "Johann Georg Faust" , "Deutschland" , "male" , "895"
"2020050802" , "2020-05-08" , "Julien Sorel" , "France" , "male" , "893"
"2020050803" , "2020-05-08" , "Dorian Grey\,Lord Henry" , "UK" , "male" , "1262"
"2020050901" , "2020-05-09" , "Anna Karenina" , "Russia" , "female" , "175"
"2020051001" , "2020-05-10" , "Tess Durbeyfield" , "US" , "female" , "986"
"2020051101" , "2020-05-11" , "Edogawa Conan" , "japan" , "male" , "8924"

Если вы хотите загрузить все данные из топика Kafka test_csv в example_tbl1 с намерением удалить пробелы до и после разделителей столбцов и установить enclose в " и escape в \, выполните следующую команду:

CREATE ROUTINE LOAD example_db.example_tbl1_test_csv ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"trim_space"="true",
"enclose"="\"",
"escape"="\\",
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic"="test_csv",
"property.kafka_default_offsets"="OFFSET_BEGINNING"
);

Загрузка данных в формате JSON

Имена столбцов таблицы Selena согласованы с именами ключей JSON

Подготовка набора данных

Например, следующие данные в формате JSON существуют в топике Kafka ordertest2.

{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875}
{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895}
{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}
примечание

Каждый объект JSON должен находиться в одном сообщении Kafka. В противном случае возникает ошибка, указывающая на сбой обработки данных в формате JSON.

Целевая база данных и таблица

Создайте таблицу example_tbl3 в целевой базе данных example_db в кластере Selena. Имена столбцов согласованы с именами ключей в данных в формате JSON.

CREATE TABLE example_db.example_tbl3 (
commodity_id varchar(26) NULL,
customer_name varchar(26) NULL,
country varchar(26) NULL,
pay_time bigint(20) NULL,
price double SUM NULL COMMENT "Price")
AGGREGATE KEY(commodity_id,customer_name,country,pay_time)
DISTRIBUTED BY HASH(commodity_id);

Задача Routine Load

Вы можете использовать простой режим для задачи Routine Load. То есть вам не нужно указывать параметры jsonpaths и COLUMNS при создании задачи Routine Load. Selena извлекает ключи данных в формате JSON в топике ordertest2 кластера Kafka в соответствии с именами столбцов целевой таблицы example_tbl3 и загружает данные в формате JSON в целевую таблицу.

CREATE ROUTINE LOAD example_db.example_tbl3_ordertest2 ON example_tbl3
PROPERTIES
(
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2"
);
примечание
  • Если внешний слой данных в формате JSON является структурой массива, вам нужно установить "strip_outer_array"="true" в PROPERTIES, чтобы удалить внешнюю структуру массива. Кроме того, когда вам нужно указать jsonpaths, корневой элемент всех данных в формате JSON — это сплющенный объект JSON, потому что внешняя структура массива данных в формате JSON удалена.
  • Вы можете использовать json_root для указания корневого элемента данных в формате JSON.

Таблица Selena содержит производные столбцы, значения которых генерируются с помощью выражений

Подготовка набора данных

Например, следующие данные в формате JSON существуют в топике ordertest2 кластера Kafka.

{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875}
{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895}
{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}

Целевая база данных и таблица

Создайте таблицу с именем example_tbl4 в базе данных example_db в кластере Selena. Столбец pay_dt — это производный столбец, значения которого генерируются путем вычисления значений ключа pay_time в данных в формате JSON.

CREATE TABLE example_db.example_tbl4 (
`commodity_id` varchar(26) NULL,
`customer_name` varchar(26) NULL,
`country` varchar(26) NULL,
`pay_time` bigint(20) NULL,
`pay_dt` date NULL,
`price` double SUM NULL)
AGGREGATE KEY(`commodity_id`,`customer_name`,`country`,`pay_time`,`pay_dt`)
DISTRIBUTED BY HASH(`commodity_id`);

Задача Routine Load

Вы можете использовать согласованный режим для задачи Routine Load. То есть вам нужно указать параметры jsonpaths и COLUMNS при создании задачи Routine Load.

Вам нужно указать ключи данных в формате JSON и расположить их последовательно в параметре jsonpaths.

И поскольку значения в ключе pay_time данных в формате JSON нужно преобразовать в тип DATE перед тем, как значения будут сохранены в столбце pay_dt таблицы example_tbl4, вам нужно указать вычисление, используя pay_dt=from_unixtime(pay_time,'%Y%m%d') в COLUMNS. Значения других ключей в данных в формате JSON могут быть напрямую отображены на таблицу example_tbl4.

CREATE ROUTINE LOAD example_db.example_tbl4_ordertest2 ON example_tbl4
COLUMNS(commodity_id, customer_name, country, pay_time, pay_dt=from_unixtime(pay_time, '%Y%m%d'), price)
PROPERTIES
(
"format" = "json",
"jsonpaths" = "[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2"
);
примечание
  • Если внешний слой данных JSON является структурой массива, вам нужно установить "strip_outer_array"="true" в PROPERTIES, чтобы удалить внешнюю структуру массива. Кроме того, когда вам нужно указать jsonpaths, корневой элемент всех данных JSON — это сплющенный объект JSON, потому что внешняя структура массива данных JSON удалена.
  • Вы можете использовать json_root для указания корневого элемента данных в формате JSON.

Таблица Selena содержит производный столбец, значения которого генерируются с помощью выражения CASE

Подготовка набора данных

Например, следующие данные в формате JSON существуют в топике Kafka topic-expr-test.

{"key1":1, "key2": 21}
{"key1":12, "key2": 22}
{"key1":13, "key2": 23}
{"key1":14, "key2": 24}

Целевая база данных и таблица

Создайте таблицу с именем tbl_expr_test в базе данных example_db в кластере Selena. Целевая таблица tbl_expr_test содержит два столбца, где значения столбца col2 должны быть вычислены с помощью выражения case на данных JSON.

CREATE TABLE tbl_expr_test (
col1 string, col2 string)
DISTRIBUTED BY HASH (col1);

Задача Routine Load

Поскольку значения в столбце col2 в целевой таблице генерируются с помощью выражения CASE, вам нужно указать соответствующее выражение в параметре COLUMNS для задачи Routine load.

CREATE ROUTINE LOAD rl_expr_test ON tbl_expr_test
COLUMNS (
key1,
key2,
col1 = key1,
col2 = CASE WHEN key1 = "1" THEN "key1=1"
WHEN key1 = "12" THEN "key1=12"
ELSE "nothing" END)
PROPERTIES ("format" = "json")
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "topic-expr-test"
);

Запрос к таблице Selena

Запросите таблицу Selena. Результат показывает, что значения в столбце col2 являются выходом выражения CASE.

MySQL [example_db]> SELECT * FROM tbl_expr_test;
+------+---------+
| col1 | col2 |
+------+---------+
| 1 | key1=1 |
| 12 | key1=12 |
| 13 | nothing |
| 14 | nothing |
+------+---------+
4 rows in set (0.015 sec)

Указание корневого элемента данных в формате JSON для загрузки

Вам нужно использовать json_root для указания корневого элемента данных в формате JSON для загрузки, и значение должно быть допустимым выражением JsonPath.

Подготовка набора данных

Например, следующие данные в формате JSON существуют в топике ordertest3 кластера Kafka. И корневой элемент данных в формате JSON для загрузки — $.RECORDS.

{"RECORDS":[{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875},{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895},{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}]}

Целевая база данных и таблица

Создайте таблицу с именем example_tbl3 в базе данных example_db в кластере Selena.

CREATE TABLE example_db.example_tbl3 (
commodity_id varchar(26) NULL,
customer_name varchar(26) NULL,
country varchar(26) NULL,
pay_time bigint(20) NULL,
price double SUM NULL)
AGGREGATE KEY(commodity_id,customer_name,country,pay_time)
ENGINE=OLAP
DISTRIBUTED BY HASH(commodity_id);

Задача Routine Load

Вы можете установить "json_root" = "$.RECORDS" в PROPERTIES, чтобы указать корневой элемент данных в формате JSON для загрузки. Также, поскольку данные в формате JSON для загрузки находятся в структуре массива, вы также должны установить "strip_outer_array" = "true", чтобы удалить внешнюю структуру массива.

CREATE ROUTINE LOAD example_db.example_tbl3_ordertest3 ON example_tbl3
PROPERTIES
(
"format" = "json",
"json_root" = "$.RECORDS",
"strip_outer_array" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2"
);

Загрузка данных в формате Avro

Начиная с версии v1.5.2, Selena поддерживает загрузку данных Avro с использованием Routine Load.

Схема Avro проста

Предположим, что схема Avro относительно проста, и вам нужно загрузить все поля данных Avro.

Подготовка набора данных

  • Схема Avro

    1. Создайте следующий файл схемы Avro avro_schema1.avsc:

      {
      "type": "record",
      "name": "sensor_log",
      "fields" : [
      {"name": "id", "type": "long"},
      {"name": "name", "type": "string"},
      {"name": "checked", "type" : "boolean"},
      {"name": "data", "type": "double"},
      {"name": "sensor_type", "type": {"type": "enum", "name": "sensor_type_enum", "symbols" : ["TEMPERATURE", "HUMIDITY", "AIR-PRESSURE"]}}
      ]
      }
    2. Зарегистрируйте схему Avro в Schema Registry.

  • Данные Avro

Подготовьте данные Avro и отправьте их в топик Kafka topic_1.

Целевая база данных и таблица

Согласно полям данных Avro создайте таблицу sensor_log1 в целевой базе данных sensor в кластере Selena. Имена столбцов таблицы должны соответствовать именам полей в данных Avro. Для отображения типов данных при загрузке данных Avro в Selena см. [Отображение типов данных](#Data types mapping).

CREATE TABLE sensor.sensor_log1 (
`id` bigint NOT NULL COMMENT "sensor id",
`name` varchar(26) NOT NULL COMMENT "sensor name",
`checked` boolean NOT NULL COMMENT "checked",
`data` double NULL COMMENT "sensor data",
`sensor_type` varchar(26) NOT NULL COMMENT "sensor type"
)
ENGINE=OLAP
DUPLICATE KEY (id)
DISTRIBUTED BY HASH(`id`);

Задача Routine Load

Вы можете использовать простой режим для задачи Routine Load. То есть вам не нужно указывать параметр jsonpaths при создании задачи Routine Load. Выполните следующий оператор, чтобы отправить задачу Routine Load с именем sensor_log_load_job1 для обработки сообщений Avro в топике Kafka topic_1 и загрузки данных в таблицу sensor_log1 в базе данных sensor.

CREATE ROUTINE LOAD sensor.sensor_log_load_job1 ON sensor_log1
PROPERTIES
(
"format" = "avro"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>,...",
"confluent.schema.registry.url" = "http://172.xx.xxx.xxx:8081",
"kafka_topic"= "topic_1",
"kafka_partitions" = "0,1,2,3,4,5",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Схема Avro содержит вложенное поле типа record

Предположим, что схема Avro содержит вложенное поле типа record, и вам нужно загрузить подполе во вложенном поле типа record в Selena.

Подготовка набора данных

  • Схема Avro

    1. Создайте следующий файл схемы Avro avro_schema2.avsc. Внешний record Avro включает пять полей, которые последовательно id, name, checked, sensor_type и data. И поле data имеет вложенный record data_record.

      {
      "type": "record",
      "name": "sensor_log",
      "fields" : [
      {"name": "id", "type": "long"},
      {"name": "name", "type": "string"},
      {"name": "checked", "type" : "boolean"},
      {"name": "sensor_type", "type": {"type": "enum", "name": "sensor_type_enum", "symbols" : ["TEMPERATURE", "HUMIDITY", "AIR-PRESSURE"]}},
      {"name": "data", "type":
      {
      "type": "record",
      "name": "data_record",
      "fields" : [
      {"name": "data_x", "type" : "boolean"},
      {"name": "data_y", "type": "long"}
      ]
      }
      }
      ]
      }
    2. Зарегистрируйте схему Avro в Schema Registry.

  • Данные Avro

Подготовьте данные Avro и отправьте их в топик Kafka topic_2.

Целевая база данных и таблица

Согласно полям данных Avro создайте таблицу sensor_log2 в целевой базе данных sensor в кластере Selena.

Предположим, что помимо загрузки полей id, name, checked и sensor_type внешнего Record, вам также нужно загрузить подполе data_y во вложенном Record data_record.

CREATE TABLE sensor.sensor_log2 (
`id` bigint NOT NULL COMMENT "sensor id",
`name` varchar(26) NOT NULL COMMENT "sensor name",
`checked` boolean NOT NULL COMMENT "checked",
`sensor_type` varchar(26) NOT NULL COMMENT "sensor type",
`data_y` long NULL COMMENT "sensor data"
)
ENGINE=OLAP
DUPLICATE KEY (id)
DISTRIBUTED BY HASH(`id`);

Задача Routine Load

Отправьте задачу загрузки, используйте jsonpaths для указания полей данных Avro, которые нужно загрузить. Обратите внимание, что для подполя data_y во вложенном Record вам нужно указать его jsonpath как "$.data.data_y".

CREATE ROUTINE LOAD sensor.sensor_log_load_job2 ON sensor_log2
PROPERTIES
(
"format" = "avro",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.checked\",\"$.sensor_type\",\"$.data.data_y\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>,...",
"confluent.schema.registry.url" = "http://172.xx.xxx.xxx:8081",
"kafka_topic" = "topic_1",
"kafka_partitions" = "0,1,2,3,4,5",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Схема Avro содержит поле Union

Подготовка набора данных

Предположим, что схема Avro содержит поле Union, и вам нужно загрузить поле Union в Selena.

  • Схема Avro

    1. Создайте следующий файл схемы Avro avro_schema3.avsc. Внешний record Avro включает пять полей, которые последовательно id, name, checked, sensor_type и data. И поле data имеет тип Union и включает два элемента, null и вложенный record data_record.

      {
      "type": "record",
      "name": "sensor_log",
      "fields" : [
      {"name": "id", "type": "long"},
      {"name": "name", "type": "string"},
      {"name": "checked", "type" : "boolean"},
      {"name": "sensor_type", "type": {"type": "enum", "name": "sensor_type_enum", "symbols" : ["TEMPERATURE", "HUMIDITY", "AIR-PRESSURE"]}},
      {"name": "data", "type": [null,
      {
      "type": "record",
      "name": "data_record",
      "fields" : [
      {"name": "data_x", "type" : "boolean"},
      {"name": "data_y", "type": "long"}
      ]
      }
      ]
      }
      ]
      }
    2. Зарегистрируйте схему Avro в Schema Registry.

  • Данные Avro

Подготовьте данные Avro и отправьте их в топик Kafka topic_3.

Целевая база данных и таблица

Согласно полям данных Avro создайте таблицу sensor_log3 в целевой базе данных sensor в кластере Selena.

Предположим, что помимо загрузки полей id, name, checked и sensor_type внешнего Record, вам также нужно загрузить поле data_y элемента data_record в поле типа Union data.

CREATE TABLE sensor.sensor_log3 (
`id` bigint NOT NULL COMMENT "sensor id",
`name` varchar(26) NOT NULL COMMENT "sensor name",
`checked` boolean NOT NULL COMMENT "checked",
`sensor_type` varchar(26) NOT NULL COMMENT "sensor type",
`data_y` long NULL COMMENT "sensor data"
)
ENGINE=OLAP
DUPLICATE KEY (id)
DISTRIBUTED BY HASH(`id`);

Задача Routine Load

Отправьте задачу загрузки, используйте jsonpaths для указания полей, которые нужно загрузить в данных Avro. Обратите внимание, что для поля data_y вам нужно указать его jsonpath как "$.data.data_y".

CREATE ROUTINE LOAD sensor.sensor_log_load_job3 ON sensor_log3
PROPERTIES
(
"format" = "avro",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.checked\",\"$.sensor_type\",\"$.data.data_y\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>,...",
"confluent.schema.registry.url" = "http://172.xx.xxx.xxx:8081",
"kafka_topic" = "topic_1",
"kafka_partitions" = "0,1,2,3,4,5",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);