Перейти к основному содержимому

Преобразование данных при загрузке

Selena поддерживает преобразование данных при загрузке.

Эта функция поддерживает Stream Load, Broker Load и Routine Load, но не поддерживает Spark Load.

Вы можете загружать данные в таблицы Selena только как пользователь, имеющий привилегию INSERT на эти таблицы Selena. Если у вас нет привилегии INSERT, следуйте инструкциям в GRANT, чтобы предоставить привилегию INSERT пользователю, которого вы используете для подключения к вашему кластеру Selena. Синтаксис: GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}.

В этой теме используются данные CSV в качестве примера для описания того, как извлекать и преобразовывать данные при загрузке. Поддерживаемые форматы файлов данных различаются в зависимости от выбранного метода загрузки.

ПРИМЕЧАНИЕ

Для данных CSV вы можете использовать строку UTF-8, такую как запятая (,), табуляция или вертикальная черта (|), длина которой не превышает 50 байт, в качестве разделителя текста.

Сценарии

При загрузке файла данных в таблицу Selena данные файла могут не полностью соответствовать данным таблицы Selena. В такой ситуации вам не нужно извлекать или преобразовывать данные перед загрузкой их в таблицу Selena. Selena может помочь вам извлечь и преобразовать данные во время загрузки:

  • Пропустить столбцы, которые не нужно загружать.

    Вы можете пропустить столбцы, которые не нужно загружать. Кроме того, если столбцы файла данных расположены в другом порядке, чем столбцы таблицы Selena, вы можете создать сопоставление столбцов между файлом данных и таблицей Selena.

  • Отфильтровать строки, которые вы не хотите загружать.

    Вы можете указать условия фильтрации, на основе которых Selena отфильтрует строки, которые вы не хотите загружать.

  • Создать новые столбцы из исходных столбцов.

    Сгенерированные столбцы — это специальные столбцы, которые вычисляются из исходных столбцов файла данных. Вы можете сопоставить сгенерированные столбцы со столбцами таблицы Selena.

  • Извлечь значения полей разделов из пути к файлу.

    Если файл данных создан из Apache Hive™, вы можете извлечь значения полей разделов из пути к файлу.

Примеры данных

  1. Создайте файлы данных в вашей локальной файловой системе.

    a. Создайте файл данных с именем file1.csv. Файл состоит из четырех столбцов, которые представляют ID пользователя, пол пользователя, дату события и тип события по порядку.

    354,female,2020-05-20,1
    465,male,2020-05-21,2
    576,female,2020-05-22,1
    687,male,2020-05-23,2

    b. Создайте файл данных с именем file2.csv. Файл состоит только из одного столбца, который представляет дату.

    2020-05-20
    2020-05-21
    2020-05-22
    2020-05-23
  2. Создайте таблицы в вашей базе данных Selena test_db.

    ПРИМЕЧАНИЕ

    Начиная с версии 1.5.0, Selena может автоматически устанавливать количество корзин (BUCKETS) при создании таблицы или добавлении раздела. Вам больше не нужно вручную устанавливать количество корзин. Для получения подробной информации см. установка количества корзин.

    a. Создайте таблицу с именем table1, которая состоит из трех столбцов: event_date, event_type и user_id.

    MySQL [test_db]> CREATE TABLE table1
    (
    `event_date` DATE COMMENT "event date",
    `event_type` TINYINT COMMENT "event type",
    `user_id` BIGINT COMMENT "user ID"
    )
    DISTRIBUTED BY HASH(user_id);

    b. Создайте таблицу с именем table2, которая состоит из четырех столбцов: date, year, month и day.

    MySQL [test_db]> CREATE TABLE table2
    (
    `date` DATE COMMENT "date",
    `year` INT COMMENT "year",
    `month` TINYINT COMMENT "month",
    `day` TINYINT COMMENT "day"
    )
    DISTRIBUTED BY HASH(date);
  3. Загрузите file1.csv и file2.csv в путь /user/starrocks/data/input/ вашего кластера HDFS, опубликуйте данные file1.csv в topic1 вашего кластера Kafka и опубликуйте данные file2.csv в topic2 вашего кластера Kafka.

Пропуск столбцов, которые не нужно загружать

Файл данных, который вы хотите загрузить в таблицу Selena, может содержать некоторые столбцы, которые не могут быть сопоставлены ни с одним столбцом таблицы Selena. В такой ситуации Selena поддерживает загрузку только тех столбцов, которые могут быть сопоставлены из файла данных со столбцами таблицы Selena.

Эта функция поддерживает загрузку данных из следующих источников данных:

  • Локальная файловая система

  • HDFS и облачное хранилище

    ПРИМЕЧАНИЕ

    В этом разделе HDFS используется в качестве примера.

  • Kafka

В большинстве случаев столбцы CSV-файла не имеют имен. Для некоторых CSV-файлов первая строка состоит из имен столбцов, но Selena обрабатывает содержимое первой строки как обычные данные, а не как имена столбцов. Поэтому при загрузке CSV-файла вы должны временно назвать столбцы CSV-файла по порядку в операторе или команде создания задания. Эти временно названные столбцы сопоставляются по имени со столбцами таблицы Selena. Обратите внимание на следующие моменты относительно столбцов файла данных:

  • Данные столбцов, которые могут быть сопоставлены и временно названы с использованием имен столбцов в таблице Selena, загружаются напрямую.

  • Столбцы, которые не могут быть сопоставлены со столбцами таблицы Selena, игнорируются, данные этих столбцов не загружаются.

  • Если некоторые столбцы могут быть сопоставлены со столбцами таблицы Selena, но не названы временно в операторе или команде создания задания, задание загрузки сообщает об ошибках.

В этом разделе используются file1.csv и table1 в качестве примера. Четыре столбца file1.csv временно названы как user_id, user_gender, event_date и event_type по порядку. Среди временно названных столбцов file1.csv, user_id, event_date и event_type могут быть сопоставлены с конкретными столбцами table1, тогда как user_gender не может быть сопоставлен ни с одним столбцом table1. Поэтому user_id, event_date и event_type загружаются в table1, но user_gender — нет.

Загрузка данных

Загрузка данных из локальной файловой системы

Если file1.csv хранится в вашей локальной файловой системе, выполните следующую команду для создания задания Stream Load:

curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "column_separator:," \
-H "columns: user_id, user_gender, event_date, event_type" \
-T file1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load

ПРИМЕЧАНИЕ

Если вы выбираете Stream Load, вы должны использовать параметр columns для временного именования столбцов файла данных, чтобы создать сопоставление столбцов между файлом данных и таблицей Selena.

Для подробного описания синтаксиса и параметров см. STREAM LOAD.

Загрузка данных из кластера HDFS

Если file1.csv хранится в вашем кластере HDFS, выполните следующий оператор для создания задания Broker Load:

LOAD LABEL test_db.label1
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/file1.csv")
INTO TABLE `table1`
FORMAT AS "csv"
COLUMNS TERMINATED BY ","
(user_id, user_gender, event_date, event_type)
)
WITH BROKER;

ПРИМЕЧАНИЕ

Если вы выбираете Broker Load, вы должны использовать параметр column_list для временного именования столбцов файла данных, чтобы создать сопоставление столбцов между файлом данных и таблицей Selena.

Для подробного описания синтаксиса и параметров см. BROKER LOAD.

Загрузка данных из кластера Kafka

Если данные file1.csv опубликованы в topic1 вашего кластера Kafka, выполните следующий оператор для создания задания Routine Load:

CREATE ROUTINE LOAD test_db.table101 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS(user_id, user_gender, event_date, event_type)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "topic1",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

ПРИМЕЧАНИЕ

Если вы выбираете Routine Load, вы должны использовать параметр COLUMNS для временного именования столбцов файла данных, чтобы создать сопоставление столбцов между файлом данных и таблицей Selena.

Для подробного описания синтаксиса и параметров см. CREATE ROUTINE LOAD.

Запрос данных

После завершения загрузки данных из вашей локальной файловой системы, кластера HDFS или кластера Kafka запросите данные table1, чтобы убедиться, что загрузка прошла успешно:

MySQL [test_db]> SELECT * FROM table1;
+------------+------------+---------+
| event_date | event_type | user_id |
+------------+------------+---------+
| 2020-05-22 | 1 | 576 |
| 2020-05-20 | 1 | 354 |
| 2020-05-21 | 2 | 465 |
| 2020-05-23 | 2 | 687 |
+------------+------------+---------+
4 rows in set (0.01 sec)

Фильтрация строк, которые вы не хотите загружать

При загрузке файла данных в таблицу Selena вы можете не захотеть загружать определенные строки файла данных. В такой ситуации вы можете использовать предложение WHERE для указания строк, которые вы хотите загрузить. Selena отфильтрует все строки, которые не соответствуют условиям фильтрации, указанным в предложении WHERE.

Эта функция поддерживает загрузку данных из следующих источников данных:

  • Локальная файловая система

  • HDFS и облачное хранилище

    ПРИМЕЧАНИЕ

    В этом разделе HDFS используется в качестве примера.

  • Kafka

В этом разделе используются file1.csv и table1 в качестве примера. Если вы хотите загрузить только строки с типом события 1 из file1.csv в table1, вы можете использовать предложение WHERE для указания условия фильтрации event_type = 1.

Загрузка данных

Загрузка данных из локальной файловой системы

Если file1.csv хранится в вашей локальной файловой системе, выполните следующую команду для создания задания Stream Load:

curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "column_separator:," \
-H "columns: user_id, user_gender, event_date, event_type" \
-H "where: event_type=1" \
-T file1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load

Для подробного описания синтаксиса и параметров см. STREAM LOAD.

Загрузка данных из кластера HDFS

Если file1.csv хранится в вашем кластере HDFS, выполните следующий оператор для создания задания Broker Load:

LOAD LABEL test_db.label2
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/file1.csv")
INTO TABLE `table1`
FORMAT AS "csv"
COLUMNS TERMINATED BY ","
(user_id, user_gender, event_date, event_type)
WHERE event_type = 1
)
WITH BROKER;

Для подробного описания синтаксиса и параметров см. BROKER LOAD.

Загрузка данных из кластера Kafka

Если данные file1.csv опубликованы в topic1 вашего кластера Kafka, выполните следующий оператор для создания задания Routine Load:

CREATE ROUTINE LOAD test_db.table102 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS (user_id, user_gender, event_date, event_type),
WHERE event_type = 1
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "topic1",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Для подробного описания синтаксиса и параметров см. CREATE ROUTINE LOAD.

Запрос данных

После завершения загрузки данных из вашей локальной файловой системы, кластера HDFS или кластера Kafka запросите данные table1, чтобы убедиться, что загрузка прошла успешно:

MySQL [test_db]> SELECT * FROM table1;
+------------+------------+---------+
| event_date | event_type | user_id |
+------------+------------+---------+
| 2020-05-20 | 1 | 354 |
| 2020-05-22 | 1 | 576 |
+------------+------------+---------+
2 rows in set (0.01 sec)

Создание новых столбцов из исходных столбцов

При загрузке файла данных в таблицу Selena некоторые данные файла данных могут потребовать преобразований перед тем, как данные могут быть загружены в таблицу Selena. В такой ситуации вы можете использовать функции или выражения в команде или операторе создания задания для реализации преобразований данных.

Эта функция поддерживает загрузку данных из следующих источников данных:

  • Локальная файловая система

  • HDFS и облачное хранилище

    ПРИМЕЧАНИЕ

    В этом разделе HDFS используется в качестве примера.

  • Kafka

В этом разделе используются file2.csv и table2 в качестве примера. file2.csv состоит только из одного столбца, который представляет дату. Вы можете использовать функции year, month и day для извлечения года, месяца и дня в каждой дате из file2.csv и загрузки извлеченных данных в столбцы year, month и day таблицы table2.

Загрузка данных

Загрузка данных из локальной файловой системы

Если file2.csv хранится в вашей локальной файловой системе, выполните следующую команду для создания задания Stream Load:

curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "column_separator:," \
-H "columns:date,year=year(date),month=month(date),day=day(date)" \
-T file2.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table2/_stream_load

ПРИМЕЧАНИЕ

  • В параметре columns вы должны сначала временно назвать все столбцы файла данных, а затем временно назвать новые столбцы, которые вы хотите создать из исходных столбцов файла данных. Как показано в предыдущем примере, единственный столбец file2.csv временно назван как date, а затем вызываются функции year=year(date), month=month(date) и day=day(date) для создания трех новых столбцов, которые временно названы как year, month и day.

  • Stream Load не поддерживает column_name = function(column_name), но поддерживает column_name = function(column_name).

Для подробного описания синтаксиса и параметров см. STREAM LOAD.

Загрузка данных из кластера HDFS

Если file2.csv хранится в вашем кластере HDFS, выполните следующий оператор для создания задания Broker Load:

LOAD LABEL test_db.label3
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/file2.csv")
INTO TABLE `table2`
FORMAT AS "csv"
COLUMNS TERMINATED BY ","
(date)
SET(year=year(date), month=month(date), day=day(date))
)
WITH BROKER;

ПРИМЕЧАНИЕ

Вы должны сначала использовать параметр column_list для временного именования всех столбцов файла данных, а затем использовать предложение SET для временного именования новых столбцов, которые вы хотите создать из исходных столбцов файла данных. Как показано в предыдущем примере, единственный столбец file2.csv временно назван как date в параметре column_list, а затем в предложении SET вызываются функции year=year(date), month=month(date) и day=day(date) для создания трех новых столбцов, которые временно названы как year, month и day.

Для подробного описания синтаксиса и параметров см. BROKER LOAD.

Загрузка данных из кластера Kafka

Если данные file2.csv опубликованы в topic2 вашего кластера Kafka, выполните следующий оператор для создания задания Routine Load:

CREATE ROUTINE LOAD test_db.table201 ON table2
COLUMNS TERMINATED BY ",",
COLUMNS(date,year=year(date),month=month(date),day=day(date))
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "topic2",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

ПРИМЕЧАНИЕ

В параметре COLUMNS вы должны сначала временно назвать все столбцы файла данных, а затем временно назвать новые столбцы, которые вы хотите создать из исходных столбцов файла данных. Как показано в предыдущем примере, единственный столбец file2.csv временно назван как date, а затем вызываются функции year=year(date), month=month(date) и day=day(date) для создания трех новых столбцов, которые временно названы как year, month и day.

Для подробного описания синтаксиса и параметров см. CREATE ROUTINE LOAD.

Запрос данных

После завершения загрузки данных из вашей локальной файловой системы, кластера HDFS или кластера Kafka запросите данные table2, чтобы убедиться, что загрузка прошла успешно:

MySQL [test_db]> SELECT * FROM table2;
+------------+------+-------+------+
| date | year | month | day |
+------------+------+-------+------+
| 2020-05-20 | 2020 | 5 | 20 |
| 2020-05-21 | 2020 | 5 | 21 |
| 2020-05-22 | 2020 | 5 | 22 |
| 2020-05-23 | 2020 | 5 | 23 |
+------------+------+-------+------+
4 rows in set (0.01 sec)

Извлечение значений полей разделов из пути к файлу

Если указанный вами путь к файлу содержит поля разделов, вы можете использовать параметр COLUMNS FROM PATH AS для указания полей разделов, которые вы хотите извлечь из путей к файлам. Поля разделов в путях к файлам эквивалентны столбцам в файлах данных. Параметр COLUMNS FROM PATH AS поддерживается только при загрузке данных из кластера HDFS.

Например, вы хотите загрузить следующие четыре файла данных, созданных из Hive:

/user/starrocks/data/input/date=2020-05-20/data
1,354
/user/starrocks/data/input/date=2020-05-21/data
2,465
/user/starrocks/data/input/date=2020-05-22/data
1,576
/user/starrocks/data/input/date=2020-05-23/data
2,687

Четыре файла данных хранятся в пути /user/starrocks/data/input/ вашего кластера HDFS. Каждый из этих файлов данных разделен по полю раздела date и состоит из двух столбцов, которые представляют тип события и ID пользователя по порядку.

Загрузка данных из кластера HDFS

Выполните следующий оператор для создания задания Broker Load, которое позволяет извлечь значения поля раздела date из пути к файлу /user/starrocks/data/input/ и использовать подстановочный знак (*) для указания того, что вы хотите загрузить все файлы данных в пути к файлу в table1:

LOAD LABEL test_db.label4
(
DATA INFILE("hdfs://<fe_host>:<fe_http_port>/user/starrocks/data/input/date=*/*")
INTO TABLE `table1`
FORMAT AS "csv"
COLUMNS TERMINATED BY ","
(event_type, user_id)
COLUMNS FROM PATH AS (date)
SET(event_date = date)
)
WITH BROKER;

ПРИМЕЧАНИЕ

В предыдущем примере поле раздела date в указанном пути к файлу эквивалентно столбцу event_date таблицы table1. Поэтому вам нужно использовать предложение SET для сопоставления поля раздела date со столбцом event_date. Если поле раздела в указанном пути к файлу имеет то же имя, что и столбец таблицы Selena, вам не нужно использовать предложение SET для создания сопоставления.

Для подробного описания синтаксиса и параметров см. BROKER LOAD.

Запрос данных

После завершения загрузки данных из вашего кластера HDFS запросите данные table1, чтобы убедиться, что загрузка прошла успешно:

MySQL [test_db]> SELECT * FROM table1;
+------------+------------+---------+
| event_date | event_type | user_id |
+------------+------------+---------+
| 2020-05-22 | 1 | 576 |
| 2020-05-20 | 1 | 354 |
| 2020-05-21 | 2 | 465 |
| 2020-05-23 | 2 | 687 |
+------------+------------+---------+
4 rows in set (0.01 sec)