Перейти к основному содержимому
Версия: 2.0.x

Загрузка данных с помощью Spark connector (рекомендуется)

Selena предоставляет самостоятельно разработанный коннектор под названием Selena Connector for Apache Spark™ (сокращённо Spark connector), который помогает загружать данные в таблицу Selena с помощью Spark. Основной принцип заключается в накоплении данных и последующей загрузке всех данных сразу в Selena через STREAM LOAD. Spark connector реализован на основе Spark DataSource V2. DataSource можно создать с помощью Spark DataFrames или Spark SQL. Поддерживаются как пакетный режим, так и режим Structured Streaming.

ВНИМАНИЕ

Только пользователи с привилегиями SELECT и INSERT на таблицу Selena могут загружать данные в эту таблицу. Вы можете следовать инструкциям в GRANT, чтобы предоставить эти привилегии пользователю.

Требования к версиям

Spark connectorSparkSelenaJavaScala
1.1.23.2, 3.3, 3.4, 3.52.5 и новее82.12
1.1.13.2, 3.3 или 3.42.5 и новее82.12
1.1.03.2, 3.3 или 3.42.5 и новее82.12

ВНИМАНИЕ

  • См. Обновление Spark connector для получения информации об изменениях поведения между различными версиями Spark connector.
  • Spark connector не предоставляет драйвер MySQL JDBC начиная с версии 1.1.1, и вам нужно вручную импортировать драйвер в classpath Spark. Вы можете найти драйвер на сайте MySQL или Maven Central.

Получение Spark connector

Вы можете получить JAR-файл Spark connector следующими способами:

  • Напрямую загрузить скомпилированный JAR-файл Spark Connector.
  • Добавить Spark connector как зависимость в ваш Maven-проект и затем загрузить JAR-файл.
  • Скомпилировать исходный код Spark Connector в JAR-файл самостоятельно.

Формат именования JAR-файла Spark connector: selena-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar.

Например, если вы установили Spark 3.2 и Scala 2.12 в вашей среде и хотите использовать Spark connector 1.1.0, вы можете использовать selena-spark-connector-3.2_2.12-1.1.0.jar.

ВНИМАНИЕ

В общем случае последняя версия Spark connector поддерживает совместимость только с тремя самыми последними версиями Spark.

Загрузка скомпилированного JAR-файла

Напрямую загрузите соответствующую версию JAR-файла Spark connector из Maven Central Repository.

Maven-зависимость

  1. В файле pom.xml вашего Maven-проекта добавьте Spark connector как зависимость в следующем формате. Замените spark_version, scala_version и connector_version соответствующими версиями.

    <dependency>
    <groupId>com.selena</groupId>
    <artifactId>selena-spark-connector-${spark_version}_${scala_version}</artifactId>
    <version>${connector_version}</version>
    </dependency>
  2. Например, если версия Spark в вашей среде — 3.2, версия Scala — 2.12, и вы выбираете Spark connector 1.1.0, вам нужно добавить следующую зависимость:

    <dependency>
    <groupId>com.selena</groupId>
    <artifactId>selena-spark-connector-3.2_2.12</artifactId>
    <version>1.1.0</version>
    </dependency>

Компиляция самостоятельно

  1. Загрузите пакет Spark connector.

  2. Выполните следующую команду для компиляции исходного кода Spark connector в JAR-файл. Обратите внимание, что spark_version заменяется соответствующей версией Spark.

    sh build.sh <spark_version>

    Например, если версия Spark в вашей среде — 3.2, вам нужно выполнить следующую команду:

    sh build.sh 3.2
  3. Перейдите в каталог target/, чтобы найти JAR-файл Spark connector, например selena-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar, сгенерированный при компиляции.

ПРИМЕЧАНИЕ

Имя Spark connector, который не выпущен официально, содержит суффикс SNAPSHOT.

Параметры

selena.fe.http.url

Обязательный: ДА
Значение по умолчанию: Нет
Описание: HTTP URL FE в вашем cluster Selena. Вы можете указать несколько URL, которые должны быть разделены запятой (,). Формат: <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>. Начиная с версии 1.1.1, вы также можете добавить префикс http:// к URL, например http://<fe_host1>:<fe_http_port1>,http://<fe_host2>:<fe_http_port2>.

selena.fe.jdbc.url

Обязательный: ДА
Значение по умолчанию: Нет
Описание: Адрес, используемый для подключения к MySQL-серверу FE. Формат: jdbc:mysql://<fe_host>:<fe_query_port>.

selena.table.identifier

Обязательный: ДА
Значение по умолчанию: Нет
Описание: Имя таблицы Selena. Формат: <database_name>.<table_name>.

selena.user

Обязательный: ДА
Значение по умолчанию: Нет
Описание: Имя пользователя вашей учётной записи cluster Selena. Пользователю необходимы привилегии SELECT и INSERT на таблицу Selena.

selena.password

Обязательный: ДА
Значение по умолчанию: Нет
Описание: Пароль вашей учётной записи cluster Selena.

selena.write.label.prefix

Обязательный: НЕТ
Значение по умолчанию: spark-
Описание: Префикс метки, используемый Stream Load.

selena.write.enable.transaction-stream-load

Обязательный: НЕТ
Значение по умолчанию: TRUE
Описание: Использовать ли Stream Load transaction interface для загрузки данных. Требуется Selena v1.5.2 или новее. Эта функция может загружать больше данных в транзакции с меньшим использованием памяти и улучшить производительность.
ВНИМАНИЕ: Начиная с версии 1.1.1, этот параметр действует только когда значение selena.write.max.retries не положительное, поскольку Stream Load transaction interface не поддерживает повторы.

selena.write.buffer.size

Обязательный: НЕТ
Значение по умолчанию: 104857600
Описание: Максимальный размер данных, которые могут быть накоплены в памяти перед отправкой в Selena за один раз. Установка этого параметра на большее значение может улучшить производительность загрузки, но может увеличить задержку загрузки.

selena.write.buffer.rows

Обязательный: НЕТ
Значение по умолчанию: Integer.MAX_VALUE
Описание: Поддерживается начиная с версии 1.1.1. Максимальное количество строк, которые могут быть накоплены в памяти перед отправкой в Selena за один раз.

selena.write.flush.interval.ms

Обязательный: НЕТ
Значение по умолчанию: 300000
Описание: Интервал, с которым данные отправляются в Selena. Этот параметр используется для контроля задержки загрузки.

selena.write.max.retries

Обязательный: НЕТ
Значение по умолчанию: 3
Описание: Поддерживается начиная с версии 1.1.1. Количество повторов, которые коннектор выполняет для Stream Load для одного и того же пакета данных, если загрузка не удалась.
ВНИМАНИЕ: Поскольку Stream Load transaction interface не поддерживает повторы. Если этот параметр положительный, коннектор всегда использует интерфейс Stream Load и игнорирует значение selena.write.enable.transaction-stream-load.

selena.write.retry.interval.ms

Обязательный: НЕТ
Значение по умолчанию: 10000
Описание: Поддерживается начиная с версии 1.1.1. Интервал для повтора Stream Load для одного и того же пакета данных, если загрузка не удалась.

selena.columns

Обязательный: НЕТ
Значение по умолчанию: Нет
Описание: Столбец таблицы Selena, в который вы хотите загрузить данные. Вы можете указать несколько столбцов, которые должны быть разделены запятыми (,), например "col0,col1,col2".

selena.column.types

Обязательный: НЕТ
Значение по умолчанию: Нет
Описание: Поддерживается начиная с версии 1.1.1. Настройка типов данных столбцов для Spark вместо использования значений по умолчанию, выведенных из таблицы Selena и сопоставления по умолчанию. Значение параметра — это схема в формате DDL, такая же как вывод Spark StructType#toDDL, например col0 INT, col1 STRING, col2 BIGINT. Обратите внимание, что вам нужно указать только столбцы, которые требуют настройки. Один из вариантов использования — загрузка данных в столбцы типа BITMAP или HLL.

selena.write.properties.*

Обязательный: НЕТ
Значение по умолчанию: Нет
Описание: Параметры, используемые для управления поведением Stream Load. Например, параметр selena.write.properties.format указывает формат данных для загрузки, например CSV или JSON. Для получения списка поддерживаемых параметров и их описаний см. STREAM LOAD.

selena.write.properties.format

Обязательный: НЕТ
Значение по умолчанию: CSV
Описание: Формат файла, на основе которого Spark connector преобразует каждый пакет данных перед отправкой данных в Selena. Допустимые значения: CSV и JSON.

selena.write.properties.row_delimiter

Обязательный: НЕТ
Значение по умолчанию: \n
Описание: Разделитель строк для данных в формате CSV.

selena.write.properties.column_separator

Обязательный: НЕТ
Значение по умолчанию: \t
Описание: Разделитель столбцов для данных в формате CSV.

selena.write.properties.partial_update

Обязательный: НЕТ
Значение по умолчанию: FALSE
Описание: Использовать ли частичные обновления. Допустимые значения: TRUE и FALSE. Значение по умолчанию: FALSE, что означает отключение этой функции.

selena.write.properties.partial_update_mode

Обязательный: НЕТ
Значение по умолчанию: row
Описание: Указывает режим для частичных обновлений. Допустимые значения: row и column.

  • Значение row (по умолчанию) означает частичные обновления в строчном режиме, который более подходит для обновлений в реальном времени с множеством столбцов и небольшими пакетами.
  • Значение column означает частичные обновления в колоночном режиме, который более подходит для пакетных обновлений с небольшим количеством столбцов и множеством строк. В таких сценариях включение колоночного режима обеспечивает более высокую скорость обновления. Например, в таблице со 100 столбцами, если обновляются только 10 столбцов (10% от общего количества) для всех строк, скорость обновления в колоночном режиме в 10 раз выше.

selena.write.num.partitions

Обязательный: НЕТ
Значение по умолчанию: Нет
Описание: Количество partition, в которые Spark может записывать данные параллельно. Когда объём данных небольшой, вы можете уменьшить количество partition, чтобы снизить параллелизм и частоту загрузки. Значение по умолчанию для этого параметра определяется Spark. Однако этот метод может вызвать накладные расходы на Spark Shuffle.

selena.write.partition.columns

Обязательный: НЕТ
Значение по умолчанию: Нет
Описание: Столбцы partition в Spark. Параметр действует только когда указан selena.write.num.partitions. Если этот параметр не указан, все записываемые столбцы используются для партиционирования.

selena.timezone

Обязательный: НЕТ
Значение по умолчанию: Часовой пояс JVM по умолчанию
Описание: Поддерживается начиная с версии 1.1.1. Часовой пояс, используемый для преобразования Spark TimestampType в Selena DATETIME. По умолчанию — часовой пояс JVM, возвращаемый ZoneId#systemDefault(). Формат может быть названием часового пояса, например Asia/Shanghai, или смещением зоны, например +08:00.

Сопоставление типов данных между Spark и Selena

  • Сопоставление типов данных по умолчанию следующее:

    Тип данных SparkТип данных Selena
    BooleanTypeBOOLEAN
    ByteTypeTINYINT
    ShortTypeSMALLINT
    IntegerTypeINT
    LongTypeBIGINT
    StringTypeLARGEINT
    FloatTypeFLOAT
    DoubleTypeDOUBLE
    DecimalTypeDECIMAL
    StringTypeCHAR
    StringTypeVARCHAR
    StringTypeSTRING
    StringTypeJSON
    DateTypeDATE
    TimestampTypeDATETIME
    ArrayTypeARRAY
    ПРИМЕЧАНИЕ:
    Поддерживается начиная с версии 1.1.1. Для получения подробных шагов см. Загрузка данных в столбцы типа ARRAY.
  • Вы также можете настроить сопоставление типов данных.

    Например, таблица Selena содержит столбцы BITMAP и HLL, но Spark не поддерживает эти два типа данных. Вам нужно настроить соответствующие типы данных в Spark. Для получения подробных шагов см. загрузку данных в столбцы BITMAP и HLL. BITMAP и HLL поддерживаются начиная с версии 1.1.1.

Обновление Spark connector

Обновление с версии 1.1.0 до 1.1.1

  • Начиная с версии 1.1.1, Spark connector не предоставляет mysql-connector-java, который является официальным драйвером JDBC для MySQL, из-за ограничений лицензии GPL, используемой mysql-connector-java. Однако Spark connector всё ещё нуждается в драйвере MySQL JDBC для подключения к Selena для получения метаданных таблицы, поэтому вам нужно вручную добавить драйвер в classpath Spark. Вы можете найти драйвер на сайте MySQL или Maven Central.
  • Начиная с версии 1.1.1, коннектор по умолчанию использует интерфейс Stream Load, а не Stream Load transaction interface, как в версии 1.1.0. Если вы всё ещё хотите использовать Stream Load transaction interface, вы можете установить опцию selena.write.max.retries в 0. См. описание selena.write.enable.transaction-stream-load и selena.write.max.retries для подробностей.

Примеры

Следующие примеры показывают, как использовать Spark connector для загрузки данных в таблицу Selena с помощью Spark DataFrames или Spark SQL. Spark DataFrames поддерживает как пакетный режим, так и режим Structured Streaming.

Для получения дополнительных примеров см. Примеры Spark Connector.

Подготовка

Создание таблицы Selena

Создайте базу данных test и создайте таблицу с Primary Key score_board.

CREATE DATABASE `test`;

CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);

Сетевая конфигурация

Убедитесь, что машина, на которой находится Spark, может получить доступ к узлам FE cluster Selena через http_port (по умолчанию: 8030) и query_port (по умолчанию: 9030), а к узлам BE через be_http_port (по умолчанию: 8040).

Настройка среды Spark

Обратите внимание, что следующие примеры выполняются в Spark 3.2.4 и используют spark-shell, pyspark и spark-sql. Перед запуском примеров убедитесь, что JAR-файл Spark connector размещён в каталоге $SPARK_HOME/jars.

Загрузка данных с помощью Spark DataFrames

Следующие два примера объясняют, как загружать данные с помощью пакетного режима Spark DataFrames или режима Structured Streaming.

Пакетный режим

Создайте данные в памяти и загрузите данные в таблицу Selena.

  1. Вы можете написать приложение Spark, используя Scala или Python.

Для Scala выполните следующий фрагмент кода в spark-shell:

// 1. Создайте DataFrame из последовательности.
val data = Seq((1, "selena", 100), (2, "spark", 100))
val df = data.toDF("id", "name", "score")

// 2. Запишите в Selena, настроив формат как "selena" и следующие опции.
// Вам нужно изменить опции в соответствии с вашей средой.
df.write.format("selena")
.option("selena.fe.http.url", "127.0.0.1:8030")
.option("selena.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("selena.table.identifier", "test.score_board")
.option("selena.user", "root")
.option("selena.password", "")
.mode("append")
.save()

Для Python выполните следующий фрагмент кода в pyspark:

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Selena Example") \
.getOrCreate()

# 1. Создайте DataFrame из последовательности.
data = [(1, "selena", 100), (2, "spark", 100)]
df = spark.sparkContext.parallelize(data) \
.toDF(["id", "name", "score"])

# 2. Запишите в Selena, настроив формат как "selena" и следующие опции.
# Вам нужно изменить опции в соответствии с вашей средой.
df.write.format("selena") \
.option("selena.fe.http.url", "127.0.0.1:8030") \
.option("selena.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030") \
.option("selena.table.identifier", "test.score_board") \
.option("selena.user", "root") \
.option("selena.password", "") \
.mode("append") \
.save()
  1. Запросите данные в таблице Selena.

    MySQL [test]> SELECT * FROM `score_board`;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 1 | selena | 100 |
    | 2 | spark | 100 |
    +------+-----------+-------+
    2 rows in set (0.00 sec)

Structured Streaming

Создайте потоковое чтение данных из CSV-файла и загрузите данные в таблицу Selena.

  1. В каталоге csv-data создайте CSV-файл test.csv со следующими данными:

    3,selena,100
    4,spark,100
  2. Вы можете написать приложение Spark, используя Scala или Python.

Для Scala выполните следующий фрагмент кода в spark-shell:

import org.apache.spark.sql.types.StructType

// 1. Создайте DataFrame из CSV.
val schema = (new StructType()
.add("id", "integer")
.add("name", "string")
.add("score", "integer")
)
val df = (spark.readStream
.option("sep", ",")
.schema(schema)
.format("csv")
// Замените на ваш путь к каталогу "csv-data".
.load("/path/to/csv-data")
)

// 2. Запишите в Selena, настроив формат как "selena" и следующие опции.
// Вам нужно изменить опции в соответствии с вашей средой.
val query = (df.writeStream.format("selena")
.option("selena.fe.http.url", "127.0.0.1:8030")
.option("selena.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("selena.table.identifier", "test.score_board")
.option("selena.user", "root")
.option("selena.password", "")
// замените на ваш каталог контрольных точек
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("append")
.start()
)

Для Python выполните следующий фрагмент кода в pyspark:

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructType, StructField

spark = SparkSession \
.builder \
.appName("Selena SS Example") \
.getOrCreate()

# 1. Создайте DataFrame из CSV.
schema = StructType([
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("score", IntegerType())
])
df = (
spark.readStream
.option("sep", ",")
.schema(schema)
.format("csv")
# Замените на ваш путь к каталогу "csv-data".
.load("/path/to/csv-data")
)

# 2. Запишите в Selena, настроив формат как "selena" и следующие опции.
# Вам нужно изменить опции в соответствии с вашей средой.
query = (
df.writeStream.format("selena")
.option("selena.fe.http.url", "127.0.0.1:8030")
.option("selena.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("selena.table.identifier", "test.score_board")
.option("selena.user", "root")
.option("selena.password", "")
# замените на ваш каталог контрольных точек
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("append")
.start()
)
  1. Запросите данные в таблице Selena.

    MySQL [test]> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 4 | spark | 100 |
    | 3 | selena | 100 |
    +------+-----------+-------+
    2 rows in set (0.67 sec)

Загрузка данных с помощью Spark SQL

Следующий пример объясняет, как загружать данные с помощью Spark SQL, используя оператор INSERT INTO в Spark SQL CLI.

  1. Выполните следующий SQL-оператор в spark-sql:

    -- 1. Создайте таблицу, настроив источник данных как `selena` и следующие опции.
    -- Вам нужно изменить опции в соответствии с вашей средой.
    CREATE TABLE `score_board`
    USING selena
    OPTIONS(
    "selena.fe.http.url"="127.0.0.1:8030",
    "selena.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
    "selena.table.identifier"="test.score_board",
    "selena.user"="root",
    "selena.password"=""
    );

    -- 2. Вставьте две строки в таблицу.
    INSERT INTO `score_board` VALUES (5, "selena", 100), (6, "spark", 100);
  2. Запросите данные в таблице Selena.

    MySQL [test]> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 6 | spark | 100 |
    | 5 | selena | 100 |
    +------+-----------+-------+
    2 rows in set (0.00 sec)

Лучшие практики

Загрузка данных в таблицу с Primary Key

В этом разделе показано, как загружать данные в таблицу Selena с Primary Key для достижения частичных обновлений и условных обновлений. Вы можете посмотреть Изменение данных через загрузку для подробного введения в эти функции. Эти примеры используют Spark SQL.

Подготовка

Создайте базу данных test и создайте таблицу с Primary Key score_board в Selena.

CREATE DATABASE `test`;

CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);

Частичные обновления

Этот пример покажет, как обновить данные только в столбце name через загрузку:

  1. Вставьте начальные данные в таблицу Selena в клиенте MySQL.

    mysql> INSERT INTO `score_board` VALUES (1, 'selena', 100), (2, 'spark', 100);

    mysql> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 1 | selena | 100 |
    | 2 | spark | 100 |
    +------+-----------+-------+
    2 rows in set (0.02 sec)
  2. Создайте таблицу Spark score_board в клиенте Spark SQL.

    • Установите опцию selena.write.properties.partial_update в true, что указывает коннектору выполнять частичное обновление.
    • Установите опцию selena.columns в "id,name", чтобы указать коннектору, какие столбцы записывать.
    CREATE TABLE `score_board`
    USING selena
    OPTIONS(
    "selena.fe.http.url"="127.0.0.1:8030",
    "selena.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
    "selena.table.identifier"="test.score_board",
    "selena.user"="root",
    "selena.password"="",
    "selena.write.properties.partial_update"="true",
    "selena.columns"="id,name"
    );
  3. Вставьте данные в таблицу в клиенте Spark SQL и обновите только столбец name.

    INSERT INTO `score_board` VALUES (1, 'selena-update'), (2, 'spark-update');
  4. Запросите таблицу Selena в клиенте MySQL.

    Вы можете видеть, что изменились только значения для name, а значения для score не изменились.

    mysql> select * from score_board;
    +------+------------------+-------+
    | id | name | score |
    +------+------------------+-------+
    | 1 | selena-update | 100 |
    | 2 | spark-update | 100 |
    +------+------------------+-------+
    2 rows in set (0.02 sec)

Условные обновления

Этот пример покажет, как выполнять условные обновления в соответствии со значениями столбца score. Обновление для id вступает в силу только тогда, когда новое значение для score больше или равно старому значению.

  1. Вставьте начальные данные в таблицу Selena в клиенте MySQL.

    mysql> INSERT INTO `score_board` VALUES (1, 'selena', 100), (2, 'spark', 100);

    mysql> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 1 | selena | 100 |
    | 2 | spark | 100 |
    +------+-----------+-------+
    2 rows in set (0.02 sec)
  2. Создайте таблицу Spark score_board следующим образом.

    • Установите опцию selena.write.properties.merge_condition в score, что указывает коннектору использовать столбец score в качестве условия.
    • Убедитесь, что Spark connector использует интерфейс Stream Load для загрузки данных, а не Stream Load transaction interface, поскольку последний не поддерживает эту функцию.
    CREATE TABLE `score_board`
    USING selena
    OPTIONS(
    "selena.fe.http.url"="127.0.0.1:8030",
    "selena.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
    "selena.table.identifier"="test.score_board",
    "selena.user"="root",
    "selena.password"="",
    "selena.write.properties.merge_condition"="score"
    );
  3. Вставьте данные в таблицу в клиенте Spark SQL и обновите строку с id равным 1 с меньшим значением score, а строку с id равным 2 с большим значением score.

    INSERT INTO `score_board` VALUES (1, 'selena-update', 99), (2, 'spark-update', 101);
  4. Запросите таблицу Selena в клиенте MySQL.

    Вы можете видеть, что изменилась только строка с id равным 2, а строка с id равным 1 не изменилась.

    mysql> select * from score_board;
    +------+--------------+-------+
    | id | name | score |
    +------+--------------+-------+
    | 1 | selena | 100 |
    | 2 | spark-update | 101 |
    +------+--------------+-------+
    2 rows in set (0.03 sec)

Загрузка данных в столбцы типа BITMAP

BITMAP часто используется для ускорения подсчёта уникальных значений (count distinct), например подсчёта UV, см. Использование Bitmap для точного Count Distinct. Здесь мы возьмём подсчёт UV в качестве примера, чтобы показать, как загружать данные в столбцы типа BITMAP. BITMAP поддерживается начиная с версии 1.1.1.

  1. Создайте Aggregate-таблицу Selena.

    В базе данных test создайте Aggregate-таблицу page_uv, где столбец visit_users определён как тип BITMAP и настроен с агрегатной функцией BITMAP_UNION.

    CREATE TABLE `test`.`page_uv` (
    `page_id` INT NOT NULL COMMENT 'page ID',
    `visit_date` datetime NOT NULL COMMENT 'access time',
    `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. Создайте таблицу Spark.

    Схема таблицы Spark выводится из таблицы Selena, и Spark не поддерживает тип BITMAP. Поэтому вам нужно настроить соответствующий тип данных столбца в Spark, например как BIGINT, настроив опцию "selena.column.types"="visit_users BIGINT". При использовании Stream Load для загрузки данных коннектор использует функцию to_bitmap для преобразования данных типа BIGINT в тип BITMAP.

    Выполните следующий DDL в spark-sql:

    CREATE TABLE `page_uv`
    USING selena
    OPTIONS(
    "selena.fe.http.url"="127.0.0.1:8030",
    "selena.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
    "selena.table.identifier"="test.page_uv",
    "selena.user"="root",
    "selena.password"="",
    "selena.column.types"="visit_users BIGINT"
    );
  3. Загрузите данные в таблицу Selena.

    Выполните следующий DML в spark-sql:

    INSERT INTO `page_uv` VALUES
    (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13),
    (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23),
    (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33),
    (1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13),
    (2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23);
  4. Вычислите UV страниц из таблицы Selena.

    MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `page_uv` GROUP BY `page_id`;
    +---------+-----------------------------+
    | page_id | count(DISTINCT visit_users) |
    +---------+-----------------------------+
    | 2 | 1 |
    | 1 | 3 |
    +---------+-----------------------------+
    2 rows in set (0.01 sec)

ВНИМАНИЕ:

Коннектор использует функцию to_bitmap для преобразования данных типов TINYINT, SMALLINT, INTEGER и BIGINT в Spark в тип BITMAP в Selena, и использует функцию bitmap_hash или bitmap_hash64 для других типов данных Spark.

Загрузка данных в столбцы типа HLL

HLL можно использовать для приблизительного подсчёта уникальных значений (count distinct), см. Использование HLL для приблизительного count distinct.

Здесь мы возьмём подсчёт UV в качестве примера, чтобы показать, как загружать данные в столбцы типа HLL. HLL поддерживается начиная с версии 1.1.1.

  1. Создайте Aggregate-таблицу Selena.

    В базе данных test создайте Aggregate-таблицу hll_uv, где столбец visit_users определён как тип HLL и настроен с агрегатной функцией HLL_UNION.

    CREATE TABLE `hll_uv` (
    `page_id` INT NOT NULL COMMENT 'page ID',
    `visit_date` datetime NOT NULL COMMENT 'access time',
    `visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. Создайте таблицу Spark.

    Схема таблицы Spark выводится из таблицы Selena, и Spark не поддерживает тип HLL. Поэтому вам нужно настроить соответствующий тип данных столбца в Spark, например как BIGINT, настроив опцию "selena.column.types"="visit_users BIGINT". При использовании Stream Load для загрузки данных коннектор использует функцию hll_hash для преобразования данных типа BIGINT в тип HLL.

    Выполните следующий DDL в spark-sql:

    CREATE TABLE `hll_uv`
    USING selena
    OPTIONS(
    "selena.fe.http.url"="127.0.0.1:8030",
    "selena.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
    "selena.table.identifier"="test.hll_uv",
    "selena.user"="root",
    "selena.password"="",
    "selena.column.types"="visit_users BIGINT"
    );
  3. Загрузите данные в таблицу Selena.

    Выполните следующий DML в spark-sql:

    INSERT INTO `hll_uv` VALUES
    (3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78),
    (4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2),
    (3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674);
  4. Вычислите UV страниц из таблицы Selena.

    MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;
    +---------+-----------------------------+
    | page_id | count(DISTINCT visit_users) |
    +---------+-----------------------------+
    | 4 | 1 |
    | 3 | 2 |
    +---------+-----------------------------+
    2 rows in set (0.01 sec)

Загрузка данных в столбцы типа ARRAY

Следующий пример объясняет, как загружать данные в столбцы типа ARRAY.

  1. Создайте таблицу Selena.

    В базе данных test создайте таблицу с Primary Key array_tbl, которая включает один столбец INT и два столбца ARRAY.

    CREATE TABLE `array_tbl` (
    `id` INT NOT NULL,
    `a0` ARRAY<STRING>,
    `a1` ARRAY<ARRAY<INT>>
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`)
    ;
  2. Запишите данные в Selena.

    Поскольку некоторые версии Selena не предоставляют метаданные столбца ARRAY, коннектор не может вывести соответствующий тип данных Spark для этого столбца. Однако вы можете явно указать соответствующий тип данных Spark столбца в опции selena.column.types. В этом примере вы можете настроить опцию как a0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>.

    Выполните следующий код в spark-shell:

     val data = Seq(
    | (1, Seq("hello", "selena"), Seq(Seq(1, 2), Seq(3, 4))),
    | (2, Seq("hello", "spark"), Seq(Seq(5, 6, 7), Seq(8, 9, 10)))
    | )
    val df = data.toDF("id", "a0", "a1")
    df.write
    .format("selena")
    .option("selena.fe.http.url", "127.0.0.1:8030")
    .option("selena.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
    .option("selena.table.identifier", "test.array_tbl")
    .option("selena.user", "root")
    .option("selena.password", "")
    .option("selena.column.types", "a0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>")
    .mode("append")
    .save()
  3. Запросите данные в таблице Selena.

    MySQL [test]> SELECT * FROM `array_tbl`;
    +------+-----------------------+--------------------+
    | id | a0 | a1 |
    +------+-----------------------+--------------------+
    | 1 | ["hello","selena"] | [[1,2],[3,4]] |
    | 2 | ["hello","spark"] | [[5,6,7],[8,9,10]] |
    +------+-----------------------+--------------------+
    2 rows in set (0.01 sec)