Перейти к основному содержимому
Версия: 2.0.x

Apache Flink

Непрерывная загрузка данных из Apache Flink®

Selena предоставляет самостоятельно разработанный connector под названием Selena Connector for Apache Flink® (сокращённо Flink connector) для загрузки данных в таблицу Selena с использованием Flink. Основной принцип заключается в накоплении данных с последующей загрузкой всех данных за один раз в Selena через STREAM LOAD.

Flink connector поддерживает DataStream API, Table API & SQL и Python API. Он имеет более высокую и стабильную производительность по сравнению с flink-connector-jdbc, предоставляемым Apache Flink®.

ВНИМАНИЕ

Загрузка данных в таблицы Selena с помощью Flink connector требует привилегий SELECT и INSERT на целевую таблицу Selena. Если у вас нет этих привилегий, следуйте инструкциям, представленным в GRANT, чтобы предоставить эти привилегии пользователю, которого вы используете для подключения к вашему cluster Selena.

Требования к версиям

ConnectorFlinkSelenaJavaScala
1.2.111.15,1.16,1.17,1.18,1.19,1.202.1 и позже82.11,2.12
1.2.101.15,1.16,1.17,1.18,1.192.1 и позже82.11,2.12
1.2.91.15,1.16,1.17,1.182.1 и позже82.11,2.12
1.2.81.13,1.14,1.15,1.16,1.172.1 и позже82.11,2.12
1.2.71.11,1.12,1.13,1.14,1.152.1 и позже82.11,2.12

Вы можете получить JAR-файл Flink connector следующими способами:

  • Напрямую скачать скомпилированный JAR-файл Flink connector.
  • Добавить Flink connector в качестве зависимости в ваш проект Maven, а затем скачать JAR-файл.
  • Скомпилировать исходный код Flink connector в JAR-файл самостоятельно.

Формат наименования JAR-файла Flink connector следующий:

  • Начиная с Flink 1.15, это flink-connector-selena-${connector_version}_flink-${flink_version}.jar. Например, если вы установили Flink 1.15 и хотите использовать Flink connector 1.2.7, вы можете использовать flink-connector-selena-1.2.7_flink-1.15.jar.

  • До Flink 1.15 это flink-connector-selena-${connector_version}_flink-${flink_version}_${scala_version}.jar. Например, если вы установили Flink 1.14 и Scala 2.12 в вашей среде, и хотите использовать Flink connector 1.2.7, вы можете использовать flink-connector-selena-1.2.7_flink-1.14_2.12.jar.

ВНИМАНИЕ

В общем случае последняя версия Flink connector поддерживает совместимость только с тремя последними версиями Flink.

Скачивание скомпилированного Jar-файла

Напрямую скачайте соответствующую версию JAR-файла Flink connector из Maven Central Repository.

Maven Dependency

В файле pom.xml вашего проекта Maven добавьте Flink connector в качестве зависимости в соответствии со следующим форматом. Замените flink_version, scala_version и connector_version на соответствующие версии.

  • В Flink 1.15 и позже

    <dependency>
    <groupId>com.selena</groupId>
    <artifactId>flink-connector-selena</artifactId>
    <version>${connector_version}_flink-${flink_version}</version>
    </dependency>
  • В версиях до Flink 1.15

    <dependency>
    <groupId>com.selena</groupId>
    <artifactId>flink-connector-selena</artifactId>
    <version>${connector_version}_flink-${flink_version}_${scala_version}</version>
    </dependency>

Компиляция самостоятельно

  1. Скачайте исходный код Flink connector.

  2. Выполните следующую команду для компиляции исходного кода Flink connector в JAR-файл. Обратите внимание, что flink_version заменяется на соответствующую версию Flink.

    sh build.sh <flink_version>

    Например, если версия Flink в вашей среде 1.15, вам нужно выполнить следующую команду:

    sh build.sh 1.15
  3. Перейдите в каталог target/, чтобы найти JAR-файл Flink connector, такой как flink-connector-selena-1.2.7_flink-1.15-SNAPSHOT.jar, сгенерированный после компиляции.

ПРИМЕЧАНИЕ

Имя Flink connector, который не выпущен официально, содержит суффикс SNAPSHOT.

Параметры

connector

Обязательный: Да
Значение по умолчанию: NONE
Описание: Connector, который вы хотите использовать. Значение должно быть "selena".

jdbc-url

Обязательный: Да
Значение по умолчанию: NONE
Описание: Адрес, который используется для подключения к серверу MySQL FE. Вы можете указать несколько адресов, которые должны быть разделены запятой (,). Формат: jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>.

load-url

Обязательный: Да
Значение по умолчанию: NONE
Описание: Адрес, который используется для подключения к HTTP-серверу FE. Вы можете указать несколько адресов, которые должны быть разделены точкой с запятой (;). Формат: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>.

database-name

Обязательный: Да
Значение по умолчанию: NONE
Описание: Имя базы данных Selena, в которую вы хотите загрузить данные.

table-name

Обязательный: Да
Значение по умолчанию: NONE
Описание: Имя таблицы, которую вы хотите использовать для загрузки данных в Selena.

username

Обязательный: Да
Значение по умолчанию: NONE
Описание: Имя пользователя учетной записи, которую вы хотите использовать для загрузки данных в Selena. Учетная запись должна иметь привилегии SELECT и INSERT на целевую таблицу Selena.

password

Обязательный: Да
Значение по умолчанию: NONE
Описание: Пароль предыдущей учетной записи.

sink.version

Обязательный: Нет
Значение по умолчанию: AUTO
Описание: Интерфейс, используемый для загрузки данных. Этот параметр поддерживается начиная с версии Flink connector 1.2.4.

  • V1: Использовать интерфейс Stream Load для загрузки данных. Connectors до 1.2.4 поддерживают только этот режим.
  • V2: Использовать интерфейс Stream Load transaction для загрузки данных. Требуется Selena версии не ниже 2.4. Рекомендуется V2, так как он оптимизирует использование памяти и обеспечивает более стабильную реализацию exactly-once.
  • AUTO: Если версия Selena поддерживает транзакционный Stream Load, автоматически выберет V2, иначе выберет V1

sink.label-prefix

Обязательный: Нет
Значение по умолчанию: NONE
Описание: Префикс метки, используемый Stream Load. Рекомендуется настроить его, если вы используете exactly-once с connector 1.2.8 и позже. См. примечания по использованию exactly-once.

sink.semantic

Обязательный: Нет
Значение по умолчанию: at-least-once
Описание: Семантика, гарантируемая sink. Допустимые значения: at-least-once и exactly-once.

sink.buffer-flush.max-bytes

Обязательный: Нет
Значение по умолчанию: 94371840(90M)
Описание: Максимальный размер данных, которые могут быть накоплены в памяти перед отправкой в Selena за один раз. Максимальное значение варьируется от 64 МБ до 10 ГБ. Установка этого параметра на большее значение может улучшить производительность загрузки, но может увеличить задержку загрузки. Этот параметр вступает в силу только тогда, когда sink.semantic установлен на at-least-once. Если sink.semantic установлен на exactly-once, данные в памяти сбрасываются при запуске checkpoint Flink. В этом случае этот параметр не действует.

sink.buffer-flush.max-rows

Обязательный: Нет
Значение по умолчанию: 500000
Описание: Максимальное количество строк, которые могут быть накоплены в памяти перед отправкой в Selena за один раз. Этот параметр доступен только тогда, когда sink.version равен V1, а sink.semantic равен at-least-once. Допустимые значения: от 64000 до 5000000.

sink.buffer-flush.interval-ms

Обязательный: Нет
Значение по умолчанию: 300000
Описание: Интервал сброса данных. Этот параметр доступен только тогда, когда sink.semantic равен at-least-once. Допустимые значения: от 1000 до 3600000. Единица измерения: мс.

sink.max-retries

Обязательный: Нет
Значение по умолчанию: 3
Описание: Количество раз, когда система повторяет попытку выполнить задачу Stream Load. Этот параметр доступен только тогда, когда вы устанавливаете sink.version в V1. Допустимые значения: от 0 до 10.

sink.connect.timeout-ms

Обязательный: Нет
Значение по умолчанию: 30000
Описание: Тайм-аут для установки HTTP-соединения. Допустимые значения: от 100 до 60000. Единица измерения: мс. До Flink connector v1.2.9 значение по умолчанию было 1000.

sink.socket.timeout-ms

Обязательный: Нет
Значение по умолчанию: -1
Описание: Поддерживается с версии 1.2.10. Продолжительность времени, в течение которого HTTP-клиент ожидает данных. Единица измерения: мс. Значение по умолчанию -1 означает, что тайм-аута нет.

sink.sanitize-error-log

Обязательный: Нет
Значение по умолчанию: false
Описание: Поддерживается с версии 1.2.12. Следует ли очищать конфиденциальные данные в журнале ошибок для производственной безопасности. Когда этот элемент установлен в true, конфиденциальные данные строк и значения столбцов в журналах ошибок Stream Load редактируются как в журналах connector, так и в журналах SDK. Значение по умолчанию - false для обратной совместимости.

sink.wait-for-continue.timeout-ms

Обязательный: Нет
Значение по умолчанию: 10000
Описание: Поддерживается с версии 1.2.7. Тайм-аут ожидания ответа HTTP 100-continue от FE. Допустимые значения: от 3000 до 60000. Единица измерения: мс

sink.ignore.update-before

Обязательный: Нет
Значение по умолчанию: true
Описание: Поддерживается с версии 1.2.8. Следует ли игнорировать записи UPDATE_BEFORE из Flink при загрузке данных в таблицы Primary Key. Если этот параметр установлен в false, запись рассматривается как операция удаления в таблице Selena.

sink.parallelism

Обязательный: Нет
Значение по умолчанию: NONE
Описание: Параллелизм загрузки. Доступно только для Flink SQL. Если этот параметр не указан, планировщик Flink определяет параллелизм. В сценарии с мультипараллелизмом пользователи должны гарантировать, что данные записываются в правильном порядке.

sink.properties.*

Обязательный: Нет
Значение по умолчанию: NONE
Описание: Параметры, которые используются для управления поведением Stream Load. Например, параметр sink.properties.format указывает формат, используемый для Stream Load, такой как CSV или JSON. Список поддерживаемых параметров и их описания см. в STREAM LOAD.

sink.properties.format

Обязательный: Нет
Значение по умолчанию: csv
Описание: Формат, используемый для Stream Load. Flink connector преобразует каждый пакет данных в формат перед их отправкой в Selena. Допустимые значения: csv и json.

sink.properties.column_separator

Обязательный: Нет
Значение по умолчанию: \t
Описание: Разделитель столбцов для данных в формате CSV.

sink.properties.row_delimiter

Обязательный: Нет
Значение по умолчанию: \n
Описание: Разделитель строк для данных в формате CSV.

sink.properties.max_filter_ratio

Обязательный: Нет
Значение по умолчанию: 0
Описание: Максимальная толерантность к ошибкам Stream Load. Это максимальный процент записей данных, которые могут быть отфильтрованы из-за недостаточного качества данных. Допустимые значения: от 0 до 1. Значение по умолчанию: 0. Подробности см. в Stream Load.

sink.properties.partial_update

Обязательный: НЕТ
Значение по умолчанию: FALSE
Описание: Следует ли использовать частичные обновления. Допустимые значения: TRUE и FALSE. Значение по умолчанию: FALSE, указывающее на отключение этой функции.

sink.properties.partial_update_mode

Обязательный: НЕТ
Значение по умолчанию: row
Описание: Указывает режим частичных обновлений. Допустимые значения: row и column.

  • Значение row (по умолчанию) означает частичные обновления в режиме строк, что более подходит для обновлений в реальном времени с большим количеством столбцов и небольшими пакетами.
  • Значение column означает частичные обновления в режиме столбцов, что более подходит для пакетных обновлений с небольшим количеством столбцов и большим количеством строк. В таких сценариях включение режима столбцов обеспечивает более быструю скорость обновления. Например, в таблице с 100 столбцами, если обновляются только 10 столбцов (10% от общего числа) для всех строк, скорость обновления в режиме столбцов в 10 раз быстрее.

sink.properties.strict_mode

Обязательный: Нет
Значение по умолчанию: false
Описание: Указывает, следует ли включать строгий режим для Stream Load. Это влияет на поведение загрузки при наличии неквалифицированных строк, таких как несогласованные значения столбцов. Допустимые значения: true и false. Значение по умолчанию: false. Подробности см. в Stream Load.

sink.properties.compression

Обязательный: Нет
Значение по умолчанию: NONE
Описание: Алгоритм сжатия, используемый для Stream Load. Допустимые значения: lz4_frame. Сжатие для формата JSON требует Flink connector 1.2.10+ и Selena v1.5.2+. Сжатие для формата CSV требует только Flink connector 1.2.11+.

sink.properties.prepared_timeout

Обязательный: Нет
Значение по умолчанию: NONE
Описание: Поддерживается с версии 1.2.12 и действует только тогда, когда sink.version установлен в V2. Требуется Selena 3.5.4 или позже. Устанавливает тайм-аут в секундах для фазы Transaction Stream Load от PREPARED до COMMITTED. Обычно требуется только для exactly-once; at-least-once обычно не требует установки этого (connector по умолчанию использует 300 с). Если не установлено в exactly-once, применяется конфигурация Selena FE prepared_transaction_default_timeout_second (по умолчанию 86400 с). См. управление тайм-аутом транзакций Selena.

Тип данных FlinkТип данных Selena
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTEGERINTEGER
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
BINARYINT
CHARSTRING
VARCHARSTRING
STRINGSTRING
DATEDATE
TIMESTAMP_WITHOUT_TIME_ZONE(N)DATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)DATETIME
ARRAY<T>ARRAY<T>
MAP<KT,VT>JSON STRING
ROW<arg T...>JSON STRING

Примечания по использованию

Exactly Once

  • Если вы хотите, чтобы sink гарантировал семантику exactly-once, мы рекомендуем вам обновить Selena до версии 2.5 или позже, а Flink connector до версии 1.2.4 или позже

    • Начиная с Flink connector 1.2.4, exactly-once был переработан на основе интерфейса транзакций Stream Load, предоставленного Selena с версии 2.4. По сравнению с предыдущей реализацией на основе нетранзакционного интерфейса Stream Load, новая реализация снижает использование памяти и накладные расходы на checkpoint, тем самым повышая производительность в реальном времени и стабильность загрузки.

    • Если версия Selena ранее 2.4 или версия Flink connector ранее 1.2.4, sink автоматически выберет реализацию на основе нетранзакционного интерфейса Stream Load.

  • Конфигурации для гарантии exactly-once

    • Значение sink.semantic должно быть exactly-once.

    • Если версия Flink connector 1.2.8 и позже, рекомендуется указать значение sink.label-prefix. Обратите внимание, что префикс метки должен быть уникальным среди всех типов загрузки в Selena, таких как задачи Flink, Routine Load и Broker Load.

      • Если префикс метки указан, Flink connector будет использовать префикс метки для очистки оставшихся транзакций, которые могут быть сгенерированы в некоторых сценариях сбоя Flink, таких как сбой задачи Flink, когда checkpoint все еще выполняется. Эти оставшиеся транзакции обычно находятся в статусе PREPARED, если вы используете SHOW PROC '/transactions/<db_id>/running'; для их просмотра в Selena. Когда задача Flink восстанавливается из checkpoint, Flink connector найдет эти оставшиеся транзакции в соответствии с префиксом метки и некоторой информацией в checkpoint и прервет их. Flink connector не может прервать их при выходе задачи Flink из-за механизма двухфазной фиксации для реализации exactly-once. Когда задача Flink завершается, Flink connector еще не получил уведомление от координатора checkpoint Flink о том, должны ли транзакции быть включены в успешный checkpoint, и это может привести к потере данных, если эти транзакции будут прерваны в любом случае. Вы можете получить обзор о том, как достичь сквозного exactly-once в Flink в этом blogpost.

      • Если префикс метки не указан, оставшиеся транзакции будут очищены Selena только после их истечения. Однако количество выполняющихся транзакций может достичь ограничения Selena max_running_txn_num_per_db, если задачи Flink часто терпят неудачу до истечения времени ожидания транзакций. Вы можете установить меньший тайм-аут для транзакций PREPARED чтобы они истекали быстрее, когда префикс метки не указан. См. ниже о том, как установить подготовленный тайм-аут.

  • Если вы уверены, что задача Flink в конечном итоге восстановится из checkpoint или savepoint после длительного простоя из-за остановки или непрерывного отказа, пожалуйста, соответствующим образом настройте следующие конфигурации Selena, чтобы избежать потери данных.

    • Настройте тайм-аут транзакций PREPARED. См. ниже о том, как установить тайм-аут.

      Тайм-аут должен быть больше времени простоя задачи Flink. В противном случае оставшиеся транзакции, которые включены в успешный checkpoint, могут быть прерваны из-за тайм-аута до перезапуска задачи Flink, что приводит к потере данных.

      Обратите внимание, что когда вы устанавливаете большее значение для этой конфигурации, лучше указать значение sink.label-prefix, чтобы оставшиеся транзакции могли быть очищены в соответствии с префиксом метки и некоторой информацией в checkpoint, вместо того чтобы из-за тайм-аута (что может привести к потере данных).

    • label_keep_max_second и label_keep_max_num: конфигурации Selena FE, значения по умолчанию - 259200 и 1000 соответственно. Подробности см. в конфигурациях FE. Значение label_keep_max_second должно быть больше времени простоя задачи Flink. В противном случае Flink connector не сможет проверить состояние транзакций в Selena, используя метки транзакций, сохраненные в savepoint или checkpoint Flink, и выяснить, зафиксированы ли эти транзакции или нет, что в конечном итоге может привести к потере данных.

  • Как установить тайм-аут для транзакций PREPARED

    • Для Connector 1.2.12+ и Selena 3.5.4+ вы можете установить тайм-аут, настроив параметр connector sink.properties.prepared_timeout. По умолчанию значение не установлено, и оно возвращается к глобальной конфигурации Selena FE prepared_transaction_default_timeout_second (значение по умолчанию - 86400).

    • Для других версий Connector или Selena вы можете установить тайм-аут, настроив глобальную конфигурацию Selena FE prepared_transaction_default_timeout_second (значение по умолчанию - 86400).

Политика сброса

Flink connector будет буферизовать данные в памяти и сбрасывать их пакетами в Selena через Stream Load. Способ запуска сброса отличается между at-least-once и exactly-once.

Для at-least-once сброс будет запущен, когда выполняется любое из следующих условий:

  • количество байтов буферизованных строк достигает предела sink.buffer-flush.max-bytes
  • количество буферизованных строк достигает предела sink.buffer-flush.max-rows. (Действительно только для версии sink V1)
  • время, прошедшее с момента последнего сброса, достигает предела sink.buffer-flush.interval-ms
  • запущен checkpoint

Для exactly-once сброс происходит только при запуске checkpoint.

Мониторинг метрик загрузки

Flink connector предоставляет следующие метрики для мониторинга загрузки.

МетрикаТипОписание
totalFlushBytescounterуспешно сброшенные байты.
totalFlushRowscounterколичество успешно сброшенных строк.
totalFlushSucceededTimescounterколичество раз, когда данные были успешно сброшены.
totalFlushFailedTimescounterколичество раз, когда сброс данных не удался.
totalFilteredRowscounterколичество отфильтрованных строк, которое также включено в totalFlushRows.

Примеры

Следующие примеры показывают, как использовать Flink connector для загрузки данных в таблицу Selena с помощью Flink SQL или Flink DataStream.

Подготовка

Создание таблицы Selena

Создайте базу данных test и создайте таблицу Primary Key score_board.

CREATE DATABASE `test`;

CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
  • Скачайте бинарный файл Flink Flink 1.15.2 и распакуйте его в каталог flink-1.15.2.

  • Скачайте Flink connector 1.2.7 и поместите его в каталог flink-1.15.2/lib.

  • Выполните следующие команды для запуска cluster Flink:

    cd flink-1.15.2
    ./bin/start-cluster.sh

Конфигурация сети

Убедитесь, что машина, на которой находится Flink, может получить доступ к узлам FE cluster Selena через http_port (по умолчанию: 8030) и query_port (по умолчанию: 9030), а к узлам BE через be_http_port (по умолчанию: 8040).

  • Выполните следующую команду для запуска клиента Flink SQL.

    ./bin/sql-client.sh
  • Создайте таблицу Flink score_board и вставьте значения в таблицу через Flink SQL Client. Обратите внимание, что вы должны определить первичный ключ в Flink DDL, если хотите загрузить данные в таблицу Primary Key Selena. Это необязательно для других типов таблиц Selena.

    CREATE TABLE `score_board` (
    `id` INT,
    `name` STRING,
    `score` INT,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'selena',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',

    'table-name' = 'score_board',
    'username' = 'root',
    'password' = ''
    );

    INSERT INTO `score_board` VALUES (1, 'selena', 100), (2, 'flink', 100);

Существует несколько способов реализации задачи Flink DataStream в зависимости от типа входных записей, таких как CSV Java String, JSON Java String или пользовательский объект Java.

  • Входные записи представляют собой String в формате CSV. Полный пример см. в LoadCsvRecords.

    /**
    * Генерация записей в формате CSV. Каждая запись имеет три значения, разделенных "\t".
    * Эти значения будут загружены в столбцы `id`, `name` и `score` в таблице Selena.
    */
    String[] records = new String[]{
    "1\tselena-csv\t100",
    "2\tflink-csv\t100"
    };
    DataStream<String> source = env.fromElements(records);

    /**
    * Настройте connector с необходимыми свойствами.
    * Вам также необходимо добавить свойства "sink.properties.format" и "sink.properties.column_separator"
    * чтобы сообщить connector, что входные записи в формате CSV, а разделитель столбцов - "\t".
    * Вы также можете использовать другие разделители столбцов в записях формата CSV,
    * но не забудьте соответственно изменить "sink.properties.column_separator".
    */
    SelenaSinkOptions options = SelenaSinkOptions.builder()
    .withProperty("jdbc-url", jdbcUrl)
    .withProperty("load-url", loadUrl)
    .withProperty("database-name", "test")
    .withProperty("table-name", "score_board")
    .withProperty("username", "root")
    .withProperty("password", "")
    .withProperty("sink.properties.format", "csv")
    .withProperty("sink.properties.column_separator", "\t")
    .build();
    // Создайте sink с параметрами.
    SinkFunction<String> selenaink = SelenaSink.sink(options);
    source.addSink(selenaink);
  • Входные записи представляют собой String в формате JSON. Полный пример см. в LoadJsonRecords.

    /**
    * Генерация записей в формате JSON.
    * Каждая запись имеет три пары ключ-значение, соответствующие столбцам `id`, `name` и `score` в таблице Selena.
    */
    String[] records = new String[]{
    "{\"id\":1, \"name\":\"selena-json\", \"score\":100}",
    "{\"id\":2, \"name\":\"flink-json\", \"score\":100}",
    };
    DataStream<String> source = env.fromElements(records);

    /**
    * Настройте connector с необходимыми свойствами.
    * Вам также необходимо добавить свойства "sink.properties.format" и "sink.properties.strip_outer_array"
    * чтобы сообщить connector, что входные записи в формате JSON и нужно удалить внешнюю структуру массива.
    */
    SelenaSinkOptions options = SelenaSinkOptions.builder()
    .withProperty("jdbc-url", jdbcUrl)
    .withProperty("load-url", loadUrl)
    .withProperty("database-name", "test")
    .withProperty("table-name", "score_board")
    .withProperty("username", "root")
    .withProperty("password", "")
    .withProperty("sink.properties.format", "json")
    .withProperty("sink.properties.strip_outer_array", "true")
    .build();
    // Создайте sink с параметрами.
    SinkFunction<String> selenaink = SelenaSink.sink(options);
    source.addSink(selenaink);
  • Входные записи представляют собой пользовательские объекты Java. Полный пример см. в LoadCustomJavaRecords.

    • В этом примере входная запись представляет собой простой POJO RowData.

      public static class RowData {
      public int id;
      public String name;
      public int score;

      public RowData() {}

      public RowData(int id, String name, int score) {
      this.id = id;
      this.name = name;
      this.score = score;
      }
      }
    • Основная программа выглядит следующим образом:

      // Генерация записей, которые используют RowData в качестве контейнера.
      RowData[] records = new RowData[]{
      new RowData(1, "selena-rowdata", 100),
      new RowData(2, "flink-rowdata", 100),
      };
      DataStream<RowData> source = env.fromElements(records);

      // Настройте connector с необходимыми свойствами.
      SelenaSinkOptions options = SelenaSinkOptions.builder()
      .withProperty("jdbc-url", jdbcUrl)
      .withProperty("load-url", loadUrl)
      .withProperty("database-name", "test")
      .withProperty("table-name", "score_board")
      .withProperty("username", "root")
      .withProperty("password", "")
      .build();

      /**
      * Flink connector будет использовать массив объектов Java (Object[]) для представления строки, которая будет загружена в таблицу Selena,
      * и каждый элемент является значением для столбца.
      * Вам необходимо определить схему Object[], которая соответствует схеме таблицы Selena.
      */
      TableSchema schema = TableSchema.builder()
      .field("id", DataTypes.INT().notNull())
      .field("name", DataTypes.STRING())
      .field("score", DataTypes.INT())
      // Когда таблица Selena является таблицей Primary Key, вы должны указать notNull(), например, DataTypes.INT().notNull(), для первичного ключа `id`.
      .primaryKey("id")
      .build();
      // Преобразуйте RowData в Object[] в соответствии со схемой.
      RowDataTransformer transformer = new RowDataTransformer();
      // Создайте sink со схемой, параметрами и трансформером.
      SinkFunction<RowData> selenaink = SelenaSink.sink(schema, options, transformer);
      source.addSink(selenaink);
    • RowDataTransformer в основной программе определен следующим образом:

      private static class RowDataTransformer implements SelenaSinkRowBuilder<RowData> {

      /**
      * Установите каждый элемент массива объектов в соответствии с входными RowData.
      * Схема массива соответствует схеме таблицы Selena.
      */
      @Override
      public void accept(Object[] internalRow, RowData rowData) {
      internalRow[0] = rowData.id;
      internalRow[1] = rowData.name;
      internalRow[2] = rowData.score;
      // Когда таблица Selena является таблицей Primary Key, вам нужно установить последний элемент, чтобы указать, является ли загрузка данных операцией UPSERT или DELETE.
      internalRow[internalRow.length - 1] = SelenaSinkOP.UPSERT.ordinal();
      }
      }

Фреймворк Flink CDC 3.0 можно использовать для легкого создания потокового конвейера ELT из источников CDC (таких как MySQL и Kafka) в Selena. Конвейер может синхронизировать всю базу данных, объединенные таблицы шардинга и изменения схемы из источников в Selena.

Начиная с v1.2.9, Flink connector для Selena интегрирован в этот фреймворк как Selena Pipeline Connector. Selena Pipeline Connector поддерживает:

  • Автоматическое создание баз данных и таблиц
  • Синхронизацию изменений схемы
  • Полную и инкрементальную синхронизацию данных

Для быстрого старта см. Streaming ELT from MySQL to Selena using Flink CDC 3.0 with Selena Pipeline Connector.

Рекомендуется использовать Selena v1.5.2 и более поздние версии для включения fast_schema_evolution. Это улучшит скорость добавления или удаления столбцов и снизит использование ресурсов.

Лучшие практики

Загрузка данных в таблицу Primary Key

В этом разделе показано, как загружать данные в таблицу Primary Key Selena для достижения частичных обновлений и условных обновлений. Вы можете увидеть Изменение данных через загрузку для введения этих функций. Эти примеры используют Flink SQL.

Подготовка

Создайте базу данных test и создайте таблицу Primary Key score_board в Selena.

CREATE DATABASE `test`;

CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);

Частичное обновление

Этот пример покажет, как загружать данные только в столбцы id и name.

  1. Вставьте две строки данных в таблицу Selena score_board в клиенте MySQL.

    mysql> INSERT INTO `score_board` VALUES (1, 'selena', 100), (2, 'flink', 100);

    mysql> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 1 | selena | 100 |
    | 2 | flink | 100 |
    +------+-----------+-------+
    2 rows in set (0.02 sec)
  2. Создайте таблицу Flink score_board в клиенте Flink SQL.

    • Определите DDL, который включает только столбцы id и name.
    • Установите опцию sink.properties.partial_update в true, что сообщает Flink connector о необходимости выполнения частичных обновлений.
    • Если версия Flink connector <= 1.2.7, вам также необходимо установить опцию sink.properties.columns в id,name,__op, чтобы сообщить Flink connector, какие столбцы нужно обновить. Обратите внимание, что вам нужно добавить поле __op в конце. Поле __op указывает, что загрузка данных является операцией UPSERT или DELETE, и его значения устанавливаются connector автоматически.
    CREATE TABLE `score_board` (
    `id` INT,
    `name` STRING,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'selena',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',
    'table-name' = 'score_board',
    'username' = 'root',
    'password' = '',
    'sink.properties.partial_update' = 'true',
    -- только для версии Flink connector <= 1.2.7
    'sink.properties.columns' = 'id,name,__op'
    );
  3. Вставьте две строки данных в таблицу Flink. Первичные ключи строк данных такие же, как у строк в таблице Selena, но значения в столбце name изменены.

    INSERT INTO `score_board` VALUES (1, 'selena-update'), (2, 'flink-update');
  4. Запросите таблицу Selena в клиенте MySQL.

    mysql> select * from score_board;
    +------+------------------+-------+
    | id | name | score |
    +------+------------------+-------+
    | 1 | selena-update | 100 |
    | 2 | flink-update | 100 |
    +------+------------------+-------+
    2 rows in set (0.02 sec)

    Вы можете видеть, что изменились только значения для name, а значения для score не изменились.

Условное обновление

Этот пример покажет, как выполнить условное обновление в соответствии со значением столбца score. Обновление для id вступает в силу только тогда, когда новое значение для score больше или равно старому значению.

  1. Вставьте две строки данных в таблицу Selena в клиенте MySQL.

    mysql> INSERT INTO `score_board` VALUES (1, 'selena', 100), (2, 'flink', 100);

    mysql> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 1 | selena | 100 |
    | 2 | flink | 100 |
    +------+-----------+-------+
    2 rows in set (0.02 sec)
  2. Создайте таблицу Flink score_board следующими способами:

    • Определите DDL, включающий все столбцы.
    • Установите опцию sink.properties.merge_condition в score, чтобы сообщить connector использовать столбец score в качестве условия.
    • Установите опцию sink.version в V1, что сообщает connector использовать Stream Load.
    CREATE TABLE `score_board` (
    `id` INT,
    `name` STRING,
    `score` INT,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'selena',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',
    'table-name' = 'score_board',
    'username' = 'root',
    'password' = '',
    'sink.properties.merge_condition' = 'score',
    'sink.version' = 'V1'
    );
  3. Вставьте две строки данных в таблицу Flink. Первичные ключи строк данных такие же, как у строк в таблице Selena. Первая строка данных имеет меньшее значение в столбце score, а вторая строка данных имеет большее значение в столбце score.

    INSERT INTO `score_board` VALUES (1, 'selena-update', 99), (2, 'flink-update', 101);
  4. Запросите таблицу Selena в клиенте MySQL.

    mysql> select * from score_board;
    +------+--------------+-------+
    | id | name | score |
    +------+--------------+-------+
    | 1 | selena | 100 |
    | 2 | flink-update | 101 |
    +------+--------------+-------+
    2 rows in set (0.03 sec)

    Вы можете видеть, что изменились только значения второй строки данных, а значения первой строки данных не изменились.

Загрузка данных в столбцы типа BITMAP

BITMAP часто используется для ускорения count distinct, например, для подсчета UV, см. Использование Bitmap для точного Count Distinct. Здесь мы возьмем подсчет UV в качестве примера, чтобы показать, как загружать данные в столбцы типа BITMAP.

  1. Создайте таблицу Aggregate Selena в клиенте MySQL.

    В базе данных test создайте таблицу Aggregate page_uv, где столбец visit_users определен как тип BITMAP и настроен с агрегатной функцией BITMAP_UNION.

    CREATE TABLE `test`.`page_uv` (
    `page_id` INT NOT NULL COMMENT 'page ID',
    `visit_date` datetime NOT NULL COMMENT 'access time',
    `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. Создайте таблицу Flink в клиенте Flink SQL.

    Столбец visit_user_id в таблице Flink имеет тип BIGINT, и мы хотим загрузить этот столбец в столбец visit_users типа BITMAP в таблице Selena. Поэтому при определении DDL таблицы Flink обратите внимание, что:

    • Поскольку Flink не поддерживает BITMAP, вам необходимо определить столбец visit_user_id как тип BIGINT для представления столбца visit_users типа BITMAP в таблице Selena.
    • Вам необходимо установить опцию sink.properties.columns в page_id,visit_date,user_id,visit_users=to_bitmap(visit_user_id), что сообщает connector сопоставление столбцов между таблицей Flink и таблицей Selena. Также вам необходимо использовать функцию to_bitmap чтобы сообщить connector преобразовать данные типа BIGINT в тип BITMAP.
    CREATE TABLE `page_uv` (
    `page_id` INT,
    `visit_date` TIMESTAMP,
    `visit_user_id` BIGINT
    ) WITH (
    'connector' = 'selena',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',
    'table-name' = 'page_uv',
    'username' = 'root',
    'password' = '',
    'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=to_bitmap(visit_user_id)'
    );
  3. Загрузите данные в таблицу Flink в клиенте Flink SQL.

    INSERT INTO `page_uv` VALUES
    (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13),
    (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23),
    (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33),
    (1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13),
    (2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23);
  4. Вычислите UV страниц из таблицы Selena в клиенте MySQL.

    MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `page_uv` GROUP BY `page_id`;
    +---------+-----------------------------+
    | page_id | count(DISTINCT visit_users) |
    +---------+-----------------------------+
    | 2 | 1 |
    | 1 | 3 |
    +---------+-----------------------------+
    2 rows in set (0.05 sec)

Загрузка данных в столбцы типа HLL

HLL может использоваться для приблизительного count distinct, см. Использование HLL для приблизительного count distinct.

Здесь мы возьмем подсчет UV в качестве примера, чтобы показать, как загружать данные в столбцы типа HLL.

  1. Создайте таблицу Aggregate Selena

    В базе данных test создайте таблицу Aggregate hll_uv, где столбец visit_users определен как тип HLL и настроен с агрегатной функцией HLL_UNION.

    CREATE TABLE `hll_uv` (
    `page_id` INT NOT NULL COMMENT 'page ID',
    `visit_date` datetime NOT NULL COMMENT 'access time',
    `visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. Создайте таблицу Flink в клиенте Flink SQL.

    Столбец visit_user_id в таблице Flink имеет тип BIGINT, и мы хотим загрузить этот столбец в столбец visit_users типа HLL в таблице Selena. Поэтому при определении DDL таблицы Flink обратите внимание, что:

    • Поскольку Flink не поддерживает BITMAP, вам необходимо определить столбец visit_user_id как тип BIGINT для представления столбца visit_users типа HLL в таблице Selena.
    • Вам необходимо установить опцию sink.properties.columns в page_id,visit_date,user_id,visit_users=hll_hash(visit_user_id), что сообщает connector сопоставление столбцов между таблицей Flink и таблицей Selena. Также вам необходимо использовать функцию hll_hash чтобы сообщить connector преобразовать данные типа BIGINT в тип HLL.
    CREATE TABLE `hll_uv` (
    `page_id` INT,
    `visit_date` TIMESTAMP,
    `visit_user_id` BIGINT
    ) WITH (
    'connector' = 'selena',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',
    'table-name' = 'hll_uv',
    'username' = 'root',
    'password' = '',
    'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=hll_hash(visit_user_id)'
    );
  3. Загрузите данные в таблицу Flink в клиенте Flink SQL.

    INSERT INTO `hll_uv` VALUES
    (3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78),
    (4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2),
    (3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674);
  4. Вычислите UV страниц из таблицы Selena в клиенте MySQL.

    mysql> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;
    **+---------+-----------------------------+
    | page_id | count(DISTINCT visit_users) |
    +---------+-----------------------------+
    | 3 | 2 |
    | 4 | 1 |
    +---------+-----------------------------+
    2 rows in set (0.04 sec)