Синхронизация в реальном времени из MySQL
Selena поддерживает несколько методов синхронизации данных из MySQL в Selena в реальном времени, обеспечивая аналитику массивных данных в реальном времени с низкой задержкой.
Эта тема описывает, как синхронизировать данные из MySQL в Selena в реальном времени (в течение секунд) через Apache Flink®.
Вы можете загружать данные в таблицы Selena только как пользователь, имеющий привилегию INSERT на эти таблицы Selena. Если у вас нет привилегии INSERT, следуйте инструкциям в GRANT, чтобы предоставить привилегию INSERT пользователю, которого вы используете для подключения к вашему кластеру Selena. Синтаксис: GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}.
Как это работает
Flink CDC используется при синхронизации из MySQL в Flink. В этой теме используется Flink CDC версии менее 3.0, поэтому SMT используется для синхронизации схем таблиц. Однако, если используется Flink CDC 3.0, нет необходимости использовать SMT для синхронизации схем таблиц в Selena. Flink CDC 3.0 может даже синхронизировать схемы всей базы данных MySQL, шардированных баз данных и таблиц, а также поддерживает синхронизацию изменений схем. Для подробного использования см. Streaming ELT from MySQL to StarRocks.
Следующая диаграмма иллюстрирует весь процесс синхронизации.

Синхронизация в реальном времени из MySQL через Flink в Selena реализуется в два этапа: синхронизация схемы базы данных и таблиц и синхронизация данных. Сначала SMT преобразует схему базы данных и таблиц MySQL в операторы создания таблиц для Selena. Затем кластер Flink запускает задания Flink для синхронизации полных и инкрементальных данных MySQL в Selena.
Процесс синхронизации гарантирует семантику exactly-once.
Процесс синхронизации:
-
Синхронизация схемы базы данных и таблиц.
SMT читает схему базы данных и таблиц MySQL, которые нужно синхронизировать, и генерирует SQL-файлы для создания целевой базы данных и таблиц в Selena. Эта операция основана на информации MySQL и Selena в конфигурационном файле SMT.
-
Синхронизация данных.
a. SQL-клиент Flink выполняет оператор загрузки данных
INSERT INTO SELECTдля отправки одного или нескольких заданий Flink в кластер Flink.b. Кластер Flink запускает задания Flink для получения данных. Коннектор Flink CDC сначала читает полные исторические данные из исходной базы данных, затем плавно переключается на инкрементальное чтение и отправляет данные в flink-connector-selena.
c. flink-connector-selena накапливает данные в мини-пакетах и синхронизирует каждый пакет данных в Selena.
к сведениюВ Selena могут быть синхронизированы только операции языка манипулирования данными (DML) из MySQL. Операции языка определения данных (DDL) не могут быть синхронизированы.
Сценарии использования
Синхронизация в реальном времени из MySQL имеет широкий спектр случаев использования, где данные постоянно изменяются. Возьмем в качестве примера реальный случай использования "рейтинг продаж товаров в реальном времени".
Flink вычисляет рейтинг продаж товаров в реальном времени на основе исходной таблицы заказов в MySQL и синхронизирует рейтинг в Primary Key таблицу Selena в реальном времени. Пользователи могут подключить инструмент визуализации к Selena для просмотра рейтинга в реальном времени и получения оперативных аналитических данных по требованию.
Подготовка
Загрузка и установка инструментов синхронизации
Для синхронизации данных из MySQL необходимо установить следующие инструменты: SMT, Flink, коннектор Flink CDC и flink-connector-selena.
-
Загрузите и установите Flink, запустите кластер Flink. Вы также можете выполнить этот шаг, следуя инструкциям в официальной документации Flink.
a. Установите Java 8 или Java 11 в вашей операционной системе перед запуском Flink. Вы можете выполнить следующую команду для проверки установленной версии Java.
# Просмотр версии Java.
java -version
# Java 8 установлена, если возвращается следующий вывод.
java version "1.8.0_301"
Java(TM) SE Runtime Environment (build 1.8.0_301-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.301-b09, mixed mode)b. Загрузите установочный пакет Flink и распакуйте его. Мы рекомендуем использовать Flink 1.14 или более поздней версии. Минимально допустимая версия - Flink 1.11. В этой теме используется Flink 1.14.5.
# Загрузка Flink.
wget https://archive.apache.org/dist/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
# Распаковка Flink.
tar -xzf flink-1.14.5-bin-scala_2.11.tgz
# Переход в директорию Flink.
cd flink-1.14.5c. Запуск кластера Flink.
# Запуск кластера Flink.
./bin/start-cluster.sh
# Кластер Flink запущен, если возвращается следующий вывод.
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host. -
Загрузите коннектор Flink CDC. В этой теме используется MySQL в качестве источника данных, поэтому загружается
flink-sql-connector-mysql-cdc-x.x.x.jar. Версия коннектора должна соответствовать версии Flink. В этой теме используется Flink 1.14.5, и вы можете загрузитьflink-sql-connector-mysql-cdc-2.2.0.jar.wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.1/flink-sql-connector-mysql-cdc-2.2.0.jar -
Загрузите flink-connector-selena. Версия должна соответствовать версии Flink.
Пакет flink-connector-selena
x.x.x_flink-y.yy _ z.zz.jarсодержит три номера версий:x.x.x- номер версии flink-connector-selena.y.yy- поддерживаемая версия Flink.z.zz- версия Scala, поддерживаемая Flink. Если версия Flink 1.14.x или более ранняя, необходимо загрузить пакет с версией Scala.
В этой теме используется Flink 1.14.5 и Scala 2.11. Поэтому вы можете загрузить следующий пакет:
1.2.3_flink-14_2.11.jar. -
Переместите JAR-пакеты коннектора Flink CDC (
flink-sql-connector-mysql-cdc-2.2.0.jar) и flink-connector-selena (1.2.3_flink-1.14_2.11.jar) в директориюlibFlink.Примечание
Если кластер Flink уже запущен в вашей системе, необходимо остановить кластер Flink и перезапустить его для загрузки и валидации JAR-пакетов.
$ ./bin/stop-cluster.sh
$ ./bin/start-cluster.sh -
Загрузите и распакуйте пакет SMT и поместите его в директорию
flink-1.14.5. Selena предоставляет пакеты SMT для Linux x86 и macOS ARM64. Вы можете выбрать один в зависимости от вашей операционной системы и процессора.# для Linux x86
wget https://releases.starrocks.io/resources/smt.tar.gz
# для macOS ARM64
wget https://releases.starrocks.io/resources/smt_darwin_arm64.tar.gz