Перейти к основному содержимому
Версия: 2.0.x

Синхронизация данных из MySQL в реальном времени

Задание Flink сообщает об ошибке Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing.

Возможная причина заключается в том, что в файле конфигурации SMT config_prod.conf отсутствует необходимая информация о конфигурации в нескольких наборах правил, таких как [table-rule.1] и [table-rule.2].

Вы можете проверить, настроен ли в каждом наборе правил, таком как [table-rule.1] и [table-rule.2], необходимая информация о базе данных, таблице и Flink connector.

Flink автоматически перезапускает неудачные задачи через механизм checkpointing и стратегию перезапуска.

Например, если вам нужно включить механизм checkpointing и использовать стратегию перезапуска по умолчанию, которая является стратегией перезапуска с фиксированной задержкой, вы можете настроить следующую информацию в файле конфигурации flink-conf.yaml:

execution.checkpointing.interval: 300000
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory

Описание параметров:

ПРИМЕЧАНИЕ

Для более подробного описания параметров в документации Flink см. Checkpointing.

  • execution.checkpointing.interval: базовый временной интервал checkpointing. Единица измерения: миллисекунды. Чтобы включить механизм checkpointing, вам нужно установить этот параметр на значение больше 0.
  • state.backend: указывает бэкенд состояния, чтобы определить, как состояние представляется внутренне, и как и где оно сохраняется при checkpointing. Обычные значения — filesystem или rocksdb. После включения механизма checkpointing состояние сохраняется при checkpoints для предотвращения потери данных и обеспечения согласованности данных после восстановления. Для получения дополнительной информации о состоянии см. State Backends.
  • state.checkpoints.dir: каталог, в который записываются checkpoints.

Вы можете вручную инициировать savepoint при остановке задания Flink (savepoint — это согласованный снимок состояния выполнения потокового задания Flink, создаваемый на основе механизма checkpointing). Позже вы можете восстановить задание Flink из указанного savepoint.

  1. Остановите задание Flink с savepoint. Следующая команда автоматически инициирует savepoint для задания Flink jobId и останавливает задание Flink. Кроме того, вы можете указать целевой каталог файловой системы для хранения savepoint.

    bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId

    Описание параметров:

    • jobId: Вы можете просмотреть ID задания Flink в Flink WebUI или выполнив flink list -running в командной строке.
    • targetDirectory: Вы можете указать state.savepoints.dir в качестве каталога по умолчанию для хранения savepoints в файле конфигурации Flink flink-conf.yml. При инициировании savepoint savepoint сохраняется в этом каталоге по умолчанию, и вам не нужно указывать каталог.
    state.savepoints.dir: [file:// or hdfs://]/home/user/savepoints_dir
  2. Повторно отправьте задание Flink с указанием вышеуказанного savepoint.

    ./flink run -c com.selena.connector.flink.tools.ExecuteSQL -s savepoints_dir/savepoints-xxxxxxxx flink-connector-selena-xxxx.jar -f flink-create.all.sql