Apache Flink
Непрерывная загрузка данных из Apache Flink®
Selena предоставляет самостоятельно разработанный connector под названием Selena Connector for Apache Flink® (сокращённо Flink connector) для загрузки данных в таблицу Selena с использованием Flink. Основной принцип заключается в накоплении данных с последующей загрузкой всех данных за один раз в Selena через STREAM LOAD.
Flink connector поддерживает DataStream API, Table API & SQL и Python API. Он имеет более высокую и стабильную производительность по сравнению с flink-connector-jdbc, предоставляемым Apache Flink®.
ВНИМАНИЕ
Загрузка данных в таблицы Selena с помощью Flink connector требует привилегий SELECT и INSERT на целевую таблицу Selena. Если у вас нет этих привилегий, следуйте инструкциям, представленным в GRANT, чтобы предоставить эти привилегии пользователю, которого вы используете для подключения к вашему cluster Selena.
Требования к версиям
| Connector | Flink | Selena | Java | Scala |
|---|---|---|---|---|
| 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 и позже | 8 | 2.11,2.12 |
| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 и позже | 8 | 2.11,2.12 |
| 1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 и позже | 8 | 2.11,2.12 |
| 1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 и позже | 8 | 2.11,2.12 |
| 1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 и позже | 8 | 2.11,2.12 |
Получение Flink connector
Вы можете получить JAR-файл Flink connector следующими способами:
- Напрямую скачать скомпилированный JAR-файл Flink connector.
- Добавить Flink connector в качестве зависимости в ваш проект Maven, а затем скачать JAR-файл.
- Скомпилировать исходный код Flink connector в JAR-файл самостоятельно.
Формат наименования JAR-файла Flink connector следующий:
-
Начиная с Flink 1.15, это
flink-connector-selena-${connector_version}_flink-${flink_version}.jar. Например, если вы установили Flink 1.15 и хотите использовать Flink connector 1.2.7, вы можете использоватьflink-connector-selena-1.2.7_flink-1.15.jar. -
До Flink 1.15 это
flink-connector-selena-${connector_version}_flink-${flink_version}_${scala_version}.jar. Например, если вы установили Flink 1.14 и Scala 2.12 в вашей среде, и хотите использ овать Flink connector 1.2.7, вы можете использоватьflink-connector-selena-1.2.7_flink-1.14_2.12.jar.
ВНИМАНИЕ
В общем случае последняя версия Flink connector поддерживает совместимость только с тремя последними версиями Flink.
Скачивание скомпилированного Jar-файла
Напрямую скачайте соответствующую версию JAR-файла Flink connector из Maven Central Repository.
Maven Dependency
В файле pom.xml вашего проекта Maven добавьте Flink connector в качестве зависимости в соответствии со следующим форматом. Замените flink_version, scala_version и connector_version на соответствующие версии.
-
В Flink 1.15 и позже
<dependency>
<groupId>com.selena</groupId>
<artifactId>flink-connector-selena</artifactId>
<version>${connector_version}_flink-${flink_version}</version>
</dependency> -
В версиях до Flink 1.15
<dependency>
<groupId>com.selena</groupId>
<artifactId>flink-connector-selena</artifactId>
<version>${connector_version}_flink-${flink_version}_${scala_version}</version>
</dependency>
Компиляция самостоятельно
-
Скачайте исходный код Flink connector.
-
Выполните следующую команду для компиляции исходного кода Flink connector в JAR-файл. Обра тите внимание, что
flink_versionзаменяется на соответствующую версию Flink.sh build.sh <flink_version>Например, если версия Flink в вашей среде 1.15, вам нужно выполнить следующую команду:
sh build.sh 1.15 -
Перейдите в каталог
target/, чтобы найти JAR-файл Flink connector, такой какflink-connector-selena-1.2.7_flink-1.15-SNAPSHOT.jar, сгенерированный после компиляции.
ПРИМЕЧАНИЕ
Имя Flink connector, который не выпущен официально, содержит суффикс
SNAPSHOT.
Параметры
connector
Обязательный: Да
Значение по умолчанию: NONE
Описание: Connector, который вы хотите использовать. Значение должно быть "selena".
jdbc-url
Обязательный: Да
Значение по умолчанию: NONE
Описание: Адрес, который используется для подключения к серверу MySQL FE. Вы можете указать несколько адресов, которые должны быть разделены запятой (,). Формат: jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>.
load-url
Обязательный: Да
Значение по умолчанию: NONE
Описание: Адрес, который используется для подключения к HTTP-серверу FE. Вы можете указать несколько адресов, которые должны быть разделены точкой с запятой (;). Формат: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>.
database-name
Обязательный: Да
Значение по умолчанию: NONE
Описание: Имя базы данных Selena, в которую вы хотите загрузить данные.
table-name
Обязательный: Да
Значение по умолчанию: NONE
Описание: Имя таблицы, которую вы хотите использовать для загрузки данных в Selena.
username
Обязательный: Да
Значение по умолчанию: NONE
Описание: Имя пользователя учетной записи, которую вы хотите использовать для загрузки данных в Selena. Учетная запись должна иметь привилегии SELECT и INSERT на целевую таблицу Selena.
password
Обязательный: Да
Значение по умолчанию: NONE
Описание: Пароль предыдущей учетной записи.
sink.version
Обязательный: Нет
Значение по умолчанию: AUTO
Описание: Интерфейс, используемый для загрузки данных. Этот параметр поддерживается начиная с версии Flink connector 1.2.4.
V1: Использовать интерфейс Stream Load для загрузки данных. Connectors до 1.2.4 поддерживают только этот режим.V2: Использовать интерфейс Stream Load transaction для загрузки данных. Требуется Selena версии не ниже 2.4. РекомендуетсяV2, так как он оптимизирует использование памяти и обеспечивает более стабильную реализацию exactly-once.AUTO: Если версия Selena поддерживает транзакционный Stream Load, автоматически выберетV2, иначе выберетV1
sink.label-prefix
Обязательный: Нет
Значение по умолчанию: NONE
Описание: Префикс метки, используемый Stream Load. Рекомендуется настроить его, если вы используете exactly-once с connector 1.2.8 и позже. См. примечания по использованию exactly-once.
sink.semantic
Обязательный: Нет
Значение по умолчанию: at-least-once
Описание: Семантика, гарантируемая sink. Допустимые значения: at-least-once и exactly-once.
sink.buffer-flush.max-bytes
Обязательный: Нет
Значение по умолчанию: 94371840(90M)
Описание: Максимальный размер данных, которые могут быть накоплены в памяти перед отправкой в Selena за один раз. Максимальное значение варьируется от 64 МБ до 10 ГБ. Установка этого параметра на большее значение может улучшить производительность загрузки, но может увеличить задержку загрузки. Этот параметр вступает в силу только тогда, когда sink.semantic установлен на at-least-once. Если sink.semantic установлен на exactly-once, данные в памяти сбрасываются при запуске checkpoint Flink. В этом случае этот параметр не действует.
sink.buffer-flush.max-rows
Обязательный: Нет
Значение по умолчанию: 500000
Описание: Максимальное количество строк, которые могут быть накоплены в памяти перед отправкой в Selena за один раз. Этот параметр доступен только тогда, когда sink.version равен V1, а sink.semantic равен at-least-once. Допустимые значения: от 64000 до 5000000.
sink.buffer-flush.interval-ms
Обязательный: Нет
Значение по умолчанию: 300000
Описание: Интервал сброса данных. Этот параметр доступен только тогда, когда sink.semantic равен at-least-once. Допустимые значения: от 1000 до 3600000. Единица измерения: мс.
sink.max-retries
Обязательный: Нет
Значение по умолчанию: 3
Описание: Количество раз, когда система повторяет попытку выполнить задачу Stream Load. Этот параметр доступен только тогда, когда вы устанавливаете sink.version в V1. Допустимые значения: от 0 до 10.
sink.connect.timeout-ms
Обязательный: Нет
Значение по умолчанию: 30000
Описание: Тайм-аут для установки HTTP-соединения. Допустимые значения: от 100 до 60000. Единица измерения: мс. До Flink connector v1.2.9 значение по умолчанию было 1000.
sink.socket.timeout-ms
Обязательный: Нет
Значение по умолчанию: -1
Описание: Поддерживается с версии 1.2.10. Продолжительность времени, в течение которого HTTP-клиент ожидает данных. Единица измерения: мс. Значение по умолчанию -1 означает, что тайм-аута нет.
sink.sanitize-error-log
Обязательный: Нет
Значение по умолчанию: false
Описание: Поддерживается с версии 1.2.12. Следует ли очищать конфиденциальные данные в журнале ошибок для производственной безопасности. Когда этот элемент установлен в true, конфиденциальные данные строк и значения столбцов в журналах ошибок Stream Load редактируются как в журналах connector, так и в журналах SDK. Значение по умолчанию - false для обратной совместимости.
sink.wait-for-continue.timeout-ms
Обязательный: Нет
Значение по умолчанию: 10000
Описание: Поддерживается с версии 1.2.7. Тайм-аут ожидания ответа HTTP 100-continue от FE. Допустимые значения: от 3000 до 60000. Единица измерения: мс
sink.ignore.update-before
Обязательный: Нет
Значение по умолчанию: true
Описание: Поддерживается с версии 1.2.8. Следует ли игнорировать записи UPDATE_BEFORE из Flink при загрузке данных в таблицы Primary Key. Если этот параметр установлен в false, запись рассматривается как операция удаления в таблице Selena.
sink.parallelism
Обязательный: Нет
Значение по умолчанию: NONE
Описание: Параллелизм загрузки. Доступно только для Flink SQL. Если этот параметр не указан, планировщик Flink определяет параллелизм. В сценарии с мультипараллелизмом пользователи должны гарантировать, что данные записываются в правильном порядке.
sink.properties.*
Обязательный: Нет
Значение по умолчанию: NONE
Описание: Параметры, которые используются для управления поведением Stream Load. Например, параметр sink.properties.format указывает формат, используемый для Stream Load, такой как CSV или JSON. Список поддерживаемых параметров и их описания см. в STREAM LOAD.
sink.properties.format
Обязательный: Нет
Значение по умолчанию: csv
Описание: Формат, используемый для Stream Load. Flink connector преобразует каждый пакет данных в формат перед их отправкой в Selena. Допустимые значения: csv и json.
sink.properties.column_separator
Обязательный: Нет
Значение по умолчанию: \t
Описание: Разделитель столбцов для данных в формате CSV.
sink.properties.row_delimiter
Обязательный: Нет
Значение по умолчанию: \n
Описание: Разделитель строк для данных в формате CSV.
sink.properties.max_filter_ratio
Обязательный: Нет
Значение по умолчанию: 0
Описание: Максимальная толерантность к ошибкам Stream Load. Это максимальный процент записей данных, которые могут быть отфильтрованы из-за недостаточного качества данных. Допустимые значения: от 0 до 1. Значение по умолчанию: 0. Подробности см. в Stream Load.
sink.properties.partial_update
Обязательный: НЕТ
Значение по умолчанию: FALSE
Описание: Следует ли использовать частичные обновления. Допустимые значения: TRUE и FALSE. Значение по умолчанию: FALSE, указывающее на отключение этой функции.
sink.properties.partial_update_mode
Обязательный: НЕТ
Значение по умолчанию: row
Описание: Указывает режим частичных обновлений. Допустимые значения: row и column.
- Значение
row(по умолчанию) означает частичные обновления в режиме строк, что более подходит для обновлений в реальном времени с большим количеством столбцов и небольшими пакетами. - Значение
columnозначает частичные обновления в режиме столбцов, что более подходит для пакетных обновлений с небольшим количеством столбцов и большим количеством строк. В таких сценариях включение режима столбцов обеспечивает более быструю скорость обновления. Например, в таблице с 100 столбцами, если обновляются только 10 столбцов (10% от общего числа) для всех строк, скорость обновления в режиме столбцов в 10 раз быстрее.
sink.properties.strict_mode
Обязательный: Нет
Значение по умолчанию: false
Описание: Указывает, следует ли включать строгий режим для Stream Load. Это влияет на поведение загрузки при наличии неквалифицированных строк, таких как несогласованные значения столбцов. Допустимые значения: true и false. Значение по умолчанию: false. Подробности см. в Stream Load.
sink.properties.compression
Обязательный: Нет
Значение по умолчанию: NONE
Описание: Алгоритм сжатия, используемый для Stream Load. Допустимые значения: lz4_frame. Сжатие для формата JSON требует Flink connector 1.2.10+ и Selena v1.5.2+. Сжатие для формата CSV требует только Flink connector 1.2.11+.
sink.properties.prepared_timeout
Обязательный: Нет
Значение по умолчанию: NONE
Описание: Поддерживается с версии 1.2.12 и действует только тогда, когда sink.version установлен в V2. Требуется Selena 3.5.4 или позже. Устанавливает тайм-аут в секундах для фазы Transaction Stream Load от PREPARED до COMMITTED. Обычно требуется только для exactly-once; at-least-once обычно не требует установки этого (connector по умолчанию использует 300 с). Если не установлено в exactly-once, применяется конфигурация Selena FE prepared_transaction_default_timeout_second (по умолчанию 86400 с). См. управление тайм-аутом транзакций Selena.
Сопоставление типов данных между Flink и Selena
| Тип данных Flink | Тип данных Selena |
|---|---|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INTEGER | INTEGER |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL | DECIMAL |
| BINARY | INT |
| CHAR | STRING |
| VARCHAR | STRING |
| STRING | STRING |
| DATE | DATE |
| TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
| ARRAY<T> | ARRAY<T> |
| MAP<KT,VT> | JSON STRING |
| ROW<arg T...> | JSON STRING |
Примечания по использованию
Exactly Once
-
Если вы хотите, чтобы sink гарантировал семантику exactly-once, мы рекомендуем вам обновить Selena до версии 2.5 или позже, а Flink connector до версии 1.2.4 или позже
-
Начиная с Flink connector 1.2.4, exactly-once был переработан на основе интерфейса транзакций Stream Load, предоставленного Selena с версии 2.4. По сравнению с предыдущей реализацией на основе нетранзакционного интерфейса Stream Load, новая реализация снижает использование памяти и накладные расходы на checkpoint, тем самым повышая производительность в реальном времени и стабильность загрузки.
-
Если версия Selena ранее 2.4 или версия Flink connector ранее 1.2.4, sink автоматически выберет реализацию на основе нетранзакционного интерфейса Stream Load.
-
-
Конфигурации для гарантии exactly-once
-
Значение
sink.semanticдолжно бытьexactly-once. -
Если версия Flink connector 1.2.8 и позже, рекомендуется указать значение
sink.label-prefix. Обратите внимание, что префикс метки должен быть уникальным среди всех типов загрузки в Selena, таких как задачи Flink, Routine Load и Broker Load.-
Если префикс метки указан, Flink connector будет использовать префикс метки для очистки оставшихся транзакций, которые могут быть сгенерированы в некоторых сценариях сбоя Flink, таких как сбой задачи Flink, когда checkpoint все еще выполняется. Эти оставшиеся транзакции обычно находятся в статусе
PREPARED, если вы используетеSHOW PROC '/transactions/<db_id>/running';для их просмотра в Selena. Когда задача Flink восстанавливается из checkpoint, Flink connector найдет эти оставшиеся транзакции в соответствии с префиксом метки и некоторой информацией в checkpoint и прервет их. Flink connector не может прервать их при выходе задачи Flink из-за механизма двухфазной фиксации для реализации exactly-once. Когда задача Flink завершается, Flink connector еще не получил уведомление от координатора checkpoint Flink о том, должны ли транзакции быть включены в успешный checkpoint, и это может привести к потере данных, если эти транзакции будут прерваны в любом случае. Вы можете получить обзор о том, как достичь сквозного exactly-once в Flink в этом blogpost. -
Если префикс метки не указан, оставшиеся транзакции будут очищены Selena только после их истечения. Однако количество выполняющихся транзакций может достичь ограничения Selena
max_running_txn_num_per_db, если задачи Flink часто терпят неудачу до истечения времени ожидания транзакций. Вы можете установить меньший тайм-аут для транзакцийPREPAREDчтобы они истекали быстрее, когда префикс метки не указан. См. ниже о том, как установить подготовленный тайм-аут.
-
-
-
Если вы уверены, что задача Flink в конечном итоге восстановится из checkpoint или savepoint после длительного простоя из-за остановки или непрерывного отказа, пожалуйста, соответствующим образом настройте следующие конфигурации Selena, чтобы избежать потери данных.
-
Настройте тайм-аут транзакций
PREPARED. См. ниже о том, как установить тайм-аут.Тайм-аут должен быть больше времени простоя задачи Flink. В противном случае оставшиеся транзакции, которые включены в успешный checkpoint, могут быть прерваны из-за тайм-аута до перезапуска задачи Flink, что приводит к потере данных.
Обратите внимание, что когда вы устанавливаете большее значение для этой конфигурации, лучше указать значение
sink.label-prefix, чтобы оставшиеся транзакции могли быть очищены в соответствии с префиксом метки и некоторой информацией в checkpoint, вместо того чтобы из-за тайм-аута (что может привести к потере данных). -
label_keep_max_secondиlabel_keep_max_num: конфигурации Selena FE, значения по умолчанию -259200и1000соответственно. Подробности см. в конфигурациях FE. Значениеlabel_keep_max_secondдолжно быть больше времени простоя задачи Flink. В противном случае Flink connector не сможет проверить состояние транзакций в Selena, используя метки транзакций, сохраненные в savepoint или checkpoint Flink, и выяснить, зафиксированы ли эти транзакции или нет, что в конечном итоге может привести к потере данных.
-
-
Как установить тайм-аут для транзакций PREPARED
-
Для Connector 1.2.12+ и Selena 3.5.4+ вы можете установить тайм-аут, настроив параметр connector
sink.properties.prepared_timeout. По умолчанию значение не установлено, и оно возвращается к глобальной конфигурации Selena FEprepared_transaction_default_timeout_second(значение по умолчанию -86400). -
Для других версий Connector или Selena вы можете установить тайм-аут, настроив глобальную конфигурацию Selena FE
prepared_transaction_default_timeout_second(значение по умолчанию -86400).
-
Политика сброса
Flink connector будет буферизовать данные в памяти и сбрасывать их пакетами в Selena через Stream Load. Способ запуска сброса отличается между at-least-once и exactly-once.
Для at-least-once сброс будет запущен, когда выполняется любое из следующих условий:
- количество байтов буферизованных строк достигает предела
sink.buffer-flush.max-bytes - количество буферизованных строк достигает предела
sink.buffer-flush.max-rows. (Действительно только для версии sink V1) - время, прошедшее с момента последнего сброса, достигает предела
sink.buffer-flush.interval-ms - запущен checkpoint
Для exactly-once сброс происходит только при запуске checkpoint.
Мониторинг метрик загрузки
Flink connector предоставляет следующие метрики для мониторинга загрузки.
| Метрика | Тип | Описание |
|---|---|---|
| totalFlushBytes | counter | успешно сброшенные байты. |
| totalFlushRows | counter | количество успешно сброшенных строк. |
| totalFlushSucceededTimes | counter | количество раз, когда данные были успешно сброшены. |
| totalFlushFailedTimes | counter | количество раз, когда сброс данных не удался. |
| totalFilteredRows | counter | количество отфильтрованных строк, которое также включено в totalFlushRows. |
Примеры
Следующие примеры показывают, как использовать Flink connector для загрузки данных в таблицу Selena с помощью Flink SQL или Flink DataStream.
Подготовка
Создание таблицы Selena
Создайте базу данных test и создайте таблицу Primary Key score_board.
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
Настройка среды Flink
-
Скачайте бинарный файл Flink Flink 1.15.2 и распакуйте его в каталог
flink-1.15.2. -
Скачайте Flink connector 1.2.7 и поместите его в каталог
flink-1.15.2/lib. -
Выполните следующие команды для запуска cluster Flink:
cd flink-1.15.2
./bin/start-cluster.sh
Конфигурация сети
Убедитесь, что машина, на которой находится Flink, может получить доступ к узлам FE cluster Selena через http_port (по умолчанию: 8030) и query_port (по умолчанию: 9030), а к узлам BE через be_http_port (по умолчанию: 8040).
Запуск с Flink SQL
-
Выполните следующую команду для запуска клиента Flink SQL.
./bin/sql-client.sh -
Создайте таблицу Flink
score_boardи вставьте значения в таблицу через Flink SQL Client. Обратите внимание, что вы должны определить первичный ключ в Flink DDL, если хотите загрузить данные в таблицу Primary Key Selena. Это необязательно для других типов таблиц Selena.CREATE TABLE `score_board` (
`id` INT,
`name` STRING,
`score` INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'selena',
'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
'load-url' = '127.0.0.1:8030',
'database-name' = 'test',
'table-name' = 'score_board',
'username' = 'root',
'password' = ''
);
INSERT INTO `score_board` VALUES (1, 'selena', 100), (2, 'flink', 100);
Запуск с Flink DataStream
Существует несколько способов реализации задачи Flink DataStream в зависимости от типа входных записей, таких как CSV Java String, JSON Java String или пользовательский объект Java.
-
Входные записи представляют собой
Stringв формате CSV. Полный пример см. в LoadCsvRecords./**
* Генерация записей в формате CSV. Каждая запись имеет три значения, разделенных "\t".
* Эти значения будут загружены в столбцы `id`, `name` и `score` в таблице Selena.
*/
String[] records = new String[]{
"1\tselena-csv\t100",
"2\tflink-csv\t100"
};
DataStream<String> source = env.fromElements(records);
/**
* Настройте connector с необходимыми свойствами.
* Вам также необходимо добавить свойства "sink.properties.format" и "sink.properties.column_separator"
* чтобы сообщить connector, что входные записи в формате CSV, а разделитель столбцов - "\t".
* Вы также можете использовать другие разделители столбцов в записях формата CSV,
* но не забудьте соответственно изменить "sink.properties.column_separator".
*/
SelenaSinkOptions options = SelenaSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
.withProperty("load-url", loadUrl)
.withProperty("database-name", "test")
.withProperty("table-name", "score_board")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("sink.properties.format", "csv")
.withProperty("sink.properties.column_separator", "\t")
.build();
// Создайте sink с параметрами.
SinkFunction<String> selenaink = SelenaSink.sink(options);
source.addSink(selenaink); -
Входные записи представляют собой
Stringв формате JSON. Полный пример см. в LoadJsonRecords./**
* Генерация записей в формате JSON.
* Каждая запись имеет три пары ключ-значение, соответствующие столбцам `id`, `name` и `score` в таблице Selena.
*/
String[] records = new String[]{
"{\"id\":1, \"name\":\"selena-json\", \"score\":100}",
"{\"id\":2, \"name\":\"flink-json\", \"score\":100}",
};
DataStream<String> source = env.fromElements(records);
/**
* Настройте connector с необходимыми свойствами.
* Вам также необходимо добавить свойства "sink.properties.format" и "sink.properties.strip_outer_array"
* чтобы сообщить connector, что входные записи в формате JSON и нужно удалить внешнюю структуру массива.
*/
SelenaSinkOptions options = SelenaSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
.withProperty("load-url", loadUrl)
.withProperty("database-name", "test")
.withProperty("table-name", "score_board")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.build();
// Создайте sink с параметрами.
SinkFunction<String> selenaink = SelenaSink.sink(options);
source.addSink(selenaink); -
Входные записи представляют собой пользовательские объекты Java. Полный пример см. в LoadCustomJavaRecords.
-
В этом примере входная запись представляет собой простой POJO
RowData.public static class RowData {
public int id;
public String name;
public int score;
public RowData() {}
public RowData(int id, String name, int score) {
this.id = id;
this.name = name;
this.score = score;
}
} -
Основная программа выглядит следующим образом:
// Генерация записей, которые используют RowData в качестве контейнера.
RowData[] records = new RowData[]{
new RowData(1, "selena-rowdata", 100),
new RowData(2, "flink-rowdata", 100),
};
DataStream<RowData> source = env.fromElements(records);
// Настройте connector с необходимыми свойствами.
SelenaSinkOptions options = SelenaSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
.withProperty("load-url", loadUrl)
.withProperty("database-name", "test")
.withProperty("table-name", "score_board")
.withProperty("username", "root")
.withProperty("password", "")
.build();
/**
* Flink connector будет использовать массив объектов Java (Object[]) для представления строки, которая будет загружена в таблицу Selena,
* и каждый элемент является значением для столбца.
* Вам необходимо определить схему Object[], которая соответствует схеме таблицы Selena.
*/
TableSchema schema = TableSchema.builder()
.field("id", DataTypes.INT().notNull())
.field("name", DataTypes.STRING())
.field("score", DataTypes.INT())
// Когда таблица Selena является таблицей Primary Key, вы должны указать notNull(), например, DataTypes.INT().notNull(), для первичного ключа `id`.
.primaryKey("id")
.build();
// Преобразуйте RowData в Object[] в соответствии со схемой.
RowDataTransformer transformer = new RowDataTransformer();
// Создайте sink со схемой, параметрами и трансформером.
SinkFunction<RowData> selenaink = SelenaSink.sink(schema, options, transformer);
source.addSink(selenaink); -
RowDataTransformerв основной программе определен следующим образом:private static class RowDataTransformer implements SelenaSinkRowBuilder<RowData> {
/**
* Установите каждый элемент массива объектов в соответствии с входными RowData.
* Схема массива соответствует схеме таблицы Selena.
*/
@Override
public void accept(Object[] internalRow, RowData rowData) {
internalRow[0] = rowData.id;
internalRow[1] = rowData.name;
internalRow[2] = rowData.score;
// Когда таблица Selena является таблицей Primary Key, вам нужно установить последний элемент, чтобы указать, является ли загрузка данных операцией UPSERT или DELETE.
internalRow[internalRow.length - 1] = SelenaSinkOP.UPSERT.ordinal();
}
}
-
Синхронизация данных с Flink CDC 3.0 (с поддержкой изменения схемы)
Фреймворк Flink CDC 3.0 можно использовать для легкого создания потокового конвейера ELT из источников CDC (таких как MySQL и Kafka) в Selena. Конвейер может синхронизировать всю базу данных, объединенные таблицы шардинга и изменения схемы из источников в Selena.
Начиная с v1.2.9, Flink connector для Selena интегрирован в этот фреймворк как Selena Pipeline Connector. Selena Pipeline Connector поддерживает:
- Автоматическое создание баз данных и таблиц
- Синхронизацию изменений схемы
- Полную и инкрементальную синхронизацию данных
Для быстрого старта см. Streaming ELT from MySQL to Selena using Flink CDC 3.0 with Selena Pipeline Connector.
Рекомендуется использовать Selena v1.5.2 и более поздние версии для включения fast_schema_evolution. Это улучшит скорость добавления или удаления столбцов и снизит использование ресурсов.
Лучшие практики
Загрузка данных в таблицу Primary Key
В этом разделе показано, как загружать данные в таблицу Primary Key Selena для достижения частичных обновлений и условных обновлений. Вы можете увидеть Изменение данных через загрузку для введения этих функций. Эти примеры используют Flink SQL.
Подготовка
Создайте базу данных test и создайте таблицу Primary Key score_board в Selena.
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);