Загрузка данных из AWS S3
Selena предоставляет следующие варианты загрузки данных из AWS S3:
- Синхронная загрузка с использованием INSERT+
FILES() - Асинхронная загрузка с использованием Broker Load
- Непрерывная асинхронная загрузка с использованием Pipe
Каждый из этих вариантов имеет свои преимущества, которые подробно описаны в следующих разделах.
В большинстве случаев м ы рекомендуем использовать метод INSERT+FILES(), который намного проще в использовании.
Однако метод INSERT+FILES() в настоящее время поддерживает только форматы файлов Parquet, ORC и CSV. Поэтому, если вам нужно загружать данные других форматов файлов, таких как JSON, или выполнять изменения данных, такие как DELETE, во время загрузки данных, вы можете использовать Broker Load.
Если вам нужно загрузить большое количество файлов данных со значительным общим объёмом данных (например, более 100 ГБ или даже 1 ТБ), мы рекомендуем использовать метод Pipe. Pipe может разбивать файлы по их количеству или размеру, разделяя задание загрузки на более мелкие последовательные задачи. Такой подход гарантирует, что ошибки в одном файле не повлияют на всё задание загрузки, и минимизирует необходимость повторных попыток из-за ошибок в данных.
Перед началом работы
Подготовка исходных данных
Убедитесь, что исходные данные, которые вы хотите загрузить в Selena, правильно хранятся в bucket S3. Вы также можете рассмотреть, где находятся данные и база данных, потому что затраты на передачу данных значительно ниже, когда ваш bucket и ваш cluster Selena находятся в одном регионе.
В этой теме мы предоставляем вам пример набора данных в bucket S3, s3://selena-examples/user-behavior-10-million-rows.parquet. Вы можете получить доступ к этому набору данных с любыми действительными учётными данными, так как объект доступен для чтения любому аутентифицированному пользователю AWS.
Проверка привилегий
Вы можете загружать данные в таблицы Selena только как пользователь, имеющий привилегию INSERT на эти таблицы Selena. Если у вас нет привилегии INSERT, следуйте инструкциям в разделе GRANT, чтобы предоставить привилегию INSERT пользователю, которого вы используете для подключения к вашему cluster Selena. Синтаксис: GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}.
Сбор данных аутентификации
Примеры в этой теме используют аутентификацию на основе IAM-пользователя. Чтобы убедиться, что у вас есть разрешение на чтение данных из AWS S3, мы рекомендуем прочитать Подготовка к аутентификации на основе IAM-пользователя и следовать инструкциям для создания IAM-пользователя с правильно настроенными политиками IAM.
Вкратце, если вы практикуете аутентификацию на основе IAM-пользователя, вам нужно собрать информацию о следующих ресурсах AWS:
- Bucket S3, в котором хранятся ваши данные.
- Ключ объекта S3 (имя объекта), если вы обращаетесь к определённому объекту в bucket. Обратите внимание, чт о ключ объекта может включать префикс, если ваши объекты S3 хранятся в подпапках.
- Регион AWS, к которому принадлежит bucket S3.
- Access key и secret key, используемые в качестве учётных данных доступа.
Для получения информации обо всех доступных методах аутентификации см. Аутентификация в ресурсах AWS.
Использование INSERT+FILES()
Этот метод доступен начиная с версии v1.5.2 и в настоящее время поддерживает только форматы файлов Parquet, ORC и CSV (начиная с версии v1.5.2).
Преимущества INSERT+FILES()
FILES() может читать файл, хранящийся в облачном хранилище, на основе указанных вами свойств, связанных с путём, выводить схему таблицы данных в файле, а затем возвращать данны е из файла в виде строк данных.
С помощью FILES() вы можете:
- Запрашивать данные напрямую из S3 с помощью SELECT.
- Создавать и загружать таблицу с помощью CREATE TABLE AS SELECT (CTAS).
- Загружать данные в существующую таблицу с помощью INSERT.
Типичные примеры
Прямой запрос из S3 с помощью SELECT
Прямой запрос из S3 с помощью SELECT+FILES() позволяет получить хороший предварительный просмотр содержимого набора данных перед созданием таблицы. Например:
- Получить предварительный просмотр набора данных без сохранения данных.
- Запросить минимальные и мак симальные значения и решить, какие типы данных использовать.
- Проверить наличие значений
NULL.
Следующий пример запрашивает пример набора данных s3://selena-examples/user-behavior-10-million-rows.parquet:
SELECT * FROM FILES
(
"path" = "s3://selena-examples/user-behavior-10-million-rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
)
LIMIT 3;
ПРИМЕЧАНИЕ
Замените
AAAиBBBна свои учётные данные в приведённой выше команде. Можно использовать любой действительныйaws.s3.access_keyиaws.s3.secret_key, так как объект доступен для чтения любому аутентифицированному пользователю AWS.
Система возвращает следующий результат запроса:
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 1 | 2576651 | 149192 | pv | 2017-11-25 01:21:25 |
| 1 | 3830808 | 4181361 | pv | 2017-11-25 07:04:53 |
| 1 | 4365585 | 2520377 | pv | 2017-11-25 07:49:06 |
+--------+---------+------------+--------------+---------------------+
ПРИМЕЧАНИЕ
Обратите внимание, что имена столбцов, возвращённые выше, предоставлены файлом Parquet.
Создание и загрузка таблицы с помощью CTAS
Это продолжение предыдущего примера. Предыдущий запрос обёрнут в CREATE TABLE AS SELECT (CTAS) для автоматизации создания таблицы с использованием вывода схемы. Это означает, что Selena выведет схему таблицы, создаст нужную вам таблицу, а затем загрузит данные в таблицу. Имена и типы столбцов не требуются для создания таблицы при использовании табличной функции FILES() с файлами Parquet, поскольку формат Parquet включает имена столбцов.
ПРИМЕЧАНИЕ
Синтаксис CREATE TABLE при использовании вывода схемы не позволяет устанавливать количество replica, поэтому установите его перед созданием таблицы. Пример ниже для системы с одной replica:
ADMIN SET FRONTEND CONFIG ('default_replication_num' = "1");
Создайте базу данных и переключитесь на неё:
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
Используйте CTAS для создания таблицы и загрузки данных из примера набора данных s3://selena-examples/user-behavior-10-million-rows.parquet в таблицу:
CREATE TABLE user_behavior_inferred AS
SELECT * FROM FILES
(
"path" = "s3://selena-examples/user-behavior-10-million-rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);
ПРИМЕЧАНИЕ
Замените
AAAиBBBна свои учётные данные в приведённой выше команде. Можно использовать любой действительныйaws.s3.access_keyиaws.s3.secret_key, так как объект доступен для чтения любому аутентифицированному пользователю AWS.
После создания таблицы вы можете просмотреть её схему с помощью DESCRIBE:
DESCRIBE user_behavior_inferred;
Система возвращает следующий результат запроса:
+--------------+------------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+------------------+------+-------+---------+-------+
| UserID | bigint | YES | true | NULL | |
| ItemID | bigint | YES | true | NULL | |
| CategoryID | bigint | YES | true | NULL | |
| BehaviorType | varchar(1048576) | YES | false | NULL | |
| Timestamp | varchar(1048576) | YES | false | NULL | |
+--------------+------------------+------+-------+---------+-------+
Запросите таблицу, чтобы убедиться, что данные были загружены в неё. Пример:
SELECT * from user_behavior_inferred LIMIT 3;
Возвращается следующий результат запроса, указывающий, что данные были успешно загружены:
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 225586 | 3694958 | 1040727 | pv | 2017-12-01 00:58:40 |
| 225586 | 3726324 | 965809 | pv | 2017-12-01 02:16:02 |
| 225586 | 3732495 | 1488813 | pv | 2017-12-01 00:59:46 |
+--------+---------+------------+--------------+---------------------+
Загрузка в существующую таблицу с помощью INSERT
Вы можете захотеть настроить таблицу, в которую вставляете данные, например:
- тип данных столбца, настройку nullable или значения по умолчанию
- типы ключей и столбцы
- разбиение данных на partition и bucket
ПРИМЕЧАНИЕ
Создание наиболее эффективной структуры таблицы требует знания того, как данные будут использоваться и содержимого столбцов. Эта тема не охватывает проектирование таблиц. Для получения информации о проектировании таблиц см. Типы таблиц.
В этом примере мы создаём таблицу на основе знаний о том, как таблица будет запрашиваться, и данных в файле Parquet. Знание данных в файле Parquet можно получить, запросив файл напрямую в S3.
- Поскольку запрос набора данных в S3 указывает, что столбец
Timestampсодержит данные, соответствующие типу данных VARCHAR, и Selena может преобразовать VARCHAR в DATETIME, тип данных изменён на DATETIME в следующем DDL. - Запрашивая данные в S3, вы можете обнаружить, что в наборе данных нет значений
NULL, поэтому DDL также может установить все столбцы как non-nullable. - На основе знаний об ожидаемых типах запросов sort key и столбец для bucket установлены на столбец
UserID. Ваш случай использования может отличаться для этих данных, поэтому вы можете решить использоватьItemIDв дополнение кUserIDили вместо него для sort key.
Создайте базу данных и переключитесь на неё:
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
Создайте таблицу вручную:
CREATE TABLE user_behavior_declared
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);
Отобразите схему, чтобы вы могли сравнить её с выведенной схемой, созданной табличной функцией FILES():
DESCRIBE user_behavior_declared;
+--------------+----------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+----------------+------+-------+---------+-------+
| UserID | int | YES | true | NULL | |
| ItemID | int | YES | false | NULL | |
| CategoryID | int | YES | false | NULL | |
| BehaviorType | varchar(65533) | YES | false | NULL | |
| Timestamp | datetime | YES | false | NULL | |
+--------------+----------------+------+-------+---------+-------+
Сравните схему, которую вы только что создали, со схемой, выведенной ранее с помощью табличной функции FILES(). Посмотрите на:
- типы данных
- nullable
- ключевые поля
Для лучшего контроля схемы целевой таблицы и лучшей производительности запросов мы рекомендуем указывать схему таблицы вручную в производственных средах.
После создания таблицы вы можете загрузить её с помощью INSERT INTO SELECT FROM FILES():
INSERT INTO user_behavior_declared
SELECT * FROM FILES
(
"path" = "s3://selena-examples/user-behavior-10-million-rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);
ПРИМЕЧАНИЕ
Замените
AAAиBBBна свои учётные данные в приведённой выше команде. Можно использовать любой действительныйaws.s3.access_keyиaws.s3.secret_key, так как объект доступен для чтения любому аутентифицированному пользователю AWS.
После завершения загрузки вы можете запросить таблицу, чтобы убедиться, что данные были загружены в неё. Пример:
SELECT * from user_behavior_declared LIMIT 3;
Возвращается следующий результат запроса, указывающий, что данные были успешно загружены:
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 393529 | 3715112 | 883960 | pv | 2017-12-02 02:45:44 |
| 393529 | 2650583 | 883960 | pv | 2017-12-02 02:45:59 |
| 393529 | 3715112 | 883960 | pv | 2017-12-02 03:00:56 |
+--------+---------+------------+--------------+---------------------+
Проверка прогресса загрузки
Вы можете запросить прогресс заданий INSERT из представления loads в Information Schema Selena. Эта функция поддерживается начиная с версии v1.5.2. Пример:
SELECT * FROM information_schema.loads ORDER BY JOB_ID DESC;
Для по лучения информации о полях, предоставляемых в представлении loads, см. loads.
Если вы отправили несколько заданий загрузки, вы можете фильтровать по LABEL, связанному с заданием. Пример:
SELECT * FROM information_schema.loads WHERE LABEL = 'insert_e3b882f5-7eb3-11ee-ae77-00163e267b60' \G
*************************** 1. row ***************************
JOB_ID: 10243
LABEL: insert_e3b882f5-7eb3-11ee-ae77-00163e267b60
DATABASE_NAME: mydatabase
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: INSERT
PRIORITY: NORMAL
SCAN_ROWS: 10000000
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 10000000
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):300; max_filter_ratio:0.0
CREATE_TIME: 2023-11-09 11:56:01
ETL_START_TIME: 2023-11-09 11:56:01
ETL_FINISH_TIME: 2023-11-09 11:56:01
LOAD_START_TIME: 2023-11-09 11:56:01
LOAD_FINISH_TIME: 2023-11-09 11:56:44
JOB_DETAILS: {"All backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[10142]},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":311710786,"InternalTableLoadRows":10000000,"ScanBytes":581574034,"ScanRows":10000000,"TaskNumber":1,"Unfinished backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL
ПРИМЕЧАНИЕ
INSERT — это синхронная команда. Если задание INSERT всё ещё выполняется, вам нужно открыть другую сессию, чтобы проверить его статус выполнения.
Использование Broker Load
Асинхронный процесс Broker Load обрабатывает подключение к S3, извлечение данных и сохранение данных в Selena.
Этот метод поддерживает следующие форматы файлов:
- Parquet
- ORC
- CSV
- JSON (поддерживается начиная с версии v1.5.2)
Преимущества Broker Load
- Broker Load выполняется в фоновом режиме, и клиентам не нужно оставаться подключёнными для продолжения задания.
- Broker Load предпочтителен для длительных заданий, тайм-аут по умолчанию составляет 4 часа.
- В дополнение к форматам файлов Parquet и ORC, Broker Load поддерживает формат файлов CSV и формат файлов JSON (формат файлов JSON поддерживается начиная с версии v1.5.2).
Поток данных

- Пользователь создаёт задание загрузки.
- Frontend (FE) создаёт план запроса и распределяет план на backend-узлы (BE) или compute-узлы (CN).
- BE или CN извлекают данные из источника и загружают данные в Selena.
Типичный пример
Создайте таблицу, запустите процесс загрузки, который извлекает пример набора данных s3://selena-examples/user-behavior-10-million-rows.parquet из S3, и проверьте прогресс и успешность загрузки данных.
Создание базы данных и таблицы
Создайте базу данных и переключитесь на неё:
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
Создайте таблицу вручную (мы рекомендуем, чтобы таблица имела ту же схему, что и файл Parquet, который вы хотите загрузить из AWS S3):
CREATE TABLE user_behavior
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);
Запуск Broker Load
Выполните следующую команду, чтобы запустить задание Broker Load, которое загружает данные из примера набора данных s3://selena-examples/user-behavior-10-million-rows.parquet в таблицу user_behavior:
LOAD LABEL user_behavior
(
DATA INFILE("s3://selena-examples/user-behavior-10-million-rows.parquet")
INTO TABLE user_behavior
FORMAT AS "parquet"
)
WITH BROKER
(
"aws.s3.enable_ssl" = "true",
"aws.s3.use_instance_profile" = "false",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
)
PROPERTIES
(
"timeout" = "72000"
);
ПРИМЕЧАНИЕ
Замените
AAAиBBBна свои учётные данные в приведённой выше команде. Можно использовать любой действительныйaws.s3.access_keyиaws.s3.secret_key, так как объект доступен для чтения любому аутентифицированному пользователю AWS.
Это задание имеет четыре основных раздела:
LABEL: Строка, используемая при запросе состояния задания загрузки.LOADdeclaration: URI источника, формат исходных данных и имя целевой таблицы.BROKER: Данные подключения к источнику.PROPERTIES: Значение тайм-аута и любые другие свойства, применяемые к заданию загрузки.
Подробный синтаксис и описание параметров см. в BROKER LOAD.
Проверка прогресса загрузки
Вы можете запросить прогресс задания Broker Load из пред ставления loads в Information Schema Selena. Эта функция поддерживается начиная с версии v1.5.2.
SELECT * FROM information_schema.loads WHERE LABEL = 'user_behavior';
Для получения информации о полях, предоставляемых в представлении loads, см. loads.
Эта запись показывает состояние LOADING и прогресс 39%. Если вы видите что-то подобное, выполните команду ещё раз, пока не увидите состояние FINISHED.
JOB_ID: 10466
LABEL: user_behavior
DATABASE_NAME: mydatabase
STATE: LOADING
PROGRESS: ETL:100%; LOAD:39%
TYPE: BROKER
PRIORITY: NORMAL
SCAN_ROWS: 4620288
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 4620288
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):72000; max_filter_ratio:0.0
CREATE_TIME: 2024-02-28 22:11:36
ETL_START_TIME: 2024-02-28 22:11:41
ETL_FINISH_TIME: 2024-02-28 22:11:41
LOAD_START_TIME: 2024-02-28 22:11:41
LOAD_FINISH_TIME: NULL
JOB_DETAILS: {"All backends":{"2fb97223-b14c-404b-9be1-83aa9b3a7715":[10004]},"FileNumber":1,"FileSize":136901706,"InternalTableLoadBytes":144032784,"InternalTableLoadRows":4620288,"ScanBytes":143969616,"ScanRows":4620288,"TaskNumber":1,"Unfinished backends":{"2fb97223-b14c-404b-9be1-83aa9b3a7715":[10004]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL
После того как вы убедитесь, что задание загрузки завершено, вы можете проверить подмножество целевой таблицы, чтобы увидеть, были ли данные успешно загружены. Пример:
SELECT * from user_behavior LIMIT 3;
Возвращается следующий результат запроса, указывающий, что данные были успешно загружены:
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 34 | 856384 | 1029459 | pv | 2017-11-27 14:43:27 |
| 34 | 5079705 | 1029459 | pv | 2017-11-27 14:44:13 |
| 34 | 4451615 | 1029459 | pv | 2017-11-27 14:45:52 |
+--------+---------+------------+--------------+---------------------+
Использование Pipe
Начиная с версии v1.5.2, Selena предоставляет метод загрузки Pipe, который в настоящее время поддерживает только форматы файлов Parquet и ORC.
Преимущества Pipe
Pipe идеально подходит для непрерывной загрузки данных и крупномасштабной загрузки данных:
-
Крупномасштабная загрузка данных микро-пакетами помогает снизить затраты на повторные попытки, вызванные ошибками данных.
С помощью Pipe Selena обеспечивает эффективную загрузку большого количества файлов данных со значительным общим объёмом данных. Pipe автоматически разбивает файлы на осн ове их количества или размера, разделяя задание загрузки на более мелкие последовательные задачи. Такой подход гарантирует, что ошибки в одном файле не повлияют на всё задание загрузки. Статус загрузки каждого файла записывается Pipe, что позволяет легко идентифицировать и исправлять файлы, содержащие ошибки. Минимизируя необходимость повторных попыток из-за ошибок данных, этот подход помогает снизить затраты.
-
Непрерывная загрузка данных помогает сократить трудозатраты.
Pipe помогает записывать новые или обновлённые файлы данных в определённое место и непрерывно загружать новые данные из этих файлов в Selena. После создания задания Pipe с указанием
"AUTO_INGEST" = "TRUE"оно будет постоянно отслеживать изменения файлов данных, хранящихся по указанному пути, и автоматически загружать новые или обновлённые данные из файлов данных в целевую таблицу Selena.
Кроме того, Pipe выполняет проверку уникальности файлов, чтобы предотвратить повторную загрузку данных. В процессе загрузки Pipe проверяет уникальность каждого файла данных на основе имени файла и дайджеста. Если файл с определённым именем и дайджестом уже был обработан заданием Pipe, это задание Pipe пропустит все последующие файлы с тем же именем и дайджестом. Обратите внимание, что объектное хранилище, такое как AWS S3, использует ETag используется как дайджест файла.
Статус загрузки каждого файла данных записывается и сохраняется в представлении information_schema.pipe_files. После удаления задания Pipe, связанного с этим представлением, записи о файлах, загруженных в этом задании, также будут удалены.
Поток данных

Pipe идеально подходит для непрерывной загрузки данных и крупномасштабной загрузки данных:
-
Крупномасштабная загрузка данных в микропакетах помогает снизить затраты на повторные попытки, вызванные ошибками данных.
С помощью Pipe Selena обеспечивает эффективную загрузку большого количества файлов данных со значительным общим объёмом данных. Pipe автоматически разделяет файлы по их количеству или размеру, разбивая задание загрузки на меньшие, последовательные задачи. Этот подход гарантирует, что ошибки в одном файле не влияют на всё задание загрузки. Статус загрузки каждого файла записывается Pipe, позволяя вам легко идентифицировать и исправлять файлы, содержащие ошибки. Минимизируя необходимость повторных попыток из-за ошибок данных, этот подход помогает снизить затраты.
-
Непрерывная загрузка данных помогает сократить трудозатраты.
Pipe помогает записывать новые или обновлённые файлы данных в определённое место и непрерывно загружать новые данные из этих файлов в Selena. После создания задания Pipe с указанным
"AUTO_INGEST" = "TRUE"оно будет постоянно отслеживать изменения в файлах данных, хранящихся по указанному пути, и автоматически загружать новые или обновлённые данные из файлов данных в целевую таблицу Selena.
Кроме того, Pipe выполняет проверки уникальности файлов для предотвращения дублирования загрузки данных. Во время процесса загрузки Pipe проверяет уникальность каждого фай ла данных на основе имени файла и дайджеста. Если файл с определённым именем и дайджестом уже был обработан заданием Pipe, задание Pipe пропустит все последующие файлы с тем же именем и дайджестом. Обратите внимание, что объектное хранилище, такое как AWS S3, использует ETag в качестве дайджеста файла.
Статус загрузки каждого файла данных записывается и сохраняется в представлении information_schema.pipe_files. После удаления задания Pipe, связанного с представлением, записи о загруженных в этом задании файлах также будут удалены.
Различия между Pipe и INSERT+FILES()
Задание Pipe разделяется на одну или несколько транзакций в зависимости от размера и количества строк в каждом файле данных. Пользователи могут запрашивать промежуточные результаты во время процесса загрузки. В отличие от этого, задание INSERT+FILES() обрабатывается как одна транзакция, и пользователи не могут просматривать данные во время процесса загрузки.
Последовательность загрузки файлов
Для каждого задания Pipe Selena поддерживает очередь файлов, из которой извлекает и загружает файлы данных как микропакеты. Pipe не гарантирует, что файлы данных загружаются в том же порядке, в котором они были загружены. Поэтому более новые данные могут быть загружены раньше более старых данных.
Типичный пример
Создание базы данных и таблицы
Создайте базу данных и переключитесь на неё:
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
Создайте таблицу вручную (мы рекомендуем, чтобы таблица имела ту же схему, что и файл Parquet, который вы хотите загрузить из AWS S3):
CREATE TABLE user_behavior_from_pipe
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);