Перейти к основному содержимому
Версия: 2.0.x

Загрузка данных из GCS

Selena предоставляет следующие варианты загрузки данных из GCS:

  • Синхронная загрузка с использованием INSERT+FILES()
  • Асинхронная загрузка с использованием Broker Load

Каждый из этих вариантов имеет свои преимущества, которые подробно описаны в следующих разделах.

В большинстве случаев мы рекомендуем использовать метод INSERT+FILES(), который гораздо проще в использовании.

Однако метод INSERT+FILES() в настоящее время поддерживает только форматы файлов Parquet, ORC и CSV. Поэтому, если вам нужно загрузить данные других форматов файлов, таких как JSON, или выполнить изменения данных, такие как DELETE, во время загрузки данных, вы можете использовать Broker Load.

Перед началом работы

Подготовка исходных данных

Убедитесь, что исходные данные, которые вы хотите загрузить в Selena, правильно хранятся в bucket GCS. Вы также можете рассмотреть, где находятся данные и база данных, потому что затраты на передачу данных значительно ниже, когда ваш bucket и ваш cluster Selena находятся в одном регионе.

В этой теме мы предоставляем вам пример набора данных в bucket GCS, gs://selena-samples/user_behavior_ten_million_rows.parquet. Вы можете получить доступ к этому набору данных с любыми действительными учётными данными, так как объект доступен для чтения любому пользователю GCP.

Проверка привилегий

Вы можете загружать данные в таблицы Selena только как пользователь, имеющий привилегию INSERT на эти таблицы Selena. Если у вас нет привилегии INSERT, следуйте инструкциям в разделе GRANT, чтобы предоставить привилегию INSERT пользователю, которого вы используете для подключения к вашему cluster Selena. Синтаксис: GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}.

Сбор данных аутентификации

Примеры в этой теме используют аутентификацию на основе сервисной учётной записи. Чтобы практиковать аутентификацию на основе IAM-пользователя, вам нужно собрать информацию о следующих ресурсах GCS:

  • Bucket GCS, в котором хранятся ваши данные.
  • Ключ объекта GCS (имя объекта), если вы обращаетесь к определённому объекту в bucket. Обратите внимание, что ключ объекта может включать префикс, если ваши объекты GCS хранятся в подпапках.
  • Регион GCS, к которому принадлежит bucket GCS.
  • private_key_id, private_key и client_email вашей сервисной учётной записи Google Cloud

Для получения информации обо всех доступных методах аутентификации см. Аутентификация в Google Cloud Storage.

Использование INSERT+FILES()

Этот метод доступен начиная с версии v1.5.2 и в настоящее время поддерживает только форматы файлов Parquet, ORC и CSV (начиная с версии v1.5.2).

Преимущества INSERT+FILES()

FILES() может читать файл, хранящийся в облачном хранилище, на основе указанных вами свойств, связанных с путём, выводить схему таблицы данных в файле, а затем возвращать данные из файла в виде строк данных.

С помощью FILES() вы можете:

  • Запрашивать данные напрямую из GCS с помощью SELECT.
  • Создавать и загружать таблицу с помощью CREATE TABLE AS SELECT (CTAS).
  • Загружать данные в существующую таблицу с помощью INSERT.

Типичные примеры

Прямой запрос из GCS с помощью SELECT

Прямой запрос из GCS с помощью SELECT+FILES() позволяет получить хороший предварительный просмотр содержимого набора данных перед созданием таблицы. Например:

  • Получить предварительный просмотр набора данных без сохранения данных.
  • Запросить минимальные и максимальные значения и решить, какие типы данных использовать.
  • Проверить наличие значений NULL.

Следующий пример запрашивает пример набора данных gs://selena-samples/user_behavior_ten_million_rows.parquet:

SELECT * FROM FILES
(
"path" = "gs://selena-samples/user_behavior_ten_million_rows.parquet",
"format" = "parquet",
"gcp.gcs.service_account_email" = "sampledatareader@xxxxx-xxxxxx-000000.iam.gserviceaccount.com",
"gcp.gcs.service_account_private_key_id" = "baaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gcp.gcs.service_account_private_key" = "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----"
)
LIMIT 3;

ПРИМЕЧАНИЕ

Замените учётные данные в приведённой выше команде на свои собственные. Можно использовать любой действительный email сервисной учётной записи, ключ и секрет, так как объект доступен для чтения любому аутентифицированному пользователю GCP.

Система возвращает результат запроса, аналогичный следующему:

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 543711 | 829192 | 2355072 | pv | 2017-11-27 08:22:37 |
| 543711 | 2056618 | 3645362 | pv | 2017-11-27 10:16:46 |
| 543711 | 1165492 | 3645362 | pv | 2017-11-27 10:17:00 |
+--------+---------+------------+--------------+---------------------+

ПРИМЕЧАНИЕ

Обратите внимание, что имена столбцов, возвращённые выше, предоставлены файлом Parquet.

Создание и загрузка таблицы с помощью CTAS

Это продолжение предыдущего примера. Предыдущий запрос обёрнут в CREATE TABLE AS SELECT (CTAS) для автоматизации создания таблицы с использованием вывода схемы. Это означает, что Selena выведет схему таблицы, создаст нужную вам таблицу, а затем загрузит данные в таблицу. Имена и типы столбцов не требуются для создания таблицы при использовании табличной функции FILES() с файлами Parquet, поскольку формат Parquet включает имена столбцов.

ПРИМЕЧАНИЕ

Синтаксис CREATE TABLE при использовании вывода схемы не позволяет устанавливать количество replica. Если вы используете cluster Selena с shared-nothing, установите количество replica перед созданием таблицы. Пример ниже для системы с тремя replica:

ADMIN SET FRONTEND CONFIG ('default_replication_num' = "3");

Создайте базу данных и переключитесь на неё:

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

Используйте CTAS для создания таблицы и загрузки данных из примера набора данных gs://selena-samples/user_behavior_ten_million_rows.parquet в таблицу:

CREATE TABLE user_behavior_inferred AS
SELECT * FROM FILES
(
"path" = "gs://selena-samples/user_behavior_ten_million_rows.parquet",
"format" = "parquet",
"gcp.gcs.service_account_email" = "sampledatareader@xxxxx-xxxxxx-000000.iam.gserviceaccount.com",
"gcp.gcs.service_account_private_key_id" = "baaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gcp.gcs.service_account_private_key" = "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----"
);

ПРИМЕЧАНИЕ

Замените учётные данные в приведённой выше команде на свои собственные. Можно использовать любой действительный email сервисной учётной записи, ключ и секрет, так как объект доступен для чтения любому аутентифицированному пользователю GCP.

После создания таблицы вы можете просмотреть её схему с помощью DESCRIBE:

DESCRIBE user_behavior_inferred;

Система возвращает результат запроса, аналогичный следующему:

+--------------+-----------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+-----------+------+-------+---------+-------+
| UserID | bigint | YES | true | NULL | |
| ItemID | bigint | YES | true | NULL | |
| CategoryID | bigint | YES | true | NULL | |
| BehaviorType | varbinary | YES | false | NULL | |
| Timestamp | varbinary | YES | false | NULL | |
+--------------+-----------+------+-------+---------+-------+

Запросите таблицу, чтобы убедиться, что данные были загружены в неё. Пример:

SELECT * from user_behavior_inferred LIMIT 3;

Возвращается следующий результат запроса, указывающий, что данные были успешно загружены:

+--------+--------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+--------+------------+--------------+---------------------+
| 84 | 162325 | 2939262 | pv | 2017-12-02 05:41:41 |
| 84 | 232622 | 4148053 | pv | 2017-11-27 04:36:10 |
| 84 | 595303 | 903809 | pv | 2017-11-26 08:03:59 |
+--------+--------+------------+--------------+---------------------+

Загрузка в существующую таблицу с помощью INSERT

Вы можете захотеть настроить таблицу, в которую вставляете данные, например:

  • тип данных столбца, настройку nullable или значения по умолчанию
  • типы ключей и столбцы
  • разбиение данных на partition и bucket

ПРИМЕЧАНИЕ

Создание наиболее эффективной структуры таблицы требует знания того, как данные будут использоваться и содержимого столбцов. Эта тема не охватывает проектирование таблиц. Для получения информации о проектировании таблиц см. Типы таблиц.

В этом примере мы создаём таблицу на основе знаний о том, как таблица будет запрашиваться, и данных в файле Parquet. Знание данных в файле Parquet можно получить, запросив файл напрямую в GCS.

  • Поскольку запрос набора данных в GCS указывает, что столбец Timestamp содержит данные, соответствующие типу данных VARBINARY, тип столбца указан в следующем DDL.
  • Запрашивая данные в GCS, вы можете обнаружить, что в наборе данных нет значений NULL, поэтому DDL не устанавливает какие-либо столбцы как nullable.
  • На основе знаний об ожидаемых типах запросов sort key и столбец для bucket установлены на столбец UserID. Ваш случай использования может отличаться для этих данных, поэтому вы можете решить использовать ItemID в дополнение к UserID или вместо него для sort key.

Создайте базу данных и переключитесь на неё:

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

Создайте таблицу вручную (мы рекомендуем, чтобы таблица имела ту же схему, что и файл Parquet, который вы хотите загрузить из GCS):

CREATE TABLE user_behavior_declared
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp varbinary
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);

Отобразите схему, чтобы вы могли сравнить её с выведенной схемой, созданной табличной функцией FILES():

DESCRIBE user_behavior_declared;
+--------------+----------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+----------------+------+-------+---------+-------+
| UserID | int | NO | true | NULL | |
| ItemID | int | NO | false | NULL | |
| CategoryID | int | NO | false | NULL | |
| BehaviorType | varchar(65533) | NO | false | NULL | |
| Timestamp | varbinary | NO | false | NULL | |
+--------------+----------------+------+-------+---------+-------+
5 rows in set (0.00 sec)
подсказка

Сравните схему, которую вы только что создали, со схемой, выведенной ранее с помощью табличной функции FILES(). Посмотрите на:

  • типы данных
  • nullable
  • ключевые поля

Для лучшего контроля схемы целевой таблицы и лучшей производительности запросов мы рекомендуем указывать схему таблицы вручную в производственных средах.

После создания таблицы вы можете загрузить её с помощью INSERT INTO SELECT FROM FILES():

INSERT INTO user_behavior_declared
SELECT * FROM FILES
(
"path" = "gs://selena-samples/user_behavior_ten_million_rows.parquet",
"format" = "parquet",
"gcp.gcs.service_account_email" = "sampledatareader@xxxxx-xxxxxx-000000.iam.gserviceaccount.com",
"gcp.gcs.service_account_private_key_id" = "baaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gcp.gcs.service_account_private_key" = "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----"
);

ПРИМЕЧАНИЕ

Замените учётные данные в приведённой выше команде на свои собственные. Можно использовать любой действительный email сервисной учётной записи, ключ и секрет, так как объект доступен для чтения любому аутентифицированному пользователю GCP.

После завершения загрузки вы можете запросить таблицу, чтобы убедиться, что данные были загружены в неё. Пример:

SELECT * from user_behavior_declared LIMIT 3;

Система возвращает результат запроса, аналогичный следующему, указывающий, что данные были успешно загружены:

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 142 | 2869980 | 2939262 | pv | 2017-11-25 03:43:22 |
| 142 | 2522236 | 1669167 | pv | 2017-11-25 15:14:12 |
| 142 | 3031639 | 3607361 | pv | 2017-11-25 15:19:25 |
+--------+---------+------------+--------------+---------------------+

Проверка прогресса загрузки

Вы можете запросить прогресс заданий INSERT из представления loads в Information Schema Selena. Эта функция поддерживается начиная с версии v1.5.2. Пример:

SELECT * FROM information_schema.loads ORDER BY JOB_ID DESC;

Для получения информации о полях, предоставляемых в представлении loads, см. loads.

Если вы отправили несколько заданий загрузки, вы можете фильтровать по LABEL, связанному с заданием. Пример:

SELECT * FROM information_schema.loads WHERE LABEL = 'insert_f3fc2298-a553-11ee-92f4-00163e0842bd' \G
*************************** 1. row ***************************
JOB_ID: 10193
LABEL: insert_f3fc2298-a553-11ee-92f4-00163e0842bd
DATABASE_NAME: mydatabase
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: INSERT
PRIORITY: NORMAL
SCAN_ROWS: 10000000
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 10000000
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):300; max_filter_ratio:0.0
CREATE_TIME: 2023-12-28 15:37:38
ETL_START_TIME: 2023-12-28 15:37:38
ETL_FINISH_TIME: 2023-12-28 15:37:38
LOAD_START_TIME: 2023-12-28 15:37:38
LOAD_FINISH_TIME: 2023-12-28 15:39:35
JOB_DETAILS: {"All backends":{"f3fc2298-a553-11ee-92f4-00163e0842bd":[10120]},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":581730322,"InternalTableLoadRows":10000000,"ScanBytes":581574034,"ScanRows":10000000,"TaskNumber":1,"Unfinished backends":{"f3fc2298-a553-11ee-92f4-00163e0842bd":[]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL

ПРИМЕЧАНИЕ

INSERT — это синхронная команда. Если задание INSERT всё ещё выполняется, вам нужно открыть другую сессию, чтобы проверить его статус выполнения.

Использование Broker Load

Асинхронный процесс Broker Load обрабатывает подключение к GCS, извлечение данных и сохранение данных в Selena.

Этот метод поддерживает следующие форматы файлов:

  • Parquet
  • ORC
  • CSV
  • JSON (поддерживается начиная с версии v1.5.2)

Преимущества Broker Load

  • Broker Load выполняется в фоновом режиме, и клиентам не нужно оставаться подключёнными для продолжения задания.
  • Broker Load предпочтителен для длительных заданий, тайм-аут по умолчанию составляет 4 часа.
  • В дополнение к форматам файлов Parquet и ORC, Broker Load поддерживает формат файлов CSV и формат файлов JSON (формат файлов JSON поддерживается начиная с версии v1.5.2).

Поток данных

Рабочий процесс Broker Load

  1. Пользователь создаёт задание загрузки.
  2. Frontend (FE) создаёт план запроса и распределяет план на backend-узлы (BE) или compute-узлы (CN).
  3. BE или CN извлекают данные из источника и загружают данные в Selena.

Типичный пример

Создайте таблицу, запустите процесс загрузки, который извлекает пример набора данных gs://selena-samples/user_behavior_ten_million_rows.parquet из GCS, и проверьте прогресс и успешность загрузки данных.

Создание базы данных и таблицы

Создайте базу данных и переключитесь на неё:

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

Создайте таблицу вручную (мы рекомендуем, чтобы таблица имела ту же схему, что и файл Parquet, который вы хотите загрузить из GCS):

CREATE TABLE user_behavior
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp varbinary
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);

Запуск Broker Load

Выполните следующую команду, чтобы запустить задание Broker Load, которое загружает данные из примера набора данных gs://selena-samples/user_behavior_ten_million_rows.parquet в таблицу user_behavior:

LOAD LABEL user_behavior
(
DATA INFILE("gs://selena-samples/user_behavior_ten_million_rows.parquet")
INTO TABLE user_behavior
FORMAT AS "parquet"
)
WITH BROKER
(

"gcp.gcs.service_account_email" = "sampledatareader@xxxxx-xxxxxx-000000.iam.gserviceaccount.com",
"gcp.gcs.service_account_private_key_id" = "baaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gcp.gcs.service_account_private_key" = "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----"
)
PROPERTIES
(
"timeout" = "72000"
);

ПРИМЕЧАНИЕ

Замените учётные данные в приведённой выше команде на свои собственные. Можно использовать любой действительный email сервисной учётной записи, ключ и секрет, так как объект доступен для чтения любому аутентифицированному пользователю GCP.

Это задание имеет четыре основных раздела:

  • LABEL: Строка, используемая при запросе состояния задания загрузки.
  • LOAD declaration: URI источника, формат исходных данных и имя целевой таблицы.
  • BROKER: Данные подключения к источнику.
  • PROPERTIES: Значение тайм-аута и любые другие свойства, применяемые к заданию загрузки.

Подробный синтаксис и описание параметров см. в BROKER LOAD.

Проверка прогресса загрузки

Вы можете запросить прогресс заданий INSERT из представления loads в Information Schema Selena. Эта функция поддерживается начиная с версии v1.5.2.

SELECT * FROM information_schema.loads;

Для получения информации о полях, предоставляемых в представлении loads, см. loads.

Если вы отправили несколько заданий загрузки, вы можете фильтровать по LABEL, связанному с заданием. Пример:

SELECT * FROM information_schema.loads WHERE LABEL = 'user_behavior';

В выводе ниже есть две записи для задания загрузки user_behavior:

  • Первая запись показывает состояние CANCELLED. Прокрутите до ERROR_MSG, и вы увидите, что задание завершилось неудачей из-за listPath failed.
  • Вторая запись показывает состояние FINISHED, что означает, что задание выполнено успешно.
JOB_ID|LABEL                                      |DATABASE_NAME|STATE    |PROGRESS           |TYPE  |PRIORITY|SCAN_ROWS|FILTERED_ROWS|UNSELECTED_ROWS|SINK_ROWS|ETL_INFO|TASK_INFO                                           |CREATE_TIME        |ETL_START_TIME     |ETL_FINISH_TIME    |LOAD_START_TIME    |LOAD_FINISH_TIME   |JOB_DETAILS                                                                                                                                                                                                                                                    |ERROR_MSG                             |TRACKING_URL|TRACKING_SQL|REJECTED_RECORD_PATH|
------+-------------------------------------------+-------------+---------+-------------------+------+--------+---------+-------------+---------------+---------+--------+----------------------------------------------------+-------------------+-------------------+-------------------+-------------------+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------+------------+------------+--------------------+
10121|user_behavior |mydatabase |CANCELLED|ETL:N/A; LOAD:N/A |BROKER|NORMAL | 0| 0| 0| 0| |resource:N/A; timeout(s):72000; max_filter_ratio:0.0|2023-08-10 14:59:30| | | |2023-08-10 14:59:34|{"All backends":{},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":0,"InternalTableLoadRows":0,"ScanBytes":0,"ScanRows":0,"TaskNumber":0,"Unfinished backends":{}} |type:ETL_RUN_FAIL; msg:listPath failed| | | |
10106|user_behavior |mydatabase |FINISHED |ETL:100%; LOAD:100%|BROKER|NORMAL | 86953525| 0| 0| 86953525| |resource:N/A; timeout(s):72000; max_filter_ratio:0.0|2023-08-10 14:50:15|2023-08-10 14:50:19|2023-08-10 14:50:19|2023-08-10 14:50:19|2023-08-10 14:55:10|{"All backends":{"a5fe5e1d-d7d0-4826-ba99-c7348f9a5f2f":[10004]},"FileNumber":1,"FileSize":1225637388,"InternalTableLoadBytes":2710603082,"InternalTableLoadRows":86953525,"ScanBytes":1225637388,"ScanRows":86953525,"TaskNumber":1,"Unfinished backends":{"a5| | | | |

После того как вы убедитесь, что задание загрузки завершено, вы можете проверить подмножество целевой таблицы, чтобы увидеть, были ли данные успешно загружены. Пример:

SELECT * from user_behavior LIMIT 3;

Система возвращает результат запроса, аналогичный следующему, указывающий, что данные были успешно загружены:

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 142 | 2869980 | 2939262 | pv | 2017-11-25 03:43:22 |
| 142 | 2522236 | 1669167 | pv | 2017-11-25 15:14:12 |
| 142 | 3031639 | 3607361 | pv | 2017-11-25 15:19:25 |
+--------+---------+------------+--------------+---------------------+