Перейти к основному содержимому

Чтение данных из Selena с помощью Spark connector

Selena предоставляет собственный коннектор под названием Selena Connector for Apache Spark™ (сокращенно Spark connector), который помогает читать данные из таблицы Selena с помощью Spark. Вы можете использовать Spark для сложной обработки и машинного обучения на данных, прочитанных из Selena.

Spark connector поддерживает три метода чтения: Spark SQL, Spark DataFrame и Spark RDD.

Вы можете использовать Spark SQL для создания временного представления на таблице Selena, а затем напрямую читать данные из таблицы Selena, используя это временное представление.

Вы также можете сопоставить таблицу Selena с Spark DataFrame или Spark RDD, а затем читать данные из Spark DataFrame или Spark RDD. Мы рекомендуем использовать Spark DataFrame.

ВНИМАНИЕ

Только пользователи с привилегией SELECT на таблицу Selena могут читать данные из этой таблицы. Вы можете следовать инструкциям, предоставленным в GRANT, чтобы предоставить привилегию пользователю.

Примечания по использованию

  • Вы можете фильтровать данные в Selena перед их чтением, тем самым уменьшая объем передаваемых данных.
  • Если накладные расходы на чтение данных существенны, вы можете использовать соответствующий дизайн таблицы и условия фильтрации, чтобы предотвратить чтение Spark чрезмерного количества данных за раз. Таким образом, вы можете снизить нагрузку на ввод-вывод диска и сетевое соединение, обеспечивая правильное выполнение обычных запросов.

Требования к версиям

Spark connectorSparkSelenaJavaScala
1.1.23.2, 3.3, 3.4, 3.52.5 и позже82.12
1.1.13.2, 3.3, 3.42.5 и позже82.12
1.1.03.2, 3.3, 3.42.5 и позже82.12
1.0.03.x1.18 и позже82.12
1.0.02.x1.18 и позже82.11

ВНИМАНИЕ

  • Пожалуйста, смотрите Обновление Spark connector для изменений поведения между различными версиями коннектора.
  • Коннектор не предоставляет драйвер MySQL JDBC начиная с версии 1.5.0, и вам необходимо импортировать драйвер в classpath Spark вручную. Вы можете найти драйвер на Maven Central.
  • В версии 1.0.0 Spark connector поддерживает только чтение данных из Selena. Начиная с версии 1.5.0, Spark connector поддерживает как чтение данных из Selena, так и запись данных в Selena.
  • Версия 1.0.0 отличается от версии 1.1.0 по параметрам и сопоставлению типов данных. См. Обновление Spark connector.
  • В общих случаях новые функции не будут добавляться в версию 1.0.0. Мы рекомендуем обновить ваш Spark connector при первой возможности.

Получение Spark connector

Используйте один из следующих методов для получения пакета .jar Spark connector, который подходит для ваших бизнес-потребностей:

  • Загрузите скомпилированный пакет.
  • Используйте Maven для добавления зависимостей, требуемых Spark connector. (Этот метод поддерживается только для Spark connector 1.1.0 и позже.)
  • Вручную скомпилируйте пакет.

Spark connector 1.1.0 и позже

Пакеты .jar Spark connector именуются в следующем формате:

starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar

Например, если вы хотите использовать Spark connector 1.1.0 с Spark 3.2 и Scala 2.12, вы можете выбрать starrocks-spark-connector-3.2_2.12-1.1.0.jar.

ВНИМАНИЕ

В обычных случаях последняя версия Spark connector может использоваться с тремя самыми последними версиями Spark.

Загрузка скомпилированного пакета

Вы можете получить пакеты .jar Spark connector различных версий в Maven Central Repository.

Добавление зависимостей Maven

Настройте зависимости, требуемые Spark connector, следующим образом:

ВНИМАНИЕ

Вы должны заменить spark_version, scala_version и connector_version на версию Spark, версию Scala и версию Spark connector, которые вы используете.

<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency>

Например, если вы хотите использовать Spark connector 1.1.0 с Spark 3.2 и Scala 2.12, настройте зависимости следующим образом:

<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>

Ручная компиляция пакета

  1. Загрузите код Spark connector.

  2. Используйте следующую команду для компиляции Spark connector:

    ВНИМАНИЕ

    Вы должны заменить spark_version на версию Spark, которую вы используете.

    sh build.sh <spark_version>

    Например, если вы хотите использовать Spark connector с Spark 3.2, скомпилируйте Spark connector следующим образом:

    sh build.sh 3.2
  3. Перейдите в путь target/, в котором генерируется пакет .jar Spark connector типа starrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar после компиляции.

    ВНИМАНИЕ

    Если вы используете версию Spark connector, которая не выпущена официально, имя сгенерированного пакета .jar Spark connector содержит SNAPSHOT в качестве суффикса.

Spark connector 1.0.0

Загрузка скомпилированного пакета

Ручная компиляция пакета

  1. Загрузите код Spark connector.

    ВНИМАНИЕ

    Вы должны переключиться на spark-1.0.

  2. Выполните одно из следующих действий для компиляции Spark connector:

    • Если вы используете Spark 2.x, выполните следующую команду, которая компилирует Spark connector для Spark 2.3.4 по умолчанию:

      sh build.sh 2
    • Если вы используете Spark 3.x, выполните следующую команду, которая компилирует Spark connector для Spark 3.1.2 по умолчанию:

      sh build.sh 3
  3. Перейдите в путь output/, в котором генерируется файл starrocks-spark2_2.11-1.0.0.jar после компиляции. Затем скопируйте файл в classpath Spark:

    • Если ваш кластер Spark работает в режиме Local, поместите файл в путь jars/.
    • Если ваш кластер Spark работает в режиме Yarn, поместите файл в пакет предварительного развертывания.

Вы можете использовать Spark connector для чтения данных из Selena только после того, как поместите файл в указанное место.

Параметры

В этом разделе описываются параметры, которые необходимо настроить при использовании Spark connector для чтения данных из Selena.

Общие параметры

Следующие параметры применяются ко всем трем методам чтения: Spark SQL, Spark DataFrame и Spark RDD.

ПараметрЗначение по умолчаниюОписание
starrocks.fenodesNoneHTTP URL FE в вашем кластере Selena. Формат <fe_host>:<fe_http_port>. Вы можете указать несколько URL, которые должны быть разделены запятой (,).
starrocks.table.identifierNoneИмя таблицы Selena. Формат: <database_name>.<table_name>.
starrocks.request.retries3Максимальное количество раз, которое Spark может повторить отправку запроса на чтение в Selena.
starrocks.request.connect.timeout.ms30000Максимальное время, после которого запрос на чтение, отправленный в Selena, истекает.
starrocks.request.read.timeout.ms30000Максимальное время, после которого чтение для запроса, отправленного в Selena, истекает.
starrocks.request.query.timeout.s3600Максимальное время, после которого запрос данных из Selena истекает. Время ожидания по умолчанию составляет 1 час. -1 означает, что время ожидания не указано.
starrocks.request.tablet.sizeInteger.MAX_VALUEКоличество tablet Selena, сгруппированных в каждый раздел Spark RDD. Меньшее значение этого параметра указывает на то, что будет сгенерировано большее количество разделов Spark RDD. Большее количество разделов Spark RDD означает более высокий параллелизм в Spark, но большее давление на Selena.
starrocks.batch.size4096Максимальное количество строк, которые можно прочитать из BE за раз. Увеличение значения этого параметра может уменьшить количество соединений, установленных между Spark и Selena, тем самым смягчая дополнительные временные накладные расходы, вызванные сетевой задержкой.
starrocks.exec.mem.limit2147483648Максимальный объем памяти, разрешенный для каждого запроса. Единица: байты. Лимит памяти по умолчанию составляет 2 ГБ.
starrocks.deserialize.arrow.asyncfalseУказывает, поддерживать ли асинхронное преобразование формата памяти Arrow в RowBatches, необходимые для итерации Spark connector.
starrocks.deserialize.queue.size64Размер внутренней очереди, которая содержит задачи для асинхронного преобразования формата памяти Arrow в RowBatches. Этот параметр действителен, когда starrocks.deserialize.arrow.async установлен в true.
starrocks.filter.queryNoneУсловие, на основе которого вы хотите фильтровать данные в Selena. Вы можете указать несколько условий фильтрации, которые должны быть соединены с помощью and. Selena фильтрует данные из таблицы Selena на основе указанных условий фильтрации перед тем, как данные будут прочитаны Spark.
starrocks.timezoneЧасовой пояс JVM по умолчаниюПоддерживается с версии 1.1.1. Часовой пояс, используемый для преобразования DATETIME Selena в TimestampType Spark. По умолчанию это часовой пояс JVM, возвращаемый ZoneId#systemDefault(). Формат может быть именем часового пояса, таким как Asia/Shanghai, или смещением зоны, таким как +08:00.

Параметры для Spark SQL и Spark DataFrame

Следующие параметры применяются только к методам чтения Spark SQL и Spark DataFrame.

ПараметрЗначение по умолчаниюОписание
starrocks.fe.http.urlNoneHTTP IP-адрес FE. Этот параметр поддерживается начиная с Spark connector 1.1.0. Этот параметр эквивалентен starrocks.fenodes. Вам нужно настроить только один из них. В Spark connector 1.1.0 и позже мы рекомендуем использовать starrocks.fe.http.url, поскольку starrocks.fenodes может быть устаревшим.
starrocks.fe.jdbc.urlNoneАдрес, который используется для подключения к серверу MySQL FE. Формат: jdbc:mysql://<fe_host>:<fe_query_port>.
ВНИМАНИЕ
В Spark connector 1.1.0 и позже этот параметр является обязательным.
userNoneИмя пользователя вашей учетной записи кластера Selena. Пользователю нужна привилегия SELECT на таблицу Selena.
starrocks.userNoneИмя пользователя вашей учетной записи кластера Selena. Этот параметр поддерживается начиная с Spark connector 1.1.0. Этот параметр эквивалентен user. Вам нужно настроить только один из них. В Spark connector 1.1.0 и позже мы рекомендуем использовать starrocks.user, поскольку user может быть устаревшим.
passwordNoneПароль вашей учетной записи кластера Selena.
starrocks.passwordNoneПароль вашей учетной записи кластера Selena. Этот параметр поддерживается начиная с Spark connector 1.1.0. Этот параметр эквивалентен password. Вам нужно настроить только один из них. В Spark connector 1.1.0 и позже мы рекомендуем использовать starrocks.password, поскольку password может быть устаревшим.
starrocks.filter.query.in.max.count100Максимальное количество значений, поддерживаемых выражением IN во время проталкивания предикатов. Если количество значений, указанных в выражении IN, превышает этот лимит, условия фильтрации, указанные в выражении IN, обрабатываются в Spark.

Параметры для Spark RDD

Следующие параметры применяются только к методу чтения Spark RDD.

ПараметрЗначение по умолчаниюОписание
starrocks.request.auth.userNoneИмя пользователя вашей учетной записи кластера Selena.
starrocks.request.auth.passwordNoneПароль вашей учетной записи кластера Selena.
starrocks.read.fieldNoneСтолбец таблицы Selena, из которого вы хотите читать данные. Вы можете указать несколько столбцов, которые должны быть разделены запятой (,).

Сопоставление типов данных между Selena и Spark

Spark connector 1.1.0 и позже

Тип данных SelenaТип данных Spark
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
LARGEINTDataTypes.StringType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DECIMALDecimalType
CHARDataTypes.StringType
VARCHARDataTypes.StringType
STRINGDataTypes.StringType
DATEDataTypes.DateType
DATETIMEDataTypes.TimestampType
JSONDataTypes.StringType
ПРИМЕЧАНИЕ:
Это сопоставление типов данных поддерживается начиная с Spark connector v1.1.2 и требует версии Selena не менее 2.5.13, 3.0.3, 3.1.0 или позже.
ARRAYНеподдерживаемый тип данных
HLLНеподдерживаемый тип данных
BITMAPНеподдерживаемый тип данных

Spark connector 1.0.0

Тип данных SelenaТип данных Spark
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
LARGEINTDataTypes.StringType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DECIMALDecimalType
CHARDataTypes.StringType
VARCHARDataTypes.StringType
DATEDataTypes.StringType
DATETIMEDataTypes.StringType
ARRAYНеподдерживаемый тип данных
HLLНеподдерживаемый тип данных
BITMAPНеподдерживаемый тип данных

Логика обработки базового движка хранения, используемого Selena, не может покрыть ожидаемый временной диапазон при прямом использовании типов данных DATE и DATETIME. Поэтому Spark connector сопоставляет типы данных DATE и DATETIME из Selena с типом данных STRING из Spark и генерирует читаемые строковые тексты, соответствующие данным даты и времени, прочитанным из Selena.

Обновление Spark connector

Обновление с версии 1.0.0 до версии 1.1.0

  • Начиная с версии 1.5.0, Spark connector не предоставляет mysql-connector-java, который является официальным драйвером JDBC для MySQL, из-за ограничений лицензии GPL, используемой mysql-connector-java. Однако Spark connector все еще нуждается в mysql-connector-java для подключения к Selena для метаданных таблицы, поэтому вам нужно добавить драйвер в classpath Spark вручную. Вы можете найти драйвер на сайте MySQL или Maven Central.

  • В версии 1.1.0 Spark connector использует JDBC для доступа к Selena для получения более подробной информации о таблице. Поэтому вы должны настроить starrocks.fe.jdbc.url.

  • В версии 1.1.0 некоторые параметры переименованы. Как старые, так и новые параметры сохраняются пока. Для каждой пары эквивалентных параметров вам нужно настроить только один из них, но мы рекомендуем использовать новый, поскольку старый может быть устаревшим.

    • starrocks.fenodes переименован в starrocks.fe.http.url.
    • user переименован в starrocks.user.
    • password переименован в starrocks.password.
  • В версии 1.1.0 сопоставления некоторых типов данных корректируются на основе Spark 3.x:

    • DATE в Selena сопоставляется с DataTypes.DateType (изначально DataTypes.StringType) в Spark.
    • DATETIME в Selena сопоставляется с DataTypes.TimestampType (изначально DataTypes.StringType) в Spark.

Примеры

Следующие примеры предполагают, что вы создали базу данных с именем test в вашем кластере Selena и у вас есть разрешения пользователя root. Настройки параметров в примерах основаны на Spark Connector 1.1.0.

Конфигурация сети

Убедитесь, что машина, на которой расположен Spark, может получить доступ к узлам FE кластера Selena через http_port (по умолчанию: 8030) и query_port (по умолчанию: 9030), а также к узлам BE через be_port (по умолчанию: 9060).

Пример данных

Выполните следующие действия для подготовки образца таблицы:

  1. Перейдите в базу данных test и создайте таблицу с именем score_board.

    MySQL [test]> CREATE TABLE `score_board`
    (
    `id` int(11) NOT NULL COMMENT "",
    `name` varchar(65533) NULL DEFAULT "" COMMENT "",
    `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`id`)
    PROPERTIES (
    "replication_num" = "3"
    );
  2. Вставьте данные в таблицу score_board.

    MySQL [test]> INSERT INTO score_board
    VALUES
    (1, 'Bob', 21),
    (2, 'Stan', 21),
    (3, 'Sam', 22),
    (4, 'Tony', 22),
    (5, 'Alice', 22),
    (6, 'Lucy', 23),
    (7, 'Polly', 23),
    (8, 'Tom', 23),
    (9, 'Rose', 24),
    (10, 'Jerry', 24),
    (11, 'Jason', 24),
    (12, 'Lily', 25),
    (13, 'Stephen', 25),
    (14, 'David', 25),
    (15, 'Eddie', 26),
    (16, 'Kate', 27),
    (17, 'Cathy', 27),
    (18, 'Judy', 27),
    (19, 'Julia', 28),
    (20, 'Robert', 28),
    (21, 'Jack', 29);
  3. Запросите таблицу score_board.

    MySQL [test]> SELECT * FROM score_board;
    +------+---------+-------+
    | id | name | score |
    +------+---------+-------+
    | 1 | Bob | 21 |
    | 2 | Stan | 21 |
    | 3 | Sam | 22 |
    | 4 | Tony | 22 |
    | 5 | Alice | 22 |
    | 6 | Lucy | 23 |
    | 7 | Polly | 23 |
    | 8 | Tom | 23 |
    | 9 | Rose | 24 |
    | 10 | Jerry | 24 |
    | 11 | Jason | 24 |
    | 12 | Lily | 25 |
    | 13 | Stephen | 25 |
    | 14 | David | 25 |
    | 15 | Eddie | 26 |
    | 16 | Kate | 27 |
    | 17 | Cathy | 27 |
    | 18 | Judy | 27 |
    | 19 | Julia | 28 |
    | 20 | Robert | 28 |
    | 21 | Jack | 29 |
    +------+---------+-------+
    21 rows in set (0.01 sec)

Чтение данных с помощью Spark SQL

  1. Выполните следующую команду в каталоге Spark для запуска Spark SQL:

    sh spark-sql
  2. Выполните следующую команду для создания временного представления с именем spark_starrocks на таблице score_board, которая принадлежит базе данных test:

    spark-sql> CREATE TEMPORARY VIEW spark_starrocks
    USING starrocks
    OPTIONS
    (
    "starrocks.table.identifier" = "test.score_board",
    "starrocks.fe.http.url" = "<fe_host>:<fe_http_port>",
    "starrocks.fe.jdbc.url" = "jdbc:mysql://<fe_host>:<fe_query_port>",
    "starrocks.user" = "root",
    "starrocks.password" = ""
    );
  3. Выполните следующую команду для чтения данных из временного представления:

    spark-sql> SELECT * FROM spark_starrocks;

    Spark возвращает следующие данные:

    1        Bob        21
    2 Stan 21
    3 Sam 22
    4 Tony 22
    5 Alice 22
    6 Lucy 23
    7 Polly 23
    8 Tom 23
    9 Rose 24
    10 Jerry 24
    11 Jason 24
    12 Lily 25
    13 Stephen 25
    14 David 25
    15 Eddie 26
    16 Kate 27
    17 Cathy 27
    18 Judy 27
    19 Julia 28
    20 Robert 28
    21 Jack 29
    Time taken: 1.883 seconds, Fetched 21 row(s)
    22/08/09 15:29:36 INFO thriftserver.SparkSQLCLIDriver: Time taken: 1.883 seconds, Fetched 21 row(s)

Чтение данных с помощью Spark DataFrame

  1. Выполните следующую команду в каталоге Spark для запуска Spark Shell:

    sh spark-shell
  2. Выполните следующую команду для создания DataFrame с именем starrocksSparkDF на таблице score_board, которая принадлежит базе данных test:

    scala> val starrocksSparkDF = spark.read.format("starrocks")
    .option("starrocks.table.identifier", s"test.score_board")
    .option("starrocks.fe.http.url", s"<fe_host>:<fe_http_port>")
    .option("starrocks.fe.jdbc.url", s"jdbc:mysql://<fe_host>:<fe_query_port>")
    .option("starrocks.user", s"root")
    .option("starrocks.password", s"")
    .load()
  3. Читайте данные из DataFrame. Например, если вы хотите прочитать первые 10 строк, выполните следующую команду:

    scala> starrocksSparkDF.show(10)

    Spark возвращает следующие данные:

    +---+-----+-----+
    | id| name|score|
    +---+-----+-----+
    | 1| Bob| 21|
    | 2| Stan| 21|
    | 3| Sam| 22|
    | 4| Tony| 22|
    | 5|Alice| 22|
    | 6| Lucy| 23|
    | 7|Polly| 23|
    | 8| Tom| 23|
    | 9| Rose| 24|
    | 10|Jerry| 24|
    +---+-----+-----+
    only showing top 10 rows

    ПРИМЕЧАНИЕ

    По умолчанию, если вы не указываете количество строк, которые хотите прочитать, Spark возвращает первые 20 строк.

Чтение данных с помощью Spark RDD

  1. Выполните следующую команду в каталоге Spark для запуска Spark Shell:

    sh spark-shell
  2. Выполните следующую команду для создания RDD с именем starrocksSparkRDD на таблице score_board, которая принадлежит базе данных test.

    scala> import com.starrocks.connector.spark._
    scala> val starrocksSparkRDD = sc.starrocksRDD
    (
    tableIdentifier = Some("test.score_board"),
    cfg = Some(Map(
    "starrocks.fenodes" -> "<fe_host>:<fe_http_port>",
    "starrocks.request.auth.user" -> "root",
    "starrocks.request.auth.password" -> ""
    ))
    )
  3. Читайте данные из RDD. Например, если вы хотите прочитать первые 10 элементов, выполните следующую команду:

    scala> starrocksSparkRDD.take(10)

    Spark возвращает следующие данные:

    res0: Array[AnyRef] = Array([1, Bob, 21], [2, Stan, 21], [3, Sam, 22], [4, Tony, 22], [5, Alice, 22], [6, Lucy, 23], [7, Polly, 23], [8, Tom, 23], [9, Rose, 24], [10, Jerry, 24])

    Чтобы прочитать весь RDD, выполните следующую команду:

    scala> starrocksSparkRDD.collect()

    Spark возвращает следующие данные:

    res1: Array[AnyRef] = Array([1, Bob, 21], [2, Stan, 21], [3, Sam, 22], [4, Tony, 22], [5, Alice, 22], [6, Lucy, 23], [7, Polly, 23], [8, Tom, 23], [9, Rose, 24], [10, Jerry, 24], [11, Jason, 24], [12, Lily, 25], [13, Stephen, 25], [14, David, 25], [15, Eddie, 26], [16, Kate, 27], [17, Cathy, 27], [18, Judy, 27], [19, Julia, 28], [20, Robert, 28], [21, Jack, 29])

Лучшие практики

Когда вы читаете данные из Selena с помощью Spark connector, вы можете использовать параметр starrocks.filter.query для указания условий фильтрации, на основе которых Spark обрезает разделы, корзины и префиксные индексы для снижения стоимости извлечения данных. В этом разделе используется Spark DataFrame в качестве примера, чтобы показать, как это достигается.

Настройка среды

КомпонентВерсия
SparkSpark 2.4.4 и Scala 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_302)
Selena2.2.0
Spark connectorstarrocks-spark2_2.11-1.0.0.jar

Пример данных

Выполните следующие действия для подготовки образца таблицы:

  1. Перейдите в базу данных test и создайте таблицу с именем mytable.

    MySQL [test]> CREATE TABLE `mytable`
    (
    `k` int(11) NULL COMMENT "bucket",
    `b` int(11) NULL COMMENT "",
    `dt` datetime NULL COMMENT "",
    `v` int(11) NULL COMMENT ""
    )
    ENGINE=OLAP
    DUPLICATE KEY(`k`,`b`, `dt`)
    COMMENT "OLAP"
    PARTITION BY RANGE(`dt`)
    (
    PARTITION p202201 VALUES [('2022-01-01 00:00:00'), ('2022-02-01 00:00:00')),
    PARTITION p202202 VALUES [('2022-02-01 00:00:00'), ('2022-03-01 00:00:00')),
    PARTITION p202203 VALUES [('2022-03-01 00:00:00'), ('2022-04-01 00:00:00'))
    )
    DISTRIBUTED BY HASH(`k`)
    PROPERTIES (
    "replication_num" = "3"
    );
  2. Вставьте данные в mytable.

    MySQL [test]> INSERT INTO mytable
    VALUES
    (1, 11, '2022-01-02 08:00:00', 111),
    (2, 22, '2022-02-02 08:00:00', 222),
    (3, 33, '2022-03-02 08:00:00', 333);
  3. Запросите таблицу mytable.

    MySQL [test]> select * from mytable;
    +------+------+---------------------+------+
    | k | b | dt | v |
    +------+------+---------------------+------+
    | 1 | 11 | 2022-01-02 08:00:00 | 111 |
    | 2 | 22 | 2022-02-02 08:00:00 | 222 |
    | 3 | 33 | 2022-03-02 08:00:00 | 333 |
    +------+------+---------------------+------+
    3 rows in set (0.01 sec)

Полное сканирование таблицы

  1. Выполните следующую команду в каталоге Spark для создания DataFrame с именем df на таблице mytable, которая принадлежит базе данных test:

    scala>  val df = spark.read.format("starrocks")
    .option("starrocks.table.identifier", s"test.mytable")
    .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
    .option("user", s"root")
    .option("password", s"")
    .load()
  2. Просмотрите файл журнала FE fe.log вашего кластера Selena и найдите SQL-оператор, выполненный для чтения данных. Пример:

    2022-08-09 18:57:38,091 INFO (nioEventLoopGroup-3-10|196) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable`] from external service [ user ['root'@'%']] for database [test] table [mytable]
  3. В базе данных test используйте EXPLAIN для получения плана выполнения оператора SELECT k,b,dt,v from test.mytable:

    MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable`;
    +-----------------------------------------------------------------------+
    | Explain String |
    +-----------------------------------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
    | PARTITION: UNPARTITIONED |
    | |
    | RESULT SINK |
    | |
    | 1:EXCHANGE |
    | |
    | PLAN FRAGMENT 1 |
    | OUTPUT EXPRS: |
    | PARTITION: RANDOM |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 01 |
    | UNPARTITIONED |
    | |
    | 0:OlapScanNode |
    | TABLE: mytable |
    | PREAGGREGATION: ON |
    | partitions=3/3 |
    | rollup: mytable |
    | tabletRatio=9/9 |
    | tabletList=41297,41299,41301,41303,41305,41307,41309,41311,41313 |
    | cardinality=3 |
    | avgRowSize=4.0 |
    | numNodes=0 |
    +-----------------------------------------------------------------------+
    26 rows in set (0.00 sec)

В этом примере обрезка не выполняется. Поэтому Spark сканирует все три раздела (как указано в partitions=3/3), которые содержат данные, и сканирует все 9 tablet (как указано в tabletRatio=9/9) в этих трех разделах.

Обрезка разделов

  1. Выполните следующую команду, в которой вы используете параметр starrocks.filter.query для указания условия фильтрации dt='2022-01-02 08:00:00 для обрезки разделов, в каталоге Spark для создания DataFrame с именем df на таблице mytable, которая принадлежит базе данных test:

    scala> val df = spark.read.format("starrocks")
    .option("starrocks.table.identifier", s"test.mytable")
    .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
    .option("user", s"root")
    .option("password", s"")
    .option("starrocks.filter.query", "dt='2022-01-02 08:00:00'")
    .load()
  2. Просмотрите файл журнала FE fe.log вашего кластера Selena и найдите SQL-оператор, выполненный для чтения данных. Пример:

    2022-08-09 19:02:31,253 INFO (nioEventLoopGroup-3-14|204) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where dt='2022-01-02 08:00:00'] from external service [ user ['root'@'%']] for database [test] table [mytable]
  3. В базе данных test используйте EXPLAIN для получения плана выполнения оператора SELECT k,b,dt,v from test.mytable where dt='2022-01-02 08:00:00':

    MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where dt='2022-01-02 08:00:00';
    +------------------------------------------------+
    | Explain String |
    +------------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
    | PARTITION: UNPARTITIONED |
    | |
    | RESULT SINK |
    | |
    | 1:EXCHANGE |
    | |
    | PLAN FRAGMENT 1 |
    | OUTPUT EXPRS: |
    | PARTITION: RANDOM |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 01 |
    | UNPARTITIONED |
    | |
    | 0:OlapScanNode |
    | TABLE: mytable |
    | PREAGGREGATION: ON |
    | PREDICATES: 3: dt = '2022-01-02 08:00:00' |
    | partitions=1/3 |
    | rollup: mytable |
    | tabletRatio=3/3 |
    | tabletList=41297,41299,41301 |
    | cardinality=1 |
    | avgRowSize=20.0 |
    | numNodes=0 |
    +------------------------------------------------+
    27 rows in set (0.01 sec)

В этом примере выполняется только обрезка разделов, тогда как обрезка корзин не выполняется. Поэтому Spark сканирует один из трех разделов (как указано в partitions=1/3) и все tablet (как указано в tabletRatio=3/3) в этом разделе.

Обрезка корзин

  1. Выполните следующую команду, в которой вы используете параметр starrocks.filter.query для указания условия фильтрации k=1 для обрезки корзин, в каталоге Spark для создания DataFrame с именем df на таблице mytable, которая принадлежит базе данных test:

    scala> val df = spark.read.format("starrocks")
    .option("starrocks.table.identifier", s"test.mytable")
    .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
    .option("user", s"root")
    .option("password", s"")
    .option("starrocks.filter.query", "k=1")
    .load()
  2. Просмотрите файл журнала FE fe.log вашего кластера Selena и найдите SQL-оператор, выполненный для чтения данных. Пример:

    2022-08-09 19:04:44,479 INFO (nioEventLoopGroup-3-16|208) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=1] from external service [ user ['root'@'%']] for database [test] table [mytable]
  3. В базе данных test используйте EXPLAIN для получения плана выполнения оператора SELECT k,b,dt,v from test.mytable where k=1:

    MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=1;
    +------------------------------------------+
    | Explain String |
    +------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
    | PARTITION: UNPARTITIONED |
    | |
    | RESULT SINK |
    | |
    | 1:EXCHANGE |
    | |
    | PLAN FRAGMENT 1 |
    | OUTPUT EXPRS: |
    | PARTITION: RANDOM |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 01 |
    | UNPARTITIONED |
    | |
    | 0:OlapScanNode |
    | TABLE: mytable |
    | PREAGGREGATION: ON |
    | PREDICATES: 1: k = 1 |
    | partitions=3/3 |
    | rollup: mytable |
    | tabletRatio=3/9 |
    | tabletList=41299,41305,41311 |
    | cardinality=1 |
    | avgRowSize=20.0 |
    | numNodes=0 |
    +------------------------------------------+
    27 rows in set (0.01 sec)

В этом примере выполняется только обрезка корзин, тогда как обрезка разделов не выполняется. Поэтому Spark сканирует все три раздела (как указано в partitions=3/3), которые содержат данные, и сканирует все три tablet (как указано в tabletRatio=3/9) для получения значений Hash, которые соответствуют условию фильтрации k = 1 в этих трех разделах.

Обрезка разделов и обрезка корзин

  1. Выполните следующую команду, в которой вы используете параметр starrocks.filter.query для указания двух условий фильтрации k=7 и dt='2022-01-02 08:00:00' для обрезки корзин и обрезки разделов, в каталоге Spark для создания DataFrame с именем df на таблице mytable в базе данных test:

    scala> val df = spark.read.format("starrocks")
    .option("starrocks.table.identifier", s"test.mytable")
    .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
    .option("user", s"")
    .option("password", s"")
    .option("starrocks.filter.query", "k=7 and dt='2022-01-02 08:00:00'")
    .load()
  2. Просмотрите файл журнала FE fe.log вашего кластера Selena и найдите SQL-оператор, выполненный для чтения данных. Пример:

    2022-08-09 19:06:34,939 INFO (nioEventLoopGroup-3-18|212) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=7 and dt='2022-01-02 08:00:00'] from external service [ user ['root'@'%']] for database [test] t
    able [mytable]
  3. В базе данных test используйте EXPLAIN для получения плана выполнения оператора SELECT k,b,dt,v from test.mytable where k=7 and dt='2022-01-02 08:00:00':

    MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=7 and dt='2022-01-02 08:00:00';
    +----------------------------------------------------------+
    | Explain String |
    +----------------------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
    | PARTITION: RANDOM |
    | |
    | RESULT SINK |
    | |
    | 0:OlapScanNode |
    | TABLE: mytable |
    | PREAGGREGATION: ON |
    | PREDICATES: 1: k = 7, 3: dt = '2022-01-02 08:00:00' |
    | partitions=1/3 |
    | rollup: mytable |
    | tabletRatio=1/3 |
    | tabletList=41301 |
    | cardinality=1 |
    | avgRowSize=20.0 |
    | numNodes=0 |
    +----------------------------------------------------------+
    17 rows in set (0.00 sec)

В этом примере выполняются как обрезка разделов, так и обрезка корзин. Поэтому Spark сканирует только один из трех разделов (как указано в partitions=1/3) и только один tablet (как указано в tabletRatio=1/3) в этом разделе.

Фильтрация префиксного индекса

  1. Вставьте больше записей данных в раздел таблицы mytable, которая принадлежит базе данных test:

    MySQL [test]> INSERT INTO mytable
    VALUES
    (1, 11, "2022-01-02 08:00:00", 111),
    (3, 33, "2022-01-02 08:00:00", 333),
    (3, 33, "2022-01-02 08:00:00", 333),
    (3, 33, "2022-01-02 08:00:00", 333);
  2. Запросите таблицу mytable:

    MySQL [test]> SELECT * FROM mytable;
    +------+------+---------------------+------+
    | k | b | dt | v |
    +------+------+---------------------+------+
    | 1 | 11 | 2022-01-02 08:00:00 | 111 |
    | 1 | 11 | 2022-01-02 08:00:00 | 111 |
    | 3 | 33 | 2022-01-02 08:00:00 | 333 |
    | 3 | 33 | 2022-01-02 08:00:00 | 333 |
    | 3 | 33 | 2022-01-02 08:00:00 | 333 |
    | 2 | 22 | 2022-02-02 08:00:00 | 222 |
    | 3 | 33 | 2022-03-02 08:00:00 | 333 |
    +------+------+---------------------+------+
    7 rows in set (0.01 sec)
  3. Выполните следующую команду, в которой вы используете параметр starrocks.filter.query для указания условия фильтрации k=1 для фильтрации префиксного индекса, в каталоге Spark для создания DataFrame с именем df на таблице mytable, которая принадлежит базе данных test:

    scala> val df = spark.read.format("starrocks")
    .option("starrocks.table.identifier", s"test.mytable")
    .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
    .option("user", s"root")
    .option("password", s"")
    .option("starrocks.filter.query", "k=1")
    .load()
  4. В базе данных test установите is_report_success в true для включения отчетности профиля:

    MySQL [test]> SET is_report_success = true;
    Query OK, 0 rows affected (0.00 sec)
  5. Используйте браузер для открытия страницы http://<fe_host>:<http_http_port>/query и просмотрите профиль оператора SELECT * FROM mytable where k=1. Пример:

    OLAP_SCAN (plan_node_id=0):
    CommonMetrics:
    - CloseTime: 1.255ms
    - OperatorTotalTime: 1.404ms
    - PeakMemoryUsage: 0.00
    - PullChunkNum: 8
    - PullRowNum: 2
    - __MAX_OF_PullRowNum: 2
    - __MIN_OF_PullRowNum: 0
    - PullTotalTime: 148.60us
    - PushChunkNum: 0
    - PushRowNum: 0
    - PushTotalTime: 0ns
    - SetFinishedTime: 136ns
    - SetFinishingTime: 129ns
    UniqueMetrics:
    - Predicates: 1: k = 1
    - Rollup: mytable
    - Table: mytable
    - BytesRead: 88.00 B
    - __MAX_OF_BytesRead: 88.00 B
    - __MIN_OF_BytesRead: 0.00
    - CachedPagesNum: 0
    - CompressedBytesRead: 844.00 B
    - __MAX_OF_CompressedBytesRead: 844.00 B
    - __MIN_OF_CompressedBytesRead: 0.00
    - CreateSegmentIter: 18.582us
    - IOTime: 4.425us
    - LateMaterialize: 17.385us
    - PushdownPredicates: 3
    - RawRowsRead: 2
    - __MAX_OF_RawRowsRead: 2
    - __MIN_OF_RawRowsRead: 0
    - ReadPagesNum: 12
    - __MAX_OF_ReadPagesNum: 12
    - __MIN_OF_ReadPagesNum: 0
    - RowsRead: 2
    - __MAX_OF_RowsRead: 2
    - __MIN_OF_RowsRead: 0
    - ScanTime: 154.367us
    - SegmentInit: 95.903us
    - BitmapIndexFilter: 0ns
    - BitmapIndexFilterRows: 0
    - BloomFilterFilterRows: 0
    - ShortKeyFilterRows: 3
    - __MAX_OF_ShortKeyFilterRows: 3
    - __MIN_OF_ShortKeyFilterRows: 0
    - ZoneMapIndexFilterRows: 0
    - SegmentRead: 2.559us
    - BlockFetch: 2.187us
    - BlockFetchCount: 2
    - __MAX_OF_BlockFetchCount: 2
    - __MIN_OF_BlockFetchCount: 0
    - BlockSeek: 7.789us
    - BlockSeekCount: 2
    - __MAX_OF_BlockSeekCount: 2
    - __MIN_OF_BlockSeekCount: 0
    - ChunkCopy: 25ns
    - DecompressT: 0ns
    - DelVecFilterRows: 0
    - IndexLoad: 0ns
    - PredFilter: 353ns
    - PredFilterRows: 0
    - RowsetsReadCount: 7
    - SegmentsReadCount: 3
    - __MAX_OF_SegmentsReadCount: 2
    - __MIN_OF_SegmentsReadCount: 0
    - TotalColumnsDataPageCount: 8
    - __MAX_OF_TotalColumnsDataPageCount: 8
    - __MIN_OF_TotalColumnsDataPageCount: 0
    - UncompressedBytesRead: 508.00 B
    - __MAX_OF_UncompressedBytesRead: 508.00 B
    - __MIN_OF_UncompressedBytesRead: 0.00

В этом примере условие фильтрации k = 1 может попасть в префиксный индекс. Поэтому Spark может отфильтровать три строки (как указано в ShortKeyFilterRows: 3).