Перейти к основному содержимому
Версия: 2.0.x

Выгрузка данных из RisingWave в Selena

RisingWave — это распределенная SQL-база данных для потоковой обработки, которая обеспечивает простую, эффективную и надежную обработку потоковых данных. Чтобы быстро начать работу с RisingWave, см. Get started.

RisingWave предоставляет функцию выгрузки данных, которая позволяет пользователям напрямую выгружать данные в Selena без необходимости в каких-либо сторонних компонентах. Эта функция работает со всеми типами таблиц Selena: Duplicate Key, Primary Key, Aggregate и Unique Key.

Предварительные требования

  • У вас есть работающий cluster RisingWave версии v1.7 или более поздней.
  • У вас есть доступ к целевой таблице Selena, и версия Selena — v1.5.2 или более поздняя.
  • Для выгрузки данных в таблицу Selena вы должны иметь привилегии SELECT и INSERT на целевой таблице. Для предоставления привилегий см. GRANT.
подсказка

RisingWave поддерживает только семантику at-least-once для Selena Sink, что означает, что в случае сбоев могут быть записаны дубликаты данных. Рекомендуется использовать таблицы Selena с Primary Key, которые могут дедуплицировать данные и обеспечить сквозную идемпотентность записи.

Параметры

В следующей таблице описаны параметры, которые необходимо настроить при выгрузке данных из RisingWave в Selena. Все параметры обязательны, если не указано иное.

ПараметрыОписание
connectorУстановите значение selena.
selena.hostIP-адрес FE-ноды Selena.
selena.query_portПорт запросов FE-ноды.
selena.http_portHTTP-порт FE-ноды.
selena.userИмя пользователя для доступа к cluster Selena.
selena.passwordПароль, связанный с именем пользователя.
selena.databaseБаза данных Selena, в которой находится целевая таблица.
selena.tableТаблица Selena, в которую вы хотите выгрузить данные.
selena.partial_update(Необязательно) Включить ли функцию частичного обновления Selena. Включение этой функции может повысить производительность Sink, когда нужно обновить только несколько столбцов.
typeТип операции с данными во время выгрузки.
  • append-only: Выполняет только операции INSERT.
  • upsert: Выполняет операции Upsert. Если используется эта настройка, целевая таблица Selena должна быть таблицей с Primary Key.
force_append_only(Необязательно) Когда для type установлено значение append-only, но в процессе выгрузки также есть операции Upsert и Delete, эта настройка может заставить задачу Sink генерировать данные только для добавления и отбрасывать данные Upsert и Delete.
primary_key(Необязательно) Primary key таблицы Selena. Требуется, если type имеет значение upsert.

Сопоставление типов данных

В следующей таблице приведено сопоставление типов данных между RisingWave и Selena.

RisingWaveSelena
BOOLEANBOOLEAN
SMALLINTSMALLINT
INTEGERINT
BIGINTBIGINT
REALFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
DATEDATE
VARCHARVARCHAR
TIME
(Преобразуется в VARCHAR перед выгрузкой в Selena)
Не поддерживается
TIMESTAMPDATETIME
TIMESTAMP WITH TIME ZONE
(Преобразуется в TIMESTAMP перед выгрузкой в Selena)
Не поддерживается
INTERVAL
(Преобразуется в VARCHAR перед выгрузкой в Selena)
Не поддерживается
STRUCTJSON
ARRAYARRAY
BYTEA
(Преобразуется в VARCHAR перед выгрузкой в Selena)
Не поддерживается
JSONBJSON
SERIALBIGINT

Примеры

  1. Создайте базу данных demo в Selena и создайте таблицу с Primary Key score_board в этой базе данных.

    CREATE DATABASE demo;
    USE demo;

    CREATE TABLE demo.score_board(
    id int(11) NOT NULL COMMENT "",
    name varchar(65533) NULL DEFAULT "" COMMENT "",
    score int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    PRIMARY KEY(id)
    DISTRIBUTED BY HASH(id);
  2. Выгрузите данные из RisingWave в Selena.

    -- Создайте таблицу в RisingWave.
    CREATE TABLE score_board (
    id INT PRIMARY KEY,
    name VARCHAR,
    score INT
    );

    -- Вставьте данные в таблицу.
    INSERT INTO score_board VALUES (1, 'selena', 100), (2, 'risingwave', 100);

    -- Выгрузите данные из этой таблицы в таблицу Selena.
    CREATE SINK score_board_sink
    FROM score_board WITH (
    connector = 'selena',
    type = 'upsert',
    selena.host = 'selena-fe',
    selena.mysqlport = '9030',
    selena.httpport = '8030',
    selena.user = 'users',
    selena.password = '123456',
    selena.database = 'demo',
    selena.table = 'score_board',
    primary_key = 'id'
    );