Перейти к основному содержимому
Версия: 2.0.x

Трансформация данных при загрузке

Selena поддерживает трансформацию данных при загрузке.

Эта функция поддерживает Stream Load, Broker Load и Routine Load, но не поддерживает Spark Load.

Вы можете загружать данные в таблицы Selena только как пользователь, имеющий привилегию INSERT на эти таблицы Selena. Если у вас нет привилегии INSERT, следуйте инструкциям в разделе GRANT, чтобы предоставить привилегию INSERT пользователю, которого вы используете для подключения к вашему cluster Selena. Синтаксис: GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}.

В этом разделе используются данные CSV в качестве примера для описания того, как извлекать и трансформировать данные при загрузке. Поддерживаемые форматы файлов данных различаются в зависимости от выбранного метода загрузки.

ПРИМЕЧАНИЕ

Для данных CSV вы можете использовать строку UTF-8, такую как запятая (,), табуляция или вертикальная черта (|), длина которой не превышает 50 байт, в качестве разделителя текста.

Сценарии

При загрузке файла данных в таблицу Selena данные файла данных могут не полностью сопоставляться с данными таблицы Selena. В этой ситуации вам не нужно извлекать или трансформировать данные перед загрузкой их в таблицу Selena. Selena может помочь вам извлечь и трансформировать данные во время загрузки:

  • Пропуск столбцов, которые не нужно загружать.

    Вы можете пропустить столбцы, которые не нужно загружать. Кроме того, если столбцы файла данных расположены в другом порядке, чем столбцы таблицы Selena, вы можете создать сопоставление столбцов между файлом данных и таблицей Selena.

  • Фильтрация строк, которые вы не хотите загружать.

    Вы можете указать условия фильтрации, на основе которых Selena отфильтровывает строки, которые вы не хотите загружать.

  • Генерация новых столбцов из исходных столбцов.

    Сгенерированные столбцы — это специальные столбцы, которые вычисляются из исходных столбцов файла данных. Вы можете сопоставить сгенерированные столбцы со столбцами таблицы Selena.

  • Извлечение значений полей partition из пути к файлу.

    Если файл данных сгенерирован из Apache Hive™, вы можете извлечь значения полей partition из пути к файлу.

Примеры данных

  1. Создайте файлы данных в вашей локальной файловой системе.

    a. Создайте файл данных с именем file1.csv. Файл состоит из четырех столбцов, которые представляют ID пользователя, пол пользователя, дату события и тип события по порядку.

    354,female,2020-05-20,1
    465,male,2020-05-21,2
    576,female,2020-05-22,1
    687,male,2020-05-23,2

    b. Создайте файл данных с именем file2.csv. Файл состоит только из одного столбца, который представляет дату.

    2020-05-20
    2020-05-21
    2020-05-22
    2020-05-23
  2. Создайте таблицы в вашей базе данных Selena test_db.

    ПРИМЕЧАНИЕ

    Начиная с версии v1.5.2, Selena может автоматически устанавливать количество buckets (BUCKETS) при создании таблицы или добавлении partition. Вам больше не нужно вручную устанавливать количество buckets. Для получения подробной информации см. установка количества buckets.

    a. Создайте таблицу с именем table1, которая состоит из трех столбцов: event_date, event_type и user_id.

    MySQL [test_db]> CREATE TABLE table1
    (
    `event_date` DATE COMMENT "event date",
    `event_type` TINYINT COMMENT "event type",
    `user_id` BIGINT COMMENT "user ID"
    )
    DISTRIBUTED BY HASH(user_id);

    b. Создайте таблицу с именем table2, которая состоит из четырех столбцов: date, year, month и day.

    MySQL [test_db]> CREATE TABLE table2
    (
    `date` DATE COMMENT "date",
    `year` INT COMMENT "year",
    `month` TINYINT COMMENT "month",
    `day` TINYINT COMMENT "day"
    )
    DISTRIBUTED BY HASH(date);
  3. Загрузите file1.csv и file2.csv в путь /user/selena/data/input/ вашего кластера HDFS, опубликуйте данные file1.csv в topic1 вашего кластера Kafka и опубликуйте данные file2.csv в topic2 вашего кластера Kafka.

Пропуск столбцов, которые не нужно загружать

Файл данных, который вы хотите загрузить в таблицу Selena, может содержать некоторые столбцы, которые не могут быть сопоставлены ни с одним столбцом таблицы Selena. В этой ситуации Selena поддерживает загрузку только тех столбцов, которые могут быть сопоставлены из файла данных со столбцами таблицы Selena.

Эта функция поддерживает загрузку данных из следующих источников данных:

  • Локальная файловая система

  • HDFS и облачное хранилище

    ПРИМЕЧАНИЕ

    В этом разделе используется HDFS в качестве примера.

  • Kafka

В большинстве случаев столбцы CSV-файла не имеют имен. Для некоторых CSV-файлов первая строка состоит из имен столбцов, но Selena обрабатывает содержимое первой строки как обычные данные, а не как имена столбцов. Поэтому при загрузке CSV-файла вы должны временно назвать столбцы CSV-файла по порядку в выражении или команде создания задачи. Эти временно названные столбцы сопоставляются по имени со столбцами таблицы Selena. Обратите внимание на следующие моменты о столбцах файла данных:

  • Данные столбцов, которые могут быть сопоставлены и временно названы с использованием имен столбцов в таблице Selena, загружаются напрямую.

  • Столбцы, которые не могут быть сопоставлены со столбцами таблицы Selena, игнорируются, данные этих столбцов не загружаются.

  • Если некоторые столбцы могут быть сопоставлены со столбцами таблицы Selena, но не названы временно в выражении или команде создания задачи, задача загрузки сообщает об ошибках.

В этом разделе используются file1.csv и table1 в качестве примера. Четыре столбца file1.csv временно названы как user_id, user_gender, event_date и event_type по порядку. Среди временно названных столбцов file1.csv, user_id, event_date и event_type могут быть сопоставлены с конкретными столбцами table1, тогда как user_gender не может быть сопоставлен ни с одним столбцом table1. Поэтому user_id, event_date и event_type загружаются в table1, но user_gender не загружается.

Загрузка данных

Загрузка данных из локальной файловой системы

Если file1.csv хранится в вашей локальной файловой системе, выполните следующую команду для создания задачи Stream Load:

curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "column_separator:," \
-H "columns: user_id, user_gender, event_date, event_type" \
-T file1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load

ПРИМЕЧАНИЕ

Если вы выбираете Stream Load, вы должны использовать параметр columns для временного именования столбцов файла данных для создания сопоставления столбцов между файлом данных и таблицей Selena.

Для получения подробного описания синтаксиса и параметров см. STREAM LOAD.

Загрузка данных из кластера HDFS

Если file1.csv хранится в вашем кластере HDFS, выполните следующее выражение для создания задачи Broker Load:

LOAD LABEL test_db.label1
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/selena/data/input/file1.csv")
INTO TABLE `table1`
FORMAT AS "csv"
COLUMNS TERMINATED BY ","
(user_id, user_gender, event_date, event_type)
)
WITH BROKER;

ПРИМЕЧАНИЕ

Если вы выбираете Broker Load, вы должны использовать параметр column_list для временного именования столбцов файла данных для создания сопоставления столбцов между файлом данных и таблицей Selena.

Для получения подробного описания синтаксиса и параметров см. BROKER LOAD.

Загрузка данных из кластера Kafka

Если данные file1.csv опубликованы в topic1 вашего кластера Kafka, выполните следующее выражение для создания задачи Routine Load:

CREATE ROUTINE LOAD test_db.table101 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS(user_id, user_gender, event_date, event_type)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "topic1",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

ПРИМЕЧАНИЕ

Если вы выбираете Routine Load, вы должны использовать параметр COLUMNS для временного именования столбцов файла данных для создания сопоставления столбцов между файлом данных и таблицей Selena.

Для получения подробного описания синтаксиса и параметров см. CREATE ROUTINE LOAD.

Запрос данных

После завершения загрузки данных из вашей локальной файловой системы, кластера HDFS или кластера Kafka запросите данные table1, чтобы убедиться, что загрузка прошла успешно:

MySQL [test_db]> SELECT * FROM table1;
+------------+------------+---------+
| event_date | event_type | user_id |
+------------+------------+---------+
| 2020-05-22 | 1 | 576 |
| 2020-05-20 | 1 | 354 |
| 2020-05-21 | 2 | 465 |
| 2020-05-23 | 2 | 687 |
+------------+------------+---------+
4 rows in set (0.01 sec)

Фильтрация строк, которые вы не хотите загружать

При загрузке файла данных в таблицу Selena вы можете не захотеть загружать определенные строки файла данных. В этой ситуации вы можете использовать предложение WHERE для указания строк, которые вы хотите загрузить. Selena отфильтровывает все строки, которые не соответствуют условиям фильтрации, указанным в предложении WHERE.

Эта функция поддерживает загрузку данных из следующих источников данных:

  • Локальная файловая система

  • HDFS и облачное хранилище

    ПРИМЕЧАНИЕ

    В этом разделе используется HDFS в качестве примера.

  • Kafka

В этом разделе используются file1.csv и table1 в качестве примера. Если вы хотите загрузить только строки, тип события которых 1, из file1.csv в table1, вы можете использовать предложение WHERE для указания условия фильтрации event_type = 1.

Загрузка данных

Загрузка данных из локальной файловой системы

Если file1.csv хранится в вашей локальной файловой системе, выполните следующую команду для создания задачи Stream Load:

curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "column_separator:," \
-H "columns: user_id, user_gender, event_date, event_type" \
-H "where: event_type=1" \
-T file1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load

Для получения подробного описания синтаксиса и параметров см. STREAM LOAD.

Загрузка данных из кластера HDFS

Если file1.csv хранится в вашем кластере HDFS, выполните следующее выражение для создания задачи Broker Load:

LOAD LABEL test_db.label2
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/selena/data/input/file1.csv")
INTO TABLE `table1`
FORMAT AS "csv"
COLUMNS TERMINATED BY ","
(user_id, user_gender, event_date, event_type)
WHERE event_type = 1
)
WITH BROKER;

Для получения подробного описания синтаксиса и параметров см. BROKER LOAD.

Загрузка данных из кластера Kafka

Если данные file1.csv опубликованы в topic1 вашего кластера Kafka, выполните следующее выражение для создания задачи Routine Load:

CREATE ROUTINE LOAD test_db.table102 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS (user_id, user_gender, event_date, event_type),
WHERE event_type = 1
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "topic1",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Для получения подробного описания синтаксиса и параметров см. CREATE ROUTINE LOAD.

Запрос данных

После завершения загрузки данных из вашей локальной файловой системы, кластера HDFS или кластера Kafka запросите данные table1, чтобы убедиться, что загрузка прошла успешно:

MySQL [test_db]> SELECT * FROM table1;
+------------+------------+---------+
| event_date | event_type | user_id |
+------------+------------+---------+
| 2020-05-20 | 1 | 354 |
| 2020-05-22 | 1 | 576 |
+------------+------------+---------+
2 rows in set (0.01 sec)

Генерация новых столбцов из исходных столбцов

При загрузке файла данных в таблицу Selena некоторые данные из файла данных могут потребовать преобразований перед тем, как данные могут быть загружены в таблицу Selena. В этой ситуации вы можете использовать функции или выражения в команде или выражении создания задачи для реализации преобразований данных.

Эта функция поддерживает загрузку данных из следующих источников данных:

  • Локальная файловая система

  • HDFS и облачное хранилище

    ПРИМЕЧАНИЕ

    В этом разделе используется HDFS в качестве примера.

  • Kafka

В этом разделе используются file2.csv и table2 в качестве примера. file2.csv состоит только из одного столбца, который представляет дату. Вы можете использовать функции year, month и day для извлечения года, месяца и дня в каждой дате из file2.csv и загрузить извлеченные данные в столбцы year, month и day таблицы table2.

Загрузка данных

Загрузка данных из локальной файловой системы

Если file2.csv хранится в вашей локальной файловой системе, выполните следующую команду для создания задачи Stream Load:

curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "column_separator:," \
-H "columns:date,year=year(date),month=month(date),day=day(date)" \
-T file2.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table2/_stream_load

ПРИМЕЧАНИЕ

  • В параметре columns вы должны сначала временно назвать все столбцы файла данных, а затем временно назвать новые столбцы, которые вы хотите сгенерировать из исходных столбцов файла данных. Как показано в предыдущем примере, единственный столбец file2.csv временно назван как date, а затем вызываются функции year=year(date), month=month(date) и day=day(date) для генерации трех новых столбцов, которые временно названы как year, month и day.

  • Stream Load не поддерживает column_name = function(column_name), но поддерживает column_name = function(column_name).

Для получения подробного описания синтаксиса и параметров см. STREAM LOAD.

Загрузка данных из кластера HDFS

Если file2.csv хранится в вашем кластере HDFS, выполните следующее выражение для создания задачи Broker Load:

LOAD LABEL test_db.label3
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/selena/data/input/file2.csv")
INTO TABLE `table2`
FORMAT AS "csv"
COLUMNS TERMINATED BY ","
(date)
SET(year=year(date), month=month(date), day=day(date))
)
WITH BROKER;

ПРИМЕЧАНИЕ

Вы должны сначала использовать параметр column_list для временного именования всех столбцов файла данных, а затем использовать предложение SET для временного именования новых столбцов, которые вы хотите сгенерировать из исходных столбцов файла данных. Как показано в предыдущем примере, единственный столбец file2.csv временно назван как date в параметре column_list, а затем в предложении SET вызываются функции year=year(date), month=month(date) и day=day(date) для генерации трех новых столбцов, которые временно названы как year, month и day.

Для получения подробного описания синтаксиса и параметров см. BROKER LOAD.

Загрузка данных из кластера Kafka

Если данные file2.csv опубликованы в topic2 вашего кластера Kafka, выполните следующее выражение для создания задачи Routine Load:

CREATE ROUTINE LOAD test_db.table201 ON table2
COLUMNS TERMINATED BY ",",
COLUMNS(date,year=year(date),month=month(date),day=day(date))
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "topic2",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

ПРИМЕЧАНИЕ

В параметре COLUMNS вы должны сначала временно назвать все столбцы файла данных, а затем временно назвать новые столбцы, которые вы хотите сгенерировать из исходных столбцов файла данных. Как показано в предыдущем примере, единственный столбец file2.csv временно назван как date, а затем вызываются функции year=year(date), month=month(date) и day=day(date) для генерации трех новых столбцов, которые временно названы как year, month и day.

Для получения подробного описания синтаксиса и параметров см. CREATE ROUTINE LOAD.

Запрос данных

После завершения загрузки данных из вашей локальной файловой системы, кластера HDFS или кластера Kafka запросите данные table2, чтобы убедиться, что загрузка прошла успешно:

MySQL [test_db]> SELECT * FROM table2;
+------------+------+-------+------+
| date | year | month | day |
+------------+------+-------+------+
| 2020-05-20 | 2020 | 5 | 20 |
| 2020-05-21 | 2020 | 5 | 21 |
| 2020-05-22 | 2020 | 5 | 22 |
| 2020-05-23 | 2020 | 5 | 23 |
+------------+------+-------+------+
4 rows in set (0.01 sec)

Извлечение значений полей partition из пути к файлу

Если путь к файлу, который вы указываете, содержит поля partition, вы можете использовать параметр COLUMNS FROM PATH AS для указания полей partition, которые вы хотите извлечь из путей к файлам. Поля partition в путях к файлам эквивалентны столбцам в файлах данных. Параметр COLUMNS FROM PATH AS поддерживается только при загрузке данных из кластера HDFS.

Например, вы хотите загрузить следующие четыре файла данных, сгенерированных из Hive:

/user/selena/data/input/date=2020-05-20/data
1,354
/user/selena/data/input/date=2020-05-21/data
2,465
/user/selena/data/input/date=2020-05-22/data
1,576
/user/selena/data/input/date=2020-05-23/data
2,687

Четыре файла данных хранятся в пути /user/selena/data/input/ вашего кластера HDFS. Каждый из этих файлов данных разбит на разделы по полю partition date и состоит из двух столбцов, которые представляют тип события и ID пользователя по порядку.

Загрузка данных из кластера HDFS

Выполните следующее выражение для создания задачи Broker Load, которая позволяет вам извлечь значения поля partition date из пути к файлу /user/selena/data/input/ и использовать подстановочный знак (*) для указания того, что вы хотите загрузить все файлы данных в пути к файлу в table1:

LOAD LABEL test_db.label4
(
DATA INFILE("hdfs://<fe_host>:<fe_http_port>/user/selena/data/input/date=*/*")
INTO TABLE `table1`
FORMAT AS "csv"
COLUMNS TERMINATED BY ","
(event_type, user_id)
COLUMNS FROM PATH AS (date)
SET(event_date = date)
)
WITH BROKER;

ПРИМЕЧАНИЕ

В предыдущем примере поле partition date в указанном пути к файлу эквивалентно столбцу event_date таблицы table1. Поэтому вам необходимо использовать предложение SET для сопоставления поля partition date со столбцом event_date. Если поле partition в указанном пути к файлу имеет то же имя, что и столбец таблицы Selena, вам не нужно использовать предложение SET для создания сопоставления.

Для получения подробного описания синтаксиса и параметров см. BROKER LOAD.

Запрос данных

После завершения загрузки данных из вашего кластера HDFS запросите данные table1, чтобы убедиться, что загрузка прошла успешно:

MySQL [test_db]> SELECT * FROM table1;
+------------+------------+---------+
| event_date | event_type | user_id |
+------------+------------+---------+
| 2020-05-22 | 1 | 576 |
| 2020-05-20 | 1 | 354 |
| 2020-05-21 | 2 | 465 |
| 2020-05-23 | 2 | 687 |
+------------+------------+---------+
4 rows in set (0.01 sec)