Перейти к основному содержимому

Загрузка данных из AWS S3

Selena предоставляет следующие варианты для загрузки данных из AWS S3:

  • Синхронная загрузка с использованием INSERT+FILES()
  • Асинхронная загрузка с использованием Broker Load
  • Непрерывная асинхронная загрузка с использованием Pipe

Каждый из этих вариантов имеет свои преимущества, которые подробно описаны в следующих разделах.

В большинстве случаев мы рекомендуем использовать метод INSERT+FILES(), который намного проще в использовании.

Однако метод INSERT+FILES() в настоящее время поддерживает только форматы файлов Parquet, ORC и CSV. Поэтому, если вам нужно загрузить данные других форматов файлов, таких как JSON, или выполнить изменения данных, такие как DELETE во время загрузки данных, вы можете использовать Broker Load.

Если вам нужно загрузить большое количество файлов данных со значительным общим объемом данных (например, более 100 ГБ или даже 1 ТБ), мы рекомендуем использовать метод Pipe. Pipe может разделять файлы на основе их количества или размера, разбивая задачу загрузки на более мелкие последовательные задачи. Такой подход гарантирует, что ошибки в одном файле не повлияют на всю задачу загрузки и минимизирует необходимость повторных попыток из-за ошибок данных.

Перед началом работы

Подготовка исходных данных

Убедитесь, что исходные данные, которые вы хотите загрузить в Selena, правильно сохранены в S3 bucket. Также стоит учесть расположение данных и базы данных, поскольку стоимость передачи данных значительно ниже, когда ваш bucket и кластер Selena находятся в одном регионе.

В этой теме мы предоставляем вам образец набора данных в S3 bucket: s3://starrocks-examples/user-behavior-10-million-rows.parquet. Вы можете получить доступ к этому набору данных с любыми действительными учетными данными, поскольку объект доступен для чтения любому аутентифицированному пользователю AWS.

Проверка привилегий

Вы можете загружать данные в таблицы Selena только как пользователь, имеющий привилегию INSERT на эти таблицы Selena. Если у вас нет привилегии INSERT, следуйте инструкциям в GRANT, чтобы предоставить привилегию INSERT пользователю, которого вы используете для подключения к вашему кластеру Selena. Синтаксис: GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}.

Сбор данных для аутентификации

Примеры в этой теме используют аутентификацию на основе IAM-пользователя. Чтобы убедиться, что у вас есть разрешение на чтение данных из AWS S3, мы рекомендуем прочитать Подготовка к аутентификации на основе IAM-пользователя и следовать инструкциям для создания IAM-пользователя с правильно настроенными IAM-политиками.

В двух словах, если вы используете аутентификацию на основе IAM-пользователя, вам необходимо собрать информацию о следующих ресурсах AWS:

  • S3 bucket, в котором хранятся ваши данные.
  • Ключ объекта S3 (имя объекта) при доступе к конкретному объекту в bucket. Обратите внимание, что ключ объекта может включать префикс, если ваши объекты S3 хранятся в подпапках.
  • Регион AWS, к которому принадлежит S3 bucket.
  • Ключ доступа и секретный ключ, используемые в качестве учетных данных для доступа.

Для получения информации обо всех доступных методах аутентификации см. Аутентификация в ресурсах AWS.

Использование INSERT+FILES()

Этот метод доступен начиная с версии 1.5.0 и в настоящее время поддерживает только форматы файлов Parquet, ORC и CSV (начиная с версии 1.5.0).

Преимущества INSERT+FILES()

FILES() может читать файл, хранящийся в облачном хранилище, на основе указанных вами свойств пути, выводить схему таблицы данных в файле, а затем возвращать данные из файла в виде строк данных.

С помощью FILES() вы можете:

  • Запрашивать данные напрямую из S3 с помощью SELECT.
  • Создавать и загружать таблицу с помощью CREATE TABLE AS SELECT (CTAS).
  • Загружать данные в существующую таблицу с помощью INSERT.

Типичные примеры

Прямой запрос из S3 с помощью SELECT

Прямой запрос из S3 с помощью SELECT+FILES() может дать хороший предварительный просмотр содержимого набора данных перед созданием таблицы. Например:

  • Получить предварительный просмотр набора данных без сохранения данных.
  • Запросить минимальные и максимальные значения и решить, какие типы данных использовать.
  • Проверить наличие значений NULL.

Следующий пример запрашивает образец набора данных s3://starrocks-examples/user-behavior-10-million-rows.parquet:

SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user-behavior-10-million-rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
)
LIMIT 3;

ПРИМЕЧАНИЕ

Замените ваши учетные данные на AAA и BBB в приведенной выше команде. Можно использовать любые действительные aws.s3.access_key и aws.s3.secret_key, поскольку объект доступен для чтения любому аутентифицированному пользователю AWS.

Система возвращает следующий результат запроса:

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 1 | 2576651 | 149192 | pv | 2017-11-25 01:21:25 |
| 1 | 3830808 | 4181361 | pv | 2017-11-25 07:04:53 |
| 1 | 4365585 | 2520377 | pv | 2017-11-25 07:49:06 |
+--------+---------+------------+--------------+---------------------+

ПРИМЕЧАНИЕ

Обратите внимание, что имена столбцов, возвращенные выше, предоставляются файлом Parquet.

Создание и загрузка таблицы с помощью CTAS

Это продолжение предыдущего примера. Предыдущий запрос обернут в CREATE TABLE AS SELECT (CTAS) для автоматизации создания таблицы с использованием вывода схемы. Это означает, что Selena выведет схему таблицы, создаст нужную вам таблицу, а затем загрузит данные в таблицу. Имена и типы столбцов не требуются для создания таблицы при использовании табличной функции FILES() с файлами Parquet, поскольку формат Parquet включает имена столбцов.

ПРИМЕЧАНИЕ

Синтаксис CREATE TABLE при использовании вывода схемы не позволяет устанавливать количество реплик, поэтому установите его перед созданием таблицы. Приведенный ниже пример для системы с одной репликой:

ADMIN SET FRONTEND CONFIG ('default_replication_num' = "1");

Создайте базу данных и переключитесь на неё:

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

Используйте CTAS для создания таблицы и загрузки данных образца набора данных s3://starrocks-examples/user-behavior-10-million-rows.parquet в таблицу:

CREATE TABLE user_behavior_inferred AS
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user-behavior-10-million-rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);

ПРИМЕЧАНИЕ

Замените ваши учетные данные на AAA и BBB в приведенной выше команде. Можно использовать любые действительные aws.s3.access_key и aws.s3.secret_key, поскольку объект доступен для чтения любому аутентифицированному пользователю AWS.

После создания таблицы вы можете просмотреть её схему с помощью DESCRIBE:

DESCRIBE user_behavior_inferred;

Система возвращает следующий результат запроса:

+--------------+------------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+------------------+------+-------+---------+-------+
| UserID | bigint | YES | true | NULL | |
| ItemID | bigint | YES | true | NULL | |
| CategoryID | bigint | YES | true | NULL | |
| BehaviorType | varchar(1048576) | YES | false | NULL | |
| Timestamp | varchar(1048576) | YES | false | NULL | |
+--------------+------------------+------+-------+---------+-------+

Запросите таблицу, чтобы убедиться, что данные были загружены в неё. Пример:

SELECT * from user_behavior_inferred LIMIT 3;

Возвращается следующий результат запроса, указывающий на то, что данные были успешно загружены:

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 225586 | 3694958 | 1040727 | pv | 2017-12-01 00:58:40 |
| 225586 | 3726324 | 965809 | pv | 2017-12-01 02:16:02 |
| 225586 | 3732495 | 1488813 | pv | 2017-12-01 00:59:46 |
+--------+---------+------------+--------------+---------------------+

Загрузка в существующую таблицу с помощью INSERT

Вы можете захотеть настроить таблицу, в которую вы вставляете данные, например:

  • тип данных столбца, настройка nullable или значения по умолчанию
  • типы ключей и столбцы
  • разделение и группировка данных

ПРИМЕЧАНИЕ

Создание наиболее эффективной структуры таблицы требует знания того, как будут использоваться данные и содержимое столбцов. Эта тема не охватывает проектирование таблиц. Для получения информации о проектировании таблиц см. Типы таблиц.

В этом примере мы создаем таблицу на основе знания того, как таблица будет запрашиваться, и данных в файле Parquet. Знание данных в файле Parquet можно получить, запросив файл напрямую в S3.

  • Поскольку запрос набора данных в S3 показывает, что столбец Timestamp содержит данные, соответствующие типу данных VARCHAR, и Selena может приводить из VARCHAR к DATETIME, тип данных изменен на DATETIME в следующем DDL.
  • Запросив данные в S3, вы можете обнаружить, что в наборе данных нет значений NULL, поэтому DDL также может установить все столбцы как non-nullable.
  • На основе знания ожидаемых типов запросов ключ сортировки и столбец группировки установлены на столбец UserID. Ваш случай использования может отличаться для этих данных, поэтому вы можете решить использовать ItemID в дополнение к UserID или вместо него для ключа сортировки.

Создайте базу данных и переключитесь на неё:

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

Создайте таблицу вручную:

CREATE TABLE user_behavior_declared
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);

Отобразите схему, чтобы вы могли сравнить её с выведенной схемой, созданной табличной функцией FILES():

DESCRIBE user_behavior_declared;
+--------------+----------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+----------------+------+-------+---------+-------+
| UserID | int | YES | true | NULL | |
| ItemID | int | YES | false | NULL | |
| CategoryID | int | YES | false | NULL | |
| BehaviorType | varchar(65533) | YES | false | NULL | |
| Timestamp | datetime | YES | false | NULL | |
+--------------+----------------+------+-------+---------+-------+
подсказка

Сравните схему, которую вы только что создали, со схемой, выведенной ранее с помощью табличной функции FILES(). Обратите внимание на:

  • типы данных
  • nullable
  • ключевые поля

Для лучшего контроля схемы целевой таблицы и лучшей производительности запросов мы рекомендуем указывать схему таблицы вручную в производственных средах.

После создания таблицы вы можете загрузить её с помощью INSERT INTO SELECT FROM FILES():

INSERT INTO user_behavior_declared
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user-behavior-10-million-rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);

ПРИМЕЧАНИЕ

Замените ваши учетные данные на AAA и BBB в приведенной выше команде. Можно использовать любые действительные aws.s3.access_key и aws.s3.secret_key, поскольку объект доступен для чтения любому аутентифицированному пользователю AWS.

После завершения загрузки вы можете запросить таблицу, чтобы убедиться, что данные были загружены в неё. Пример:

SELECT * from user_behavior_declared LIMIT 3;

Возвращается следующий результат запроса, указывающий на то, что данные были успешно загружены:

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 393529 | 3715112 | 883960 | pv | 2017-12-02 02:45:44 |
| 393529 | 2650583 | 883960 | pv | 2017-12-02 02:45:59 |
| 393529 | 3715112 | 883960 | pv | 2017-12-02 03:00:56 |
+--------+---------+------------+--------------+---------------------+

Проверка прогресса загрузки

Вы можете запросить прогресс заданий INSERT из представления loads в Selena Information Schema. Эта функция поддерживается начиная с версии 1.5.0. Пример:

SELECT * FROM information_schema.loads ORDER BY JOB_ID DESC;

Для получения информации о полях, предоставляемых в представлении loads, см. loads.

Если вы отправили несколько заданий загрузки, вы можете фильтровать по LABEL, связанному с заданием. Пример:

SELECT * FROM information_schema.loads WHERE LABEL = 'insert_e3b882f5-7eb3-11ee-ae77-00163e267b60' \G
*************************** 1. row ***************************
JOB_ID: 10243
LABEL: insert_e3b882f5-7eb3-11ee-ae77-00163e267b60
DATABASE_NAME: mydatabase
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: INSERT
PRIORITY: NORMAL
SCAN_ROWS: 10000000
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 10000000
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):300; max_filter_ratio:0.0
CREATE_TIME: 2023-11-09 11:56:01
ETL_START_TIME: 2023-11-09 11:56:01
ETL_FINISH_TIME: 2023-11-09 11:56:01
LOAD_START_TIME: 2023-11-09 11:56:01
LOAD_FINISH_TIME: 2023-11-09 11:56:44
JOB_DETAILS: {"All backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[10142]},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":311710786,"InternalTableLoadRows":10000000,"ScanBytes":581574034,"ScanRows":10000000,"TaskNumber":1,"Unfinished backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL

ПРИМЕЧАНИЕ

INSERT — это синхронная команда. Если задание INSERT все еще выполняется, вам нужно открыть другую сессию, чтобы проверить его статус выполнения.

Использование Broker Load

Асинхронный процесс Broker Load обрабатывает подключение к S3, извлечение данных и сохранение данных в Selena.

Этот метод поддерживает следующие форматы файлов:

  • Parquet
  • ORC
  • CSV
  • JSON (поддерживается начиная с версии 1.5.0)

Преимущества Broker Load

  • Broker Load выполняется в фоновом режиме, и клиентам не нужно оставаться подключенными для продолжения задания.
  • Broker Load предпочтителен для долго выполняющихся заданий с таймаутом по умолчанию в 4 часа.
  • В дополнение к форматам файлов Parquet и ORC, Broker Load поддерживает формат файлов CSV и формат файлов JSON (формат файлов JSON поддерживается начиная с версии 1.5.0).

Поток данных

Workflow of Broker Load

  1. Пользователь создает задание загрузки.
  2. Frontend (FE) создает план запроса и распределяет план по backend узлам (BE) или compute узлам (CN).
  3. BE или CN извлекают данные из источника и загружают данные в Selena.

Типичный пример

Создайте таблицу, запустите процесс загрузки, который извлекает образец набора данных s3://starrocks-examples/user-behavior-10-million-rows.parquet из S3, и проверьте прогресс и успех загрузки данных.

Создание базы данных и таблицы

Создайте базу данных и переключитесь на неё:

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

Создайте таблицу вручную (мы рекомендуем, чтобы таблица имела ту же схему, что и файл Parquet, который вы хотите загрузить из AWS S3):

CREATE TABLE user_behavior
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);

Запуск Broker Load

Выполните следующую команду для запуска задания Broker Load, которое загружает данные из образца набора данных s3://starrocks-examples/user-behavior-10-million-rows.parquet в таблицу user_behavior:

LOAD LABEL user_behavior
(
DATA INFILE("s3://starrocks-examples/user-behavior-10-million-rows.parquet")
INTO TABLE user_behavior
FORMAT AS "parquet"
)
WITH BROKER
(
"aws.s3.enable_ssl" = "true",
"aws.s3.use_instance_profile" = "false",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
)
PROPERTIES
(
"timeout" = "72000"
);

ПРИМЕЧАНИЕ

Замените ваши учетные данные на AAA и BBB в приведенной выше команде. Можно использовать любые действительные aws.s3.access_key и aws.s3.secret_key, поскольку объект доступен для чтения любому аутентифицированному пользователю AWS.

Это задание имеет четыре основных раздела:

  • LABEL: Строка, используемая при запросе состояния задания загрузки.
  • Объявление LOAD: URI источника, формат исходных данных и имя целевой таблицы.
  • BROKER: Детали подключения к источнику.
  • PROPERTIES: Значение таймаута и любые другие свойства для применения к заданию загрузки.

Для подробного синтаксиса и описания параметров см. BROKER LOAD.

Проверка прогресса загрузки

Вы можете запросить прогресс задания Broker Load из представления loads в Selena Information Schema. Эта функция поддерживается начиная с версии 1.5.0.

SELECT * FROM information_schema.loads WHERE LABEL = 'user_behavior';

Для получения информации о полях, предоставляемых в представлении loads, см. loads.

Эта запись показывает состояние LOADING и прогресс 39%. Если вы видите что-то подобное, выполните команду снова, пока не увидите состояние FINISHED.

              JOB_ID: 10466
LABEL: user_behavior
DATABASE_NAME: mydatabase
STATE: LOADING
PROGRESS: ETL:100%; LOAD:39%
TYPE: BROKER
PRIORITY: NORMAL
SCAN_ROWS: 4620288
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 4620288
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):72000; max_filter_ratio:0.0
CREATE_TIME: 2024-02-28 22:11:36
ETL_START_TIME: 2024-02-28 22:11:41
ETL_FINISH_TIME: 2024-02-28 22:11:41
LOAD_START_TIME: 2024-02-28 22:11:41
LOAD_FINISH_TIME: NULL
JOB_DETAILS: {"All backends":{"2fb97223-b14c-404b-9be1-83aa9b3a7715":[10004]},"FileNumber":1,"FileSize":136901706,"InternalTableLoadBytes":144032784,"InternalTableLoadRows":4620288,"ScanBytes":143969616,"ScanRows":4620288,"TaskNumber":1,"Unfinished backends":{"2fb97223-b14c-404b-9be1-83aa9b3a7715":[10004]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL

После того как вы подтвердите, что задание загрузки завершено, вы можете проверить подмножество целевой таблицы, чтобы увидеть, были ли данные успешно загружены. Пример:

SELECT * from user_behavior LIMIT 3;

Возвращается следующий результат запроса, указывающий на то, что данные были успешно загружены:

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 34 | 856384 | 1029459 | pv | 2017-11-27 14:43:27 |
| 34 | 5079705 | 1029459 | pv | 2017-11-27 14:44:13 |
| 34 | 4451615 | 1029459 | pv | 2017-11-27 14:45:52 |
+--------+---------+------------+--------------+---------------------+

Использование Pipe

Начиная с версии 1.5.0, Selena предоставляет метод загрузки Pipe, который в настоящее время поддерживает только форматы файлов Parquet и ORC.

Преимущества Pipe

Pipe идеально подходит для непрерывной загрузки данных и крупномасштабной загрузки данных:

  • Крупномасштабная загрузка данных в микро-пакетах помогает снизить стоимость повторных попыток, вызванных ошибками данных.

    С помощью Pipe, Selena обеспечивает эффективную загрузку большого количества файлов данных со значительным общим объемом данных. Pipe автоматически разделяет файлы на основе их количества или размера, разбивая задачу загрузки на более мелкие последовательные задачи. Этот подход гарантирует, что ошибки в одном файле не повлияют на всю задачу загрузки. Статус загрузки каждого файла записывается Pipe, что позволяет легко идентифицировать и исправить файлы, содержащие ошибки. Минимизируя необходимость в повторных попытках из-за ошибок данных, этот подход помогает снизить затраты.

  • Непрерывная загрузка данных помогает сократить трудозатраты.

    Pipe помогает записывать новые или обновленные файлы данных в определенное место и непрерывно загружать новые данные из этих файлов в Selena. После создания задачи Pipe с указанием "AUTO_INGEST" = "TRUE", она будет постоянно отслеживать изменения в файлах данных, хранящихся по указанному пути, и автоматически загружать новые или обновленные данные из файлов данных в целевую таблицу Selena.

Кроме того, Pipe выполняет проверки уникальности файлов, чтобы предотвратить дублирование загрузки данных. Во время процесса загрузки Pipe проверяет уникальность каждого файла данных на основе имени файла и дайджеста. Если файл с определенным именем и дайджестом уже был обработан задачей Pipe, задача Pipe пропустит все последующие файлы с тем же именем и дайджестом. Обратите внимание, что object storage like AWS S3 uses ETag используется как дайджест файла.

Статус загрузки каждого файла данных записывается и сохраняется в представлении information_schema.pipe_files. После удаления задачи Pipe, связанной с представлением, записи о файлах, загруженных в этой задаче, также будут удалены.

Поток данных

Поток данных Pipe

Pipe идеально подходит для непрерывной загрузки данных и крупномасштабной загрузки данных:

  • Крупномасштабная загрузка данных в микро-пакетах помогает снизить стоимость повторных попыток, вызванных ошибками данных.

    С помощью Pipe Selena обеспечивает эффективную загрузку большого количества файлов данных со значительным общим объемом данных. Pipe автоматически разделяет файлы на основе их количества или размера, разбивая задание загрузки на более мелкие последовательные задачи. Этот подход гарантирует, что ошибки в одном файле не влияют на все задание загрузки. Статус загрузки каждого файла записывается Pipe, что позволяет легко идентифицировать и исправлять файлы, содержащие ошибки. Минимизируя необходимость в повторных попытках из-за ошибок данных, этот подход помогает снизить затраты.

  • Непрерывная загрузка данных помогает сократить трудозатраты.

    Pipe помогает записывать новые или обновленные файлы данных в определенное место и непрерывно загружать новые данные из этих файлов в Selena. После создания задания Pipe с указанием "AUTO_INGEST" = "TRUE" оно будет постоянно отслеживать изменения в файлах данных, хранящихся по указанному пути, и автоматически загружать новые или обновленные данные из файлов данных в целевую таблицу Selena.

Кроме того, Pipe выполняет проверки уникальности файлов, чтобы предотвратить дублирование загрузки данных. Во время процесса загрузки Pipe проверяет уникальность каждого файла данных на основе имени файла и дайджеста. Если файл с определенным именем файла и дайджестом уже был обработан заданием Pipe, задание Pipe пропустит все последующие файлы с тем же именем файла и дайджестом. Обратите внимание, что object storage, такое как AWS S3, использует ETag в качестве дайджеста файла.

Статус загрузки каждого файла данных записывается и сохраняется в представлении information_schema.pipe_files. После удаления задания Pipe, связанного с представлением, записи о файлах, загруженных в этом задании, также будут удалены.

Различия между Pipe и INSERT+FILES()

Задание Pipe разделяется на одну или несколько транзакций на основе размера и количества строк в каждом файле данных. Пользователи могут запрашивать промежуточные результаты во время процесса загрузки. В отличие от этого, задание INSERT+FILES() обрабатывается как одна транзакция, и пользователи не могут просматривать данные во время процесса загрузки.

Последовательность загрузки файлов

Для каждого задания Pipe Selena поддерживает очередь файлов, из которой извлекает и загружает файлы данных как микро-пакеты. Pipe не гарантирует, что файлы данных загружаются в том же порядке, в котором они загружены. Поэтому более новые данные могут быть загружены раньше более старых данных.

Типичный пример

Создание базы данных и таблицы

Создайте базу данных и переключитесь на неё:

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

Создайте таблицу вручную (мы рекомендуем, чтобы таблица имела ту же схему, что и файл Parquet, который вы хотите загрузить из AWS S3):

CREATE TABLE user_behavior_from_pipe
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);

Запуск задания Pipe

Выполните следующую команду для запуска задания Pipe, которое загружает данные из образца набора данных s3://starrocks-examples/user-behavior-10-million-rows/ в таблицу user_behavior_from_pipe. Это задание pipe использует как микро-пакеты, так и непрерывную загрузку (описанные выше) специфичные для pipe функции.

Другие примеры в этом руководстве загружают один файл Parquet с 10 миллионами строк. Для примера pipe тот же набор данных разделен на 57 отдельных файлов, и все они хранятся в одной папке S3. Обратите внимание, что в команде CREATE PIPE ниже path — это URI для папки S3, и вместо предоставления имени файла URI заканчивается на /*. Установив AUTO_INGEST и указав папку, а не отдельный файл, задание pipe будет опрашивать папку S3 на предмет новых файлов и принимать их по мере добавления в папку.

CREATE PIPE user_behavior_pipe
PROPERTIES
(
"AUTO_INGEST" = "TRUE"
)
AS
INSERT INTO user_behavior_from_pipe
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user-behavior-10-million-rows/*",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);

ПРИМЕЧАНИЕ

Замените ваши учетные данные на AAA и BBB в приведенной выше команде. Можно использовать любые действительные aws.s3.access_key и aws.s3.secret_key, поскольку объект доступен для чтения любому аутентифицированному пользователю AWS.

Это задание имеет четыре основных раздела:

  • pipe_name: Имя pipe. Имя pipe должно быть уникальным в базе данных, к которой принадлежит pipe.
  • INSERT_SQL: Оператор INSERT INTO SELECT FROM FILES, который используется для загрузки данных из указанного исходного файла данных в целевую таблицу.
  • PROPERTIES: Набор необязательных параметров, которые указывают, как выполнить pipe. К ним относятся AUTO_INGEST, POLL_INTERVAL, BATCH_SIZE и BATCH_FILES. Укажите эти свойства в формате "key" = "value".

Для подробного синтаксиса и описания параметров см. CREATE PIPE.

Проверка прогресса загрузки

  • Запросите прогресс задания Pipe с помощью SHOW PIPES в текущей базе данных, к которой принадлежит задание Pipe.

    SHOW PIPES WHERE NAME = 'user_behavior_pipe' \G

    Возвращается следующий результат:

    подсказка

    В выводе, показанном ниже, pipe находится в состоянии RUNNING. Pipe будет оставаться в состоянии RUNNING, пока вы не остановите его вручную. Вывод также показывает количество загруженных файлов (57) и время последней загрузки файла.

    *************************** 1. row ***************************
    DATABASE_NAME: mydatabase
    PIPE_ID: 10476
    PIPE_NAME: user_behavior_pipe
    STATE: RUNNING
    TABLE_NAME: mydatabase.user_behavior_from_pipe
    LOAD_STATUS: {"loadedFiles":57,"loadedBytes":295345637,"loadingFiles":0,"lastLoadedTime":"2024-02-28 22:14:19"}
    LAST_ERROR: NULL
    CREATED_TIME: 2024-02-28 22:13:41
    1 row in set (0.02 sec)
  • Запросите прогресс задания Pipe из представления pipes в Selena Information Schema.

    SELECT * FROM information_schema.pipes WHERE pipe_name = 'user_behavior_replica' \G

    Возвращается следующий результат:

    подсказка

    Некоторые запросы в этом руководстве заканчиваются на \G вместо точки с запятой (;). Это заставляет клиент MySQL выводить результаты в вертикальном формате. Если вы используете DBeaver или другой клиент, вам может потребоваться использовать точку с запятой (;) вместо \G.

    *************************** 1. row ***************************
    DATABASE_NAME: mydatabase
    PIPE_ID: 10217
    PIPE_NAME: user_behavior_replica
    STATE: RUNNING
    TABLE_NAME: mydatabase.user_behavior_replica
    LOAD_STATUS: {"loadedFiles":1,"loadedBytes":132251298,"loadingFiles":0,"lastLoadedTime":"2023-11-09 15:35:42"}
    LAST_ERROR:
    CREATED_TIME: 9891-01-15 07:51:45
    1 row in set (0.01 sec)

Проверка статуса файлов

Вы можете запросить статус загрузки файлов, загруженных из представления pipe_files в Selena Information Schema.

SELECT * FROM information_schema.pipe_files WHERE pipe_name = 'user_behavior_replica' \G

Возвращается следующий результат:

*************************** 1. row ***************************
DATABASE_NAME: mydatabase
PIPE_ID: 10217
PIPE_NAME: user_behavior_replica
FILE_NAME: s3://starrocks-examples/user-behavior-10-million-rows.parquet
FILE_VERSION: e29daa86b1120fea58ad0d047e671787-8
FILE_SIZE: 132251298
LAST_MODIFIED: 2023-11-06 13:25:17
LOAD_STATE: FINISHED
STAGED_TIME: 2023-11-09 15:35:02
START_LOAD_TIME: 2023-11-09 15:35:03
FINISH_LOAD_TIME: 2023-11-09 15:35:42
ERROR_MSG:
1 row in set (0.03 sec)

Управление заданиями Pipe

Вы можете изменять, приостанавливать или возобновлять, удалять или запрашивать созданные вами pipe, а также повторять попытки загрузки конкретных файлов данных. Для получения дополнительной информации см. ALTER PIPE, SUSPEND or RESUME PIPE, DROP PIPE, SHOW PIPES и RETRY FILE.