CREATE ROUTINE LOAD
Попробуйте Routine Load в этом Кратком руководстве
Routine Load может непрерывно потреблять сообщения из Apache Kafka® и загружать данные в Selena. Routine Load может потреблять данные в форматах CSV, JSON и Avro (поддерживается с версии v1.5.2) из кластера Kafka и получать доступ к Kafka через несколько протоколов безопасности, включая plaintext, ssl, sasl_plaintext и sasl_ssl.
В этой теме описываются синтаксис, параметры и примеры оператора CREATE ROUTINE LOAD.
- Информацию о сценариях применения, принципах и основных операциях Routine Load см. в разделе Загрузка данных с использованием Routine Load.
- Вы можете загружать данные в таблицы Selena только как пользователь, у которого есть привилегия INSERT на эти таблицы Selena. Если у вас нет привилегии INSERT, следуйте инструкциям в разделе GRANT, чтобы предоставить привилегию INSERT пользователю, которого вы используете для подключения к кластеру Selena.
Синтаксис
CREATE ROUTINE LOAD <database_name>.<job_name> ON <table_name>
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
Параметры
database_name, job_name, table_name
database_name
Опционально. Имя базы данных Selena.
job_name
Обязательно. Имя задачи Routine Load. Таблица может получать данные из нескольких задач Routine Load. Мы рекомендуем устанавливать осмысленное имя задачи Routine Load, используя идентифицируемую информацию, например, имя топика Kafka и приблизительное время создания задачи, чтобы различать несколько задач Routine Load. Имя задачи Routine Load должно быть уникальным в рамках одной базы данных.
table_name
Обязательно. Имя таблицы Selena, в которую загружаются данные.
load_properties
Опционально. Свойства данных. Синтаксис:
[COLUMNS TERMINATED BY '<column_separator>'],
[ROWS TERMINATED BY '<row_separator>'],
[COLUMNS (<column1_name>[, <column2_name>, <column_assignment>, ... ])],
[WHERE <expr>],
[PARTITION (<partition1_name>[, <partition2_name>, ...])]
[TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name>, ...])]
COLUMNS TERMINATED BY
Разделитель столбцов для данных в формате CSV. По умолчанию разделитель столбцов — \t (табуляция). Например, вы можете использовать COLUMNS TERMINATED BY ",", чтобы указать разделитель столбцов как запятую.
- Убедитесь, что указанный здесь разделитель столбцов совпадает с разделителем столбцов в загружаемых данных.
- Вы можете использовать строку UTF-8, такую как запятая (,), табуляция или вертикальная черта (|), длина которой не превышает 50 байт, в качестве текстового разделителя.
- Значения NULL обозначаются с помощью
\N. Например, запись данных состоит из трех столбцов, и запись содержит данные в первом и третьем столбцах, но не содержит данных во втором столбце. В этой ситуации вам нужно использовать\Nво втором столбце для обозначения значения NULL. Это означает, что запись должна быть скомпилирована какa,\N,bвместоa,,b.a,,bозначает, что второй столбец записи содержит пустую строку.
ROWS TERMINATED BY
Разделитель строк для данных в формате CSV. По умолчанию разделитель строк — \n.
COLUMNS
Отображение между столбцами в исходных данных и столбцами в таблице Selena. Дополнительную информацию см. в разделе Отображение столбцов в этой теме.
column_name: Если столбец исходных данных может быть отображен на столбец таблицы Selena без каких-либо вычислений, вам нужно указать только имя столбца. Эти столбцы можно назвать отображаемыми столбцами.column_assignment: Если столбец исходных данных не может быть напрямую отображен на столбец таблицы Selena, и значения столбца должны быть выч ислены с помощью функций перед загрузкой данных, вы должны указать вычислительную функцию вexpr. Эти столбцы можно назвать производными столбцами. Рекомендуется размещать производные столбцы после отображаемых столбцов, потому что Selena сначала обрабатывает отображаемые столбцы.
WHERE
Условие фильтрации. Только данные, которые соответствуют условию фильтрации, могут быть загружены в Selena. Например, если вы хотите загрузить только строки, где значение col1 больше 100, а значение col2 равно 1000, вы можете использовать WHERE col1 > 100 and col2 = 1000.
Столбцы, указанные в условии фильтрации, могут быть исходными столбцами или пр оизводными столбцами.
PARTITION
Если таблица Selena распределена по партициям p0, p1, p2 и p3, и вы хотите загрузить данные только в p1, p2 и p3 в Selena и отфильтровать данные, которые будут храниться в p0, тогда вы можете указать PARTITION(p1, p2, p3) в качестве условия фильтрации. По умолчанию, если вы не указываете этот параметр, данные будут загружены во все партиции. Пример:
PARTITION (p1, p2, p3)
TEMPORARY PARTITION
Имя временной партиции, в которую вы хотите загрузить данные. Вы можете указать несколько временных партиций, которые должны быть разделены запятыми (,).
job_properties
Обязательно. Свойства задачи загрузки. Синтаксис:
PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
desired_concurrent_number
Обязательно: Нет
Описание: Ожидаемый параллелизм задачи для одной задачи Routine Load. Значение по умолчанию: 3. Фактический параллелизм задачи определяется минимальным значением из нескольких параметров: min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num).
alive_be_number: количество активных BE узлов.partition_number: количество партиций для обработки.desired_concurrent_number: ожидаемый параллелизм задачи для одной задачи Routine Load. Значение по умолчанию:3.max_routine_load_task_concurrent_num: максимальный параллелизм задачи по умолчанию для задачи Routine Load, который равен5. См. Динамические параметры FE.
max_batch_interval
Обязательно: Нет
Описание: Интервал планирования задачи, то есть, как часто выполняется задача. Единица: секунды. Диапазон значений: 5 ~ 60. Значение по умолчанию: 10. Рекомендуется установить значение больше 10. Если планирование короче 10 секунд, генерируется слишком много версий tablet из-за чрезмерно высокой частоты загрузки.
max_batch_rows
Обязательно: Нет
Описание: Это свойство используется только для определения окна обнаружения ошибок. Окно представляет собой количество строк данных, обработанных одной задачей Routine Load. Значение 10 * max_batch_rows. Значение по умолчанию 10 * 200000 = 2000000. Задача Routine Load обнаруживает ошибочные данные в окне обнаружения ошибок. Ошибочные данные — это данные, которые Selena не может обработать, например, недопустимые данные в формате JSON.
max_error_number
Обязательно: Нет
Описание: Максимальное количество строк с ошибочными данными, разрешенное в окне обнаружения ошибок. Если количество строк с ошибочными данными превышает это значение, задача загрузки приостанавливается. Вы можете выполнить SHOW ROUTINE LOAD и просмотреть журналы ошибок с помощью ErrorLogUrls. После этого вы можете исправить ошибку в Kafka в соответствии с журналами ошибок. Значение по умолчанию 0, что означает, что строки с ошибками не разрешены.
ПРИМЕЧАНИЕ
- Последний пакет задач будет успешен перед приостановкой задачи загрузки, когда слишком много строк с ошибочными данными. То есть, квалифицированные данные будут загружены, а неквалифицированные данные будут отфильтрованы. Если вы не хотите фильтровать слишком много неквалифицированных строк данных, установите параметр
max_filter_ratio. - Строки с ошибочными данными не включают строки данных, которые отфильтрованы предложением WHERE.
- Этот параметр вместе со следующим параметром
max_filter_ratioконтролирует максимальное количество записей с ошибочными данными. Когдаmax_filter_ratioне установлен, вступает в силу значение этого параметра. Когдаmax_filter_ratioустановлен, задача загрузки приостанавливается, как только количество записей с ошибочными данными достигает порога, установленного этим параметром или параметромmax_filter_ratio.
max_filter_ratio
Обязательно: Нет
Описание: Максимальная допустимая погрешность задачи загрузки. Допустимая погрешность — это максимальный процент записей данных, которые могут быть отфильтрованы из-за недостаточного качества данных из всех записей данных, запрошенных задачей загрузки. Допустимые значения: от 0 до 1. Значение по умолчанию: 1 (это означает, что на самом деле оно не вступит в силу).
Мы рекомендуем установить значение 0. Таким образом, если обнаружены неквалифицированные записи данных, задача загрузки приостанавливается, обеспечивая правильность данных.
Если вы хотите игнорировать неквалифицированные записи данных, вы можете установить этот параметр в значение больше 0. Таким образом, задача загрузки может быть успешной, даже если файл данных содержит неквалифицированные записи данных.
ПРИМЕЧАНИЕ
- Последний пакет задач будет неудачным, когда слишком много строк с ошибочными данными больше, чем
max_filter_ratio. Это немного отличается от эффектаmax_error_number. - Неквалифицированные записи данных не включают записи данных, которые отфильтрованы предложением WHERE.
- Этот параметр вместе с предыдущим параметром
max_error_numberконтролирует максимальное количество записей с ошибочными данными. Когда этот параметр не установлен (это работает так же, как установкаmax_filter_ratio = 1), вступает в силу значение параметраmax_error_number. Когда этот параметр установлен, задача загрузки приостанавливается, как только количество записей с ошибочными данными достигает порога, установленного этим параметром или параметромmax_error_number.
strict_mode
Обязательно: Нет
Описание: Указывает, следует ли включать строгий режим. Допустимые значения: true и false. Значение по умолчанию: false. Когда строгий режим включен, если значение для столбца в загружаемых данных равно NULL, но целевая таблица не допускает значение NULL для этого столбца, строка данных будет отфильтрована.
log_rejected_record_num
Обязательно: Нет
Описание: Указывает максимальное количество неквалифицированных строк данных, которые могут быть залогированы. Этот параметр поддерживается начиная с версии v1.5.2. Допустимые значения: 0, -1 и любое ненулевое положительное целое число. Значение по умолчанию: 0.
- Значение
0указывает, что отфильтрованные строки данных не будут залогированы. - Значение
-1указывает, что все отфильтрованные строки данных будут залогированы. - Ненулевое положительное целое число, такое как
n, указывает, что доnотфильтрованных строк данных могут быть залогированы на каждом BE.
REJECTED_RECORD_PATH из запроса к представлению information_schema.loads.
timezone
Обязательно: Нет
Описание: Часовой пояс, используемый задачей загрузки. Значение по умолчанию: Asia/Shanghai. Значение этого параметра влияет на результаты, возвращаемые такими функциями, как strftime(), alignment_timestamp() и from_unixtime(). Часовой пояс, указанный этим параметром, является часовым поясом на уровне сеанса. Для получения дополнительной информации см. Настройка часового пояса.
partial_update
Обязательно: Нет
Описание: Использовать ли частичные обновления. Допустимые значения: TRUE и FALSE. Значение по умолчанию: FALSE, указывающее на отключение этой функции.
merge_condition
Обязательно: Нет
Описание: Указывает имя столбца, который вы хотите использовать в качестве условия для определения, следует ли обновлять данные. Данные будут обновлены только в том случае, если значение данных, которые должны быть загружены в этот столбец, больше или равно текущему значению этого столбца. ПРИМЕЧАНИЕ
Только таблицы Primary Key поддерживают условные обновления. Столбец, который вы указываете, не может быть столбцом первичного ключа.
format
Обязательно: Нет
Описание: Формат загружаемых данных. Допустимые значения: CSV, JSON и Avro (поддерживается с версии v1.5.2). Значение по умолчанию: CSV.
trim_space
Обязательно: Нет
Описание: Указывает, следует ли удалять пробелы до и после разделителей столбцов из файла данных, когда файл данных в формате CSV. Тип: BOOLEAN. Значение по умолчанию: false.
Для некоторых баз данных при экспорте данных в виде файла данных в формате CSV к разделителям столбцов добавляются пробелы. Такие пробелы называются ведущими или завершающими пробелами в зависимости от их расположения. Установив параметр trim_space, вы можете разрешить Selena удалять такие ненужные пробелы во время загрузки данных.
Обратите внимание, что Selena не удаляет пробелы (включая ведущие и завершающие пробелы) внутри поля, заключенного в пару символов, указанных в enclose. Например, следующие значения полей используют вертикальную черту (|) в качестве разделителя столбцов и двойные кавычки (") в качестве символа, указанного в enclose: | "Love Selena" |. Если вы установите trim_space в true, Selena обработает предыдущие значения полей как |"Love Selena"|.
enclose
Обязательно: Нет
Описание: Указывает символ, который используется для обертывания значений полей в файле данных в соответствии с RFC4180, когда файл данных в формате CSV. Тип: однобайтовый символ. Значение по умолчанию: NONE. Наиболее распространенными символами являются одинарная кавычка (') и двойная кавычка (").
Все специальные символы (включая разделители строк и разделители столбцов), обернутые с помощью символа, указанного в enclose, считаются обычными символами. Selena может делать больше, чем RFC4180, поскольку позволяет вам указать любой однобайтовый символ в качестве символа, указанного в enclose.
Если значение поля содержит символ, указанный в enclose, вы можете использовать тот же символ для экранирования этого символа, указанного в enclose. Например, вы устанавливаете enclose в ", и значение поля — a "quoted" c. В этом случае вы можете ввести значение поля как "a ""quoted"" c" в файл данных.
escape
Обязательно: Нет
Описание: Указывает символ, который используется для экранирования различных специальных символов, таких как разделители строк, разделители столбцов, escape-символы и символы, указанные в enclose, которые затем рассматриваются Selena как обычные символы и обрабатываются как часть значений полей, в которых они находятся. Тип: однобайтовый символ. Значение по умолчанию: NONE. Наиболее распространенным символом является косая черта (\), которая должна быть записана как двойная косая черта (\\) в операторах SQL.
ПРИМЕЧАНИЕ
Символ, указанный в escape, применяется как внутри, так и снаружи каждой пары символов, указанных в enclose.
Два примера следующие:
- Когда вы устанавливаете
encloseв"иescapeв\, Selena обрабатывает"say \"Hello world\""вsay "Hello world". - Предположим, что разделитель столбцов — запятая (
,). Когда вы устанавливаетеescapeв\, Selena обрабатываетa, b\, cв два отдельных значения полей:aиb, c.
strip_outer_array
Обязательно: Нет
Описание: Указывает, следует ли удалять внешнюю структуру массива данных в формате JSON. Допустимые значения: true и false. Значение по умолчанию: false. В реальных бизнес-сценариях данные в формате JSON могут иметь внешнюю структуру массива, обозначенную парой квадратных скобок []. В этой ситуации мы рекомендуем установить этот параметр в true, чтобы Selena удалила внешние квадратные скобки [] и загрузила каждый внутренний массив как отдельную запись данных. Если вы установите этот параметр в false, Selena обрабатывает все данные в формате JSON в один массив и загружает массив как одну запись данных. Используйте данные в формате JSON [{"category" : 1, "author" : 2}, {"category" : 3, "author" : 4} ] в качестве примера. Если вы установите этот параметр в true, {"category" : 1, "author" : 2} и {"category" : 3, "author" : 4} обрабатываются как две отдельные записи данных и загружаются в две строки данных Selena.
jsonpaths
Обязательно: Нет
Описание: Имена полей, которые вы хотите загрузить из данных в формате JSON. Значение этого параметра является допустимым выражением JsonPath. Для получения дополнительной информации см. Таблица Selena содержит производные столбцы, значения которых генерируются с помощью выражений) в этой теме.
json_root
Обязательно: Нет
Описание: Корневой элемент данных в формате JSON для загрузки. Selena извлекает элементы корневого узла через json_root для обработки. По умолчанию значение этого параметра пустое, что указывает на то, что все данные в формате JSON будут загружены. Для получения дополнительной информации см. Указание корневого элемента данных в формате JSON для загрузки в этой теме.
task_consume_second
Обязательно: Нет
Описание: Максимальное время для каждой задачи Routine Load в указанной задаче Routine Load для обработки данных. Единица: секунды. В отличие от динамических параметров FE routine_load_task_consume_second (который применяется ко всем задачам Routine Load в кластере), этот параметр специфичен для отдельной задачи Routine Load, что более гибко. Этот параметр поддерживается начиная с версии v1.5.2.
- Когда
task_consume_secondиtask_timeout_secondне настроены, Selena использует динамические параметры FEroutine_load_task_consume_secondиroutine_load_task_timeout_secondдля управления поведением загрузки. - Когда настроен только
task_consume_second, значение по умолчанию дляtask_timeout_secondвычисляется какtask_consume_second* 4. - Когда настроен только
task_timeout_second, значение по умолчанию дляtask_consume_secondвычисляется какtask_timeout_second/4.
task_timeout_second
Обязательно: Нет
Описание: Длительность тайм-аута для каждой задачи Routine Load в указанной задаче Routine Load. Единица: секунды. В отличие от динамического параметра FE routine_load_task_timeout_second (который применяется ко всем задачам Routine Load в кластере), этот параметр специфичен для отдельной задачи Routine Load, что более гибко. Этот параметр поддерживается начиная с версии v1.5.2.
- Когда
task_consume_secondиtask_timeout_secondне настроены, Selena использует динамические параметры FEroutine_load_task_consume_secondиroutine_load_task_timeout_secondдля управления поведением загрузки. - Когда настроен только
task_timeout_second, значение по умолчанию дляtask_consume_secondвычисляется какtask_timeout_second/4. - Когда настроен только
task_consume_second, значение по умолчанию дляtask_timeout_secondвычисляется какtask_consume_second* 4.
pause_on_fatal_parse_error
Обязательно: Нет
Описание: Указывает, следует ли автоматически приостанавливать задачу при обнаружении невосстановимых ошибок обработки данных. Допустимые значения: true и false. Значение по умолчанию: false. Этот параметр поддерживается начиная с версии v1.5.2/v1.5.2.
Такие ошибки обработки обычно вызваны недопустимыми форматами данных, такими как:
- Импорт массива JSON без установки
strip_outer_array. - Импорт данных JSON, но сообщение Kafka содержит недопустимый JSON, например
abcd.
data_source, data_source_properties
Обязательно. Источник данных и соответствующие свойства.
FROM <data_source>
("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
data_source
Обязательно. Источник данных, которые вы хотите загрузить. Допустимое значение: KAFKA.
data_source_properties
Свойства источника данных.
kafka_broker_list
Обязательно: Да
Описание: Информация о подключении к брокеру Kafka. Формат: <kafka_broker_ip>:<broker_ port>. Несколько брокеров разделяются запятыми (,). Порт по умолчанию, используемый брокерами Kafka, — 9092. Пример:"kafka_broker_list" = ""xxx.xx.xxx.xx:9092,xxx.xx.xxx.xx:9092".
kafka_topic
Обязательно: Да
Описание: Топик Kafka, который нужно обработать. Задача Routine Load может обрабатывать сообщения только из одного топика.
kafka_partitions
Обязательно: Нет
Описание: Партиции Kafka, которые нужно обработать, например, "kafka_partitions" = "0, 1, 2, 3". Если это свойство не указано, все партиции обрабатываются по умолчанию.
kafka_offsets
Обязательно: Нет
Описание: Начальный offset, с которого нужно начать обработку данных в партиции Kafka, указанной в kafka_partitions. Если это свойство не указано, задача Routine Load обрабатывает данные, начиная с последних offset в kafka_partitions. Допустимые значения:
- Конкретный offset: обрабатывать данные, начиная с конкретного offset.
OFFSET_BEGINNING: обрабатывать данные, начиная с самого раннего возможного offset.OFFSET_END: обрабатывать данные, начиная с последнего offset.
"kafka_offsets" = "1000, OFFSET_BEGINNING, OFFSET_END, 2000".
property.kafka_default_offsets
Обязательно: Нет
Описание: Начальный offset по умолчанию для всех партиций consumer. Поддерживаемые значения для этого свойства такие же, как и для свойства kafka_offsets.
confluent.schema.registry.url
Обязательно: Нет
Описание: URL Schema Registry, где зарегистрирована схема Avro. Selena получает схему Avro, используя этот URL. Формат следующий:confluent.schema.registry.url = http[s]://[<schema-registry-api-key>:<schema-registry-api-secret>@]<hostname or ip address>[:<port>]
Дополнительные свойства, связанные с источником данных
Вы можете указать дополнительные свойства, связанные с источником данных (Kafka), которые эквивалентны использованию командной строки Kafka --property. Для получения дополнительных поддерживаемых свойств см. свойства для consumer-клиента Kafka в свойствах конфигурации librdkafka.
Если значение свойства является именем файла, добавьте ключевое слово FILE: перед именем файла. Информацию о том, как создать файл, см. в разделе CREATE FILE.
- Указать начальный offset по умолчанию для всех партиций, которые нужно обработать
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
- Указать идентификатор группы consumer, используемой задачей Routine Load
"property.group.id" = "group_id_0"
Если property.group.id не указан, Selena генерирует случайное значение на основе имени задачи Routine Load в формате {job_name}_{random uuid}, например, simple_job_0a64fe25-3983-44b2-a4d8-f52d3af4c3e8.
-
Указать протокол безопасности и соответствующие параметры, используемые BE для д оступа к Kafka
Протокол безопасности может быть указан как
plaintext(по умолчанию),ssl,sasl_plaintextилиsasl_ssl. И вам нужно настроить связанные параметры в соответствии с указанным протоколом безопасности.Когда протокол безопасности установлен на
sasl_plaintextилиsasl_ssl, поддерживаются следующие механизмы аутентификации SASL:- PLAIN
- SCRAM-SHA-256 и SCRAM-SHA-512
- OAUTHBEARER
- GSSAPI (Kerberos)
Примеры:
-
Доступ к Kafka с использованием протокола безопасности SSL:
"property.security.protocol" = "ssl", -- Указать протокол безопасности как SSL.
"property.ssl.ca.location" = "FILE:ca-cert", -- Путь к файлу или каталогу с сертификатом CA для проверки ключа брокера Kafka.
-- Если на сервере Kafka включена аутентификация клиента, также требуются следующие три параметра:
"property.ssl.certificate.location" = "FILE:client.pem", -- Путь к публичному ключу клиента, используемому для аутентификации.
"property.ssl.key.location" = "FILE:client.key", -- Путь к приватному ключу клиента, используемому для аутентификации.
"property.ssl.key.password" = "xxxxxx" -- Пароль для приватного ключа клиента. -
Доступ к Kafka с использованием протокола безопасности SASL_PLAINTEXT и механизма аутентификации SASL/PLAIN:
"property.security.protocol" = "SASL_PLAINTEXT", -- Указать проток ол безопасности как SASL_PLAINTEXT.
"property.sasl.mechanism" = "PLAIN", -- Указать механизм SASL как PLAIN, который является простым механизмом аутентификации username/password.
"property.sasl.username" = "admin", -- Имя пользователя SASL.
"property.sasl.password" = "xxxxxx" -- Пароль SASL. -
Дос туп к Kafka с использованием протокола безопасности SASL_PLAINTEXT и механизма аутентификации SASL/GSSAPI (Kerberos):
"property.security.protocol" = "SASL_PLAINTEXT", -- Указать протокол безопасности как SASL_PLAINTEXT.
"property.sasl.mechanism" = "GSSAPI", -- Указать механизм аутентификации SASL как GSSAPI. Значение по умолчанию — GSSAPI.
"property.sasl.kerberos.service.name" = "kafka", -- Имя службы брокера. Значение по умолчанию — kafka.
"property.sasl.kerberos.keytab" = "/home/selena/selena.keytab", -- Расположение keytab клиента.
"property.sasl.kerberos.principal" = "selena@YOUR.COM" -- Принципал Kerberos.примечание-
Начиная с версии Selena v1.5.2, поддерживается аутентификация SASL/GSSAPI (Kerberos).
-
Модули, связанные с SASL, должны быть установлены на машине BE.
# Debian/Ubuntu:
sudo apt-get install libsasl2-modules-gssapi-mit libsasl2-dev
# CentOS/Redhat:
sudo yum install cyrus-sasl-gssapi cyrus-sasl-devel
-
Элементы конфигурации FE и BE
Для элементов конфигурации FE и BE, связанных с Routine Load, см. элементы конфигурации.
Отображение столбцов
Настройка отображения столбцов для загрузки данных в формате CSV
Если столбцы данных в формате CSV могут быть отображены один к одному последовательно на столбцы таблицы Selena, вам не нужно настраивать отображение столбцов между данными и таблицей Selena.
Если столбцы данных в формате CSV не могут быть отображены один к одному последовательно на столбцы таблицы Selena, вам нужно использовать параметр columns для настройки отображения столбцов между файлом данных и таблицей Selena. Это включает следующие два случая использования:
-
Одинаковое количество столбцов, но разная последовательность столбцов. Также данные из файла данных не нужно вычислять функциями перед загрузкой в соответствующие столбцы таблицы Selena.
-
В параметре
columnsвам нужно указать имена столбцов таблицы Selena в той же последовательности, что и столбцы файла данных. -
Например, таблица Selena состоит из трех столбцов, которые идут последовательно
col1,col2иcol3, а файл данных также состоит из трех столбцов, которые могут быть отображены на столбцы таблицы Selena последовательноcol3,col2иcol1. В этом случае вам нужно указать"columns: col3, col2, col1".
-
-
Разное количество столбцов и разная последовательность столбцов. Также данные из файла данных нужно вычислить функциями перед загрузкой в соответствующие столбцы таблицы Selena.
В параметре
columnsвам нужно указать имена столбцов таблицы Selena в той же последовательности, что и столбцы файла данных, и указать функции, которые вы хотите использовать для вычисления данных. Два примера следующие:- Таблица Selena состоит из трех столбцов, которые идут последовательно
col1,col2иcol3. Файл данных состоит из четырех столбцов, среди которых первые три столбца могут быть отображены последовательно на столбцы таблицы Selenacol1,col2иcol3, а четвертый столбец не может быть отображен ни на один из столбцов таблицы Selena. В этом случае вам нужно временно указать имя для четвертого столбца файла данных, и временное имя должно отличаться от любого из имен столбцов таблицы Selena. Например, вы можете указать"columns: col1, col2, col3, temp", в котором четвертый столбец файла данных временно названtemp. - Таблица Selena состоит из трех столбцов, которые идут последовательно
year,monthиday. Файл данных состоит только из одного столбца, который содержит значения даты и времени в форматеyyyy-mm-dd hh:mm:ss. В этом случае вы можете указать"columns: col, year = year(col), month=month(col), day=day(col)", в которомcol— это временное имя столбца файла данных, а функцииyear = year(col),month=month(col)иday=day(col)используются для извлечения данных из столбца файла данныхcolи загрузки данных в соответствующие столбцы таблицы Selena. Например,year = year(col)используется для извлечения данныхyyyyиз столбца файла данныхcolи загрузки данных в столбец таблицы Selenayear.
- Таблица Selena состоит из трех столбцов, которые идут последовательно
Для получения дополнительных примеров см. Настройка отображения столбцов.
Настройка отображения столбцов для загрузки данных в формате JSON или Avro
Начиная с версии v1.5.2, Selena поддерживает загрузку данных Avro с использованием Routine Load. При загрузке данных JSON или Avro конфигурация для отображения и преобразования столбцов одинакова. Поэтому в этом разделе данные JSON используются в качестве примера для представления конфигурации.
Если ключи данных в формате JSON имеют те же имена, что и столбцы таблицы Selena, вы можете загрузить данные в формате JSON, используя простой режим. В простом режиме вам не нужно указывать параметр jsonpaths. Этот режим т ребует, чтобы данные в формате JSON были объектом, обозначенным фигурными скобками {}, например {"category": 1, "author": 2, "price": "3"}. В этом примере category, author и price — это имена ключей, и эти ключи могут быть отображены один к одному по имени на столбцы category, author и price таблицы Selena. Для примеров см. простой режим.
Если ключи данных в формате JSON имеют разные имена, чем столбцы таблицы Selena, вы можете загрузить данные в формате JSON, используя согласованный режим. В согласованном режиме вам нужно использовать параметры jsonpaths и COLUMNS для указания отображения столбцов между данными в формате JSON и таблицей Selena:
- В параметре
jsonpathsукажите ключи JSON в последовательности, как они расположены в данных в формате JSON. - В параметре
COLUMNSукажите отображение между ключами JSON и столбцами таблицы Selena:- Имена столбцов, указанные в параметре
COLUMNS, отображаются один к одному последовательно на данные в формате JSON. - Имена столбцов, указанные в параметре
COLUMNS, отображаются один к одному по имени на столбцы таблицы Selena.
- Имена столбцов, указанные в параметре
Для примеров см. Таблица Selena содержит производные столбцы, значения которых генерируются с помощью выражений.
Примеры
Загрузка данных в формате CSV
В этом разделе используются данные в формате CSV в качестве примера для описания того, как вы можете использовать различные настройки параметров и комбинации для удовлетворения ваших разнообразных требований к загрузке.
Подготовка набора данных
Предположим, вы хотите загрузить данные в формате CSV из топика Kafka с именем ordertest1. Каждое сообщение в наборе данных включает шесть столбцов: идентификатор заказа, дата оплаты, имя клиента, национальность, пол и цена.
2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
2020050802,2020-05-08,Julien Sorel,France,male,893
2020050803,2020-05-08,Dorian Grey,UK,male,1262
2020050901,2020-05-09,Anna Karenina,Russia,female,175
2020051001,2020-05-10,Tess Durbeyfield,US,female,986
2020051101,2020-05-11,Edogawa Conan,japan,male,8924
Создание таблицы
Согласно столбцам данных в формате CSV, создайте таблицу с именем example_tbl1 в базе данных example_db.
CREATE TABLE example_db.example_tbl1 (
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Payment date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Nationality",
`gender` varchar(26) NULL COMMENT "Gender",
`price` double NULL COMMENT "Price")
DUPLICATE KEY (order_id,pay_dt)
DISTRIBUTED BY HASH(`order_id`);
Обработка данных, начиная с указанных offset для указанных партиций
Если задача Routine Load должна обрабатывать данные, начиная с указанных партиций и offset, вам нужно настроить параметры kafka_partitions и kafka_offsets.
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1",
"kafka_partitions" ="0,1,2,3,4", -- партиции для обработки
"kafka_offsets" = "1000, OFFSET_BEGINNING, OFFSET_END, 2000" -- соответствующие начальные offset
);
Повышение производительности загрузки путем увеличения параллелизма задач
Для повышения производительности загрузки и избежания накопительной о бработки вы можете увеличить параллелизм задач, увеличив значение desired_concurrent_number при создании задачи Routine Load. Параллелизм задач позволяет разделить одну задачу Routine Load на как можно больше параллельных задач.
Обратите внимание, что фактический параллелизм задачи определяется минимальным значением среди следующих нескольких параметров:
min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num)
Максимальный фактический параллелизм задачи — это либо количество активных BE узлов, либо количество партиций для обработки.
Поэтому, когда количество активных BE узлов и количество партиций для обработки больше, чем значения двух других параметров max_routine_load_task_concurrent_num и desired_concurrent_number, вы можете увеличить значения двух других параметров для увеличения фактического параллелизма задачи.
Предположим, что количество партиций для обработки равно 7, количество активных BE узлов равно 5, а max_routine_load_task_concurrent_num — значение по умолчанию 5. Если вы хотите увеличить фактический параллелизм задачи, вы можете установить desired_concurrent_number в 5 (значение по умолчанию — 3). В этом случае фактический параллелизм задачи min(5,7,5,5) настроен на 5.
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"desired_concurrent_number" = "5" -- установить значение desired_concurrent_number в 5
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Настройка отображения столбцов
Если последовательность столбцов в данных в формате CSV не согласуется со столбцами в целевой таблице, предположим, что пятый столбец в данных в формате CSV не нужно импортировать в целевую таблицу, вам нужно указать отображение столбцов между данными в формате CSV и целевой таблицей с помощью параметра COLUMNS.
Целевая база данных и таблица
Создайте целевую таблицу example_tbl2 в целевой базе данных example_db в соответствии со столбцами в данных в формате CSV. В этом сценарии вам нужно создать пять столбцов, соответствующих пяти столбцам в данных в формате CSV, за исключением пятого столбца, который хранит пол.
CREATE TABLE example_db.example_tbl2 (
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Payment date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Nationality",
`price` double NULL COMMENT "Price"
)
DUPLICATE KEY (order_id,pay_dt)
DISTRIBUTED BY HASH(order_id);
Задача Routine Load
В этом примере, поскольку пятый столбец в данных в формате CSV не нужно загружать в целевую таблицу, пятый столбец временно назван temp_gender в COLUMNS, а другие столбцы напрямую отображаются на таблицу example_tbl2.
CREATE ROUTINE LOAD example_db.example_tbl2_ordertest1 ON example_tbl2
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Установка условий фильтрации
Если вы хотите загрузить только данные, которые соответствуют определенным условиям, вы можете установить условия фильтрации в предложении WHERE, например, price > 100.
CREATE ROUTINE LOAD example_db.example_tbl2_ordertest1 ON example_tbl2
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price),
WHERE price > 100 -- установить условие фильтрации
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Включение строгого режима для фильтрации строк со значениями NULL
В PROPERTIES вы можете установить "strict_mode" = "true", что означает, что задача Routine Load находится в строгом режиме. Если в исходном столбце есть значение NULL, но столбец целевой таблицы Selena не допускает значения NULL, строка, которая содержит значение NULL в исходном столбце, будет отфильтрована.
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"strict_mode" = "true" -- включить строгий режим
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Установка допустимой погрешности
Если ваш бизнес-сценарий имеет низкую допустимость к неквалифицированным данным, вам нужно установить окно обнаружения ошибок и максимальное количество строк с ошибочными данными, настроив параметры max_batch_rows и max_error_number. Когда количество строк с ошибочными данными в окне обнаружения ошибок превышает значение max_error_number, задача Routine Load приостанавливается.
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"max_batch_rows" = "100000",-- Значение max_batch_rows, умноженное на 10, равно окну обнаружения ошибок.
"max_error_number" = "100" -- Максимальное количество строк с ошибочными данными, разрешенное в окне обнаружения ошибок.
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Указание протокола безопасности как SSL и настройка связанных параметров
Если вам нужно указать протокол безопасности как SSL, используемый BE для доступа к Kafka, вам нужно настроить "property.security.protocol" = "ssl" и связанные параметры.
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1",
-- Указать протокол безопасности как SSL.
"property.security.protocol" = "ssl",
-- Располо жение сертификата CA.
"property.ssl.ca.location" = "FILE:ca-cert",
-- Если аутентификация включена для клиентов Kafka, вам нужно настроить следующие свойства:
-- Расположение публичного ключа клиента Kafka.
"property.ssl.certificate.location" = "FILE:client.pem",
-- Расположение приватного ключа клиента Kafka.
"property.ssl.key.location" = "FILE:client.key",
-- Пароль к приватному ключу клиента Kafka.
"property.ssl.key.password" = "abcdefg"
);
Установка trim_space, enclose и escape
Предположим, вы хотите загрузить данные в формате CSV из топика Kafka с именем test_csv. Каждое сообщение в наборе данных включает шесть столбцов: идентификатор заказа, дата оплаты, имя клиента, национальность, пол и цена.
"2020050802" , "2020-05-08" , "Johann Georg Faust" , "Deutschland" , "male" , "895"
"2020050802" , "2020-05-08" , "Julien Sorel" , "France" , "male" , "893"
"2020050803" , "2020-05-08" , "Dorian Grey\,Lord Henry" , "UK" , "male" , "1262"
"2020050901" , "2020-05-09" , "Anna Karenina" , "Russia" , "female" , "175"
"2020051001" , "2020-05-10" , "Tess Durbeyfield" , "US" , "female" , "986"
"2020051101" , "2020-05-11" , "Edogawa Conan" , "japan" , "male" , "8924"
Если вы хотите загрузить все данные из топика Kafka test_csv в example_tbl1 с намерением удалить пробелы до и после разделителей столбцов и установить enclose в " и escape в \, выполните следующую команду:
CREATE ROUTINE LOAD example_db.example_tbl1_test_csv ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"trim_space"="true",
"enclose"="\"",
"escape"="\\",
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic"="test_csv",
"property.kafka_default_offsets"="OFFSET_BEGINNING"
);
Загрузка данных в формате JSON
Имена столбцов таблицы Selena согласованы с именами ключей JSON
Подготовка набора данных
Например, следующие данные в формате JSON существуют в топике Kafka ordertest2.
{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875}
{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895}
{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}
Каждый объект JSON должен находиться в одном сообщении Kafka. В противном случае возникает ошибка, указывающая на сбой обработки данных в формате JSON.
Целевая база данных и таблица
Создайте таблицу example_tbl3 в целевой базе данных example_db в кластере Selena. Имена столбцов согласованы с именами ключей в данных в формате JSON.
CREATE TABLE example_db.example_tbl3 (
commodity_id varchar(26) NULL,
customer_name varchar(26) NULL,
country varchar(26) NULL,
pay_time bigint(20) NULL,
price double SUM NULL COMMENT "Price")
AGGREGATE KEY(commodity_id,customer_name,country,pay_time)
DISTRIBUTED BY HASH(commodity_id);
Задача Routine Load
Вы можете использовать простой режим для задачи Routine Load. То есть вам не нужно указывать параметры jsonpaths и COLUMNS при создании задачи Routine Load. Selena извлекает ключи данных в формате JSON в топике ordertest2 кластера Kafka в соответствии с именами столбцов целевой таблицы example_tbl3 и загружает данные в формате JSON в целевую таблицу.
CREATE ROUTINE LOAD example_db.example_tbl3_ordertest2 ON example_tbl3
PROPERTIES
(
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2"
);
- Если внешний слой данных в формате JSON является структурой массива, вам нужно установить
"strip_outer_array"="true"вPROPERTIES, чтобы удалить внешнюю структуру массива. Кроме того, когда вам нужно указатьjsonpaths, корневой элемент всех данных в формате JSON — это сплющенный объект JSON, потому что внешняя структура массива данных в формате JSON удалена. - Вы можете использовать
json_rootдля указания корневого элемента данных в формате JSON.