Перейти к основному содержимому

Синхронизация данных из MySQL в реальном времени

Задача Flink сообщает об ошибке Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing.

Возможная причина заключается в том, что в нескольких наборах правил, таких как [table-rule.1] и [table-rule.2], отсутствует необходимая информация конфигурации в файле конфигурации SMT config_prod.conf.

Вы можете проверить, настроен ли каждый набор правил, такой как [table-rule.1] и [table-rule.2], с необходимой информацией о базе данных, таблице и Flink connector.

Flink автоматически перезапускает неудачные задачи через механизм checkpointing и стратегию перезапуска.

Например, если вам нужно включить механизм checkpointing и использовать стратегию перезапуска по умолчанию, которая является стратегией перезапуска с фиксированной задержкой, вы можете настроить следующую информацию в файле конфигурации flink-conf.yaml:

execution.checkpointing.interval: 300000
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory

Описание параметров:

ПРИМЕЧАНИЕ

Для более подробного описания параметров в документации Flink см. Checkpointing.

  • execution.checkpointing.interval: базовый интервал времени для checkpointing. Единица измерения: миллисекунда. Чтобы включить механизм checkpointing, необходимо установить этот параметр на значение больше 0.
  • state.backend: указывает backend состояния для определения того, как состояние представлено внутренне, и как и где оно сохраняется при checkpointing. Обычные значения: filesystem или rocksdb. После включения механизма checkpointing состояние сохраняется при checkpoint'ах для предотвращения потери данных и обеспечения согласованности данных после восстановления. Для получения дополнительной информации о состоянии см. State Backends.
  • state.checkpoints.dir: директория, в которую записываются checkpoint'ы.

Вы можете вручную запустить savepoint при остановке задачи Flink (savepoint — это согласованный образ состояния выполнения потоковой задачи Flink, который создается на основе механизма checkpointing). Позже вы можете восстановить задачу Flink из указанного savepoint.

  1. Остановите задачу Flink с savepoint. Следующая команда автоматически запускает savepoint для задачи Flink jobId и останавливает задачу Flink. Дополнительно вы можете указать целевую директорию файловой системы для хранения savepoint.

    bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId

    Описание параметров:

    • jobId: Вы можете просмотреть ID задачи Flink в Flink WebUI или выполнив flink list -running в командной строке.
    • targetDirectory: Вы можете указать state.savepoints.dir как директорию по умолчанию для хранения savepoint'ов в файле конфигурации Flink flink-conf.yml. Когда запускается savepoint, он сохраняется в этой директории по умолчанию, и вам не нужно указывать директорию.
    state.savepoints.dir: [file:// or hdfs://]/home/user/savepoints_dir
  2. Повторно отправьте задачу Flink с указанным предыдущим savepoint.

    ./flink run -c com.starrocks.connector.flink.tools.ExecuteSQL -s savepoints_dir/savepoints-xxxxxxxx flink-connector-selena-xxxx.jar -f flink-create.all.sql