Перейти к основному содержимому

Загрузка данных с использованием транзакционного интерфейса Stream Load

Начиная с версии 1.5.0, Selena предоставляет транзакционный интерфейс Stream Load для реализации двухфазного коммита (2PC) для транзакций, которые выполняются для загрузки данных из внешних систем, таких как Apache Flink® и Apache Kafka®. Транзакционный интерфейс Stream Load помогает улучшить производительность высококонкурентных потоковых загрузок.

В этой теме описывается транзакционный интерфейс Stream Load и способы загрузки данных в Selena с использованием этого интерфейса.

Описание

Транзакционный интерфейс Stream Load поддерживает использование инструмента или языка, совместимого с протоколом HTTP, для вызова операций API. В этой теме используется curl в качестве примера для объяснения того, как использовать этот интерфейс. Этот интерфейс предоставляет различные функции, такие как управление транзакциями, запись данных, предварительный коммит транзакций, дедупликация транзакций и управление таймаутами транзакций.

примечание

Stream Load поддерживает форматы файлов CSV и JSON. Этот метод рекомендуется, если вы хотите загрузить данные из небольшого количества файлов, размер которых не превышает 10 ГБ. Stream Load не поддерживает формат файлов Parquet. Если вам нужно загрузить данные из файлов Parquet, используйте INSERT+files().

Управление транзакциями

Транзакционный интерфейс Stream Load предоставляет следующие операции API, которые используются для управления транзакциями:

  • /api/transaction/begin: запускает новую транзакцию.

  • /api/transaction/commit: фиксирует текущую транзакцию, чтобы сделать изменения данных постоянными.

  • /api/transaction/rollback: откатывает текущую транзакцию, чтобы отменить изменения данных.

Предварительный коммит транзакции

Транзакционный интерфейс Stream Load предоставляет операцию /api/transaction/prepare, которая используется для предварительного коммита текущей транзакции и временного сохранения изменений данных. После предварительного коммита транзакции вы можете продолжить фиксацию или откат транзакции. Если ваш кластер Selena выйдет из строя после предварительного коммита транзакции, вы все равно сможете продолжить фиксацию транзакции после восстановления кластера Selena до нормального состояния.

ПРИМЕЧАНИЕ

После предварительного коммита транзакции не продолжайте запись данных с использованием транзакции. Если вы продолжите запись данных с использованием транзакции, ваш запрос на запись вернет ошибки.

Запись данных

Транзакционный интерфейс Stream Load предоставляет операцию /api/transaction/load, которая используется для записи данных. Вы можете вызывать эту операцию несколько раз в рамках одной транзакции.

Дедупликация транзакций

Транзакционный интерфейс Stream Load наследует механизм маркировки Selena. Вы можете привязать уникальную метку к каждой транзакции для достижения гарантий не более одного раза для транзакций.

Управление таймаутами транзакций

Вы можете использовать параметр stream_load_default_timeout_second в файле конфигурации каждого FE для указания периода таймаута транзакции по умолчанию для этого FE.

При создании транзакции вы можете использовать поле timeout в заголовке HTTP-запроса для указания периода таймаута для транзакции.

При создании транзакции вы также можете использовать поле idle_transaction_timeout в заголовке HTTP-запроса для указания периода таймаута, в течение которого транзакция может оставаться неактивной. Если в течение периода таймаута данные не записываются, транзакция автоматически откатывается.

Преимущества

Транзакционный интерфейс Stream Load приносит следующие преимущества:

  • Семантика точно один раз

    Транзакция разделена на две фазы: предварительный коммит и коммит, что упрощает загрузку данных между системами. Например, этот интерфейс может гарантировать семантику точно один раз для загрузки данных из Flink.

  • Улучшенная производительность загрузки

    Если вы выполняете задание загрузки с помощью программы, транзакционный интерфейс Stream Load позволяет объединять несколько мини-пакетов данных по требованию, а затем отправлять их все сразу в рамках одной транзакции, вызывая операцию /api/transaction/commit. Таким образом, нужно загружать меньше версий данных, и производительность загрузки улучшается.

Ограничения

Транзакционный интерфейс Stream Load имеет следующие ограничения:

  • Поддерживаются только транзакции одна база данных одна таблица. Поддержка транзакций несколько баз данных несколько таблиц находится в разработке.

  • Поддерживается только одновременная запись данных от одного клиента. Поддержка одновременной записи данных от нескольких клиентов находится в разработке.

  • Операция /api/transaction/load может вызываться несколько раз в рамках одной транзакции. В этом случае настройки параметров, указанные для всех вызываемых операций /api/transaction/load, должны быть одинаковыми.

  • При загрузке данных в формате CSV с использованием транзакционного интерфейса Stream Load убедитесь, что каждая запись данных в вашем файле данных заканчивается разделителем строк.

Меры предосторожности

  • Если операция /api/transaction/begin, /api/transaction/load или /api/transaction/prepare, которую вы вызвали, возвращает ошибки, транзакция завершается неудачей и автоматически откатывается.
  • При вызове операции /api/transaction/begin для запуска новой транзакции вы должны указать метку. Обратите внимание, что последующие операции /api/transaction/load, /api/transaction/prepare и /api/transaction/commit должны использовать ту же метку, что и операция /api/transaction/begin.
  • Если вы используете метку предыдущей транзакции для вызова операции /api/transaction/begin для запуска новой транзакции, предыдущая транзакция завершится неудачей и будет откачена.
  • Разделитель столбцов и разделитель строк по умолчанию, которые Selena поддерживает для данных в формате CSV, это \t и \n. Если ваш файл данных не использует разделитель столбцов или разделитель строк по умолчанию, вы должны использовать "column_separator: <column_separator>" или "row_delimiter: <row_delimiter>" для указания разделителя столбцов или разделителя строк, который фактически используется в вашем файле данных при вызове операции /api/transaction/load.

Перед началом

Проверка привилегий

Вы можете загружать данные в таблицы Selena только как пользователь, имеющий привилегию INSERT на эти таблицы Selena. Если у вас нет привилегии INSERT, следуйте инструкциям в GRANT, чтобы предоставить привилегию INSERT пользователю, которого вы используете для подключения к вашему кластеру Selena. Синтаксис: GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}.

Проверка конфигурации сети

Убедитесь, что машина, на которой находятся данные, которые вы хотите загрузить, может получить доступ к узлам FE и BE кластера Selena через http_port (по умолчанию: 8030) и be_http_port (по умолчанию: 8040) соответственно.

Основные операции

Подготовка примера данных

В этой теме используются данные в формате CSV в качестве примера.

  1. В пути /home/disk1/ вашей локальной файловой системы создайте CSV-файл с именем example1.csv. Файл состоит из трех столбцов, которые представляют ID пользователя, имя пользователя и оценку пользователя по порядку.

    1,Lily,23
    2,Rose,23
    3,Alice,24
    4,Julia,25
  2. В вашей базе данных Selena test_db создайте таблицу Primary Key с именем table1. Таблица состоит из трех столбцов: id, name и score, из которых id является первичным ключом.

    CREATE TABLE `table1`
    (
    `id` int(11) NOT NULL COMMENT "user ID",
    `name` varchar(65533) NULL COMMENT "user name",
    `score` int(11) NOT NULL COMMENT "user score"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`) BUCKETS 10;

Запуск транзакции

Синтаксис

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" -H "table:<table_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/begin

Пример

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" -H "table:table1" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/begin

ПРИМЕЧАНИЕ

Для этого примера streamload_txn_example1_table1 указывается как метка транзакции.

Результат возврата

  • Если транзакция успешно запущена, возвращается следующий результат:

    {
    "Status": "OK",
    "Message": "",
    "Label": "streamload_txn_example1_table1",
    "TxnId": 9032,
    "BeginTxnTimeMs": 0
    }
  • Если транзакция привязана к дублирующейся метке, возвращается следующий результат:

    {
    "Status": "LABEL_ALREADY_EXISTS",
    "ExistingJobStatus": "RUNNING",
    "Message": "Label [streamload_txn_example1_table1] has already been used."
    }
  • Если возникают ошибки, отличные от дублирующейся метки, возвращается следующий результат:

    {
    "Status": "FAILED",
    "Message": ""
    }

Запись данных

Синтаксис

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" -H "table:<table_name>" \
-T <file_path> \
-XPUT http://<fe_host>:<fe_http_port>/api/transaction/load

ПРИМЕЧАНИЕ

При вызове операции /api/transaction/load вы должны использовать <file_path> для указания пути сохранения файла данных, который вы хотите загрузить.

Пример

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" -H "table:table1" \
-T /home/disk1/example1.csv \
-H "column_separator: ," \
-XPUT http://<fe_host>:<fe_http_port>/api/transaction/load

ПРИМЕЧАНИЕ

Для этого примера разделитель столбцов, используемый в файле данных example1.csv, это запятые (,) вместо разделителя столбцов по умолчанию Selena (\t). Поэтому при вызове операции /api/transaction/load вы должны использовать "column_separator: <column_separator>" для указания запятых (,) в качестве разделителя столбцов.

Результат возврата

  • Если запись данных успешна, возвращается следующий результат:

    {
    "TxnId": 1,
    "Seq": 0,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "",
    "NumberTotalRows": 5265644,
    "NumberLoadedRows": 5265644,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 10737418067,
    "LoadTimeMs": 418778,
    "StreamLoadPutTimeMs": 68,
    "ReceivedDataTimeMs": 38964,
    }
  • Если транзакция считается неизвестной, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "TXN_NOT_EXISTS"
    }
  • Если транзакция считается находящейся в недопустимом состоянии, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation State Invalid"
    }
  • Если возникают ошибки, отличные от неизвестной транзакции и недопустимого статуса, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": ""
    }

Предварительный коммит транзакции

Синтаксис

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/prepare

Пример

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/prepare

Результат возврата

  • Если предварительный коммит успешен, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "",
    "NumberTotalRows": 5265644,
    "NumberLoadedRows": 5265644,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 10737418067,
    "LoadTimeMs": 418778,
    "StreamLoadPutTimeMs": 68,
    "ReceivedDataTimeMs": 38964,
    "WriteDataTimeMs": 417851
    "CommitAndPublishTimeMs": 1393
    }
  • Если транзакция считается несуществующей, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation Not Exist"
    }
  • Если предварительный коммит превышает время ожидания, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "commit timeout",
    }
  • Если возникают ошибки, отличные от несуществующей транзакции и таймаута предварительного коммита, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "publish timeout"
    }

Коммит транзакции

Синтаксис

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/commit

Пример

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/commit

Результат возврата

  • Если коммит успешен, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "",
    "NumberTotalRows": 5265644,
    "NumberLoadedRows": 5265644,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 10737418067,
    "LoadTimeMs": 418778,
    "StreamLoadPutTimeMs": 68,
    "ReceivedDataTimeMs": 38964,
    "WriteDataTimeMs": 417851
    "CommitAndPublishTimeMs": 1393
    }
  • Если транзакция уже была зафиксирована, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "Transaction already commited",
    }
  • Если транзакция считается несуществующей, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation Not Exist"
    }
  • Если коммит превышает время ожидания, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "commit timeout",
    }
  • Если публикация данных превышает время ожидания, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "publish timeout",
    "CommitAndPublishTimeMs": 1393
    }
  • Если возникают ошибки, отличные от несуществующей транзакции и таймаута, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": ""
    }

Откат транзакции

Синтаксис

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/rollback

Пример

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/rollback

Результат возврата

  • Если откат успешен, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": ""
    }
  • Если транзакция считается несуществующей, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation Not Exist"
    }
  • Если возникают ошибки, отличные от несуществующей транзакции, возвращается следующий результат:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": ""
    }

Справочные материалы

Для получения информации о подходящих сценариях применения и поддерживаемых форматах файлов данных Stream Load, а также о том, как работает Stream Load, см. Загрузка из локальной файловой системы через Stream Load.

Для получения информации о синтаксисе и параметрах для создания заданий Stream Load см. STREAM LOAD.