Непрерывная загрузка данных из Apache Flink®
Selena предоставляет собственный коннектор под названием Selena Connector for Apache Flink® (сокращенно Flink коннектор), который помогает загружать данные в таблицу Selena с помощью Flink. Основной принцип заключается в накоплении данных и последующей их загрузке за один раз в Selena через STREAM LOAD.
Flink коннектор поддерживает DataStream API, Table API & SQL и Python API. Он обладает более высокой и стабильной производительностью по сравнению с flink-connector-jdbc, предоставляемым Apache Flink®.
УВЕДОМЛЕНИЕ
Для загрузки данных в таблицы Selena с помощью Flink коннектора требуются привилегии SELECT и INSERT на целевую таблицу Selena. Если у вас нет этих привилегий, следуйте инструкциям в GRANT для предоставления этих привилегий пользователю, которого вы используете для подключения к кластеру Selena.
Требования к версиям
| Коннектор | Flink | Selena | Java | Scala |
|---|---|---|---|---|
| 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 и позже | 8 | 2.11,2.12 |
| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 и позже | 8 | 2.11,2.12 |
| 1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 и позже | 8 | 2.11,2.12 |
| 1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 и позже | 8 | 2.11,2.12 |
| 1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 и позже | 8 | 2.11,2.12 |
Получение Flink коннектора
Вы можете получить JAR-файл Flink коннектора следующими способами:
- Прямая загрузка скомпилированного JAR-файла Flink коннектора.
- Добавление Flink коннектора как зависимости в ваш Maven проект с последующей загрузкой JAR-файла.
- Самостоятельная компиляция исходного кода Flink коннектора в JAR-файл.
Формат именования JAR-файла Flink коннектора следующий:
-
Начиная с Flink 1.15, это
flink-connector-selena-${connector_version}_flink-${flink_version}.jar. Например, если у вас установлен Flink 1.15 и вы хотите использовать Flink коннектор 1.2.7, вы можете использоватьflink-connector-selena-1.2.7_flink-1.15.jar. -
До Flink 1.15, это
flink-connector-selena-${connector_version}_flink-${flink_version}_${scala_version}.jar. Например, если в вашей среде установлены Flink 1.14 и Scala 2.12, и вы хотите испол ьзовать Flink коннектор 1.2.7, вы можете использоватьflink-connector-selena-1.2.7_flink-1.14_2.12.jar.
УВЕДОМЛЕНИЕ
В общем случае последняя версия Flink коннектора поддерживает совместимость только с тремя последними версиями Flink.
Загрузка скомпилированного Jar-файла
Прямая загрузка соответствующей версии JAR-файла Flink коннектора из Maven Central Repository.
Maven зависимость
В файле pom.xml вашего Maven проекта добавьте Flink коннектор как зависимость согласно следующему формату. Замените flink_version, scala_version и connector_version на соответствующие версии.
-
В Flink 1.15 и позже
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-selena</artifactId>
<version>${connector_version}_flink-${flink_version}</version>
</dependency> -
В версиях до Flink 1.15
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-selena</artifactId>
<version>${connector_version}_flink-${flink_version}_${scala_version}</version>
</dependency>
Самостоятельная компиляция
-
Загрузите исходный код Flink коннектора.
-
Выполните следующую команду д ля компиляции исходного кода Flink коннектора в JAR-файл. Обратите внимание, что
flink_versionзаменяется на соответствующую версию Flink.sh build.sh <flink_version>Например, если версия Flink в вашей среде 1.15, вам нужно выполнить следующую команду:
sh build.sh 1.15 -
Перейдите в директорию
target/, чтобы найти JAR-файл Flink коннектора, такой какflink-connector-selena-1.2.7_flink-1.15-SNAPSHOT.jar, созданный при компиляции.
ПРИМЕЧАНИЕ
Имя Flink коннекто ра, который не был официально выпущен, содержит суффикс
SNAPSHOT.
Опции
connector
Обязательно: Да
Значение по умолчанию: NONE
Описание: Коннектор, который вы хотите использовать. Значение должно быть "starrocks".
jdbc-url
Обязательно: Да
Значение по умолчанию: NONE
Описание: Адрес, используемый для подключения к MySQL серверу FE. Вы можете указать несколько адресов, которые должны быть разделены запятой (,). Формат: jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>.
load-url
Обязательно: Да
Значение по умолчанию: NONE
Описание: Адрес, используемый для подключения к HTTP серверу FE. Вы можете указать несколько адресов, которые должны быть разделены точкой с запятой (;). Формат: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>.
database-name
Обязательно: Да
Значение по умолчанию: NONE
Описание: Имя базы данных Selena, в которую вы хотите загрузить данные.
table-name
Обязательно: Да
Значение по умолчанию: NONE
Описание: Имя таблицы, которую вы хотите использовать для загрузки данных в Selena.
username
Обязательно: Да
Значение по умолчанию: NONE
Описание: Имя пользователя учетной записи, которую вы хотите использовать для загрузки данных в Selena. Учетная запись должна иметь привилегии SELECT и INSERT на целевую таблицу Selena.
password
Обязательно: Да
Значение по умолчанию: NONE
Описание: Пароль указанной учетной записи.
sink.version
Обязательно: Нет
Значение по умолчанию: AUTO
Описание: Интерфейс, используемый для загрузки данных. Этот параметр поддерживается начиная с версии Flink коннектора 1.2.4.
V1: Использует интерфейс Stream Load для загрузки данных. Коннекторы до версии 1.2.4 поддерживают только этот режим.V2: Использует интерфейс Stream Load transaction для загрузки данных. Требует Selena версии не менее 2.4. РекомендуетсяV2, поскольку он оптимизирует использование памяти и обеспечивает более стабильную реализацию exactly-once.AUTO: Если версия Selena поддерживает транзакционный Stream Load, автоматически выберетV2, иначе выберетV1
sink.label-prefix
Обязательно: Нет
Значение по умолчанию: NONE
Описание: Префикс метки, используемый Stream Load. Рекомендуется настроить его, если вы используете exactly-once с коннектором версии 1.2.8 и позже. См. примечания по использованию exactly-once.
sink.semantic
Обязательно: Нет
Значение по умолчанию: at-least-once
Описание: Семантика, гарантируемая sink. Допустимые значения: at-least-once и exactly-once.
sink.buffer-flush.max-bytes
Обязательно: Нет
Значение по умолчанию: 94371840(90M)
Описание: Максимальный размер данных, которые могут быть накоплены в памяти перед отправкой в Selena за один раз. Максимальное значение варьируется от 64 МБ до 10 ГБ. Установка этого параметра на большее значение может улучшить производительность загрузки, но может увеличить задержку загрузки. Этот параметр действует только когда sink.semantic установлен в at-least-once. Если sink.semantic установлен в exactly-once, данные в памяти сбрасываются при срабатывании checkpoint Flink. В этом случае этот параметр не действует.
sink.buffer-flush.max-rows
Обязательно: Нет
Значение по умолчанию: 500000
Описание: Максимальное количество строк, которые могут быть накоплены в памяти перед отправкой в Selena за один раз. Этот параметр доступен только когда sink.version равен V1 и sink.semantic равен at-least-once. Допустимые значения: от 64000 до 5000000.
sink.buffer-flush.interval-ms
Обязательно: Нет
Значение по умолчанию: 300000
Описание: Интервал, с которым данные сбрасываются. Этот параметр доступен только когда sink.semantic равен at-least-once. Допустимые значения: от 1000 до 3600000. Единица: мс.
sink.max-retries
Обязательно: Нет
Значение по умолчанию: 3
Описание: Количество попыток повтора выполнения задания Stream Load. Этот параметр доступен только когда вы устанавливаете sink.version в V1. Допустимые значения: от 0 до 10.
sink.connect.timeout-ms
Обязательно: Нет
Значение по умолчанию: 30000
Описание: Тайм-аут для установления HTTP соединения. Допустимые значения: от 100 до 60000. Единица: мс. До версии Flink к оннектора v1.2.9 значение по умолчанию было 1000.
sink.socket.timeout-ms
Обязательно: Нет
Значение по умолчанию: -1
Описание: Поддерживается с версии 1.2.10. Время ожидания данных HTTP клиентом. Единица: мс. Значение по умолчанию -1 означает отсутствие тайм-аута.
sink.sanitize-error-log
Обязательно: Нет
Значение по умолчанию: false
Описание: Поддерживается с версии 1.2.12. Следует ли очищать конфиденциальные данные в журнале ошибок для безопасности в продакшене. Когда этот параметр установлен в true, конфиденциальные данные строк и значения столбцов в журналах ошибок Stream Load редактируются как в коннекторе, так и в журналах SDK. Значение по умолчанию false для обратной совместимости.
sink.wait-for-continue.timeout-ms
Обязательно: Нет
Значение по умолчанию: 10000
Описание: Поддерживается с версии 1.2.7. Тайм-аут ожидания ответа HTTP 100-continue от FE. Допустимые значения: от 3000 до 60000. Единица: мс
sink.ignore.update-before
Обязательно: Нет
Значение по умолчанию: true
Описание: Поддерживается с версии 1.2.8. Следует ли игнорировать записи UPDATE_BEFORE от Flink при загрузке данных в таблицы Primary Key. Если этот параметр установлен в false, запись обрабатывается как операция удаления в таблице Selena.
sink.parallelism
Обязательно: Нет
Значение по умолчанию: NONE
Описание: Параллелизм загрузки. Доступно только для Flink SQL. Если этот параметр не указан, планировщик Flink решает параллелизм. В сценарии мульти-параллелизма пользователи должны гарантировать, что данные записываются в правильном порядке.
sink.properties.*
Обязательно: Нет
Значение по умолчанию: NONE
Описание: Параметры, используемые для управления поведением Stream Load. Например, параметр sink.properties.format указывает формат, используемый для Stream Load, такой как CSV или JSON. Список поддерживаемых параметров и их описания см. в STREAM LOAD.
sink.properties.format
Обязательно: Нет
Значение по умолчанию: csv
Описание: Формат, используемый для Stream Load. Flink коннектор преобразует каждую партию данных в указанный формат перед отправкой их в Selena. Допустимые значения: csv и json.
sink.properties.column_separator
Обязательно: Нет
Значение по умолчанию: \t
Описание: Разделитель столбцов для данных в формате CSV.
sink.properties.row_delimiter
Обязательно: Нет
Значение по умолчанию: \n
Описание: Разделитель строк для данных в формате CSV.
sink.properties.max_filter_ratio
Обязательно: Нет
Значение по умолчанию: 0
Описание: Максимальная толерантность к ошибкам Stream Load. Это максимальный процент записей данных, которые могут быть отфильтрованы из-за неадекватного качества данных. Допустимые значения: от 0 до 1. Значение по умолчанию: 0. Подробности см. в Stream Load.
sink.properties.partial_update
Обязательно: НЕТ
Значение по умолчанию: FALSE
Описание: Использовать ли частичные обновления. Допустимые значения: TRUE и FALSE. Значение по умолчанию: FALSE, указывающее на отключение этой функции.
sink.properties.partial_update_mode
Обязательно: НЕТ
Значение по умолчанию: row
Описание: Указывает режим для частичных обновлений. Допустимые значения: row и column.
- Значение
row(по умолчанию) означает частичные обновления в режиме строк, что более подходит для обновлений в реальном времени с множеством столбцов и небольшими партиями. - Значение
columnозначает частичные обновления в режиме столбцов, что более подходит для пакетных обновлений с небольшим количеством столбцов и множеством строк. В таких сценариях включение режима столбцов обеспечивает более быструю скорость обновления. Например, в таблице со 100 столбцами, если обновляются только 10 столбцов (10% от общего количества) для всех строк, скорость обновления в режиме столбцов в 10 раз быстрее.
sink.properties.strict_mode
Обязательно: Нет
Значение по умолчанию: false
Описание: Указывает, следует ли включить строгий режим для Stream Load. Это влияет на поведение загрузки при наличии неквалифицированных строк, таких как несогласованные значения столбцов. Допустимые значения: true и false. Значение по умолчанию: false. Подробности см. в Stream Load.