Синхронизация данных из MySQL в реальном времени
Что делать, если задача Flink сообщает об ошибке?
Задача Flink сообщает об ошибке Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing.
Возможная причина заключается в том, что в нескольких наборах правил, таких как [table-rule.1] и [table-rule.2], отсутствует необходимая информация конфигурации в файле конфигурации SMT config_prod.conf.
Вы можете проверить, настроен ли каждый набор правил, такой как [table-rule.1] и [table-rule.2], с необходимой информацией о базе данных, таблице и Flink connector.
Как заставить Flink автоматически перезапускать неудачные задачи?
Flink автоматически перезапускает неудачные задачи через механизм checkpointing и стратегию перезапуска.
Например, если вам нужно включить механизм checkpointing и использовать стратегию перезапуска по умолчанию, которая является стратегией перезапуска с фиксированной задержкой, вы можете настроить следующую информацию в файле конфигурации flink-conf.yaml:
execution.checkpointing.interval: 300000
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
Описание параметров:
ПРИМЕЧАНИЕ
Для более подробного описания параметров в документации Flink см. Checkpointing.
execution.checkpointing.interval: базовый интервал времени для checkpointing. Единица измерения: миллисекунда. Чтобы включить механизм checkpointing, необходимо установить этот параметр на значение больше0.state.backend: указывает backend состояния для определения того, как состояние представлено внутренне, и как и где оно сохраняется при checkpointing. Обычные значения:filesystemилиrocksdb. После включения механизма checkpointing состояние сохраняется при checkpoint'ах для предотвращения потери данных и обеспечения согласов анности данных после восстановления. Для получения дополнительной информации о состоянии см. State Backends.state.checkpoints.dir: директория, в которую записываются checkpoint'ы.
Как можно вручную остановить задачу Flink и позже восстановить её до состояния перед остановкой?
Вы можете вручную запустить savepoint при остановке задачи Flink (savepoint — это согласованный образ состояния выполнения потоковой задачи Flink, который создается на основе механизма checkpointing). Позже вы можете восстановить задачу Flink из указанного savepoint.
-
Остановите задачу Flink с savepoint. Следующая команда автоматически запускает savepoint для задачи Flink
jobIdи останавливает задачу Flink. Дополнительно вы можете указать целевую директорию файловой системы для хранения savepoint.bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobIdОписание параметров:
jobId: Вы можете просмотреть ID задачи Flink в Flink WebUI или выполнивflink list -runningв командной строке.targetDirectory: Вы можете указатьstate.savepoints.dirкак директорию по умолчанию для хранения savepoint'ов в файле конфигурации Flink flink-conf.yml. Когда запускается savepoint, он сохраняется в этой директории по умолчанию, и вам не нужно указывать директорию.
state.savepoints.dir: [file:// or hdfs://]/home/user/savepoints_dir -
Повторно отправьте задачу Flink с указанным предыдущим savepoint.
./flink run -c com.starrocks.connector.flink.tools.ExecuteSQL -s savepoints_dir/savepoints-xxxxxxxx flink-connector-selena-xxxx.jar -f flink-create.all.sql