Перейти к основному содержимому

Загрузка данных из MinIO

Selena предоставляет следующие варианты для загрузки данных из MinIO:

  • Синхронная загрузка с использованием INSERT+FILES()
  • Асинхронная загрузка с использованием Broker Load

Каждый из этих вариантов имеет свои преимущества, которые подробно описаны в следующих разделах.

В большинстве случаев мы рекомендуем использовать метод INSERT+FILES(), который намного проще в использовании.

Однако метод INSERT+FILES() в настоящее время поддерживает только форматы файлов Parquet, ORC и CSV. Поэтому, если вам нужно загрузить данные других форматов файлов, таких как JSON, или выполнить изменения данных, такие как DELETE во время загрузки данных, вы можете использовать Broker Load.

Перед началом работы

Подготовка исходных данных

Убедитесь, что исходные данные, которые вы хотите загрузить в Selena, правильно сохранены в bucket MinIO. Вы также можете рассмотреть расположение данных и базы данных, поскольку затраты на передачу данных намного ниже, когда ваш bucket и кластер Selena находятся в одном регионе.

В этой теме мы предоставляем вам образец набора данных. Вы можете скачать его с помощью curl:

curl -O https://starrocks-examples.s3.amazonaws.com/user_behavior_ten_million_rows.parquet

Загрузите файл Parquet в вашу систему MinIO и запомните имя bucket. В примерах этого руководства используется имя bucket /starrocks.

Проверка привилегий

Вы можете загружать данные в таблицы Selena только как пользователь, имеющий привилегию INSERT на эти таблицы Selena. Если у вас нет привилегии INSERT, следуйте инструкциям в GRANT, чтобы предоставить привилегию INSERT пользователю, которого вы используете для подключения к вашему кластеру Selena. Синтаксис: GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}.

Сбор данных для подключения

В двух словах, для использования аутентификации MinIO Access Key вам необходимо собрать следующую информацию:

  • Bucket, в котором хранятся ваши данные
  • Ключ объекта (имя объекта), если вы обращаетесь к конкретному объекту в bucket
  • Конечная точка MinIO
  • Ключ доступа и секретный ключ, используемые в качестве учетных данных для доступа.

MinIO access key

Использование INSERT+FILES()

Этот метод доступен начиная с версии 1.5.0 и в настоящее время поддерживает только форматы файлов Parquet, ORC и CSV (начиная с v3.3.0).

Преимущества INSERT+FILES()

FILES() может читать файл, хранящийся в облачном хранилище, на основе свойств пути, которые вы указываете, выводить схему таблицы данных в файле, а затем возвращать данные из файла в виде строк данных.

С помощью FILES() вы можете:

  • Запрашивать данные напрямую из MinIO, используя SELECT.
  • Создавать и загружать таблицу, используя CREATE TABLE AS SELECT (CTAS).
  • Загружать данные в существующую таблицу, используя INSERT.

Типичные примеры

Прямой запрос из MinIO с использованием SELECT

Прямой запрос из MinIO с использованием SELECT+FILES() может дать хороший предварительный просмотр содержимого набора данных перед созданием таблицы. Например:

  • Получить предварительный просмотр набора данных без сохранения данных.
  • Запросить минимальные и максимальные значения и решить, какие типы данных использовать.
  • Проверить наличие значений NULL.

Следующий пример запрашивает образец набора данных, ранее добавленный в вашу систему MinIO.

подсказка

Выделенная часть команды включает настройки, которые вам может потребоваться изменить:

  • Установите endpoint и path в соответствии с вашей системой MinIO.
  • Если ваша система MinIO использует SSL, установите enable_ssl в true.
  • Замените ваш ключ доступа MinIO и секретный ключ на AAA и BBB.
SELECT * FROM FILES
(
"aws.s3.endpoint" = "http://minio:9000",
"path" = "s3://starrocks/user_behavior_ten_million_rows.parquet",
"aws.s3.enable_ssl" = "false",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB",
"format" = "parquet",
"aws.s3.use_aws_sdk_default_behavior" = "false",
"aws.s3.use_instance_profile" = "false",
"aws.s3.enable_path_style_access" = "true"
)
LIMIT 3;

Система возвращает следующий результат запроса:

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 543711 | 829192 | 2355072 | pv | 2017-11-27 08:22:37 |
| 543711 | 2056618 | 3645362 | pv | 2017-11-27 10:16:46 |
| 543711 | 1165492 | 3645362 | pv | 2017-11-27 10:17:00 |
+--------+---------+------------+--------------+---------------------+
3 rows in set (0.41 sec)
к сведению

Обратите внимание, что имена столбцов, возвращенные выше, предоставляются файлом Parquet.

Создание и загрузка таблицы с использованием CTAS

Это продолжение предыдущего примера. Предыдущий запрос обернут в CREATE TABLE AS SELECT (CTAS) для автоматизации создания таблицы с использованием вывода схемы. Это означает, что Selena выведет схему таблицы, создаст нужную вам таблицу, а затем загрузит данные в таблицу. Имена и типы столбцов не требуются для создания таблицы при использовании табличной функции FILES() с файлами Parquet, поскольку формат Parquet включает имена столбцов.

примечание

Синтаксис CREATE TABLE при использовании вывода схемы не позволяет устанавливать количество реплик, поэтому установите его перед созданием таблицы. Приведенный ниже пример предназначен для системы с одной репликой:

ADMIN SET FRONTEND CONFIG ('default_replication_num' = '1');

Создайте базу данных и переключитесь на неё:

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

Используйте CTAS для создания таблицы и загрузки данных образца набора данных, ранее добавленного в вашу систему MinIO.

подсказка

Выделенная часть команды включает настройки, которые вам может потребоваться изменить:

  • Установите endpoint и path в соответствии с вашей системой MinIO.
  • Если ваша система MinIO использует SSL, установите enable_ssl в true.
  • Замените ваш ключ доступа MinIO и секретный ключ на AAA и BBB.
CREATE TABLE user_behavior_inferred AS
SELECT * FROM FILES
(
"aws.s3.endpoint" = "http://minio:9000",
"path" = "s3://starrocks/user_behavior_ten_million_rows.parquet",
"aws.s3.enable_ssl" = "false",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB",
"format" = "parquet",
"aws.s3.use_aws_sdk_default_behavior" = "false",
"aws.s3.use_instance_profile" = "false",
"aws.s3.enable_path_style_access" = "true"
);
Query OK, 10000000 rows affected (3.17 sec)
{'label':'insert_a5da3ff5-9ee4-11ee-90b0-02420a060004', 'status':'VISIBLE', 'txnId':'17'}

После создания таблицы вы можете просмотреть её схему, используя DESCRIBE:

DESCRIBE user_behavior_inferred;

Система возвращает следующий результат запроса:

+--------------+------------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+------------------+------+-------+---------+-------+
| UserID | bigint | YES | true | NULL | |
| ItemID | bigint | YES | true | NULL | |
| CategoryID | bigint | YES | true | NULL | |
| BehaviorType | varchar(1048576) | YES | false | NULL | |
| Timestamp | varchar(1048576) | YES | false | NULL | |
+--------------+------------------+------+-------+---------+-------+

Запросите таблицу, чтобы убедиться, что данные были загружены в неё. Пример:

SELECT * from user_behavior_inferred LIMIT 3;

Возвращается следующий результат запроса, указывающий на то, что данные были успешно загружены:

+--------+--------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+--------+------------+--------------+---------------------+
| 58 | 158350 | 2355072 | pv | 2017-11-27 13:06:51 |
| 58 | 158590 | 3194735 | pv | 2017-11-27 02:21:04 |
| 58 | 215073 | 3002561 | pv | 2017-11-30 10:55:42 |
+--------+--------+------------+--------------+---------------------+

Загрузка в существующую таблицу с использованием INSERT

Вы можете захотеть настроить таблицу, в которую вы вставляете данные, например:

  • тип данных столбца, настройку nullable или значения по умолчанию
  • типы ключей и столбцы
  • разделение данных и bucketing
подсказка

Создание наиболее эффективной структуры таблицы требует знания того, как будут использоваться данные и содержимое столбцов. Эта тема не охватывает дизайн таблиц. Для получения информации о дизайне таблиц см. Типы таблиц.

В этом примере мы создаем таблицу на основе знания того, как таблица будет запрашиваться, и данных в файле Parquet. Знание данных в файле Parquet можно получить, запросив файл напрямую в MinIO.

  • Поскольку запрос набора данных в MinIO показывает, что столбец Timestamp содержит данные, соответствующие типу данных datetime, тип столбца указан в следующем DDL.
  • Запросив данные в MinIO, вы можете обнаружить, что в наборе данных нет значений NULL, поэтому DDL не устанавливает никаких столбцов как nullable.
  • На основе знания ожидаемых типов запросов ключ сортировки и столбец bucketing устанавливаются в столбец UserID. Ваш случай использования может отличаться для этих данных, поэтому вы можете решить использовать ItemID в дополнение к UserID или вместо него для ключа сортировки.

Создайте базу данных и переключитесь на неё:

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

Создайте таблицу вручную (мы рекомендуем, чтобы таблица имела ту же схему, что и файл Parquet, который вы хотите загрузить из MinIO):

CREATE TABLE user_behavior_declared
(
UserID int(11) NOT NULL,
ItemID int(11) NOT NULL,
CategoryID int(11) NOT NULL,
BehaviorType varchar(65533) NOT NULL,
Timestamp datetime NOT NULL
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID)
PROPERTIES
(
'replication_num' = '1'
);

Отобразите схему, чтобы вы могли сравнить её с выведенной схемой, созданной табличной функцией FILES():

DESCRIBE user_behavior_declared;
+--------------+----------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+----------------+------+-------+---------+-------+
| UserID | int | NO | true | NULL | |
| ItemID | int | NO | false | NULL | |
| CategoryID | int | NO | false | NULL | |
| BehaviorType | varchar(65533) | NO | false | NULL | |
| Timestamp | datetime | NO | false | NULL | |
+--------------+----------------+------+-------+---------+-------+
5 rows in set (0.00 sec)
подсказка

Сравните схему, которую вы только что создали, со схемой, выведенной ранее с помощью табличной функции FILES(). Обратите внимание на:

  • типы данных
  • nullable
  • ключевые поля

Для лучшего контроля схемы целевой таблицы и лучшей производительности запросов мы рекомендуем указывать схему таблицы вручную в производственных средах. Наличие типа данных datetime для поля timestamp более эффективно, чем использование varchar.

После создания таблицы вы можете загрузить её с помощью INSERT INTO SELECT FROM FILES():

подсказка

Выделенная часть команды включает настройки, которые вам может потребоваться изменить:

  • Установите endpoint и path в соответствии с вашей системой MinIO.
  • Если ваша система MinIO использует SSL, установите enable_ssl в true.
  • Замените ваш ключ доступа MinIO и секретный ключ на AAA и BBB.
INSERT INTO user_behavior_declared
SELECT * FROM FILES
(
"aws.s3.endpoint" = "http://minio:9000",
"path" = "s3://starrocks/user_behavior_ten_million_rows.parquet",
"aws.s3.enable_ssl" = "false",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB",
"format" = "parquet",
"aws.s3.use_aws_sdk_default_behavior" = "false",
"aws.s3.use_instance_profile" = "false",
"aws.s3.enable_path_style_access" = "true"
);

После завершения загрузки вы можете запросить таблицу, чтобы убедиться, что данные были загружены в неё. Пример:

SELECT * from user_behavior_declared LIMIT 3;

Возвращается следующий результат запроса, указывающий на то, что данные были успешно загружены:

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 58 | 4309692 | 1165503 | pv | 2017-11-25 14:06:52 |
| 58 | 181489 | 1165503 | pv | 2017-11-25 14:07:22 |
| 58 | 3722956 | 1165503 | pv | 2017-11-25 14:09:28 |
+--------+---------+------------+--------------+---------------------+

Проверка прогресса загрузки

Вы можете запросить прогресс заданий INSERT из представления loads в Selena Information Schema. Эта функция поддерживается начиная с версии 1.5.0. Пример:

SELECT * FROM information_schema.loads ORDER BY JOB_ID DESC;

Для получения информации о полях, предоставляемых в представлении loads, см. loads.

Если вы отправили несколько заданий загрузки, вы можете фильтровать по LABEL, связанному с заданием. Пример:

SELECT * FROM information_schema.loads WHERE LABEL = 'insert_e3b882f5-7eb3-11ee-ae77-00163e267b60' \G
*************************** 1. row ***************************
JOB_ID: 10243
LABEL: insert_e3b882f5-7eb3-11ee-ae77-00163e267b60
DATABASE_NAME: mydatabase
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: INSERT
PRIORITY: NORMAL
SCAN_ROWS: 10000000
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 10000000
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):300; max_filter_ratio:0.0
CREATE_TIME: 2023-11-09 11:56:01
ETL_START_TIME: 2023-11-09 11:56:01
ETL_FINISH_TIME: 2023-11-09 11:56:01
LOAD_START_TIME: 2023-11-09 11:56:01
LOAD_FINISH_TIME: 2023-11-09 11:56:44
JOB_DETAILS: {"All backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[10142]},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":311710786,"InternalTableLoadRows":10000000,"ScanBytes":581574034,"ScanRows":10000000,"TaskNumber":1,"Unfinished backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL
подсказка

INSERT — это синхронная команда. Если задание INSERT все еще выполняется, вам нужно открыть другую сессию, чтобы проверить его статус выполнения.

Сравнение размеров таблиц на диске

Этот запрос сравнивает таблицу с выведенной схемой и ту, где схема объявлена. Поскольку выведенная схема имеет nullable столбцы и varchar для timestamp, длина данных больше:

SELECT TABLE_NAME,
TABLE_ROWS,
AVG_ROW_LENGTH,
DATA_LENGTH
FROM information_schema.tables
WHERE TABLE_NAME like 'user_behavior%'\G
*************************** 1. row ***************************
TABLE_NAME: user_behavior_declared
TABLE_ROWS: 10000000
AVG_ROW_LENGTH: 10
DATA_LENGTH: 102562516
*************************** 2. row ***************************
TABLE_NAME: user_behavior_inferred
TABLE_ROWS: 10000000
AVG_ROW_LENGTH: 17
DATA_LENGTH: 176803880
2 rows in set (0.04 sec)

Использование Broker Load

Асинхронный процесс Broker Load обрабатывает подключение к MinIO, извлечение данных и сохранение данных в Selena.

Этот метод поддерживает следующие форматы файлов:

  • Parquet
  • ORC
  • CSV
  • JSON (поддерживается начиная с v3.2.3)

Преимущества Broker Load

  • Broker Load выполняется в фоновом режиме, и клиентам не нужно оставаться подключенными для продолжения работы.
  • Broker Load предпочтителен для долго выполняющихся заданий, с таймаутом по умолчанию в 4 часа.
  • В дополнение к форматам файлов Parquet и ORC, Broker Load поддерживает формат файлов CSV и формат файлов JSON (формат файлов JSON поддерживается начиная с v3.2.3).

Поток данных

Workflow of Broker Load

  1. Пользователь создает задание загрузки.
  2. Frontend (FE) создает план запроса и распределяет план по backend узлам (BE) или compute узлам (CN).
  3. BE или CN извлекают данные из источника и загружают данные в Selena.

Типичный пример

Создайте таблицу, запустите процесс загрузки, который извлекает образец набора данных, ранее загруженный в вашу систему MinIO.

Создание базы данных и таблицы

Создайте базу данных и переключитесь на неё:

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

Создайте таблицу вручную (мы рекомендуем, чтобы таблица имела ту же схему, что и файл Parquet, который вы хотите загрузить из MinIO):

CREATE TABLE user_behavior
(
UserID int(11) NOT NULL,
ItemID int(11) NOT NULL,
CategoryID int(11) NOT NULL,
BehaviorType varchar(65533) NOT NULL,
Timestamp datetime NOT NULL
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID)
PROPERTIES
(
'replication_num' = '1'
);

Запуск Broker Load

Выполните следующую команду, чтобы запустить задание Broker Load, которое загружает данные из образца набора данных user_behavior_ten_million_rows.parquet в таблицу user_behavior:

подсказка

Выделенная часть команды включает настройки, которые вам может потребоваться изменить:

  • Установите endpoint и DATA INFILE в соответствии с вашей системой MinIO.
  • Если ваша система MinIO использует SSL, установите enable_ssl в true.
  • Замените ваш ключ доступа MinIO и секретный ключ на AAA и BBB.
LOAD LABEL UserBehavior
(
DATA INFILE("s3://starrocks/user_behavior_ten_million_rows.parquet")
INTO TABLE user_behavior
)
WITH BROKER
(
"aws.s3.endpoint" = "http://minio:9000",
"aws.s3.enable_ssl" = "false",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB",
"aws.s3.use_aws_sdk_default_behavior" = "false",
"aws.s3.use_instance_profile" = "false",
"aws.s3.enable_path_style_access" = "true"
)
PROPERTIES
(
"timeout" = "72000"
);

Это задание имеет четыре основных раздела:

  • LABEL: Строка, используемая при запросе состояния задания загрузки.
  • LOAD объявление: URI источника, формат исходных данных и имя целевой таблицы.
  • BROKER: Детали подключения для источника.
  • PROPERTIES: Значение таймаута и любые другие свойства для применения к заданию загрузки.

Для подробного синтаксиса и описания параметров см. BROKER LOAD.

Проверка прогресса загрузки

Вы можете запросить прогресс заданий Broker Load из представления loads в Selena Information Schema. Эта функция поддерживается начиная с версии 1.5.0.

SELECT * FROM information_schema.loads;

Для получения информации о полях, предоставляемых в представлении loads, см. loads.

Если вы отправили несколько заданий загрузки, вы можете фильтровать по LABEL, связанному с заданием. Пример:

SELECT * FROM information_schema.loads
WHERE LABEL = 'UserBehavior'\G
*************************** 1. row ***************************
JOB_ID: 10176
LABEL: userbehavior
DATABASE_NAME: mydatabase
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: BROKER
PRIORITY: NORMAL
SCAN_ROWS: 10000000
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 10000000
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):72000; max_filter_ratio:0.0
CREATE_TIME: 2023-12-19 23:02:41
ETL_START_TIME: 2023-12-19 23:02:44
ETL_FINISH_TIME: 2023-12-19 23:02:44
LOAD_START_TIME: 2023-12-19 23:02:44
LOAD_FINISH_TIME: 2023-12-19 23:02:46
JOB_DETAILS: {"All backends":{"4aeec563-a91e-4c1e-b169-977b660950d1":[10004]},"FileNumber":1,"FileSize":132251298,"InternalTableLoadBytes":311710786,"InternalTableLoadRows":10000000,"ScanBytes":132251298,"ScanRows":10000000,"TaskNumber":1,"Unfinished backends":{"4aeec563-a91e-4c1e-b169-977b660950d1":[]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL
1 row in set (0.02 sec)

После того как вы подтвердите, что задание загрузки завершено, вы можете проверить подмножество целевой таблицы, чтобы увидеть, были ли данные успешно загружены. Пример:

SELECT * from user_behavior LIMIT 3;

Возвращается следующий результат запроса, указывающий на то, что данные были успешно загружены:

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 142 | 2869980 | 2939262 | pv | 2017-11-25 03:43:22 |
| 142 | 2522236 | 1669167 | pv | 2017-11-25 15:14:12 |
| 142 | 3031639 | 3607361 | pv | 2017-11-25 15:19:25 |
+--------+---------+------------+--------------+---------------------+