Перейти к основному содержимому

Инструмент миграции Selena (SMT)

Инструмент миграции Selena (SMT) — это инструмент миграции данных, предоставляемый Selena для загрузки данных из исходных баз данных через Flink в Selena. SMT в основном может:

  • Генерировать операторы для создания таблиц в Selena на основе информации об исходной базе данных и целевом кластере Selena.
  • Генерировать SQL-операторы, которые могут быть выполнены в SQL-клиенте Flink для отправки заданий Flink для синхронизации данных, что упрощает полную или инкрементальную синхронизацию данных в конвейере. В настоящее время SMT поддерживает следующие исходные базы данных:
Исходная база данныхГенерация оператора для создания таблицы в SelenaПолная синхронизация данныхИнкрементальная синхронизация данных
MySQLПоддерживаетсяПоддерживаетсяПоддерживается
PostgreSQLПоддерживаетсяПоддерживаетсяПоддерживается
OracleПоддерживаетсяПоддерживаетсяПоддерживается
HiveПоддерживаетсяПоддерживаетсяНе поддерживается
ClickHouseПоддерживаетсяПоддерживаетсяНе поддерживается
SQL ServerПоддерживаетсяПоддерживаетсяПоддерживается
TiDBПоддерживаетсяПоддерживаетсяПоддерживается

Ссылка для скачивания: https://cdn-thirdparty.starrocks.com/smt.tar.gz?r=2

Шаги использования SMT

Общие шаги включают следующее:

  1. Настройте файл conf/config_prod.conf.

  2. Выполните starrocks-migration-tool.

  3. После выполнения SQL-скрипты генерируются в директории result по умолчанию.

    Затем вы можете использовать SQL-скрипты в директории result для синхронизации метаданных или данных.

Конфигурации SMT

  • [db]: информация для подключения к источнику данных. Настройте информацию для подключения к источнику данных, соответствующему типу базы данных, указанному в параметре type.

  • [other]: дополнительные конфигурации. Рекомендуется указать фактическое количество узлов BE в параметре be_num.

  • flink.starrocks.sink.*: конфигурации flink-connector-selena. Для подробных конфигураций и описания см. описание конфигурации.

  • [table-rule.1]: правило для сопоставления таблиц в источнике данных. Оператор CREATE TABLE генерируется на основе регулярных выражений, настроенных в правиле для сопоставления имен баз данных и таблиц в источнике данных. Можно настроить несколько правил, и каждое правило генерирует соответствующий файл результата, например:

    • [table-rule.1] -> result/starrocks-create.1.sql
    • [table-rule.2] -> result/starrocks-create.2.sql

    Каждое правило должно содержать конфигурации базы данных, таблицы и flink-connector-selena.

    [table-rule.1]
    # шаблон для сопоставления баз данных для установки свойств
    database = ^ database1.*$
    # шаблон для сопоставления таблиц для установки свойств
    table = ^.*$
    schema = ^.*$
    ############################################
    ### конфигурации flink sink
    ### НЕ устанавливайте `connector`, `table-name`, `database-name`, они генерируются автоматически
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url= 192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true

    [table-rule.2]
    # шаблон для сопоставления баз данных для установки свойств
    database = ^database2.*$
    # шаблон для сопоставления таблиц для установки свойств
    table = ^.*$
    schema = ^.*$
    ############################################
    ### конфигурации flink sink
    ### НЕ устанавливайте `connector`, `table-name`, `database-name`, они генерируются автоматически
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url= 192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
  • Отдельное правило может быть настроено для большой таблицы, которая разделена на шарды в базах данных. Например, предположим, что две базы данных edu_db_1 и edu_db_2 содержат таблицы course_1 и course_2 соответственно, и эти две таблицы имеют одинаковую структуру. Вы можете использовать следующее правило для загрузки данных из этих двух таблиц в одну таблицу Selena для анализа.

    [table-rule.3]
    # шаблон для сопоставления баз данных для установки свойств
    database = ^edu_db_[0-9]*$
    # шаблон для сопоставления таблиц для установки свойств
    table = ^course_[0-9]*$
    schema = ^.*$

    Это правило автоматически сформирует отношение загрузки "многие к одному". Имя таблицы по умолчанию, которая будет сгенерирована в Selena, — course__auto_shard, и вы также можете изменить имя таблицы в соответствующем SQL-скрипте, таком как result/starrocks-create.3.sql.

Синхронизация MySQL с Selena

Введение

Коннектор Flink CDC и SMT могут синхронизировать данные из MySQL в течение доли секунды.

img

Как показано на изображении, SMT может автоматически генерировать операторы CREATE TABLE для исходных и целевых таблиц Flink на основе информации о кластере и структуре таблиц MySQL и Selena. Коннектор Flink CDC читает MySQL Binlog, а Flink-connector-selena записывает данные в Selena.

Шаги

  1. Скачайте Flink. Поддерживается версия Flink 1.11 или более поздняя.

  2. Скачайте коннектор Flink CDC. Убедитесь, что вы скачали flink-sql-connector-mysql-cdc-xxx.jar, соответствующий версии Flink.

  3. Скачайте Flink-connector-selena.

  4. Скопируйте flink-sql-connector-mysql-cdc-xxx.jar и flink-connector-selena-xxx.jar в flink-xxx/lib/.

  5. Скачайте smt.tar.gz.

  6. Извлеките и измените файл конфигурации SMT.

    [db]
    host = 192.168.1.1
    port = 3306
    user = root
    password =
    type = mysql

    [other]
    # количество backend'ов в Selena
    be_num = 3
    # `decimal_v3` поддерживается начиная с Selena-1.18.1
    use_decimal_v3 = false
    # директория для сохранения преобразованного DDL SQL
    output_dir = ./result

    [table-rule.1]
    # шаблон для сопоставления баз данных для установки свойств
    database = ^db$
    # шаблон для сопоставления таблиц для установки свойств
    table = ^table$
    schema = ^.*$

    ############################################
    ### конфигурации flink sink
    ### НЕ устанавливайте `connector`, `table-name`, `database-name`, они генерируются автоматически
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url= 192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
  7. Выполните starrocks-migrate-tool. Все SQL-скрипты генерируются в директории result.

    $./starrocks-migrate-tool
    $ls result
    flink-create.1.sql smt.tar.gz starrocks-create.all.sql
    flink-create.all.sql starrocks-create.1.sql starrocks-external-create.all.sql
  8. Используйте SQL-скрипт с префиксом starrocks-create для создания таблицы в Selena.

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  9. Используйте SQL-скрипт с префиксом flink-create для создания исходных и целевых таблиц Flink и запуска задания Flink для синхронизации данных.

    bin/sql-client.sh embedded < flink-create.all.sql

    После успешного выполнения вышеуказанной команды задание Flink для синхронизации данных продолжает работать.

  10. Наблюдайте за статусом задания Flink.

    bin/flink list 

    Если выполнение задания сталкивается с ошибкой, вы можете просмотреть подробную информацию об ошибке в логах Flink. Также вы можете изменить конфигурации Flink в файле conf/flink-conf.yaml, такие как память и слот.

Примечания

  • Как включить MySQL binlog?

    1. Измените /etc/my.cnf:

      # Включить binlog
      log-bin=/var/lib/mysql/mysql-bin

      #log_bin=ON
      ## Базовое имя файлов binlog
      #log_bin_basename=/var/lib/mysql/mysql-bin
      ## Индексный файл для файлов binlog, управляющий всеми файлами binlog
      #log_bin_index=/var/lib/mysql/mysql-bin.index
      # Настроить server id
      server-id=1
      binlog_format = row
    2. Перезапустите mysqld. Вы можете проверить, включен ли MySQL binlog, выполнив SHOW VARIABLES LIKE 'log_bin';.

Синхронизация PostgreSQL с Selena

Введение

Коннектор Flink CDC и SMT могут синхронизировать данные из PostgreSQL в течение доли секунды.

SMT может автоматически генерировать операторы CREATE TABLE для исходных и целевых таблиц Flink на основе информации о кластере и структуре таблиц PostgreSQL и Selena.

Коннектор Flink CDC читает WAL PostgreSQL, а Flink-connector-selena записывает данные в Selena.

Шаги

  1. Скачайте Flink. Поддерживается версия Flink 1.11 или более поздняя.

  2. Скачайте коннектор Flink CDC. Убедитесь, что вы скачали flink-sql-connector-postgres-cdc-xxx.jar, соответствующий версии Flink.

  3. Скачайте коннектор Flink Selena.

  4. Скопируйте flink-sql-connector-postgres-cdc-xxx.jar и flink-connector-selena-xxx.jar в flink-xxx/lib/.

  5. Скачайте smt.tar.gz.

  6. Извлеките и измените файл конфигурации SMT.

    [db]
    host = 192.168.1.1
    port = 5432
    user = xxx
    password = xxx
    type = pgsql

    [other]
    # количество backend'ов в Selena
    be_num = 3
    # `decimal_v3` поддерживается начиная с Selena-1.18.1
    use_decimal_v3 = false
    # директория для сохранения преобразованного DDL SQL
    output_dir = ./result

    [table-rule.1]
    # шаблон для сопоставления баз данных для установки свойств
    database = ^db$
    # шаблон для сопоставления таблиц для установки свойств
    table = ^table$
    # шаблон для сопоставления схем для установки свойств
    schema = ^.*$

    ############################################
    ### конфигурации flink sink
    ### НЕ устанавливайте `connector`, `table-name`, `database-name`, они генерируются автоматически
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url= 192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
  7. Выполните starrocks-migrate-tool. Все SQL-скрипты генерируются в директории result.

    $./starrocks-migrate-tool
    $ls result
    flink-create.1.sql smt.tar.gz starrocks-create.all.sql
    flink-create.all.sql starrocks-create.1.sql
  8. Используйте SQL-скрипт с префиксом starrocks-create для создания таблицы в Selena.

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  9. Используйте SQL-скрипт с префиксом flink-create для создания исходных и целевых таблиц Flink и запуска задания Flink для синхронизации данных.

    bin/sql-client.sh embedded < flink-create.all.sql

    После успешного выполнения вышеуказанной команды задание Flink для синхронизации данных продолжает работать.

  10. Наблюдайте за статусом задания Flink.

    bin/flink list 

    Если выполнение задания сталкивается с ошибкой, вы можете просмотреть подробную информацию об ошибке в логах Flink. Также вы можете изменить конфигурации Flink в файле conf/flink-conf.yaml, такие как память и слот.

Примечания

  • Для PostgreSQL v9.* требуется специальная конфигурация flink-cdc, как показано ниже (Рекомендуется использовать PostgreSQL v10.* или более позднюю версию. В противном случае вам нужно установить плагины декодирования WAL):

    ############################################
    ############################################
    ### конфигурация плагина flink-cdc для `postgresql`
    ############################################
    ### для `9.*` decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming
    ### см. https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html
    ### и https://debezium.io/documentation/reference/postgres-plugins.html
    ### flink.cdc.decoding.plugin.name = decoderbufs
  • Как включить PostgreSQL WAL?

    # Открыть разрешения на подключение
    echo "host all all 0.0.0.0/32 trust" >> pg_hba.conf
    echo "host replication all 0.0.0.0/32 trust" >> pg_hba.conf
    # Включить логическую репликацию wal
    echo "wal_level = logical" >> postgresql.conf
    echo "max_wal_senders = 2" >> postgresql.conf
    echo "max_replication_slots = 8" >> postgresql.conf

    Укажите replica identity FULL для таблиц, которые нужно синхронизировать.

    ALTER TABLE schema_name.table_name REPLICA IDENTITY FULL

    После внесения этих изменений перезапустите PostgreSQL.

Синхронизация Oracle с Selena

Введение

Коннектор Flink CDC и SMT могут синхронизировать данные из Oracle в течение доли секунды.

SMT может автоматически генерировать операторы CREATE TABLE для исходных и целевых таблиц Flink на основе информации о кластере и структуре таблиц Oracle и Selena.

Коннектор Flink CDC читает logminer Oracle, а Flink-connector-selena записывает данные в Selena.

Шаги

  1. Скачайте Flink. Поддерживается версия Flink 1.11 или более поздняя.

  2. Скачайте коннектор Flink CDC. Убедитесь, что вы скачали flink-sql-connector-oracle-cdc-xxx.jar, соответствующий версии Flink.

  3. Скачайте коннектор Flink Selena.

  4. Скопируйте flink-sql-connector-oracle-cdc-xxx.jar и flink-connector-selena-xxx.jar в flink-xxx/lib/.

  5. Скачайте smt.tar.gz.

  6. Извлеките и измените файл конфигурации SMT.

    [db]
    host = 192.168.1.1
    port = 1521
    user = xxx
    password = xxx
    type = oracle

    [other]
    # количество backend'ов в Selena
    be_num = 3
    # `decimal_v3` поддерживается начиная с Selena-1.18.1
    use_decimal_v3 = false
    # директория для сохранения преобразованного DDL SQL
    output_dir = ./result

    [table-rule.1]
    # шаблон для сопоставления баз данных для установки свойств
    database = ^db$
    # шаблон для сопоставления таблиц для установки свойств
    table = ^table$
    # шаблон для сопоставления схем для установки свойств
    schema = ^.*$

    ############################################
    ### конфигурации flink sink
    ### НЕ устанавливайте `connector`, `table-name`, `database-name`, они генерируются автоматически
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url= 192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
  7. Выполните starrocks-migrate-tool. Все SQL-скрипты генерируются в директории result.

    $./starrocks-migrate-tool
    $ls result
    flink-create.1.sql smt.tar.gz starrocks-create.all.sql
    flink-create.all.sql starrocks-create.1.sql
  8. Используйте SQL-скрипт с префиксом starrocks-create для создания таблицы в Selena.

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  9. Используйте SQL-скрипт с префиксом flink-create для создания исходных и целевых таблиц Flink и запуска задания Flink для синхронизации данных.

    bin/sql-client.sh embedded < flink-create.all.sql

    После успешного выполнения вышеуказанной команды задание Flink для синхронизации данных продолжает работать.

  10. Наблюдайте за статусом задания Flink.

    bin/flink list 

    Если выполнение задания сталкивается с ошибкой, вы можете просмотреть подробную информацию об ошибке в логах Flink. Также вы можете изменить конфигурации Flink в файле conf/flink-conf.yaml, такие как память и слот.

Примечания

  • Синхронизация Oracle с использованием logminer:

    # Включить логирование
    alter system set db_recovery_file_dest = '/home/oracle/data' scope=spfile;
    alter system set db_recovery_file_dest_size = 10G;
    shutdown immediate;
    startup mount;
    alter database archivelog;
    alter database open;

    ALTER TABLE schema_name.table_name ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

    # Авторизация создания пользователя и предоставление разрешений
    GRANT CREATE SESSION TO flinkuser;
    GRANT SET CONTAINER TO flinkuser;
    GRANT SELECT ON V_$DATABASE TO flinkuser;
    GRANT FLASHBACK ANY TABLE TO flinkuser;
    GRANT SELECT ANY TABLE TO flinkuser;
    GRANT SELECT_CATALOG_ROLE TO flinkuser;
    GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
    GRANT SELECT ANY TRANSACTION TO flinkuser;
    GRANT LOGMINING TO flinkuser;
    GRANT CREATE TABLE TO flinkuser;
    GRANT LOCK ANY TABLE TO flinkuser;
    GRANT ALTER ANY TABLE TO flinkuser;
    GRANT CREATE SEQUENCE TO flinkuser;
    GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
    GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
    GRANT SELECT ON V_$LOG TO flinkuser;
    GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
    GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
    GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
    GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
    GRANT SELECT ON V_$LOGFILE TO flinkuser;
    GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
    GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
  • Конфигурации базы данных в [table-rule.1] не поддерживают регулярные выражения, поэтому необходимо указать полные имена баз данных.

  • Поскольку Oracle12c поддерживает режим CDB, SMT внутренне автоматически определяет, включен ли CDB, и соответственно изменяет конфигурацию flink-cdc. Однако пользователям нужно обратить внимание на то, нужно ли добавлять префикс c## к конфигурации [db].user, чтобы избежать проблем с недостаточными разрешениями.

Синхронизация Hive с Selena

Введение

Это руководство объясняет, как использовать SMT для синхронизации данных Hive с Selena. Во время синхронизации создается таблица Duplicate в Selena и задание Flink продолжает работать для синхронизации данных.

Шаги

Подготовка

[db]
# IP сервиса hiveserver2
host = 127.0.0.1
# порт сервиса hiveserver2
port = 10000
user = hive/emr-header-1.cluster-49148
password =
type = hive
# действует только с `type = hive`.
# Доступные значения: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap
authentication = kerberos

Поддерживаемые методы аутентификации следующие:

  • nosasl, zk: не нужно указывать user и password.
  • none, none_http, ldap: указать user и password.
  • kerberos, kerberos_http: выполните следующие шаги:
    • Выполните kadmin.local на кластере Hive и проверьте list_principals, чтобы найти соответствующее имя принципала. Например, когда имя принципала hive/emr-header-1.cluster-49148@EMR.49148.COM, пользователь должен быть установлен как hive/emr-header-1.cluster-49148, а пароль оставлен пустым.
    • Выполните kinit -kt /path/to/keytab principal на машине, где выполняется SMT, и выполните klist, чтобы увидеть, сгенерирован ли правильный токен.

Синхронизация данных

  1. Выполните starrocks-migrate-tool.

  2. Используйте SQL-скрипт с префиксом starrocks-create для создания таблицы в Selena.

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  3. В flink/conf/ создайте и отредактируйте файл sql-client-defaults.yaml:

    execution:
    planner: blink
    type: batch
    current-catalog: hive-starrocks
    catalogs:
    - name: hive-starrocks
    type: hive
    hive-conf-dir: /path/to/apache-hive-xxxx-bin/conf
  4. Скачайте пакет зависимостей (flink-sql-connector-hive-xxxx) со страницы Hive соответствующей версии Flink и поместите его в директорию flink/lib.

  5. Запустите кластер Flink и выполните flink/bin/sql-client.sh embedded < result/flink-create.all.sql для начала синхронизации данных.

Синхронизация SQL Server с Selena

Введение

Коннектор Flink CDC и SMT могут синхронизировать данные из SQL Server в течение доли секунды.

SMT может автоматически генерировать операторы CREATE TABLE для исходных и целевых таблиц Flink на основе информации о кластере и структуре таблиц SQL Server и Selena.

Коннектор Flink CDC захватывает и записывает изменения на уровне строк, которые происходят в сервере базы данных SQL Server. Принцип заключается в использовании функции CDC, предоставляемой самим SQL Server. Возможность CDC самого SQL Server может архивировать указанные изменения в базе данных в указанные таблицы изменений. Коннектор SQL Server CDC сначала читает исторические данные из таблицы с помощью JDBC, а затем получает инкрементальные изменения из таблиц изменений, тем самым достигая полной инкрементальной синхронизации. Затем Flink-connector-selena записывает данные в Selena.

Шаги

  1. Скачайте Flink. Поддерживается версия Flink 1.11 или более поздняя.

  2. Скачайте коннектор Flink CDC. Убедитесь, что вы скачали flink-sql-connector-sqlserver-cdc-xxx.jar, соответствующий версии Flink.

  3. Скачайте коннектор Flink Selena.

  4. Скопируйте flink-sql-connector-sqlserver-cdc-xxx.jar, flink-connector-selena-xxx.jar в flink-xxx/lib/.

  5. Скачайте smt.tar.gz.

  6. Извлеките и измените файл конфигурации SMT.

    [db]
    host = 127.0.0.1
    port = 1433
    user = xxx
    password = xxx

    # доступные в настоящее время типы: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse`
    type = sqlserver

    [other]
    # количество backend'ов в Selena
    be_num = 3
    # `decimal_v3` поддерживается начиная с Selena-1.18.1
    use_decimal_v3 = false
    # директория для сохранения преобразованного DDL SQL
    output_dir = ./result

    [table-rule.1]
    # шаблон для сопоставления баз данных для установки свойств
    database = ^db$
    # шаблон для сопоставления таблиц для установки свойств
    table = ^table$
    schema = ^.*$

    ############################################
    ### конфигурации flink sink
    ### НЕ устанавливайте `connector`, `table-name`, `database-name`, они генерируются автоматически
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url= 192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
  7. Выполните starrocks-migrate-tool*. Все SQL-скрипты генерируются в директории result.

​ ```Bash $./starrocks-migrate-tool $ls result flink-create.1.sql smt.tar.gz starrocks-create.all.sql flink-create.all.sql starrocks-create.1.sql starrocks-external-create.all.sql


8. Используйте SQL-скрипт с префиксом `starrocks-create` для создания таблицы в Selena.

```Bash
mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  1. Используйте SQL-скрипт с префиксом flink-create для создания исходных и целевых таблиц Flink и запуска задания Flink для синхронизации данных.

    bin/sql-client.sh embedded < flink-create.all.sql     

    После успешного выполнения вышеуказанной команды задание Flink для синхронизации данных продолжает работать.

  2. Наблюдайте за статусом задания Flink.

    bin/flink list 

    Если выполнение задания сталкивается с ошибкой, вы можете просмотреть подробную информацию об ошибке в логах Flink. Также вы можете изменить конфигурации Flink в файле conf/flink-conf.yaml, такие как память и слот.

Примечания

  1. Убедитесь, что служба Server Agent включена.

    Проверьте, работает ли служба Server Agent нормально.

    EXEC master.dbo.xp_servicecontrol N'QUERYSTATE', N'SQLSERVERAGENT'
    GO

    Включите службу Server Agent.

    /opt/mssql/bin/mssql-conf set sqlagent.enabled true
  2. Убедитесь, что CDC для соответствующей базы данных включен.

    ​ Проверьте, включен ли CDC для соответствующей базы данных.

    select is_cdc_enabled, name from sys.databases where name = 'XXX_databases'
    GO

    ​ Включите CDC.

    ​ :::note

    ​ При выполнении этой команды убедитесь, что пользователь serverRole является sysadmin.

    ​ :::

    USE XXX_databases
    GO
    EXEC sys.sp_cdc_enable_db
    GO
  3. Убедитесь, что CDC для соответствующей таблицы включен.

    EXEC sys.sp_cdc_enable_table 
    @source_schema = 'XXX_schema',
    @source_name = 'XXX_table',
    @role_name = NULL,
    @supports_net_changes = 0;
    GO

Синхронизация TiDB с Selena

Введение

Коннектор Flink CDC и SMT могут синхронизировать данные из TiDB в течение доли секунды.

SMT может автоматически генерировать DDL-операторы для исходных и целевых таблиц Flink на основе информации о кластере и структуре таблиц TiDB и Selena.

Коннектор Flink CDC захватывает данные, напрямую читая полные и инкрементальные данные из базового хранилища TiKV. Полные данные получаются из диапазонов, разделенных на основе ключей, а инкрементальные данные получаются с использованием CDC Client, предоставляемого TiDB. Впоследствии данные записываются в Selena через Flink-connector-selena.

Шаги

  1. Скачайте Flink. Поддерживается версия Flink 1.11 или более поздняя.

  2. Скачайте коннектор Flink CDC. Убедитесь, что вы скачали flink-sql-connector-tidb-cdc-xxx.jar, соответствующий версии Flink.

  3. Скачайте коннектор Flink Selena.

  4. Скопируйте flink-sql-connector-tidb-cdc-xxx.jar, flink-connector-selena-xxx.jar в flink-xxx/lib/.

  5. Скачайте smt.tar.gz.

  6. Извлеките и измените файл конфигурации SMT.

    [db]
    host = 127.0.0.1
    port = 4000
    user = root
    password =
    # доступные в настоящее время типы: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse`, `sqlserver`, `tidb`
    type = tidb
    # # действует только при `type == hive`.
    # # Доступные значения: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap
    # authentication = kerberos

    [other]
    # количество backend'ов в Selena
    be_num = 3
    # `decimal_v3` поддерживается начиная с Selena-1.18.1
    use_decimal_v3 = false
    # директория для сохранения преобразованного DDL SQL
    output_dir = ./result

    [table-rule.1]
    # шаблон для сопоставления баз данных для установки свойств
    database = ^db$
    # шаблон для сопоставления таблиц для установки свойств
    table = ^table$
    schema = ^.*$

    ############################################
    ### конфигурации flink sink
    ### НЕ устанавливайте `connector`, `table-name`, `database-name`, они генерируются автоматически
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url= 192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true

    ############################################
    ### конфигурация flink-cdc для `tidb`
    ############################################
    # # Действует только для TiDB до v4.0.0.
    # # Адрес PD кластера TiKV.
    # flink.cdc.pd-addresses = 127.0.0.1:2379
  7. Выполните starrocks-migrate-tool*. Все SQL-скрипты генерируются в директории result.

    $./starrocks-migrate-tool
    $ls result
    flink-create.1.sql smt.tar.gz starrocks-create.all.sql
    flink-create.all.sql starrocks-create.1.sql starrocks-external-create.all.sql
  8. Используйте SQL-скрипт с префиксом starrocks-create для создания таблицы в Selena.

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  9. Используйте SQL-скрипт с префиксом flink-create для создания исходных и целевых таблиц Flink и запуска задания Flink для синхронизации данных.

    bin/sql-client.sh embedded < flink-create.all.sql     

​ После успешного выполнения вышеуказанной команды задание Flink для синхронизации данных продолжает работать.

  1. Наблюдайте за статусом задания Flink.

    bin/flink list 

    Если выполнение задания сталкивается с ошибкой, вы можете просмотреть подробную информацию об ошибке в логах Flink. Также вы можете изменить конфигурации Flink в файле conf/flink-conf.yaml, такие как память и слот.

Примечания

Для TiDB версии до v4.0.0 требуется дополнительная конфигурация flink.cdc.pd-addresses.

```Bash
############################################
### конфигурация flink-cdc для `tidb`
############################################
# # Действует только для TiDB до v4.0.0.
# # Адрес PD кластера TiKV.
# flink.cdc.pd-addresses = 127.0.0.1:2379
```