Перейти к основному содержимому

Передача данных из RisingWave в Selena

RisingWave — это распределенная SQL база данных для потоковой обработки, которая обеспечивает простую, эффективную и надежную обработку потоковых данных. Для быстрого начала работы с RisingWave см. Начало работы.

RisingWave предоставляет функцию передачи данных, которая позволяет пользователям напрямую передавать данные в Selena без использования каких-либо сторонних компонентов. Эта функция работает со всеми типами таблиц Selena: Duplicate Key, Primary Key, Aggregate и Unique Key.

Предварительные требования

  • У вас есть работающий кластер RisingWave версии v1.7 или более поздней.
  • У вас есть доступ к целевой таблице Selena, и версия Selena — v2.5 или более поздняя.
  • Для передачи данных в таблицу Selena у вас должны быть привилегии SELECT и INSERT на целевую таблицу. Для предоставления привилегий см. GRANT.
подсказка

RisingWave поддерживает только семантику at-least-once для Selena Sink, что означает, что в случае сбоев могут быть записаны дублированные данные. Рекомендуется использовать таблицы Primary Key Selena, которые могут дедуплицировать данные и обеспечивать идемпотентные записи от начала до конца.

Параметры

В следующей таблице описаны параметры, которые необходимо настроить при передаче данных из RisingWave в Selena. Все параметры обязательны, если не указано иное.

ПараметрыОписание
connectorУстановите значение starrocks.
starrocks.hostIP-адрес узла Selena FE.
starrocks.query_portПорт запросов узла FE.
starrocks.http_portHTTP-порт узла FE.
starrocks.userИмя пользователя для доступа к кластеру Selena.
starrocks.passwordПароль, связанный с именем пользователя.
starrocks.databaseБаза данных Selena, в которой находится целевая таблица.
starrocks.tableТаблица Selena, в которую вы хотите передать данные.
starrocks.partial_update(Необязательно) Включить ли функцию частичного обновления Selena. Включение этой функции может повысить производительность Sink, когда нужно обновить только несколько столбцов.
typeТип операции с данными во время передачи.
  • append-only: Выполняет только операции INSERT.
  • upsert: Выполняет операции Upsert. Если используется эта настройка, целевая таблица Selena должна быть таблицей Primary Key.
force_append_only(Необязательно) Когда type установлен в append-only, но в процессе передачи также есть операции Upsert и Delete, эта настройка может заставить задачу Sink генерировать данные только для добавления и отбрасывать данные Upsert и Delete.
primary_key(Необязательно) Первичный ключ таблицы Selena. Обязателен, если type равен upsert.

Сопоставление типов данных

В следующей таблице представлено сопоставление типов данных между RisingWave и Selena.

RisingWaveSelena
BOOLEANBOOLEAN
SMALLINTSMALLINT
INTEGERINT
BIGINTBIGINT
REALFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
DATEDATE
VARCHARVARCHAR
TIME
(Приводится к VARCHAR перед передачей в Selena)
Не поддерживается
TIMESTAMPDATETIME
TIMESTAMP WITH TIME ZONE
(Приводится к TIMESTAMP перед передачей в Selena)
Не поддерживается
INTERVAL
(Приводится к VARCHAR перед передачей в Selena)
Не поддерживается
STRUCTJSON
ARRAYARRAY
BYTEA
(Приводится к VARCHAR перед передачей в Selena)
Не поддерживается
JSONBJSON
SERIALBIGINT

Примеры

  1. Создайте базу данных demo в Selena и создайте таблицу Primary Key score_board в этой базе данных.

    CREATE DATABASE demo;
    USE demo;

    CREATE TABLE demo.score_board(
    id int(11) NOT NULL COMMENT "",
    name varchar(65533) NULL DEFAULT "" COMMENT "",
    score int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    PRIMARY KEY(id)
    DISTRIBUTED BY HASH(id);
  2. Передайте данные из RisingWave в Selena.

    -- Создайте таблицу в RisingWave.
    CREATE TABLE score_board (
    id INT PRIMARY KEY,
    name VARCHAR,
    score INT
    );

    -- Вставьте данные в таблицу.
    INSERT INTO score_board VALUES (1, 'starrocks', 100), (2, 'risingwave', 100);

    -- Передайте данные из этой таблицы в таблицу Selena.
    CREATE SINK score_board_sink
    FROM score_board WITH (
    connector = 'starrocks',
    type = 'upsert',
    starrocks.host = 'starrocks-fe',
    starrocks.mysqlport = '9030',
    starrocks.httpport = '8030',
    starrocks.user = 'users',
    starrocks.password = '123456',
    starrocks.database = 'demo',
    starrocks.table = 'score_board',
    primary_key = 'id'
    );