Перейти к основному содержимому
Версия: 2.0.x

Чтение данных из Selena с помощью Spark connector

Selena предоставляет собственный connector под названием Selena Connector for Apache Spark™ (сокращенно Spark connector), который помогает читать данные из таблицы Selena с помощью Spark. Вы можете использовать Spark для сложной обработки и машинного обучения на данных, прочитанных из Selena.

Spark connector поддерживает три метода чтения: Spark SQL, Spark DataFrame и Spark RDD.

Вы можете использовать Spark SQL для создания временного представления на таблице Selena, а затем напрямую читать данные из таблицы Selena, используя это временное представление.

Вы также можете сопоставить таблицу Selena с Spark DataFrame или Spark RDD, а затем читать данные из Spark DataFrame или Spark RDD. Мы рекомендуем использовать Spark DataFrame.

NOTICE

Только пользователи с привилегией SELECT на таблицу Selena могут читать данные из этой таблицы. Вы можете следовать инструкциям, предоставленным в GRANT, чтобы предоставить привилегию пользователю.

Примечания по использованию

  • Вы можете фильтровать данные в Selena перед чтением, тем самым уменьшая объем передаваемых данных.
  • Если накладные расходы на чтение данных существенны, вы можете использовать соответствующий дизайн таблицы и условия фильтрации, чтобы предотвратить чтение Spark чрезмерного объема данных за раз. Таким образом, вы можете снизить нагрузку на ввод-вывод диска и сетевое подключение, тем самым обеспечивая правильное выполнение обычных запросов.

Требования к версиям

Spark connectorSparkSelenaJavaScala
1.1.23.2, 3.3, 3.4, 3.52.5 и новее82.12
1.1.13.2, 3.3, 3.42.5 и новее82.12
1.1.03.2, 3.3, 3.42.5 и новее82.12
1.0.03.x1.18 и новее82.12
1.0.02.x1.18 и новее82.11

NOTICE

  • Пожалуйста, см. Обновление Spark connector для изменений в поведении между различными версиями connector.
  • Connector не предоставляет MySQL JDBC driver начиная с версии 1.1.1, и вам необходимо импортировать driver в classpath Spark вручную. Вы можете найти driver на Maven Central.
  • В версии 1.0.0 Spark connector поддерживает только чтение данных из Selena. Начиная с версии 1.1.0, Spark connector поддерживает как чтение данных из Selena, так и запись данных в Selena.
  • Версия 1.0.0 отличается от версии 1.1.0 по параметрам и сопоставлению типов данных. См. Обновление Spark connector.
  • В общих случаях новые функции не будут добавляться в версию 1.0.0. Мы рекомендуем обновить ваш Spark connector при первой возможности.

Получение Spark connector

Используйте один из следующих методов для получения пакета .jar Spark connector, который соответствует вашим бизнес-потребностям:

  • Загрузить скомпилированный пакет.
  • Использовать Maven для добавления зависимостей, требуемых Spark connector. (Этот метод поддерживается только для Spark connector 1.1.0 и новее.)
  • Вручную скомпилировать пакет.

Spark connector 1.1.0 и новее

Пакеты .jar Spark connector названы в следующем формате:

selena-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar

Например, если вы хотите использовать Spark connector 1.1.0 со Spark 3.2 и Scala 2.12, вы можете выбрать selena-spark-connector-3.2_2.12-1.1.0.jar.

NOTICE

В обычных случаях последняя версия Spark connector может использоваться с тремя последними версиями Spark.

Загрузка скомпилированного пакета

Вы можете получить пакеты .jar Spark connector различных версий на Maven Central Repository.

Добавление зависимостей Maven

Настройте зависимости, требуемые Spark connector, следующим образом:

NOTICE

Вы должны заменить spark_version, scala_version и connector_version на версию Spark, версию Scala и версию Spark connector, которые вы используете.

<dependency>
<groupId>com.selena</groupId>
<artifactId>selena-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency>

Например, если вы хотите использовать Spark connector 1.1.0 со Spark 3.2 и Scala 2.12, настройте зависимости следующим образом:

<dependency>
<groupId>com.selena</groupId>
<artifactId>selena-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>

Ручная компиляция пакета

  1. Загрузите код Spark connector.

  2. Используйте следующую команду для компиляции Spark connector:

    NOTICE

    Вы должны заменить spark_version на версию Spark, которую вы используете.

    sh build.sh <spark_version>

    Например, если вы хотите использовать Spark connector со Spark 3.2, скомпилируйте Spark connector следующим образом:

    sh build.sh 3.2
  3. Перейдите по пути target/, где создается пакет .jar Spark connector, такой как selena-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar, после компиляции.

    NOTICE

    Если вы используете версию Spark connector, которая не выпущена официально, имя созданного пакета .jar Spark connector содержит SNAPSHOT в качестве суффикса.

Spark connector 1.0.0

Загрузка скомпилированного пакета

Ручная компиляция пакета

  1. Загрузите код Spark connector.

    NOTICE

    Вы должны переключиться на spark-1.0.

  2. Выполните одно из следующих действий для компиляции Spark connector:

    • Если вы используете Spark 2.x, выполните следующую команду, которая компилирует Spark connector для Spark 2.3.4 по умолчанию:

      sh build.sh 2
    • Если вы используете Spark 3.x, выполните следующую команду, которая компилирует Spark connector для Spark 3.1.2 по умолчанию:

      sh build.sh 3
  3. Перейдите по пути output/, где создается файл selena-spark2_2.11-1.0.0.jar после компиляции. Затем скопируйте файл в classpath Spark:

    • Если ваш кластер Spark работает в режиме Local, поместите файл в путь jars/.
    • Если ваш кластер Spark работает в режиме Yarn, поместите файл в пакет для предварительного развертывания.

Вы можете использовать Spark connector для чтения данных из Selena только после того, как поместите файл в указанное место.

Параметры

В этом разделе описаны параметры, которые вам необходимо настроить при использовании Spark connector для чтения данных из Selena.

Общие параметры

Следующие параметры применяются ко всем трем методам чтения: Spark SQL, Spark DataFrame и Spark RDD.

ПараметрЗначение по умолчаниюОписание
selena.fenodesNoneHTTP URL FE в вашем кластере Selena. Формат <fe_host>:<fe_http_port>. Вы можете указать несколько URL, которые должны быть разделены запятой (,).
selena.table.identifierNoneИмя таблицы Selena. Формат: <database_name>.<table_name>.
selena.request.retries3Максимальное количество повторных попыток отправки запроса на чтение в Selena.
selena.request.connect.timeout.ms30000Максимальное время, после которого запрос на чтение, отправленный в Selena, истекает.
selena.request.read.timeout.ms30000Максимальное время, после которого чтение для запроса, отправленного в Selena, истекает.
selena.request.query.timeout.s3600Максимальное время, после которого запрос данных из Selena истекает. Период тайм-аута по умолчанию - 1 час. -1 означает, что период тайм-аута не указан.
selena.request.tablet.sizeInteger.MAX_VALUEКоличество tablet Selena, сгруппированных в каждый partition Spark RDD. Меньшее значение этого параметра означает, что будет создано большее количество partition Spark RDD. Большее количество partition Spark RDD означает более высокий параллелизм в Spark, но большее давление на Selena.
selena.batch.size4096Максимальное количество строк, которые могут быть прочитаны из BE за раз. Увеличение значения этого параметра может уменьшить количество соединений, установленных между Spark и Selena, тем самым смягчая дополнительные временные издержки, вызванные задержкой сети.
selena.exec.mem.limit2147483648Максимальный объем памяти, разрешенный для одного запроса. Единица: байты. Ограничение памяти по умолчанию - 2 ГБ.
selena.deserialize.arrow.asyncfalseУказывает, поддерживается ли асинхронное преобразование формата памяти Arrow в RowBatches, необходимые для итерации Spark connector.
selena.deserialize.queue.size64Размер внутренней очереди, которая содержит задачи для асинхронного преобразования формата памяти Arrow в RowBatches. Этот параметр действителен, когда selena.deserialize.arrow.async установлен в true.
selena.filter.queryNoneУсловие, на основе которого вы хотите фильтровать данные в Selena. Вы можете указать несколько условий фильтрации, которые должны быть объединены с помощью and. Selena фильтрует данные из таблицы Selena на основе указанных условий фильтрации перед тем, как данные будут прочитаны Spark.
selena.timezoneЧасовой пояс JVM по умолчаниюПоддерживается с версии 1.1.1. Часовой пояс, используемый для преобразования Selena DATETIME в Spark TimestampType. По умолчанию используется часовой пояс JVM, возвращаемый ZoneId#systemDefault(). Формат может быть именем часового пояса, таким как Asia/Shanghai, или смещением зоны, таким как +08:00.

Параметры для Spark SQL и Spark DataFrame

Следующие параметры применяются только к методам чтения Spark SQL и Spark DataFrame.

ПараметрЗначение по умолчаниюОписание
selena.fe.http.urlNoneHTTP IP-адрес FE. Этот параметр поддерживается начиная с Spark connector 1.1.0. Этот параметр эквивалентен selena.fenodes. Вам нужно настроить только один из них. В Spark connector 1.1.0 и новее мы рекомендуем использовать selena.fe.http.url, потому что selena.fenodes может быть устаревшим.
selena.fe.jdbc.urlNoneАдрес, используемый для подключения к MySQL серверу FE. Формат: jdbc:mysql://<fe_host>:<fe_query_port>.
NOTICE
В Spark connector 1.1.0 и новее этот параметр обязателен.
userNoneИмя пользователя учетной записи кластера Selena. Пользователю нужна привилегия SELECT на таблицу Selena.
selena.userNoneИмя пользователя учетной записи кластера Selena. Этот параметр поддерживается начиная с Spark connector 1.1.0. Этот параметр эквивалентен user. Вам нужно настроить только один из них. В Spark connector 1.1.0 и новее мы рекомендуем использовать selena.user, потому что user может быть устаревшим.
passwordNoneПароль учетной записи кластера Selena.
selena.passwordNoneПароль учетной записи кластера Selena. Этот параметр поддерживается начиная с Spark connector 1.1.0. Этот параметр эквивалентен password. Вам нужно настроить только один из них. В Spark connector 1.1.0 и новее мы рекомендуем использовать selena.password, потому что password может быть устаревшим.
selena.filter.query.in.max.count100Максимальное количество значений, поддерживаемых выражением IN во время проталкивания предиката. Если количество значений, указанных в выражении IN, превышает этот лимит, условия фильтрации, указанные в выражении IN, обрабатываются в Spark.

Параметры для Spark RDD

Следующие параметры применяются только к методу чтения Spark RDD.

ПараметрЗначение по умолчаниюОписание
selena.request.auth.userNoneИмя пользователя учетной записи кластера Selena.
selena.request.auth.passwordNoneПароль учетной записи кластера Selena.
selena.read.fieldNoneСтолбец таблицы Selena, из которого вы хотите читать данные. Вы можете указать несколько столбцов, которые должны быть разделены запятой (,).

Сопоставление типов данных между Selena и Spark

Spark connector 1.1.0 и новее

Тип данных SelenaТип данных Spark
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
LARGEINTDataTypes.StringType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DECIMALDecimalType
CHARDataTypes.StringType
VARCHARDataTypes.StringType
STRINGDataTypes.StringType
DATEDataTypes.DateType
DATETIMEDataTypes.TimestampType
JSONDataTypes.StringType
NOTE:
Это сопоставление типов данных поддерживается с Spark connector v1.1.2 и требует версии Selena не менее 2.5.13, 3.0.3, 3.1.0 или новее.
ARRAYНеподдерживаемый тип данных
HLLНеподдерживаемый тип данных
BITMAPНеподдерживаемый тип данных

Spark connector 1.0.0

Тип данных SelenaТип данных Spark
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
LARGEINTDataTypes.StringType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DECIMALDecimalType
CHARDataTypes.StringType
VARCHARDataTypes.StringType
DATEDataTypes.StringType
DATETIMEDataTypes.StringType
ARRAYНеподдерживаемый тип данных
HLLНеподдерживаемый тип данных
BITMAPНеподдерживаемый тип данных

Логика обработки базового движка хранения, используемого Selena, не может покрыть ожидаемый диапазон времени при прямом использовании типов данных DATE и DATETIME. Поэтому Spark connector сопоставляет типы данных DATE и DATETIME из Selena с типом данных STRING из Spark и генерирует читаемые текстовые строки, соответствующие данным даты и времени, прочитанным из Selena.

Обновление Spark connector

Обновление с версии 1.0.0 до версии 1.1.0

  • Начиная с 1.1.1, Spark connector не предоставляет mysql-connector-java, который является официальным JDBC драйвером для MySQL, из-за ограничений лицензии GPL, используемой mysql-connector-java. Однако Spark connector все еще нуждается в mysql-connector-java для подключения к Selena для получения метаданных таблицы, поэтому вам нужно добавить драйвер в classpath Spark вручную. Вы можете найти драйвер на сайте MySQL или Maven Central.

  • В версии 1.1.0 Spark connector использует JDBC для доступа к Selena для получения более детальной информации о таблице. Поэтому вы должны настроить selena.fe.jdbc.url.

  • В версии 1.1.0 некоторые параметры переименованы. Оба старых и новых параметра сохранены на данный момент. Для каждой пары эквивалентных параметров вам нужно настроить только один из них, но мы рекомендуем использовать новый, потому что старый может быть устаревшим.

    • selena.fenodes переименован в selena.fe.http.url.
    • user переименован в selena.user.
    • password переименован в selena.password.
  • В версии 1.1.0 сопоставления некоторых типов данных скорректированы на основе Spark 3.x:

    • DATE в Selena сопоставлен с DataTypes.DateType (изначально DataTypes.StringType) в Spark.
    • DATETIME в Selena сопоставлен с DataTypes.TimestampType (изначально DataTypes.StringType) в Spark.

Примеры

Следующие примеры предполагают, что вы создали базу данных с именем test в вашем кластере Selena и у вас есть разрешения пользователя root. Настройки параметров в примерах основаны на Spark Connector 1.1.0.

Настройка сети

Убедитесь, что машина, на которой расположен Spark, может получить доступ к узлам FE кластера Selena через http_port (по умолчанию: 8030) и query_port (по умолчанию: 9030), а также к узлам BE через be_port (по умолчанию: 9060).

Пример данных

Выполните следующие действия для подготовки примера таблицы:

  1. Перейдите в базу данных test и создайте таблицу с именем score_board.

    MySQL [test]> CREATE TABLE `score_board`
    (
    `id` int(11) NOT NULL COMMENT "",
    `name` varchar(65533) NULL DEFAULT "" COMMENT "",
    `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`id`)
    PROPERTIES (
    "replication_num" = "3"
    );
  2. Вставьте данные в таблицу score_board.

    MySQL [test]> INSERT INTO score_board
    VALUES
    (1, 'Bob', 21),
    (2, 'Stan', 21),
    (3, 'Sam', 22),
    (4, 'Tony', 22),
    (5, 'Alice', 22),
    (6, 'Lucy', 23),
    (7, 'Polly', 23),
    (8, 'Tom', 23),
    (9, 'Rose', 24),
    (10, 'Jerry', 24),
    (11, 'Jason', 24),
    (12, 'Lily', 25),
    (13, 'Stephen', 25),
    (14, 'David', 25),
    (15, 'Eddie', 26),
    (16, 'Kate', 27),
    (17, 'Cathy', 27),
    (18, 'Judy', 27),
    (19, 'Julia', 28),
    (20, 'Robert', 28),
    (21, 'Jack', 29);
  3. Запросите таблицу score_board.

    MySQL [test]> SELECT * FROM score_board;
    +------+---------+-------+
    | id | name | score |
    +------+---------+-------+
    | 1 | Bob | 21 |
    | 2 | Stan | 21 |
    | 3 | Sam | 22 |
    | 4 | Tony | 22 |
    | 5 | Alice | 22 |
    | 6 | Lucy | 23 |
    | 7 | Polly | 23 |
    | 8 | Tom | 23 |
    | 9 | Rose | 24 |
    | 10 | Jerry | 24 |
    | 11 | Jason | 24 |
    | 12 | Lily | 25 |
    | 13 | Stephen | 25 |
    | 14 | David | 25 |
    | 15 | Eddie | 26 |
    | 16 | Kate | 27 |
    | 17 | Cathy | 27 |
    | 18 | Judy | 27 |
    | 19 | Julia | 28 |
    | 20 | Robert | 28 |
    | 21 | Jack | 29 |
    +------+---------+-------+
    21 rows in set (0.01 sec)

Чтение данных с помощью Spark SQL

  1. Выполните следующую команду в директории Spark для запуска Spark SQL:

    sh spark-sql
  2. Выполните следующую команду для создания временного представления с именем spark_selena на таблице score_board, которая принадлежит базе данных test:

    spark-sql> CREATE TEMPORARY VIEW spark_selena
    USING selena
    OPTIONS
    (
    "selena.table.identifier" = "test.score_board",
    "selena.fe.http.url" = "<fe_host>:<fe_http_port>",
    "selena.fe.jdbc.url" = "jdbc:mysql://<fe_host>:<fe_query_port>",
    "selena.user" = "root",
    "selena.password" = ""
    );
  3. Выполните следующую команду для чтения данных из временного представления:

    spark-sql> SELECT * FROM spark_selena;

    Spark возвращает следующие данные:

    1        Bob        21
    2 Stan 21
    3 Sam 22
    4 Tony 22
    5 Alice 22
    6 Lucy 23
    7 Polly 23
    8 Tom 23
    9 Rose 24
    10 Jerry 24
    11 Jason 24
    12 Lily 25
    13 Stephen 25
    14 David 25
    15 Eddie 26
    16 Kate 27
    17 Cathy 27
    18 Judy 27
    19 Julia 28
    20 Robert 28
    21 Jack 29
    Time taken: 1.883 seconds, Fetched 21 row(s)
    22/08/09 15:29:36 INFO thriftserver.SparkSQLCLIDriver: Time taken: 1.883 seconds, Fetched 21 row(s)

Чтение данных с помощью Spark DataFrame

  1. Выполните следующую команду в директории Spark для запуска Spark Shell:

    sh spark-shell
  2. Выполните следующую команду для создания DataFrame с именем selenaSparkDF на таблице score_board, которая принадлежит базе данных test:

    scala> val selenaSparkDF = spark.read.format("selena")
    .option("selena.table.identifier", s"test.score_board")
    .option("selena.fe.http.url", s"<fe_host>:<fe_http_port>")
    .option("selena.fe.jdbc.url", s"jdbc:mysql://<fe_host>:<fe_query_port>")
    .option("selena.user", s"root")
    .option("selena.password", s"")
    .load()
  3. Прочитайте данные из DataFrame. Например, если вы хотите прочитать первые 10 строк, выполните следующую команду:

    scala> selenaSparkDF.show(10)

    Spark возвращает следующие данные:

    +---+-----+-----+
    | id| name|score|
    +---+-----+-----+
    | 1| Bob| 21|
    | 2| Stan| 21|
    | 3| Sam| 22|
    | 4| Tony| 22|
    | 5|Alice| 22|
    | 6| Lucy| 23|
    | 7|Polly| 23|
    | 8| Tom| 23|
    | 9| Rose| 24|
    | 10|Jerry| 24|
    +---+-----+-----+
    only showing top 10 rows

    NOTE

    По умолчанию, если вы не указываете количество строк, которые хотите прочитать, Spark возвращает первые 20 строк.

Чтение данных с помощью Spark RDD

  1. Выполните следующую команду в директории Spark для запуска Spark Shell:

    sh spark-shell
  2. Выполните следующую команду для создания RDD с именем selenaSparkRDD на таблице score_board, которая принадлежит базе данных test.

    scala> import com.selena.connector.spark._
    scala> val selenaSparkRDD = sc.selenaRDD
    (
    tableIdentifier = Some("test.score_board"),
    cfg = Some(Map(
    "selena.fenodes" -> "<fe_host>:<fe_http_port>",
    "selena.request.auth.user" -> "root",
    "selena.request.auth.password" -> ""
    ))
    )
  3. Прочитайте данные из RDD. Например, если вы хотите прочитать первые 10 элементов, выполните следующую команду:

    scala> selenaSparkRDD.take(10)

    Spark возвращает следующие данные:

    res0: Array[AnyRef] = Array([1, Bob, 21], [2, Stan, 21], [3, Sam, 22], [4, Tony, 22], [5, Alice, 22], [6, Lucy, 23], [7, Polly, 23], [8, Tom, 23], [9, Rose, 24], [10, Jerry, 24])

    Чтобы прочитать весь RDD, выполните следующую команду:

    scala> selenaSparkRDD.collect()

    Spark возвращает следующие данные:

    res1: Array[AnyRef] = Array([1, Bob, 21], [2, Stan, 21], [3, Sam, 22], [4, Tony, 22], [5, Alice, 22], [6, Lucy, 23], [7, Polly, 23], [8, Tom, 23], [9, Rose, 24], [10, Jerry, 24], [11, Jason, 24], [12, Lily, 25], [13, Stephen, 25], [14, David, 25], [15, Eddie, 26], [16, Kate, 27], [17, Cathy, 27], [18, Judy, 27], [19, Julia, 28], [20, Robert, 28], [21, Jack, 29])

Лучшие практики

При чтении данных из Selena с помощью Spark connector вы можете использовать параметр selena.filter.query для указания условий фильтрации, на основе которых Spark обрезает partition, bucket и префиксные индексы, чтобы уменьшить стоимость извлечения данных. В этом разделе Spark DataFrame используется в качестве примера, чтобы показать, как это достигается.

Настройка среды

КомпонентВерсия
SparkSpark 2.4.4 и Scala 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_302)
Selena2.2.0
Spark connectorselena-spark2_2.11-1.0.0.jar

Пример данных

Выполните следующие действия для подготовки примера таблицы:

  1. Перейдите в базу данных test и создайте таблицу с именем mytable.

    MySQL [test]> CREATE TABLE `mytable`
    (
    `k` int(11) NULL COMMENT "bucket",
    `b` int(11) NULL COMMENT "",
    `dt` datetime NULL COMMENT "",
    `v` int(11) NULL COMMENT ""
    )
    ENGINE=OLAP
    DUPLICATE KEY(`k`,`b`, `dt`)
    COMMENT "OLAP"
    PARTITION BY RANGE(`dt`)
    (
    PARTITION p202201 VALUES [('2022-01-01 00:00:00'), ('2022-02-01 00:00:00')),
    PARTITION p202202 VALUES [('2022-02-01 00:00:00'), ('2022-03-01 00:00:00')),
    PARTITION p202203 VALUES [('2022-03-01 00:00:00'), ('2022-04-01 00:00:00'))
    )
    DISTRIBUTED BY HASH(`k`)
    PROPERTIES (
    "replication_num" = "3"
    );
  2. Вставьте данные в mytable.

    MySQL [test]> INSERT INTO mytable
    VALUES
    (1, 11, '2022-01-02 08:00:00', 111),
    (2, 22, '2022-02-02 08:00:00', 222),
    (3, 33, '2022-03-02 08:00:00', 333);
  3. Запросите таблицу mytable.

    MySQL [test]> select * from mytable;
    +------+------+---------------------+------+
    | k | b | dt | v |
    +------+------+---------------------+------+
    | 1 | 11 | 2022-01-02 08:00:00 | 111 |
    | 2 | 22 | 2022-02-02 08:00:00 | 222 |
    | 3 | 33 | 2022-03-02 08:00:00 | 333 |
    +------+------+---------------------+------+
    3 rows in set (0.01 sec)

Полное сканирование таблицы

  1. Выполните следующую команду в директории Spark для создания DataFrame с именем df на таблице mytable, которая принадлежит базе данных test:

    scala>  val df = spark.read.format("selena")
    .option("selena.table.identifier", s"test.mytable")
    .option("selena.fenodes", s"<fe_host>:<fe_http_port>")
    .option("user", s"root")
    .option("password", s"")
    .load()
  2. Просмотрите файл журнала FE fe.log вашего кластера Selena и найдите SQL-оператор, выполненный для чтения данных. Пример:

    2022-08-09 18:57:38,091 INFO (nioEventLoopGroup-3-10|196) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable`] from external service [ user ['root'@'%']] for database [test] table [mytable]
  3. В базе данных test используйте EXPLAIN для получения плана выполнения оператора SELECT k,b,dt,v from test.mytable:

    MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable`;
    +-----------------------------------------------------------------------+
    | Explain String |
    +-----------------------------------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
    | PARTITION: UNPARTITIONED |
    | |
    | RESULT SINK |
    | |
    | 1:EXCHANGE |
    | |
    | PLAN FRAGMENT 1 |
    | OUTPUT EXPRS: |
    | PARTITION: RANDOM |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 01 |
    | UNPARTITIONED |
    | |
    | 0:OlapScanNode |
    | TABLE: mytable |
    | PREAGGREGATION: ON |
    | partitions=3/3 |
    | rollup: mytable |
    | tabletRatio=9/9 |
    | tabletList=41297,41299,41301,41303,41305,41307,41309,41311,41313 |
    | cardinality=3 |
    | avgRowSize=4.0 |
    | numNodes=0 |
    +-----------------------------------------------------------------------+
    26 rows in set (0.00 sec)

В этом примере обрезка не выполняется. Поэтому Spark сканирует все три partition (как указано partitions=3/3), которые содержат данные, и сканирует все 9 tablet (как указано tabletRatio=9/9) в этих трех partition.

Обрезка partition

  1. Выполните следующую команду, в которой вы используете параметр selena.filter.query для указания условия фильтрации dt='2022-01-02 08:00:00 для обрезки partition, в директории Spark для создания DataFrame с именем df на таблице mytable, которая принадлежит базе данных test:

    scala> val df = spark.read.format("selena")
    .option("selena.table.identifier", s"test.mytable")
    .option("selena.fenodes", s"<fe_host>:<fe_http_port>")
    .option("user", s"root")
    .option("password", s"")
    .option("selena.filter.query", "dt='2022-01-02 08:00:00'")
    .load()
  2. Просмотрите файл журнала FE fe.log вашего кластера Selena и найдите SQL-оператор, выполненный для чтения данных. Пример:

    2022-08-09 19:02:31,253 INFO (nioEventLoopGroup-3-14|204) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where dt='2022-01-02 08:00:00'] from external service [ user ['root'@'%']] for database [test] table [mytable]
  3. В базе данных test используйте EXPLAIN для получения плана выполнения оператора SELECT k,b,dt,v from test.mytable where dt='2022-01-02 08:00:00':

    MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where dt='2022-01-02 08:00:00';
    +------------------------------------------------+
    | Explain String |
    +------------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
    | PARTITION: UNPARTITIONED |
    | |
    | RESULT SINK |
    | |
    | 1:EXCHANGE |
    | |
    | PLAN FRAGMENT 1 |
    | OUTPUT EXPRS: |
    | PARTITION: RANDOM |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 01 |
    | UNPARTITIONED |
    | |
    | 0:OlapScanNode |
    | TABLE: mytable |
    | PREAGGREGATION: ON |
    | PREDICATES: 3: dt = '2022-01-02 08:00:00' |
    | partitions=1/3 |
    | rollup: mytable |
    | tabletRatio=3/3 |
    | tabletList=41297,41299,41301 |
    | cardinality=1 |
    | avgRowSize=20.0 |
    | numNodes=0 |
    +------------------------------------------------+
    27 rows in set (0.01 sec)

В этом примере выполняется только обрезка partition, в то время как обрезка bucket не выполняется. Поэтому Spark сканирует один из трех partition (как указано partitions=1/3) и все tablet (как указано tabletRatio=3/3) в этом partition.

Обрезка bucket

  1. Выполните следующую команду, в которой вы используете параметр selena.filter.query для указания условия фильтрации k=1 для обрезки bucket, в директории Spark для создания DataFrame с именем df на таблице mytable, которая принадлежит базе данных test:

    scala> val df = spark.read.format("selena")
    .option("selena.table.identifier", s"test.mytable")
    .option("selena.fenodes", s"<fe_host>:<fe_http_port>")
    .option("user", s"root")
    .option("password", s"")
    .option("selena.filter.query", "k=1")
    .load()
  2. Просмотрите файл журнала FE fe.log вашего кластера Selena и найдите SQL-оператор, выполненный для чтения данных. Пример:

    2022-08-09 19:04:44,479 INFO (nioEventLoopGroup-3-16|208) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=1] from external service [ user ['root'@'%']] for database [test] table [mytable]
  3. В базе данных test используйте EXPLAIN для получения плана выполнения оператора SELECT k,b,dt,v from test.mytable where k=1:

    MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=1;
    +------------------------------------------+
    | Explain String |
    +------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
    | PARTITION: UNPARTITIONED |
    | |
    | RESULT SINK |
    | |
    | 1:EXCHANGE |
    | |
    | PLAN FRAGMENT 1 |
    | OUTPUT EXPRS: |
    | PARTITION: RANDOM |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 01 |
    | UNPARTITIONED |
    | |
    | 0:OlapScanNode |
    | TABLE: mytable |
    | PREAGGREGATION: ON |
    | PREDICATES: 1: k = 1 |
    | partitions=3/3 |
    | rollup: mytable |
    | tabletRatio=3/9 |
    | tabletList=41299,41305,41311 |
    | cardinality=1 |
    | avgRowSize=20.0 |
    | numNodes=0 |
    +------------------------------------------+
    27 rows in set (0.01 sec)

В этом примере выполняется только обрезка bucket, в то время как обрезка partition не выполняется. Поэтому Spark сканирует все три partition (как указано partitions=3/3), которые содержат данные, и сканирует все три tablet (как указано tabletRatio=3/9) для получения значений Hash, которые соответствуют условию фильтрации k = 1 в этих трех partition.

Обрезка partition и bucket

  1. Выполните следующую команду, в которой вы используете параметр selena.filter.query для указания двух условий фильтрации k=7 и dt='2022-01-02 08:00:00' для обрезки bucket и partition, в директории Spark для создания DataFrame с именем df на таблице mytable в базе данных test:

    scala> val df = spark.read.format("selena")
    .option("selena.table.identifier", s"test.mytable")
    .option("selena.fenodes", s"<fe_host>:<fe_http_port>")
    .option("user", s"")
    .option("password", s"")
    .option("selena.filter.query", "k=7 and dt='2022-01-02 08:00:00'")
    .load()
  2. Просмотрите файл журнала FE fe.log вашего кластера Selena и найдите SQL-оператор, выполненный для чтения данных. Пример:

    2022-08-09 19:06:34,939 INFO (nioEventLoopGroup-3-18|212) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=7 and dt='2022-01-02 08:00:00'] from external service [ user ['root'@'%']] for database [test] t
    able [mytable]
  3. В базе данных test используйте EXPLAIN для получения плана выполнения оператора SELECT k,b,dt,v from test.mytable where k=7 and dt='2022-01-02 08:00:00':

    MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=7 and dt='2022-01-02 08:00:00';
    +----------------------------------------------------------+
    | Explain String |
    +----------------------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
    | PARTITION: RANDOM |
    | |
    | RESULT SINK |
    | |
    | 0:OlapScanNode |
    | TABLE: mytable |
    | PREAGGREGATION: ON |
    | PREDICATES: 1: k = 7, 3: dt = '2022-01-02 08:00:00' |
    | partitions=1/3 |
    | rollup: mytable |
    | tabletRatio=1/3 |
    | tabletList=41301 |
    | cardinality=1 |
    | avgRowSize=20.0 |
    | numNodes=0 |
    +----------------------------------------------------------+
    17 rows in set (0.00 sec)

В этом примере выполняется как обрезка partition, так и обрезка bucket. Поэтому Spark сканирует только один из трех partition (как указано partitions=1/3) и только один tablet (как указано tabletRatio=1/3) в этом partition.

Фильтрация по префиксному индексу

  1. Вставьте больше записей данных в partition таблицы mytable, которая принадлежит базе данных test:

    MySQL [test]> INSERT INTO mytable
    VALUES
    (1, 11, "2022-01-02 08:00:00", 111),
    (3, 33, "2022-01-02 08:00:00", 333),
    (3, 33, "2022-01-02 08:00:00", 333),
    (3, 33, "2022-01-02 08:00:00", 333);
  2. Запросите таблицу mytable:

    MySQL [test]> SELECT * FROM mytable;
    +------+------+---------------------+------+
    | k | b | dt | v |
    +------+------+---------------------+------+
    | 1 | 11 | 2022-01-02 08:00:00 | 111 |
    | 3 | 33 | 2022-01-02 08:00:00 | 333 |
    | 3 | 33 | 2022-01-02 08:00:00 | 333 |
    | 3 | 33 | 2022-01-02 08:00:00 | 333 |
    +------+------+---------------------+------+
    4 rows in set (0.01 sec)
  3. Выполните следующую команду, в которой вы используете параметр selena.filter.query для указания условия фильтрации k=1 для фильтрации по префиксному индексу, в директории Spark для создания DataFrame с именем df на таблице mytable, которая принадлежит базе данных test:

    scala> val df = spark.read.format("selena")
    .option("selena.table.identifier", s"test.mytable")
    .option("selena.fenodes", s"<fe_host>:<fe_http_port>")
    .option("user", s"root")
    .option("password", s"")
    .option("selena.filter.query", "k=1")
    .load()
  4. В базе данных test установите is_report_success в true, чтобы включить отчет профиля:

    MySQL [test]> SET is_report_success = true;
    Query OK, 0 rows affected (0.00 sec)
  5. Используйте браузер для открытия страницы http://<fe_host>:<http_http_port>/query и просмотрите профиль оператора SELECT * FROM mytable where k=1. Пример:

    OLAP_SCAN (plan_node_id=0):
    CommonMetrics:
    - CloseTime: 1.255ms
    - OperatorTotalTime: 1.404ms
    - PeakMemoryUsage: 0.00
    - PullChunkNum: 8
    - PullRowNum: 2
    - __MAX_OF_PullRowNum: 2
    - __MIN_OF_PullRowNum: 0
    - PullTotalTime: 148.60us
    - PushChunkNum: 0
    - PushRowNum: 0
    - PushTotalTime: 0ns
    - SetFinishedTime: 136ns
    - SetFinishingTime: 129ns
    UniqueMetrics:
    - Predicates: 1: k = 1
    - Rollup: mytable
    - Table: mytable
    - BytesRead: 88.00 B
    - __MAX_OF_BytesRead: 88.00 B
    - __MIN_OF_BytesRead: 0.00
    - CachedPagesNum: 0
    - CompressedBytesRead: 844.00 B
    - __MAX_OF_CompressedBytesRead: 844.00 B
    - __MIN_OF_CompressedBytesRead: 0.00
    - CreateSegmentIter: 18.582us
    - IOTime: 4.425us
    - LateMaterialize: 17.385us
    - PushdownPredicates: 3
    - RawRowsRead: 2
    - __MAX_OF_RawRowsRead: 2
    - __MIN_OF_RawRowsRead: 0
    - ReadPagesNum: 12
    - __MAX_OF_ReadPagesNum: 12
    - __MIN_OF_ReadPagesNum: 0
    - RowsRead: 2
    - __MAX_OF_RowsRead: 2
    - __MIN_OF_RowsRead: 0
    - ScanTime: 154.367us
    - SegmentInit: 95.903us
    - BitmapIndexFilter: 0ns
    - BitmapIndexFilterRows: 0
    - BloomFilterFilterRows: 0
    - ShortKeyFilterRows: 3
    - __MAX_OF_ShortKeyFilterRows: 3
    - __MIN_OF_ShortKeyFilterRows: 0
    - ZoneMapIndexFilterRows: 0
    - SegmentRead: 2.559us
    - BlockFetch: 2.187us
    - BlockFetchCount: 2
    - __MAX_OF_BlockFetchCount: 2
    - __MIN_OF_BlockFetchCount: 0
    - BlockSeek: 7.789us
    - BlockSeekCount: 2
    - __MAX_OF_BlockSeekCount: 2
    - __MIN_OF_BlockSeekCount: 0
    - ChunkCopy: 25ns
    - DecompressT: 0ns
    - DelVecFilterRows: 0
    - IndexLoad: 0ns
    - PredFilter: 353ns
    - PredFilterRows: 0
    - RowsetsReadCount: 7
    - SegmentsReadCount: 3
    - __MAX_OF_SegmentsReadCount: 2
    - __MIN_OF_SegmentsReadCount: 0
    - TotalColumnsDataPageCount: 8
    - __MAX_OF_TotalColumnsDataPageCount: 8
    - __MIN_OF_TotalColumnsDataPageCount: 0
    - UncompressedBytesRead: 508.00 B
    - __MAX_OF_UncompressedBytesRead: 508.00 B
    - __MIN_OF_UncompressedBytesRead: 0.00

В этом примере условие фильтрации k = 1 может попасть в префиксный индекс. Поэтому Spark может отфильтровать три строки (как указано ShortKeyFilterRows: 3).