Выгрузка данных из RisingWave в Selena
RisingWave — это распределенная SQL-база данных для потоковой обработки, которая обеспечивает простую, эффективную и надежную обработку потоковых данных. Чтобы быстро начать работу с RisingWave, см. Get started.
RisingWave предоставляет функцию выгрузки данных, которая позволяет пользователям напрямую выгружать данные в Selena без необходимости в каких-либо сторонних компонентах. Эта функция работает со всеми типами таблиц Selena: Duplicate Key, Primary Key, Aggregate и Unique Key.
Предварительные требования
- У вас есть работающий cluster RisingWave версии v1.7 или более поздней.
- У вас есть доступ к целевой таблице Selena, и версия Selena — v1.5.2 или более поздняя.
- Для выгрузки данных в таблицу Selena вы должны иметь привилегии SELECT и INSERT на целевой таблице. Для предоставления привилегий см. GRANT.
RisingWave поддерживает только семантику at-least-once для Selena Sink, что означает, что в случае сбоев могут быть записаны дубликаты данных. Рекомендуется использовать таблицы Selena с Primary Key, которые могут дедуплицировать данные и обеспечить сквозную идемпотентность записи.
Параметры
В следующей таблице описаны параметры, которые необходимо настроить при выгрузке данных из RisingWave в Selena. Все параметры обязательны, если не указано иное.
| Параметры | Описание |
|---|---|
| connector | Установите значение selena. |
| selena.host | IP-адрес FE-ноды Selena. |
| selena.query_port | Порт запросов FE-ноды. |
| selena.http_port | HTTP-порт FE-ноды. |
| selena.user | Имя пользователя для доступа к cluster Selena. |
| selena.password | Пароль, связанный с именем пользователя. |
| selena.database | База данных Selena, в которой находится целевая таблица. |
| selena.table | Таблица Selena, в которую вы хотите выгрузить данные. |
| selena.partial_update | (Необязательно) Включить ли функцию частичного обновления Selena. Включение этой функции может повысить производительность Sink, когда нужно обновить только несколько столбцов. |
| type | Тип операции с данными во время выгрузки.
|
| force_append_only | (Необязательно) Когда для type установлено значение append-only, но в процессе выгрузки также есть операции Upsert и Delete, эта настройка может заставить задачу Sink генерировать данные только для добавления и отбрасывать данные Upsert и Delete. |
| primary_key | (Необязательно) Primary key таблицы Selena. Требуется, если type имеет значение upsert. |
Сопоставление типов данных
В следующей таблице приведено сопоставление типов данных между RisingWave и Selena.
| RisingWave | Selena |
|---|---|
| BOOLEAN | BOOLEAN |
| SMALLINT | SMALLINT |
| INTEGER | INT |
| BIGINT | BIGINT |
| REAL | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL | DECIMAL |
| DATE | DATE |
| VARCHAR | VARCHAR |
| TIME (Преобразуется в VARCHAR перед выгрузкой в Selena) | Не поддерживается |
| TIMESTAMP | DATETIME |
| TIMESTAMP WITH TIME ZONE (Преобразуется в TIMESTAMP перед выгрузкой в Selena) | Не поддерживается |
| INTERVAL (Преобразуется в VARCHAR перед выгрузкой в Selena) | Не поддерживается |
| STRUCT | JSON |
| ARRAY | ARRAY |
| BYTEA (Преобразуется в VARCHAR перед выгрузкой в Selena) | Не поддерживается |
| JSONB | JSON |
| SERIAL | BIGINT |
Примеры
-
Создайте базу данных
demoв Selena и создайте таблицу с Primary Keyscore_boardв этой базе данных.CREATE DATABASE demo;
USE demo;
CREATE TABLE demo.score_board(
id int(11) NOT NULL COMMENT "",
name varchar(65533) NULL DEFAULT "" COMMENT "",
score int(11) NOT NULL DEFAULT "0" COMMENT ""
)
PRIMARY KEY(id)
DISTRIBUTED BY HASH(id); -
Выгрузите данные из RisingWave в Selena.
-- Создайте таблицу в RisingWave.
CREATE TABLE score_board (
id INT PRIMARY KEY,
name VARCHAR,
score INT
);
-- Вставьте данные в таблицу.
INSERT INTO score_board VALUES (1, 'selena', 100), (2, 'risingwave', 100);
-- Выгрузите данные из этой таблицы в таблицу Selena.
CREATE SINK score_board_sink
FROM score_board WITH (
connector = 'selena',
type = 'upsert',
selena.host = 'selena-fe',
selena.mysqlport = '9030',
selena.httpport = '8030',
selena.user = 'users',
selena.password = '123456',
selena.database = 'demo',
selena.table = 'score_board',
primary_key = 'id'
);