Перейти к основному содержимому

Непрерывная загрузка данных из Apache Flink®

Selena предоставляет собственный коннектор под названием Selena Connector for Apache Flink® (сокращенно Flink коннектор), который помогает загружать данные в таблицу Selena с помощью Flink. Основной принцип заключается в накоплении данных и последующей их загрузке за один раз в Selena через STREAM LOAD.

Flink коннектор поддерживает DataStream API, Table API & SQL и Python API. Он обладает более высокой и стабильной производительностью по сравнению с flink-connector-jdbc, предоставляемым Apache Flink®.

УВЕДОМЛЕНИЕ

Для загрузки данных в таблицы Selena с помощью Flink коннектора требуются привилегии SELECT и INSERT на целевую таблицу Selena. Если у вас нет этих привилегий, следуйте инструкциям в GRANT для предоставления этих привилегий пользователю, которого вы используете для подключения к кластеру Selena.

Требования к версиям

КоннекторFlinkSelenaJavaScala
1.2.111.15,1.16,1.17,1.18,1.19,1.202.1 и позже82.11,2.12
1.2.101.15,1.16,1.17,1.18,1.192.1 и позже82.11,2.12
1.2.91.15,1.16,1.17,1.182.1 и позже82.11,2.12
1.2.81.13,1.14,1.15,1.16,1.172.1 и позже82.11,2.12
1.2.71.11,1.12,1.13,1.14,1.152.1 и позже82.11,2.12

Вы можете получить JAR-файл Flink коннектора следующими способами:

  • Прямая загрузка скомпилированного JAR-файла Flink коннектора.
  • Добавление Flink коннектора как зависимости в ваш Maven проект с последующей загрузкой JAR-файла.
  • Самостоятельная компиляция исходного кода Flink коннектора в JAR-файл.

Формат именования JAR-файла Flink коннектора следующий:

  • Начиная с Flink 1.15, это flink-connector-selena-${connector_version}_flink-${flink_version}.jar. Например, если у вас установлен Flink 1.15 и вы хотите использовать Flink коннектор 1.2.7, вы можете использовать flink-connector-selena-1.2.7_flink-1.15.jar.

  • До Flink 1.15, это flink-connector-selena-${connector_version}_flink-${flink_version}_${scala_version}.jar. Например, если в вашей среде установлены Flink 1.14 и Scala 2.12, и вы хотите использовать Flink коннектор 1.2.7, вы можете использовать flink-connector-selena-1.2.7_flink-1.14_2.12.jar.

УВЕДОМЛЕНИЕ

В общем случае последняя версия Flink коннектора поддерживает совместимость только с тремя последними версиями Flink.

Загрузка скомпилированного Jar-файла

Прямая загрузка соответствующей версии JAR-файла Flink коннектора из Maven Central Repository.

Maven зависимость

В файле pom.xml вашего Maven проекта добавьте Flink коннектор как зависимость согласно следующему формату. Замените flink_version, scala_version и connector_version на соответствующие версии.

  • В Flink 1.15 и позже

    <dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-selena</artifactId>
    <version>${connector_version}_flink-${flink_version}</version>
    </dependency>
  • В версиях до Flink 1.15

    <dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-selena</artifactId>
    <version>${connector_version}_flink-${flink_version}_${scala_version}</version>
    </dependency>

Самостоятельная компиляция

  1. Загрузите исходный код Flink коннектора.

  2. Выполните следующую команду для компиляции исходного кода Flink коннектора в JAR-файл. Обратите внимание, что flink_version заменяется на соответствующую версию Flink.

    sh build.sh <flink_version>

    Например, если версия Flink в вашей среде 1.15, вам нужно выполнить следующую команду:

    sh build.sh 1.15
  3. Перейдите в директорию target/, чтобы найти JAR-файл Flink коннектора, такой как flink-connector-selena-1.2.7_flink-1.15-SNAPSHOT.jar, созданный при компиляции.

ПРИМЕЧАНИЕ

Имя Flink коннектора, который не был официально выпущен, содержит суффикс SNAPSHOT.

Опции

connector

Обязательно: Да
Значение по умолчанию: NONE
Описание: Коннектор, который вы хотите использовать. Значение должно быть "starrocks".

jdbc-url

Обязательно: Да
Значение по умолчанию: NONE
Описание: Адрес, используемый для подключения к MySQL серверу FE. Вы можете указать несколько адресов, которые должны быть разделены запятой (,). Формат: jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>.

load-url

Обязательно: Да
Значение по умолчанию: NONE
Описание: Адрес, используемый для подключения к HTTP серверу FE. Вы можете указать несколько адресов, которые должны быть разделены точкой с запятой (;). Формат: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>.

database-name

Обязательно: Да
Значение по умолчанию: NONE
Описание: Имя базы данных Selena, в которую вы хотите загрузить данные.

table-name

Обязательно: Да
Значение по умолчанию: NONE
Описание: Имя таблицы, которую вы хотите использовать для загрузки данных в Selena.

username

Обязательно: Да
Значение по умолчанию: NONE
Описание: Имя пользователя учетной записи, которую вы хотите использовать для загрузки данных в Selena. Учетная запись должна иметь привилегии SELECT и INSERT на целевую таблицу Selena.

password

Обязательно: Да
Значение по умолчанию: NONE
Описание: Пароль указанной учетной записи.

sink.version

Обязательно: Нет
Значение по умолчанию: AUTO
Описание: Интерфейс, используемый для загрузки данных. Этот параметр поддерживается начиная с версии Flink коннектора 1.2.4.

  • V1: Использует интерфейс Stream Load для загрузки данных. Коннекторы до версии 1.2.4 поддерживают только этот режим.
  • V2: Использует интерфейс Stream Load transaction для загрузки данных. Требует Selena версии не менее 2.4. Рекомендуется V2, поскольку он оптимизирует использование памяти и обеспечивает более стабильную реализацию exactly-once.
  • AUTO: Если версия Selena поддерживает транзакционный Stream Load, автоматически выберет V2, иначе выберет V1

sink.label-prefix

Обязательно: Нет
Значение по умолчанию: NONE
Описание: Префикс метки, используемый Stream Load. Рекомендуется настроить его, если вы используете exactly-once с коннектором версии 1.2.8 и позже. См. примечания по использованию exactly-once.

sink.semantic

Обязательно: Нет
Значение по умолчанию: at-least-once
Описание: Семантика, гарантируемая sink. Допустимые значения: at-least-once и exactly-once.

sink.buffer-flush.max-bytes

Обязательно: Нет
Значение по умолчанию: 94371840(90M)
Описание: Максимальный размер данных, которые могут быть накоплены в памяти перед отправкой в Selena за один раз. Максимальное значение варьируется от 64 МБ до 10 ГБ. Установка этого параметра на большее значение может улучшить производительность загрузки, но может увеличить задержку загрузки. Этот параметр действует только когда sink.semantic установлен в at-least-once. Если sink.semantic установлен в exactly-once, данные в памяти сбрасываются при срабатывании checkpoint Flink. В этом случае этот параметр не действует.

sink.buffer-flush.max-rows

Обязательно: Нет
Значение по умолчанию: 500000
Описание: Максимальное количество строк, которые могут быть накоплены в памяти перед отправкой в Selena за один раз. Этот параметр доступен только когда sink.version равен V1 и sink.semantic равен at-least-once. Допустимые значения: от 64000 до 5000000.

sink.buffer-flush.interval-ms

Обязательно: Нет
Значение по умолчанию: 300000
Описание: Интервал, с которым данные сбрасываются. Этот параметр доступен только когда sink.semantic равен at-least-once. Допустимые значения: от 1000 до 3600000. Единица: мс.

sink.max-retries

Обязательно: Нет
Значение по умолчанию: 3
Описание: Количество попыток повтора выполнения задания Stream Load. Этот параметр доступен только когда вы устанавливаете sink.version в V1. Допустимые значения: от 0 до 10.

sink.connect.timeout-ms

Обязательно: Нет
Значение по умолчанию: 30000
Описание: Тайм-аут для установления HTTP соединения. Допустимые значения: от 100 до 60000. Единица: мс. До версии Flink коннектора v1.2.9 значение по умолчанию было 1000.

sink.socket.timeout-ms

Обязательно: Нет
Значение по умолчанию: -1
Описание: Поддерживается с версии 1.2.10. Время ожидания данных HTTP клиентом. Единица: мс. Значение по умолчанию -1 означает отсутствие тайм-аута.

sink.sanitize-error-log

Обязательно: Нет
Значение по умолчанию: false
Описание: Поддерживается с версии 1.2.12. Следует ли очищать конфиденциальные данные в журнале ошибок для безопасности в продакшене. Когда этот параметр установлен в true, конфиденциальные данные строк и значения столбцов в журналах ошибок Stream Load редактируются как в коннекторе, так и в журналах SDK. Значение по умолчанию false для обратной совместимости.

sink.wait-for-continue.timeout-ms

Обязательно: Нет
Значение по умолчанию: 10000
Описание: Поддерживается с версии 1.2.7. Тайм-аут ожидания ответа HTTP 100-continue от FE. Допустимые значения: от 3000 до 60000. Единица: мс

sink.ignore.update-before

Обязательно: Нет
Значение по умолчанию: true
Описание: Поддерживается с версии 1.2.8. Следует ли игнорировать записи UPDATE_BEFORE от Flink при загрузке данных в таблицы Primary Key. Если этот параметр установлен в false, запись обрабатывается как операция удаления в таблице Selena.

sink.parallelism

Обязательно: Нет
Значение по умолчанию: NONE
Описание: Параллелизм загрузки. Доступно только для Flink SQL. Если этот параметр не указан, планировщик Flink решает параллелизм. В сценарии мульти-параллелизма пользователи должны гарантировать, что данные записываются в правильном порядке.

sink.properties.*

Обязательно: Нет
Значение по умолчанию: NONE
Описание: Параметры, используемые для управления поведением Stream Load. Например, параметр sink.properties.format указывает формат, используемый для Stream Load, такой как CSV или JSON. Список поддерживаемых параметров и их описания см. в STREAM LOAD.

sink.properties.format

Обязательно: Нет
Значение по умолчанию: csv
Описание: Формат, используемый для Stream Load. Flink коннектор преобразует каждую партию данных в указанный формат перед отправкой их в Selena. Допустимые значения: csv и json.

sink.properties.column_separator

Обязательно: Нет
Значение по умолчанию: \t
Описание: Разделитель столбцов для данных в формате CSV.

sink.properties.row_delimiter

Обязательно: Нет
Значение по умолчанию: \n
Описание: Разделитель строк для данных в формате CSV.

sink.properties.max_filter_ratio

Обязательно: Нет
Значение по умолчанию: 0
Описание: Максимальная толерантность к ошибкам Stream Load. Это максимальный процент записей данных, которые могут быть отфильтрованы из-за неадекватного качества данных. Допустимые значения: от 0 до 1. Значение по умолчанию: 0. Подробности см. в Stream Load.

sink.properties.partial_update

Обязательно: НЕТ
Значение по умолчанию: FALSE
Описание: Использовать ли частичные обновления. Допустимые значения: TRUE и FALSE. Значение по умолчанию: FALSE, указывающее на отключение этой функции.

sink.properties.partial_update_mode

Обязательно: НЕТ
Значение по умолчанию: row
Описание: Указывает режим для частичных обновлений. Допустимые значения: row и column.

  • Значение row (по умолчанию) означает частичные обновления в режиме строк, что более подходит для обновлений в реальном времени с множеством столбцов и небольшими партиями.
  • Значение column означает частичные обновления в режиме столбцов, что более подходит для пакетных обновлений с небольшим количеством столбцов и множеством строк. В таких сценариях включение режима столбцов обеспечивает более быструю скорость обновления. Например, в таблице со 100 столбцами, если обновляются только 10 столбцов (10% от общего количества) для всех строк, скорость обновления в режиме столбцов в 10 раз быстрее.

sink.properties.strict_mode

Обязательно: Нет
Значение по умолчанию: false
Описание: Указывает, следует ли включить строгий режим для Stream Load. Это влияет на поведение загрузки при наличии неквалифицированных строк, таких как несогласованные значения столбцов. Допустимые значения: true и false. Значение по умолчанию: false. Подробности см. в Stream Load.

sink.properties.compression

Обязательно: Нет
Значение по умолчанию: NONE
Описание: Алгоритм сжатия, используемый для Stream Load. Допустимые значения: lz4_frame. Сжатие для формата JSON требует Flink коннектор 1.2.10+ и Selena v3.2.7+. Сжатие для формата CSV требует только Flink коннектор 1.2.11+.

sink.properties.prepared_timeout

Обязательно: Нет
Значение по умолчанию: NONE
Описание: Поддерживается с версии 1.2.12 и действует только когда sink.version установлен в V2. Требует Selena 3.5.4 или позже. Устанавливает тайм-аут в секундах для фазы Transaction Stream Load от PREPARED до COMMITTED. Обычно нужен только для exactly-once; at-least-once обычно не требует установки этого параметра (коннектор по умолчанию использует 300с). Если не установлен в exactly-once, применяется конфигурация Selena FE prepared_transaction_default_timeout_second (по умолчанию 86400с). См. Управление тайм-аутами транзакций Selena.

Тип данных FlinkТип данных Selena
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTEGERINTEGER
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
BINARYINT
CHARSTRING
VARCHARSTRING
STRINGSTRING
DATEDATE
TIMESTAMP_WITHOUT_TIME_ZONE(N)DATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)DATETIME
ARRAY<T>ARRAY<T>
MAP<KT,VT>JSON STRING
ROW<arg T...>JSON STRING

Примечания по использованию

Exactly Once

  • Если вы хотите, чтобы sink гарантировал семантику exactly-once, мы рекомендуем обновить Selena до версии 2.5 или позже, а Flink коннектор до версии 1.2.4 или позже

    • Начиная с Flink коннектора 1.2.4, exactly-once переработан на основе интерфейса транзакций Stream Load, предоставляемого Selena с версии 2.4. По сравнению с предыдущей реализацией на основе нетранзакционного интерфейса Stream Load, новая реализация снижает использование памяти и накладные расходы checkpoint, тем самым повышая производительность в реальном времени и стабильность загрузки.

    • Если версия Selena ранее 2.4 или версия Flink коннектора ранее 1.2.4, sink автоматически выберет реализацию на основе нетранзакционного интерфейса Stream Load.

  • Конфигурации для гарантии exactly-once

    • Значение sink.semantic должно быть exactly-once.

    • Если версия Flink коннектора 1.2.8 и позже, рекомендуется указать значение sink.label-prefix. Обратите внимание, что префикс метки должен быть уникальным среди всех типов загрузки в Selena, таких как задания Flink, Routine Load и Broker Load.

      • Если префикс метки указан, Flink коннектор будет использовать префикс метки для очистки зависших транзакций, которые могут быть созданы в некоторых сценариях сбоя Flink, например, когда задание Flink завершается неудачно во время выполнения checkpoint. Эти зависшие транзакции обычно находятся в статусе PREPARED, если вы используете SHOW PROC '/transactions/<db_id>/running'; для их просмотра в Selena. Когда задание Flink восстанавливается из checkpoint, Flink коннектор найдет эти зависшие транзакции согласно префиксу метки и некоторой информации в checkpoint и прервет их. Flink коннектор не может прервать их при выходе из задания Flink из-за механизма двухфазной фиксации для реализации exactly-once. Когда задание Flink завершается, Flink коннектор не получил уведомления от координатора checkpoint Flink о том, должны ли транзакции быть включены в успешный checkpoint, и это может привести к потере данных, если эти транзакции будут прерваны в любом случае. Вы можете получить обзор о том, как достичь сквозного exactly-once в Flink в этом блогпосте.

      • Если префикс метки не указан, зависшие транзакции будут очищены Selena только после истечения времени ожидания. Однако количество выполняющихся транзакций может достичь ограничения Selena max_running_txn_num_per_db, если задания Flink часто завершаются неудачно до истечения времени ожидания транзакций. Вы можете установить меньший тайм-аут для транзакций PREPARED, чтобы они истекали быстрее, когда префикс метки не указан. См. ниже о том, как установить тайм-аут prepared.

  • Если вы уверены, что задание Flink в конечном итоге восстановится из checkpoint или savepoint после длительного простоя из-за остановки или непрерывных сбоев, пожалуйста, соответственно настройте следующие конфигурации Selena, чтобы избежать потери данных.

    • Настройте тайм-аут транзакций PREPARED. См. ниже о том, как установить тайм-аут.

      Тайм-аут должен быть больше времени простоя задания Flink. В противном случае зависшие транзакции, которые включены в успешный checkpoint, могут быть прерваны из-за тайм-аута до перезапуска задания Flink, что приведет к потере данных.

      Обратите внимание, что когда вы устанавливаете большее значение для этой конфигурации, лучше указать значение sink.label-prefix, чтобы зависшие транзакции могли быть очищены согласно префиксу метки и некоторой информации в checkpoint, а не из-за тайм-аута (что может привести к потере данных).

    • label_keep_max_second и label_keep_max_num: конфигурации Selena FE, значения по умолчанию 259200 и 1000 соответственно. Подробности см. в конфигурациях FE. Значение label_keep_max_second должно быть больше времени простоя задания Flink. В противном случае Flink коннектор не сможет проверить состояние транзакций в Selena, используя метки транзакций, сохраненные в savepoint или checkpoint Flink, и выяснить, зафиксированы ли эти транзакции или нет, что в конечном итоге может привести к потере данных.

  • Как установить тайм-аут для транзакций PREPARED

    • Для коннектора 1.2.12+ и Selena 3.5.4+ вы можете установить тайм-аут, настроив параметр коннектора sink.properties.prepared_timeout. По умолчанию значение не установлено, и оно возвращается к глобальной конфигурации Selena FE prepared_transaction_default_timeout_second (значение по умолчанию 86400).

    • Для других версий коннектора или Selena вы можете установить тайм-аут, настроив глобальную конфигурацию Selena FE prepared_transaction_default_timeout_second (значение по умолчанию 86400).

Политика сброса

Flink коннектор будет буферизовать данные в памяти и сбрасывать их пакетами в Selena через Stream Load. Способ запуска сброса различается между at-least-once и exactly-once.

Для at-least-once сброс будет запущен при выполнении любого из следующих условий:

  • байты буферизованных строк достигают лимита sink.buffer-flush.max-bytes
  • количество буферизованных строк достигает лимита sink.buffer-flush.max-rows. (Действительно только для версии sink V1)
  • время, прошедшее с последнего сброса, достигает лимита sink.buffer-flush.interval-ms
  • запускается checkpoint

Для exactly-once сброс происходит только при запуске checkpoint.

Мониторинг метрик загрузки

Flink коннектор предоставляет следующие метрики для мониторинга загрузки.

МетрикаТипОписание
totalFlushBytescounterуспешно сброшенные байты.
totalFlushRowscounterколичество успешно сброшенных строк.
totalFlushSucceededTimescounterколичество раз успешного сброса данных.
totalFlushFailedTimescounterколичество раз неудачного сброса данных.
totalFilteredRowscounterколичество отфильтрованных строк, которое также включено в totalFlushRows.

Примеры

Следующие примеры показывают, как использовать Flink коннектор для загрузки данных в таблицу Selena с помощью Flink SQL или Flink DataStream.

Подготовка

Создание таблицы Selena

Создайте базу данных test и создайте таблицу Primary Key score_board.

CREATE DATABASE `test`;

CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
  • Загрузите бинарный файл Flink Flink 1.15.2 и распакуйте его в директорию flink-1.15.2.

  • Загрузите Flink коннектор 1.2.7 и поместите его в директорию flink-1.15.2/lib.

  • Выполните следующие команды для запуска кластера Flink:

    cd flink-1.15.2
    ./bin/start-cluster.sh

Конфигурация сети

Убедитесь, что машина, на которой расположен Flink, может получить доступ к узлам FE кластера Selena через http_port (по умолчанию: 8030) и query_port (по умолчанию: 9030), а также к узлам BE через be_http_port (по умолчанию: 8040).

  • Выполните следующую команду для запуска клиента Flink SQL.

    ./bin/sql-client.sh
  • Создайте таблицу Flink score_board и вставьте значения в таблицу через клиент Flink SQL. Обратите внимание, что вы должны определить первичный ключ в DDL Flink, если хотите загрузить данные в таблицу Primary Key Selena. Это необязательно для других типов таблиц Selena.

    CREATE TABLE `score_board` (
    `id` INT,
    `name` STRING,
    `score` INT,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',

    'table-name' = 'score_board',
    'username' = 'root',
    'password' = ''
    );

    INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

Существует несколько способов реализации задания Flink DataStream в зависимости от типа входных записей, таких как CSV Java String, JSON Java String или пользовательский Java объект.

  • Входные записи представляют собой String в формате CSV. См. LoadCsvRecords для полного примера.

    /**
    * Генерация записей в формате CSV. Каждая запись имеет три значения, разделенных "\t".
    * Эти значения будут загружены в столбцы `id`, `name` и `score` в таблице Selena.
    */
    String[] records = new String[]{
    "1\tstarrocks-csv\t100",
    "2\tflink-csv\t100"
    };
    DataStream<String> source = env.fromElements(records);

    /**
    * Настройте коннектор с необходимыми свойствами.
    * Вам также нужно добавить свойства "sink.properties.format" и "sink.properties.column_separator"
    * чтобы сообщить коннектору, что входные записи в формате CSV, а разделитель столбцов "\t".
    * Вы также можете использовать другие разделители столбцов в записях формата CSV,
    * но не забудьте соответственно изменить "sink.properties.column_separator".
    */
    StarRocksSinkOptions options = StarRocksSinkOptions.builder()
    .withProperty("jdbc-url", jdbcUrl)
    .withProperty("load-url", loadUrl)
    .withProperty("database-name", "test")
    .withProperty("table-name", "score_board")
    .withProperty("username", "root")
    .withProperty("password", "")
    .withProperty("sink.properties.format", "csv")
    .withProperty("sink.properties.column_separator", "\t")
    .build();
    // Создайте sink с опциями.
    SinkFunction<String> starRockSink = StarRocksSink.sink(options);
    source.addSink(starRockSink);
  • Входные записи представляют собой String в формате JSON. См. LoadJsonRecords для полного примера.

    /**
    * Генерация записей в формате JSON.
    * Каждая запись имеет три пары ключ-значение, соответствующие столбцам `id`, `name` и `score` в таблице Selena.
    */
    String[] records = new String[]{
    "{\"id\":1, \"name\":\"starrocks-json\", \"score\":100}",
    "{\"id\":2, \"name\":\"flink-json\", \"score\":100}",
    };
    DataStream<String> source = env.fromElements(records);

    /**
    * Настройте коннектор с необходимыми свойствами.
    * Вам также нужно добавить свойства "sink.properties.format" и "sink.properties.strip_outer_array"
    * чтобы сообщить коннектору, что входные записи в формате JSON и удалить внешнюю структуру массива.
    */
    StarRocksSinkOptions options = StarRocksSinkOptions.builder()
    .withProperty("jdbc-url", jdbcUrl)
    .withProperty("load-url", loadUrl)
    .withProperty("database-name", "test")
    .withProperty("table-name", "score_board")
    .withProperty("username", "root")
    .withProperty("password", "")
    .withProperty("sink.properties.format", "json")
    .withProperty("sink.properties.strip_outer_array", "true")
    .build();
    // Создайте sink с опциями.
    SinkFunction<String> starRockSink = StarRocksSink.sink(options);
    source.addSink(starRockSink);
  • Входные записи представляют собой пользовательские Java объекты. См. LoadCustomJavaRecords для полного примера.

    • В этом примере входная запись представляет собой простой POJO RowData.

      public static class RowData {
      public int id;
      public String name;
      public int score;

      public RowData() {}

      public RowData(int id, String name, int score) {
      this.id = id;
      this.name = name;
      this.score = score;
      }
      }
    • Основная программа выглядит следующим образом:

      // Генерация записей, которые используют RowData в качестве контейнера.
      RowData[] records = new RowData[]{
      new RowData(1, "starrocks-rowdata", 100),
      new RowData(2, "flink-rowdata", 100),
      };
      DataStream<RowData> source = env.fromElements(records);

      // Настройте коннектор с необходимыми свойствами.
      StarRocksSinkOptions options = StarRocksSinkOptions.builder()
      .withProperty("jdbc-url", jdbcUrl)
      .withProperty("load-url", loadUrl)
      .withProperty("database-name", "test")
      .withProperty("table-name", "score_board")
      .withProperty("username", "root")
      .withProperty("password", "")
      .build();

      /**
      * Flink коннектор будет использовать массив Java объектов (Object[]) для представления строки, загружаемой в таблицу Selena,
      * и каждый элемент является значением для столбца.
      * Вам нужно определить схему Object[], которая соответствует схеме таблицы Selena.
      */
      TableSchema schema = TableSchema.builder()
      .field("id", DataTypes.INT().notNull())
      .field("name", DataTypes.STRING())
      .field("score", DataTypes.INT())
      // Когда таблица Selena является таблицей Primary Key, вы должны указать notNull(), например, DataTypes.INT().notNull(), для первичного ключа `id`.
      .primaryKey("id")
      .build();
      // Преобразуйте RowData в Object[] согласно схеме.
      RowDataTransformer transformer = new RowDataTransformer();
      // Создайте sink со схемой, опциями и трансформером.
      SinkFunction<RowData> starRockSink = StarRocksSink.sink(schema, options, transformer);
      source.addSink(starRockSink);
    • RowDataTransformer в основной программе определен следующим образом:

      private static class RowDataTransformer implements StarRocksSinkRowBuilder<RowData> {

      /**
      * Установите каждый элемент массива объектов согласно входному RowData.
      * Схема массива соответствует схеме таблицы Selena.
      */
      @Override
      public void accept(Object[] internalRow, RowData rowData) {
      internalRow[0] = rowData.id;
      internalRow[1] = rowData.name;
      internalRow[2] = rowData.score;
      // Когда таблица Selena является таблицей Primary Key, вам нужно установить последний элемент, чтобы указать, является ли загрузка данных операцией UPSERT или DELETE.
      internalRow[internalRow.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
      }
      }

Фреймворк Flink CDC 3.0 может использоваться для легкого построения потокового ELT пайплайна из источников CDC (таких как MySQL и Kafka) в Selena. Пайплайн может синхронизировать целые базы данных, объединенные шардированные таблицы и изменения схемы из источников в Selena.

Начиная с версии 1.5.0, Flink коннектор для Selena интегрирован в этот фреймворк как Selena Pipeline Connector. Selena Pipeline Connector поддерживает:

  • Автоматическое создание баз данных и таблиц
  • Синхронизацию изменений схемы
  • Полную и инкрементальную синхронизацию данных

Для быстрого старта см. Потоковый ELT из MySQL в Selena с использованием Flink CDC 3.0 с Selena Pipeline Connector.

Рекомендуется использовать Selena версии v3.2.1 и позже для включения fast_schema_evolution. Это улучшит скорость добавления или удаления столбцов и снизит использование ресурсов.

Лучшие практики

Загрузка данных в таблицу Primary Key

Этот раздел покажет, как загружать данные в таблицу Primary Key Selena для достижения частичных обновлений и условных обновлений. Вы можете посмотреть Изменение данных через загрузку для введения в эти функции. Эти примеры используют Flink SQL.

Подготовка

Создайте базу данных test и создайте таблицу Primary Key score_board в Selena.

CREATE DATABASE `test`;

CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);

Частичное обновление

Этот пример покажет, как загружать данные только в столбцы id и name.

  1. Вставьте две строки данных в таблицу Selena score_board в клиенте MySQL.

    mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

    mysql> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 1 | starrocks | 100 |
    | 2 | flink | 100 |
    +------+-----------+-------+
    2 rows in set (0.02 sec)
  2. Создайте таблицу Flink score_board в клиенте Flink SQL.

    • Определите DDL, который включает только столбцы id и name.
    • Установите опцию sink.properties.partial_update в true, что сообщает Flink коннектору выполнять частичные обновления.
    • Если версия Flink коннектора <= 1.2.7, вам также нужно установить опцию sink.properties.columns в id,name,__op, чтобы сообщить Flink коннектору, какие столбцы нужно обновить. Обратите внимание, что вам нужно добавить поле __op в конце. Поле __op указывает, что загрузка данных является операцией UPSERT или DELETE, и его значения устанавливаются коннектором автоматически.
    CREATE TABLE `score_board` (
    `id` INT,
    `name` STRING,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',
    'table-name' = 'score_board',
    'username' = 'root',
    'password' = '',
    'sink.properties.partial_update' = 'true',
    -- только для версии Flink коннектора <= 1.2.7
    'sink.properties.columns' = 'id,name,__op'
    );
  3. Вставьте две строки данных в таблицу Flink. Первичные ключи строк данных такие же, как у строк в таблице Selena, но значения в столбце name изменены.

    INSERT INTO `score_board` VALUES (1, 'starrocks-update'), (2, 'flink-update');
  4. Запросите таблицу Selena в клиенте MySQL.

    mysql> select * from score_board;
    +------+------------------+-------+
    | id | name | score |
    +------+------------------+-------+
    | 1 | starrocks-update | 100 |
    | 2 | flink-update | 100 |
    +------+------------------+-------+
    2 rows in set (0.02 sec)

    Вы можете видеть, что изменяются только значения для name, а значения для score не изменяются.

Условное обновление

Этот пример покажет, как выполнить условное обновление согласно значению столбца score. Обновление для id вступает в силу только когда новое значение для score больше или равно старому значению.

  1. Вставьте две строки данных в таблицу Selena в клиенте MySQL.

    mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

    mysql> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 1 | starrocks | 100 |
    | 2 | flink | 100 |
    +------+-----------+-------+
    2 rows in set (0.02 sec)
  2. Создайте таблицу Flink score_board следующими способами:

    • Определите DDL, включающий все столбцы.
    • Установите опцию sink.properties.merge_condition в score, чтобы сообщить коннектору использовать столбец score в качестве условия.
    • Установите опцию sink.version в V1, что сообщает коннектору использовать Stream Load.
    CREATE TABLE `score_board` (
    `id` INT,
    `name` STRING,
    `score` INT,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',
    'table-name' = 'score_board',
    'username' = 'root',
    'password' = '',
    'sink.properties.merge_condition' = 'score',
    'sink.version' = 'V1'
    );
  3. Вставьте две строки данных в таблицу Flink. Первичные ключи строк данных такие же, как у строк в таблице Selena. Первая строка данных имеет меньшее значение в столбце score, а вторая строка данных имеет большее значение в столбце score.

    INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'flink-update', 101);
  4. Запросите таблицу Selena в клиенте MySQL.

    mysql> select * from score_board;
    +------+--------------+-------+
    | id | name | score |
    +------+--------------+-------+
    | 1 | starrocks | 100 |
    | 2 | flink-update | 101 |
    +------+--------------+-------+
    2 rows in set (0.03 sec)

    Вы можете видеть, что изменяются только значения второй строки данных, а значения первой строки данных не изменяются.

Загрузка данных в столбцы типа BITMAP

BITMAP часто используется для ускорения подсчета уникальных значений, например, подсчета UV, см. Использование Bitmap для точного Count Distinct. Здесь мы возьмем подсчет UV в качестве примера, чтобы показать, как загружать данные в столбцы типа BITMAP.

  1. Создайте таблицу Aggregate Selena в клиенте MySQL.

    В базе данных test создайте таблицу Aggregate page_uv, где столбец visit_users определен как тип BITMAP и настроен с агрегатной функцией BITMAP_UNION.

    CREATE TABLE `test`.`page_uv` (
    `page_id` INT NOT NULL COMMENT 'page ID',
    `visit_date` datetime NOT NULL COMMENT 'access time',
    `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. Создайте таблицу Flink в клиенте Flink SQL.

    Столбец visit_user_id в таблице Flink имеет тип BIGINT, и мы хотим загрузить этот столбец в столбец visit_users типа BITMAP в таблице Selena. Поэтому при определении DDL таблицы Flink обратите внимание, что:

    • Поскольку Flink не поддерживает BITMAP, вам нужно определить столбец visit_user_id как тип BIGINT для представления столбца visit_users типа BITMAP в таблице Selena.
    • Вам нужно установить опцию sink.properties.columns в page_id,visit_date,user_id,visit_users=to_bitmap(visit_user_id), что сообщает коннектору сопоставление столбцов между таблицей Flink и таблицей Selena. Также вам нужно использовать функцию to_bitmap чтобы сообщить коннектору преобразовать данные типа BIGINT в тип BITMAP.
    CREATE TABLE `page_uv` (
    `page_id` INT,
    `visit_date` TIMESTAMP,
    `visit_user_id` BIGINT
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',
    'table-name' = 'page_uv',
    'username' = 'root',
    'password' = '',
    'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=to_bitmap(visit_user_id)'
    );
  3. Загрузите данные в таблицу Flink в клиенте Flink SQL.

    INSERT INTO `page_uv` VALUES
    (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13),
    (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23),
    (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33),
    (1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13),
    (2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23);
  4. Вычислите UV страниц из таблицы Selena в клиенте MySQL.

    MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `page_uv` GROUP BY `page_id`;
    +---------+-----------------------------+
    | page_id | count(DISTINCT visit_users) |
    +---------+-----------------------------+
    | 2 | 1 |
    | 1 | 3 |
    +---------+-----------------------------+
    2 rows in set (0.05 sec)

Загрузка данных в столбцы типа HLL

HLL может использоваться для приблизительного подсчета уникальных значений, см. Использование HLL для приблизительного подсчета уникальных значений.

Здесь мы возьмем подсчет UV в качестве примера, чтобы показать, как загружать данные в столбцы типа HLL.

  1. Создайте таблицу Aggregate Selena

    В базе данных test создайте таблицу Aggregate hll_uv, где столбец visit_users определен как тип HLL и настроен с агрегатной функцией HLL_UNION.

    CREATE TABLE `hll_uv` (
    `page_id` INT NOT NULL COMMENT 'page ID',
    `visit_date` datetime NOT NULL COMMENT 'access time',
    `visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. Создайте таблицу Flink в клиенте Flink SQL.

    Столбец visit_user_id в таблице Flink имеет тип BIGINT, и мы хотим загрузить этот столбец в столбец visit_users типа HLL в таблице Selena. Поэтому при определении DDL таблицы Flink обратите внимание, что:

    • Поскольку Flink не поддерживает BITMAP, вам нужно определить столбец visit_user_id как тип BIGINT для представления столбца visit_users типа HLL в таблице Selena.
    • Вам нужно установить опцию sink.properties.columns в page_id,visit_date,user_id,visit_users=hll_hash(visit_user_id), что сообщает коннектору сопоставление столбцов между таблицей Flink и таблицей Selena. Также вам нужно использовать функцию hll_hash чтобы сообщить коннектору преобразовать данные типа BIGINT в тип HLL.
    CREATE TABLE `hll_uv` (
    `page_id` INT,
    `visit_date` TIMESTAMP,
    `visit_user_id` BIGINT
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',
    'table-name' = 'hll_uv',
    'username' = 'root',
    'password' = '',
    'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=hll_hash(visit_user_id)'
    );
  3. Загрузите данные в таблицу Flink в клиенте Flink SQL.

    INSERT INTO `hll_uv` VALUES
    (3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78),
    (4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2),
    (3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674);
  4. Вычислите UV страниц из таблицы Selena в клиенте MySQL.

    mysql> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;
    **+---------+-----------------------------+
    | page_id | count(DISTINCT visit_users) |
    +---------+-----------------------------+
    | 3 | 2 |
    | 4 | 1 |
    +---------+-----------------------------+
    2 rows in set (0.04 sec)