Загрузка данных с использованием транзакционного интерфейса Stream Load
Начиная с версии 1.5.0, Selena предоставляет транзакционный интерфейс Stream Load для реализации двухфазного коммита (2PC) для транзакций, которые выполняются для загрузки данны х из внешних систем, таких как Apache Flink® и Apache Kafka®. Транзакционный интерфейс Stream Load помогает улучшить производительность высококонкурентных потоковых загрузок.
В этой теме описывается транзакционный интерфейс Stream Load и способы загрузки данных в Selena с использованием этого интерфейса.
Описание
Транзакционный интерфейс Stream Load поддерживает использование инструмента или языка, совместимого с протоколом HTTP, для вызова операций API. В этой теме используется curl в качестве примера для объяснения того, как использовать этот интерфейс. Этот интерфейс предоставляет различные функции, такие как управление транзакциями, запись данных, предварительный коммит транзакций, дедупликация транзакций и управление таймаутами транзакций.
Stream Load поддерживает форматы файлов CSV и JSON. Этот метод рекомендуется, если вы хотите загрузить данные из небольшого количества файлов, размер которых не превышает 10 ГБ. Stream Load не поддерживает формат файлов Parquet. Если вам нужно загрузить данные из файлов Parquet, используйте INSERT+files().
Управление транзакциями
Транзакционный интерфейс Stream Load предоставляет следующие операции API, которые используются для управления транзакциями:
-
/api/transaction/begin: запускает новую транзакцию. -
/api/transaction/commit: фиксирует текущую транзакцию, чтобы сделать изменения данных постоянными. -
/api/transaction/rollback: откатывает текущую транзакцию, чтобы отменить изменения данных.
Предварительный коммит транзакции
Транзакционный интерфейс Stream Load предоставляет операцию /api/transaction/prepare, которая используется для предварительного коммита текущей транзакции и временного сохранения изменений данных. После предварительного коммита транзакции вы можете продолжить фиксацию или откат транзакции. Если ваш кластер Selena выйдет из строя после предварительного коммита транзакции, вы все равно сможете продолжить фиксацию транзакции после восстановления кластера Selena до нормального состояния.
ПРИМЕЧАНИЕ
После предварительного коммита транзакции не продолжайте запись данных с использованием транзакции. Если вы продолжите запись данных с использованием транзакции, ваш запрос на запись вернет ошибки.
Запись д анных
Транзакционный интерфейс Stream Load предоставляет операцию /api/transaction/load, которая используется для записи данных. Вы можете вызывать эту операцию несколько раз в рамках одной транзакции.
Дедупликация транзакций
Транзакционный интерфейс Stream Load наследует механизм маркировки Selena. Вы можете привязать уникальную метку к каждой транзакции для достижения гарантий не более одного раза для транзакций.
Управление таймаутами транзакций
Вы можете использовать параметр stream_load_default_timeout_second в файле конфигурации каждого FE для указан ия периода таймаута транзакции по умолчанию для этого FE.
При создании транзакции вы можете использовать поле timeout в заголовке HTTP-запроса для указания периода таймаута для транзакции.
При создании транзакции вы также можете использовать поле idle_transaction_timeout в заголовке HTTP-запроса для указания периода таймаута, в течение которого транзакция может оставаться неактивной. Если в течение периода таймаута данные не записываются, транзакция автоматически откатывается.
Преимущества
Транзакционный интерфейс Stream Load приносит следующие преимущества:
-
Семантика точно один раз
Транзакция разделена на две фазы: предварительный коммит и коммит, что упрощает загрузку данных между системами. Например, этот интерфейс может гарантировать семантику точно один раз для загрузки данных из Flink.
-
Улучшенная производительность загрузки
Если вы выполняете задание загрузки с помощью программы, транзакционный интерфейс Stream Load позволяет объединять несколько мини-пакетов данных по требованию, а затем отправлять их все сразу в рамках одной транзакции, вызывая операцию
/api/transaction/commit. Таким образом, нужно загружать меньше версий данных, и производительность загрузки улучшается.
Ограничения
Транзакционный интерфейс Stream Load имеет следующие ограничения:
-
Поддерживаются только транзакции одна база данных одна таблица. Поддержка транзакций несколько баз данных несколько таблиц находится в разработке.
-
Поддерживается только одновременная запись данных от одного клиента. Поддержка одновременной записи данных от нескольких клиентов находится в разработке.
-
Операция
/api/transaction/loadможет вызываться несколько раз в рамках одной транзакции. В этом случае настройки параметров, указанные для всех вызываемых операций/api/transaction/load, должны быть одинаковыми. -
При загрузке данных в формате CSV с использованием транзакционного интерфейса Stream Load убедитесь, что каждая запись данных в вашем файле данных заканчивается разделителем строк.
Меры предосторожности
- Если операция
/api/transaction/begin,/api/transaction/loadили/api/transaction/prepare, которую вы вызвали, возвращает ошибки, транзакция завершается неудачей и автоматически откатывается. - При вызове операции
/api/transaction/beginдля запуска новой транзакции вы должны указать метку. Обратите внимание, что последующие операции/api/transaction/load,/api/transaction/prepareи/api/transaction/commitдолжны использовать ту же метку, что и операция/api/transaction/begin. - Если вы используете метку предыдущей транзакции для вызова операции
/api/transaction/beginдля запуска новой транзакции, предыдущая транзакция завершится неудачей и будет откачена. - Разделитель столбцов и разделитель строк по умолчанию, которые Selena поддерживает для данных в формате CSV, это
\tи\n. Если ваш файл данных не использует разделитель столбцов или разделитель строк по умолчанию, вы должны использовать"column_separator: <column_separator>"или"row_delimiter: <row_delimiter>"для указания разделителя столбцов или разделителя строк, который фактически используется в вашем файле данных при вызове операции/api/transaction/load.
Перед началом
Проверка привилегий
Вы можете загружать данные в таблицы Selena только как пользователь, имеющий привилегию INSERT на эти таблицы Selena. Если у вас нет привилегии INSERT, следуйте инструкциям в GRANT, чтобы предоставить привилегию INSERT пользователю, которого вы используете для подключения к вашему кластеру Selena. Синтаксис: GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}.
Проверка конфигурации сети
Убедитесь, что машина, на которой находятся данные, которые вы хотите загрузить, может получить доступ к узлам FE и BE кластера Selena через http_port (по умолчанию: 8030) и be_http_port (по умолчанию: 8040) соответственно.
Основные операции
Подготовка примера данных
В этой теме используются данные в формате CSV в качестве примера.
-
В пути
/home/disk1/вашей локальной файловой системы создайте CSV-файл с именемexample1.csv. Файл состоит из трех столбцов, которые представляют ID пользователя, имя пользователя и оценку пользователя по порядку.1,Lily,23
2,Rose,23
3,Alice,24
4,Julia,25 -
В вашей базе данных Selena
test_dbсоздайте таблицу Primary Key с именемtable1. Таблица состоит из трех столбцов:id,nameиscore, из которыхidявляется первичным ключом.CREATE TABLE `table1`
(
`id` int(11) NOT NULL COMMENT "user ID",
`name` varchar(65533) NULL COMMENT "user name",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10;
Запуск транзакции
Синтаксис
curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" -H "table:<table_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/begin
Пример
curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" -H "table:table1" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/begin
ПРИМЕЧАНИЕ
Для этого примера
streamload_txn_example1_table1указывается как метка транзакции.
Результат возврата
-
Если транзакция успешно запущена, возвращается следующий результат:
{
"Status": "OK",
"Message": "",
"Label": "streamload_txn_example1_table1",
"TxnId": 9032,
"BeginTxnTimeMs": 0
} -
Если транзакция привязана к дублирующейся метке, возвращается следующий результат:
{
"Status": "LABEL_ALREADY_EXISTS",
"ExistingJobStatus": "RUNNING",
"Message": "Label [streamload_txn_example1_table1] has already been used."
} -
Если возникают ошибки, отличные от дублирующейся метки, возвращается следующий результат:
{
"Status": "FAILED",
"Message": ""
}
Запись данных
Синтаксис
curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" -H "table:<table_name>" \
-T <file_path> \
-XPUT http://<fe_host>:<fe_http_port>/api/transaction/load
ПРИМЕЧАНИЕ
При вызове операции
/api/transaction/loadвы должны использовать<file_path>для указания пути сохранения файла данных, который вы хотите загрузить.
Пример
curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" -H "table:table1" \
-T /home/disk1/example1.csv \
-H "column_separator: ," \
-XPUT http://<fe_host>:<fe_http_port>/api/transaction/load
ПРИМЕЧАНИЕ
Для этого примера разделитель столбцов, используемый в файле данных
example1.csv, это запятые (,) вместо разделителя столбцов по умолчанию Selena (\t). Поэтому при вызове операции/api/transaction/loadвы должны использовать"column_separator: <column_separator>"для указания запятых (,) в качестве разделителя столбцов.
Результат возврата
-
Если запись данных успешна, возвращается следующий результат:
{
"TxnId": 1,
"Seq": 0,
"Label": "streamload_txn_example1_table1",
"Status": "OK",
"Message": "",
"NumberTotalRows": 5265644,
"NumberLoadedRows": 5265644,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 10737418067,
"LoadTimeMs": 418778,
"StreamLoadPutTimeMs": 68,
"ReceivedDataTimeMs": 38964,
} -
Если транзакция считается неизвестной, возвращается следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "TXN_NOT_EXISTS"
} -
Если транзакция считается находящейся в недопустимом состоянии, возвращается следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "Transcation State Invalid"
} -
Если возникают ошибки, отличные от неизвестной транзакции и недопустимого статуса, возвращается следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": ""
}
Предварительный коммит транзакции
Синтаксис
curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/prepare
Пример
curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/prepare
Результат возврата
-
Если предварительный коммит успешен, возвращается следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "OK",
"Message": "",
"NumberTotalRows": 5265644,
"NumberLoadedRows": 5265644,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 10737418067,
"LoadTimeMs": 418778,
"StreamLoadPutTimeMs": 68,
"ReceivedDataTimeMs": 38964,
"WriteDataTimeMs": 417851
"CommitAndPublishTimeMs": 1393
} -
Если транзакция считается несуществующей, возвращается следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "Transcation Not Exist"
} -
Если предварительный коммит превышает время ожидания, возвращается следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "commit timeout",
} -
Если возникают ошибки, отличные от несуществующей транзакции и таймаута предварительного коммита, возвращае тся следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "publish timeout"
}
Коммит транзакции
Синтаксис
curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/commit
Пример
curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/commit
Результат возврата
-
Если коммит успешен, возвращается следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "OK",
"Message": "",
"NumberTotalRows": 5265644,
"NumberLoadedRows": 5265644,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 10737418067,
"LoadTimeMs": 418778,
"StreamLoadPutTimeMs": 68,
"ReceivedDataTimeMs": 38964,
"WriteDataTimeMs": 417851
"CommitAndPublishTimeMs": 1393
} -
Если транзакция уже была зафиксирована, возвращается следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "OK",
"Message": "Transaction already commited",
} -
Если транзакция считается несуществующей, возвращается следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "Transcation Not Exist"
} -
Если коммит превышает время ожидания, возвращается следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "commit timeout",
} -
Если публикация данных превышает время ожидания, возвращается следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "publish timeout",
"CommitAndPublishTimeMs": 1393
} -
Если возникают ошибки, отличные от несуществующей транзакции и таймаута, возвращается следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": ""
}
Откат транзакции
Синтаксис
curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/rollback
Пример
curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/rollback
Результат возврата
-
Если откат успешен, возвращается следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "OK",
"Message": ""
} -
Если транзакция считается несуществующей, возвращается следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "Transcation Not Exist"
} -
Если возникают ошибки, отличные от несуществующей транзакции, возвращается следующий результат:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": ""
}
Справочные материалы
Для получения информации о подходящих сценариях применения и поддерживаемых форматах файлов данных Stream Load, а также о том, как работает Stream Load, см. Загрузка из локальной файловой системы через Stream Load.
Для получения информации о синтаксисе и параметрах для создания заданий Stream Load см. STREAM LOAD.