Перейти к основному содержимому
Версия: 2.0.x

Загрузка данных с использованием транзакционного интерфейса Stream Load

Начиная с версии v1.5.2, Selena предоставляет транзакционный интерфейс Stream Load для реализации двухфазной фиксации (2PC) для транзакций, которые выполняются для загрузки данных из внешних систем, таких как Apache Flink и Apache Kafka. Транзакционный интерфейс Stream Load помогает повысить производительность потоковых загрузок с высокой параллельностью.

Начиная с версии v2.0.0, транзакционный интерфейс Stream Load поддерживает Multi-table Transaction, то есть загрузку данных в несколько таблиц в рамках одной базы данных.

В этой теме описывается транзакционный интерфейс Stream Load и способы загрузки данных в Selena с использованием этого интерфейса.

Описание

Транзакционный интерфейс Stream Load поддерживает использование инструмента или языка, совместимого с протоколом HTTP, для вызова операций API. В этой теме в качестве примера используется curl для объяснения использования этого интерфейса. Этот интерфейс предоставляет различные функции, такие как управление транзакциями, запись данных, предварительная фиксация транзакций, дедупликация транзакций и управление тайм-аутом транзакций.

примечание

Stream Load поддерживает форматы файлов CSV и JSON. Этот метод рекомендуется, если вы хотите загрузить данные из небольшого количества файлов, размер каждого из которых не превышает 10 ГБ. Stream Load не поддерживает формат файлов Parquet. Если вам нужно загрузить данные из файлов Parquet, используйте INSERT+files().

Управление транзакциями

Транзакционный интерфейс Stream Load предоставляет следующие операции API, которые используются для управления транзакциями:

  • /api/transaction/begin: запускает новую транзакцию.

  • /api/transaction/prepare: предварительно фиксирует текущую транзакцию и делает изменения данных временно постоянными. После предварительной фиксации транзакции вы можете продолжить её фиксацию или откат. Если ваш cluster выходит из строя после предварительной фиксации транзакции, вы всё равно можете продолжить фиксацию транзакции после восстановления cluster.

  • /api/transaction/commit: фиксирует текущую транзакцию, чтобы сделать изменения данных постоянными.

  • /api/transaction/rollback: откатывает текущую транзакцию, чтобы отменить изменения данных.

ПРИМЕЧАНИЕ

После предварительной фиксации транзакции не продолжайте записывать данные, используя транзакцию. Если вы продолжите записывать данные, используя транзакцию, ваш запрос на запись вернёт ошибки.

На следующей диаграмме показана связь между состояниями транзакции и операциями:

stateDiagram-v2
direction LR
[*] --> PREPARE : begin
PREPARE --> PREPARED : prepare
PREPARE --> ABORTED : rollback
PREPARED --> COMMITTED : commit
PREPARED --> ABORTED : rollback

Запись данных

Транзакционный интерфейс Stream Load предоставляет операцию /api/transaction/load, которая используется для записи данных. Вы можете вызывать эту операцию несколько раз в рамках одной транзакции.

Начиная с версии v2.0.0, вы можете вызывать операции /api/transaction/load для разных таблиц, чтобы загружать данные в несколько таблиц в рамках одной базы данных.

Дедупликация транзакций

Транзакционный интерфейс Stream Load переносит механизм меток Selena. Вы можете привязать уникальную метку к каждой транзакции для достижения гарантий "не более одного раза" для транзакций.

Управление тайм-аутом транзакций

При запуске транзакции вы можете использовать поле timeout в заголовке HTTP-запроса для указания периода тайм-аута (в секундах) для транзакции из состояния PREPARE в состояние PREPARED. Если транзакция не была подготовлена после этого периода, она будет автоматически прервана. Если это поле не указано, значение по умолчанию определяется конфигурацией FE stream_load_default_timeout_second (по умолчанию: 600 секунд).

При запуске транзакции вы также можете использовать поле idle_transaction_timeout в заголовке HTTP-запроса для указания периода тайм-аута (в секундах), в течение которого транзакция может оставаться бездействующей. Если данные не записываются в течение этого периода, транзакция будет автоматически откачена.

При подготовке транзакции вы можете использовать поле prepared_timeout в заголовке HTTP-запроса для указания периода тайм-аута (в секундах) для транзакции из состояния PREPARED в состояние COMMITTED. Если транзакция не была зафиксирована после этого периода, она будет автоматически прервана. Если это поле не указано, значение по умолчанию определяется конфигурацией FE prepared_transaction_default_timeout_second (по умолчанию: 86400 секунд). prepared_timeout поддерживается начиная с версии v1.5.2.

Преимущества

Транзакционный интерфейс Stream Load приносит следующие преимущества:

  • Семантика "ровно один раз"

    Транзакция разделена на две фазы — предварительную фиксацию и фиксацию, что упрощает загрузку данных между системами. Например, этот интерфейс может гарантировать семантику "ровно один раз" для загрузки данных из Flink.

  • Улучшенная производительность загрузки

    Если вы запускаете задание загрузки с помощью программы, транзакционный интерфейс Stream Load позволяет объединять несколько мини-пакетов данных по требованию, а затем отправлять их все сразу в рамках одной транзакции, вызывая операцию /api/transaction/commit. Таким образом, требуется загрузить меньше версий данных, и производительность загрузки улучшается.

Ограничения

Транзакционный интерфейс Stream Load имеет следующие ограничения:

  • Транзакции с одной базой данных и несколькими таблицами поддерживаются начиная с версии v2.0.0. Поддержка транзакций с несколькими базами данных и несколькими таблицами находится в разработке.

  • Поддерживаются только параллельные записи данных от одного клиента. Поддержка параллельных записей данных от нескольких клиентов находится в разработке.

  • Операция /api/transaction/load может вызываться несколько раз в рамках одной транзакции. В этом случае настройки параметров (кроме table), указанные для всех вызываемых операций /api/transaction/load, должны быть одинаковыми.

  • При загрузке данных в формате CSV с использованием транзакционного интерфейса Stream Load убедитесь, что каждая запись данных в вашем файле данных заканчивается разделителем строк.

Меры предосторожности

  • Если вызванная операция /api/transaction/begin, /api/transaction/load или /api/transaction/prepare возвращает ошибки, транзакция завершается неудачей и автоматически откатывается.
  • При вызове операции /api/transaction/begin для запуска новой транзакции необходимо указать метку. Обратите внимание, что последующие операции /api/transaction/load, /api/transaction/prepare и /api/transaction/commit должны использовать ту же метку, что и операция /api/transaction/begin.
  • Если вы используете метку текущей транзакции для вызова операции /api/transaction/begin для запуска новой транзакции, предыдущая транзакция завершится неудачей и будет откачена.
  • Если вы используете транзакцию с несколькими таблицами для загрузки данных в разные таблицы, вы должны указать параметр -H "transaction_type:multi" для всех операций, участвующих в транзакции.
  • Разделитель столбцов и разделитель строк по умолчанию, которые Selena поддерживает для данных в формате CSV, — это \t и \n. Если ваш файл данных не использует разделитель столбцов или разделитель строк по умолчанию, вы должны использовать "column_separator: <column_separator>" или "row_delimiter: <row_delimiter>" для указания разделителя столбцов или разделителя строк, фактически используемого в вашем файле данных, при вызове операции /api/transaction/load.

Перед началом работы

Проверка привилегий

Вы можете загружать данные в таблицы Selena только как пользователь, имеющий привилегию INSERT на эти таблицы Selena. Если у вас нет привилегии INSERT, следуйте инструкциям в разделе GRANT, чтобы предоставить привилегию INSERT пользователю, которого вы используете для подключения к вашему cluster Selena. Синтаксис: GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}.

Проверка сетевой конфигурации

Убедитесь, что машина, на которой находятся загружаемые данные, имеет доступ к узлам FE и BE cluster Selena через порты http_port (по умолчанию: 8030) и be_http_port (по умолчанию: 8040) соответственно.

Основные операции

Подготовка примера данных

В этой теме в качестве примера используются данные в формате CSV.

  1. В пути /home/disk1/ вашей локальной файловой системы создайте CSV-файл с именем example1.csv. Файл состоит из трёх столбцов, которые представляют идентификатор пользователя, имя пользователя и оценку пользователя соответственно.

    1,Lily,23
    2,Rose,23
    3,Alice,24
    4,Julia,25
  2. В базе данных Selena test_db создайте таблицу с первичным ключом с именем table1. Таблица состоит из трёх столбцов: id, name и score, где id является первичным ключом.

    CREATE TABLE `table1`
    (
    `id` int(11) NOT NULL COMMENT "user ID",
    `name` varchar(65533) NULL COMMENT "user name",
    `score` int(11) NOT NULL COMMENT "user score"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`) BUCKETS 10;

Запуск транзакции

Синтаксис

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
[-H "transaction_type:multi"]\ # Необязательно. Инициирует транзакцию с несколькими таблицами.
-H "db:<database_name>" -H "table:<table_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/begin

ПРИМЕЧАНИЕ

Укажите -H "transaction_type:multi" в команде, если вы хотите загрузить данные в разные таблицы в рамках транзакции.

Пример

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" -H "table:table1" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/begin

ПРИМЕЧАНИЕ

В этом примере streamload_txn_example1_table1 указана как метка транзакции.

Возвращаемый результат

  • Если транзакция успешно запущена, возвращается следующий результат:

    {
    "Status": "OK",
    "Message": "",
    "Label": "streamload_txn_example1_table1",
    "TxnId": 9032,
    "BeginTxnTimeMs": 0
    }
  • Если транзакция связана с дублирующей меткой, возвращается следующий результат:

    {
    "Status": "LABEL_ALREADY_EXISTS",
    "ExistingJobStatus": "RUNNING",
    "Message": "Label [streamload_txn_example1_table1] has already been used."
    }
  • Если возникают ошибки, отличные от дублирующей метки, возвращается следующий результат:

    {
    "Status": "FAILED",
    "Message": ""
    }

Запись данных

Синтаксис

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
[-H "transaction_type:multi"]\ # Необязательно. Загружает данные через транзакцию с несколькими таблицами.
-H "db:<database_name>" -H "table:<table_name>" \
-T <file_path> \
-XPUT http://<fe_host>:<fe_http_port>/api/transaction/load

ПРИМЕЧАНИЕ

  • При вызове операции /api/transaction/load вы должны использовать <file_path> для указания пути сохранения файла данных, который вы хотите загрузить.
  • Вы можете вызывать операции /api/transaction/load с различными значениями параметра table для загрузки данных в разные таблицы в рамках одной базы данных. В этом случае вы должны указать -H "transaction_type:multi" в команде.

Пример

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" -H "table:table1" \
-T /home/disk1/example1.csv \
-H "column_separator: ," \
-XPUT http://<fe_host>:<fe_http_port>/api/transaction/load

ПРИМЕЧАНИЕ

В этом примере разделитель столбцов, используемый в файле данных example1.csv, — запятые (,) вместо разделителя столбцов Selena по умолчанию (\t). Поэтому при вызове операции /api/transaction/load необходимо использовать "column_separator: <column_separator>" для указания запятых (,) в качестве разделителя столбцов.

Возвращаемый результат

  • Если запись данных успешна, возвращается следующий результат:

    {
    "TxnId": 1,
    "Seq": 0,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "",
    "NumberTotalRows": 5265644,
    "NumberLoadedRows": 5265644,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 10737418067,
    "LoadTimeMs": 418778,
    "StreamLoadPutTimeMs": 68,
    "ReceivedDataTimeMs": 38964,
    }
  • Если транзакция считается неизвестной, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "TXN_NOT_EXISTS"
    }
  • Если транзакция считается в недопустимом состоянии, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation State Invalid"
    }
  • Если возникают ошибки, отличные от неизвестной транзакции и недопустимого состояния, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": ""
    }

Предварительная фиксация транзакции

Синтаксис

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
[-H "transaction_type:multi"]\ # Необязательно. Предварительно фиксирует транзакцию с несколькими таблицами.
-H "db:<database_name>" \
[-H "prepared_timeout:<timeout_seconds>"] \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/prepare

ПРИМЕЧАНИЕ

Укажите -H "transaction_type:multi" в команде, если транзакция, которую вы хотите предварительно зафиксировать, является транзакцией с несколькими таблицами.

Пример

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-H "prepared_timeout:300" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/prepare

ПРИМЕЧАНИЕ

Поле prepared_timeout является необязательным. Если оно не указано, значение по умолчанию определяется конфигурацией FE prepared_transaction_default_timeout_second (по умолчанию: 86400 секунд). prepared_timeout поддерживается начиная с версии v1.5.2.

Возвращаемый результат

  • Если предварительная фиксация успешна, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "",
    "NumberTotalRows": 5265644,
    "NumberLoadedRows": 5265644,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 10737418067,
    "LoadTimeMs": 418778,
    "StreamLoadPutTimeMs": 68,
    "ReceivedDataTimeMs": 38964,
    "WriteDataTimeMs": 417851
    "CommitAndPublishTimeMs": 1393
    }
  • Если транзакция считается несуществующей, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation Not Exist"
    }
  • Если предварительная фиксация истекает по таймауту, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "commit timeout",
    }
  • Если возникают ошибки, отличные от несуществующей транзакции и тайм-аута предварительной фиксации, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "publish timeout"
    }

Фиксация транзакции

Синтаксис

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
[-H "transaction_type:multi"]\ # Необязательно. Фиксирует транзакцию с несколькими таблицами.
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/commit

ПРИМЕЧАНИЕ

Укажите -H "transaction_type:multi" в команде, если транзакция, которую вы хотите зафиксировать, является транзакцией с несколькими таблицами.

Пример

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/commit

Возвращаемый результат

  • Если фиксация успешна, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "",
    "NumberTotalRows": 5265644,
    "NumberLoadedRows": 5265644,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 10737418067,
    "LoadTimeMs": 418778,
    "StreamLoadPutTimeMs": 68,
    "ReceivedDataTimeMs": 38964,
    "WriteDataTimeMs": 417851
    "CommitAndPublishTimeMs": 1393
    }
  • Если транзакция уже была зафиксирована, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "Transaction already commited",
    }
  • Если транзакция считается несуществующей, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation Not Exist"
    }
  • Если фиксация истекает по таймауту, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "commit timeout",
    }
  • Если публикация данных истекает по таймауту, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "publish timeout",
    "CommitAndPublishTimeMs": 1393
    }
  • Если возникают ошибки, отличные от несуществующей транзакции и тайм-аута, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": ""
    }

Откат транзакции

Синтаксис

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
[-H "transaction_type:multi"]\ # Необязательно. Откатывает транзакцию с несколькими таблицами.
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/rollback

ПРИМЕЧАНИЕ

Укажите -H "transaction_type:multi" в команде, если транзакция, которую вы хотите откатить, является транзакцией с несколькими таблицами.

Пример

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/rollback

Возвращаемый результат

  • Если откат успешен, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": ""
    }
  • Если транзакция считается несуществующей, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation Not Exist"
    }
  • Если возникают ошибки, отличные от несуществующей транзакции, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": ""
    }

Справочные материалы

Информацию о подходящих сценариях применения и поддерживаемых форматах файлов данных Stream Load, а также о принципе работы Stream Load см. в Загрузка из локальной файловой системы через Stream Load.

Информацию о синтаксисе и параметрах для создания заданий Stream Load см. в STREAM LOAD.