Перейти к основному содержимому

Чтение данных из Selena с помощью Flink коннектора

Selena предоставляет собственный коннектор под названием Selena Connector for Apache Flink® (сокращенно Flink коннектор), который помогает читать данные в пакетном режиме из кластера Selena с использованием Flink.

Flink коннектор поддерживает два метода чтения: Flink SQL и Flink DataStream. Рекомендуется использовать Flink SQL.

ПРИМЕЧАНИЕ

Flink коннектор также поддерживает запись данных, прочитанных Flink, в другой кластер Selena или систему хранения. См. Непрерывная загрузка данных из Apache Flink®.

Справочная информация

В отличие от JDBC коннектора, предоставляемого Flink, Flink коннектор Selena поддерживает параллельное чтение данных из нескольких BE вашего кластера Selena, значительно ускоряя задачи чтения. Следующее сравнение показывает разницу в реализации между двумя коннекторами.

  • Flink коннектор Selena

    С помощью Flink коннектора Selena, Flink может сначала получить план запроса от ответственного FE, затем распределить полученный план запроса в качестве параметров всем задействованным BE, и наконец получить данные, возвращенные BE.

    - Flink коннектор Selena

  • JDBC коннектор Flink

    С помощью JDBC коннектора Flink, Flink может читать данные только из отдельных FE, по одному за раз. Чтение данных происходит медленно.

    JDBC коннектор Flink

Требования к версиям

КоннекторFlinkSelenaJavaScala
1.2.101.15,1.16,1.17,1.18,1.192.1 и позднее82.11,2.12
1.2.91.15,1.16,1.17,1.182.1 и позднее82.11,2.12
1.2.81.13,1.14,1.15,1.16,1.172.1 и позднее82.11,2.12
1.2.71.11,1.12,1.13,1.14,1.152.1 и позднее82.11,2.12

Предварительные требования

Flink должен быть развернут. Если Flink не развернут, выполните следующие шаги для его развертывания:

  1. Установите Java 8 или Java 11 в вашей операционной системе, чтобы обеспечить правильную работу Flink. Вы можете использовать следующую команду для проверки версии установленной Java:

    java -version

    Например, если возвращается следующая информация, значит установлена Java 8:

    openjdk version "1.8.0_322"
    OpenJDK Runtime Environment (Temurin)(build 1.8.0_322-b06)
    OpenJDK 64-Bit Server VM (Temurin)(build 25.322-b06, mixed mode)
  2. Скачайте и распакуйте пакет Flink по вашему выбору.

    ПРИМЕЧАНИЕ

    Мы рекомендуем использовать Flink v1.14 или позднее. Минимальная поддерживаемая версия Flink — v1.11.

    # Скачайте пакет Flink.
    wget https://dlcdn.apache.org/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
    # Распакуйте пакет Flink.
    tar -xzf flink-1.14.5-bin-scala_2.11.tgz
    # Перейдите в директорию Flink.
    cd flink-1.14.5
  3. Запустите ваш кластер Flink.

    # Запустите ваш кластер Flink.
    ./bin/start-cluster.sh

    # Когда отображается следующая информация, ваш кластер Flink успешно запущен:
    Starting cluster.
    Starting standalonesession daemon on host.
    Starting taskexecutor daemon on host.

Вы также можете развернуть Flink, следуя инструкциям в документации Flink.

Перед началом работы

Выполните следующие шаги для развертывания Flink коннектора:

  1. Выберите и скачайте JAR-пакет flink-connector-selena, соответствующий используемой вами версии Flink. Если требуется отладка кода, скомпилируйте пакет Flink коннектора в соответствии с вашими бизнес-требованиями.

    ВНИМАНИЕ

    Мы рекомендуем скачивать пакет Flink коннектора версии 1.2.x или позднее, у которого соответствующая версия Flink имеет те же первые две цифры, что и используемая вами версия Flink. Например, если вы используете Flink v1.14.x, вы можете скачать flink-connector-selena-1.2.4_flink-1.14_x.yy.jar.

  2. Поместите скачанный или скомпилированный пакет Flink коннектора в директорию lib Flink.

  3. Перезапустите ваш кластер Flink.

Настройка сети

Убедитесь, что машина, на которой расположен Flink, может получить доступ к узлам FE кластера Selena через http_port (по умолчанию: 8030) и query_port (по умолчанию: 9030), а также к узлам BE через be_port (по умолчанию: 9060).

Параметры

Общие параметры

Следующие параметры применяются как к методу чтения Flink SQL, так и к методу чтения Flink DataStream.

ПараметрОбязательныйТип данныхОписание
connectorДаSTRINGТип коннектора, который вы хотите использовать для чтения данных. Установите значение starrocks.
scan-urlДаSTRINGАдрес, используемый для подключения FE с веб-сервера. Формат: <fe_host>:<fe_http_port>. Порт по умолчанию — 8030. Вы можете указать несколько адресов, которые должны быть разделены запятой (,). Пример: 192.168.xxx.xxx:8030,192.168.xxx.xxx:8030.
jdbc-urlДаSTRINGАдрес, используемый для подключения MySQL-клиента FE. Формат: jdbc:mysql://<fe_host>:<fe_query_port>. Номер порта по умолчанию — 9030.
usernameДаSTRINGИмя пользователя вашей учетной записи кластера Selena. Учетная запись должна иметь права на чтение таблицы Selena, которую вы хотите прочитать. См. Привилегии пользователей.
passwordДаSTRINGПароль вашей учетной записи кластера Selena.
database-nameДаSTRINGИмя базы данных Selena, к которой принадлежит таблица Selena, которую вы хотите прочитать.
table-nameДаSTRINGИмя таблицы Selena, которую вы хотите прочитать.
scan.connect.timeout-msНетSTRINGМаксимальное время, после которого соединение от Flink коннектора к вашему кластеру Selena истекает. Единица: миллисекунды. Значение по умолчанию: 1000. Если время, затраченное на установление соединения, превышает этот лимит, задача чтения завершается неудачей.
scan.params.keep-alive-minНетSTRINGМаксимальное время, в течение которого задача чтения остается активной. Время активности проверяется регулярно с помощью механизма опроса. Единица: минуты. Значение по умолчанию: 10. Мы рекомендуем установить этот параметр на значение, большее или равное 5.
scan.params.query-timeout-sНетSTRINGМаксимальное время, после которого задача чтения истекает. Время ожидания проверяется во время выполнения задачи. Единица: секунды. Значение по умолчанию: 600. Если результат чтения не возвращается после истечения времени, задача чтения останавливается.
scan.params.mem-limit-byteНетSTRINGМаксимальный объем памяти, разрешенный для каждого запроса на каждом BE. Единица: байты. Значение по умолчанию: 1073741824, равное 1 ГБ.
scan.max-retriesНетSTRINGМаксимальное количество попыток повтора задачи чтения при сбоях. Значение по умолчанию: 1. Если количество попыток повтора задачи чтения превышает этот лимит, задача чтения возвращает ошибки.

Следующие параметры применяются только к методу чтения Flink DataStream.

ПараметрОбязательныйТип данныхОписание
scan.columnsНетSTRINGСтолбец, который вы хотите прочитать. Вы можете указать несколько столбцов, которые должны быть разделены запятой (,).
scan.filterНетSTRINGУсловие фильтрации, на основе которого вы хотите фильтровать данные.

Предположим, что в Flink вы создаете таблицу, состоящую из трех столбцов: c1, c2, c3. Чтобы прочитать строки, значения которых в столбце c1 этой таблицы Flink равны 100, вы можете указать два условия фильтрации "scan.columns, "c1" и "scan.filter, "c1 = 100".

Следующее сопоставление типов данных действительно только для чтения данных Flink из Selena. Для сопоставления типов данных, используемого при записи данных Flink в Selena, см. Непрерывная загрузка данных из Apache Flink®.

SelenaFlink
NULLNULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
LARGEINTSTRING
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
DECIMALV2DECIMAL
DECIMAL32DECIMAL
DECIMAL64DECIMAL
DECIMAL128DECIMAL
CHARCHAR
VARCHARSTRING
JSONSTRING
ПРИМЕЧАНИЕ:
Поддерживается с версии 1.2.10
ARRAYARRAY
ПРИМЕЧАНИЕ:
Поддерживается с версии 1.2.10, требуется Selena v3.1.12/v3.2.5 или позднее.
STRUCTROW
ПРИМЕЧАНИЕ:
Поддерживается с версии 1.2.10, требуется Selena v3.1.12/v3.2.5 или позднее.
MAPMAP
ПРИМЕЧАНИЕ:
Поддерживается с версии 1.2.10, требуется Selena v3.1.12/v3.2.5 или позднее.

Примеры

Следующие примеры предполагают, что вы создали базу данных с именем test в вашем кластере Selena и у вас есть права пользователя root.

ПРИМЕЧАНИЕ

Если задача чтения завершается неудачей, вы должны пересоздать ее.

Пример данных

  1. Перейдите в базу данных test и создайте таблицу с именем score_board.

    MySQL [test]> CREATE TABLE `score_board`
    (
    `id` int(11) NOT NULL COMMENT "",
    `name` varchar(65533) NULL DEFAULT "" COMMENT "",
    `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`id`)
    PROPERTIES
    (
    "replication_num" = "3"
    );
  2. Вставьте данные в таблицу score_board.

    MySQL [test]> INSERT INTO score_board
    VALUES
    (1, 'Bob', 21),
    (2, 'Stan', 21),
    (3, 'Sam', 22),
    (4, 'Tony', 22),
    (5, 'Alice', 22),
    (6, 'Lucy', 23),
    (7, 'Polly', 23),
    (8, 'Tom', 23),
    (9, 'Rose', 24),
    (10, 'Jerry', 24),
    (11, 'Jason', 24),
    (12, 'Lily', 25),
    (13, 'Stephen', 25),
    (14, 'David', 25),
    (15, 'Eddie', 26),
    (16, 'Kate', 27),
    (17, 'Cathy', 27),
    (18, 'Judy', 27),
    (19, 'Julia', 28),
    (20, 'Robert', 28),
    (21, 'Jack', 29);
  3. Выполните запрос к таблице score_board.

    MySQL [test]> SELECT * FROM score_board;
    +------+---------+-------+
    | id | name | score |
    +------+---------+-------+
    | 1 | Bob | 21 |
    | 2 | Stan | 21 |
    | 3 | Sam | 22 |
    | 4 | Tony | 22 |
    | 5 | Alice | 22 |
    | 6 | Lucy | 23 |
    | 7 | Polly | 23 |
    | 8 | Tom | 23 |
    | 9 | Rose | 24 |
    | 10 | Jerry | 24 |
    | 11 | Jason | 24 |
    | 12 | Lily | 25 |
    | 13 | Stephen | 25 |
    | 14 | David | 25 |
    | 15 | Eddie | 26 |
    | 16 | Kate | 27 |
    | 17 | Cathy | 27 |
    | 18 | Judy | 27 |
    | 19 | Julia | 28 |
    | 20 | Robert | 28 |
    | 21 | Jack | 29 |
    +------+---------+-------+
    21 rows in set (0.00 sec)
  1. В вашем кластере Flink создайте таблицу с именем flink_test на основе схемы исходной таблицы Selena (которая в данном примере называется score_board). В команде создания таблицы вы должны настроить свойства задачи чтения, включая информацию о Flink коннекторе, исходной базе данных Selena и исходной таблице Selena.

    CREATE TABLE flink_test
    (
    `id` INT,
    `name` STRING,
    `score` INT
    )
    WITH
    (
    'connector'='starrocks',
    'scan-url'='192.168.xxx.xxx:8030',
    'jdbc-url'='jdbc:mysql://192.168.xxx.xxx:9030',
    'username'='xxxxxx',
    'password'='xxxxxx',
    'database-name'='test',
    'table-name'='score_board'
    );
  2. Используйте SELECT для чтения данных из Selena.

    SELECT id, name FROM flink_test WHERE score > 20;

При чтении данных с помощью Flink SQL обратите внимание на следующие моменты:

  • Вы можете использовать только SQL-операторы типа SELECT ... FROM <table_name> WHERE ... для чтения данных из Selena. Из всех агрегатных функций поддерживается только count.
  • Поддерживается проталкивание предикатов. Например, если ваш запрос содержит условие фильтрации char_1 <> 'A' and int_1 = -126, условие фильтрации будет проталкиваться в Flink коннектор и преобразовываться в оператор, который может быть выполнен Selena, перед выполнением запроса. Вам не нужно выполнять дополнительные настройки.
  • Оператор LIMIT не поддерживается.
  • Selena не поддерживает механизм контрольных точек. В результате согласованность данных не может быть гарантирована, если задача чтения завершается неудачей.
  1. Добавьте следующие зависимости в файл pom.xml:

    <dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-selena</artifactId>


    <version>x.x.x_flink-1.15</version>


    <version>x.x.x_flink-1.14_2.11</version>
    <version>x.x.x_flink-1.14_2.12</version>


    <version>x.x.x_flink-1.13_2.11</version>
    <version>x.x.x_flink-1.13_2.12</version>


    <version>x.x.x_flink-1.12_2.11</version>
    <version>x.x.x_flink-1.12_2.12</version>


    <version>x.x.x_flink-1.11_2.11</version>
    <version>x.x.x_flink-1.11_2.12</version>
    </dependency>

    Вы должны заменить x.x.x в приведенном выше примере кода на последнюю версию Flink коннектора, которую вы используете. См. Информация о версии.

  2. Вызовите Flink коннектор для чтения данных из Selena:

    import com.starrocks.connector.flink.StarRocksSource;
    import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.TableSchema;

    public class StarRocksSourceApp {
    public static void main(String[] args) throws Exception {
    StarRocksSourceOptions options = StarRocksSourceOptions.builder()
    .withProperty("scan-url", "192.168.xxx.xxx:8030")
    .withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030")
    .withProperty("username", "root")
    .withProperty("password", "")
    .withProperty("table-name", "score_board")
    .withProperty("database-name", "test")
    .build();
    TableSchema tableSchema = TableSchema.builder()
    .field("id", DataTypes.INT())
    .field("name", DataTypes.STRING())
    .field("score", DataTypes.INT())
    .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print();
    env.execute("StarRocks flink source");
    }

    }

Что дальше

После того как Flink успешно прочитает данные из Selena, вы можете использовать Flink WebUI для мониторинга задачи чтения. Например, вы можете просмотреть метрику totalScannedRows на странице Metrics WebUI, чтобы получить количество строк, которые были успешно прочитаны. Вы также можете использовать Flink SQL для выполнения вычислений, таких как соединения, с прочитанными данными.