Перейти к основному содержимому

Загрузка данных с использованием Spark connector (рекомендуется)

Selena предоставляет собственный коннектор под названием Selena Connector for Apache Spark™ (сокращенно Spark connector), который помогает загружать данные в таблицу Selena с помощью Spark. Основной принцип заключается в накоплении данных и последующей загрузке их все сразу в Selena через STREAM LOAD. Spark connector реализован на основе Spark DataSource V2. DataSource может быть создан с использованием Spark DataFrames или Spark SQL. Поддерживаются как пакетный, так и структурированный потоковый режимы.

ВНИМАНИЕ

Только пользователи с привилегиями SELECT и INSERT на таблицу Selena могут загружать данные в эту таблицу. Вы можете следовать инструкциям, предоставленным в GRANT, чтобы предоставить эти привилегии пользователю.

Требования к версиям

Spark connectorSparkSelenaJavaScala
1.1.23.2, 3.3, 3.4, 3.52.5 и позже82.12
1.1.13.2, 3.3, или 3.42.5 и позже82.12
1.1.03.2, 3.3, или 3.42.5 и позже82.12

ВНИМАНИЕ

  • Пожалуйста, смотрите Обновление Spark connector для изменений поведения между различными версиями Spark connector.
  • Spark connector не предоставляет драйвер MySQL JDBC начиная с версии 1.5.0, и вам необходимо импортировать драйвер в classpath spark вручную. Вы можете найти драйвер на сайте MySQL или Maven Central.

Получение Spark connector

Вы можете получить JAR-файл Spark connector следующими способами:

  • Напрямую скачать скомпилированный JAR-файл Spark Connector.
  • Добавить Spark connector как зависимость в ваш Maven проект и затем скачать JAR-файл.
  • Скомпилировать исходный код Spark Connector в JAR-файл самостоятельно.

Формат именования JAR-файла Spark connector: starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar.

Например, если в вашей среде установлены Spark 3.2 и Scala 2.12, и вы хотите использовать Spark connector 1.1.0, вы можете использовать starrocks-spark-connector-3.2_2.12-1.1.0.jar.

ВНИМАНИЕ

В общем случае, последняя версия Spark connector поддерживает совместимость только с тремя самыми последними версиями Spark.

Скачивание скомпилированного Jar файла

Напрямую скачайте соответствующую версию JAR-файла Spark connector из Maven Central Repository.

Maven зависимость

  1. В файле pom.xml вашего Maven проекта добавьте Spark connector как зависимость согласно следующему формату. Замените spark_version, scala_version и connector_version на соответствующие версии.

    <dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
    <version>${connector_version}</version>
    </dependency>
  2. Например, если версия Spark в вашей среде 3.2, версия Scala 2.12, и вы выбираете Spark connector 1.1.0, вам нужно добавить следующую зависимость:

    <dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
    <version>1.1.0</version>
    </dependency>

Компиляция самостоятельно

  1. Скачайте пакет Spark connector.

  2. Выполните следующую команду для компиляции исходного кода Spark connector в JAR-файл. Обратите внимание, что spark_version заменяется на соответствующую версию Spark.

    sh build.sh <spark_version>

    Например, если версия Spark в вашей среде 3.2, вам нужно выполнить следующую команду:

    sh build.sh 3.2
  3. Перейдите в директорию target/, чтобы найти JAR-файл Spark connector, такой как starrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar, созданный при компиляции.

ПРИМЕЧАНИЕ

Имя Spark connector, который не выпущен официально, содержит суффикс SNAPSHOT.

Параметры

starrocks.fe.http.url

Обязательный: ДА
Значение по умолчанию: Нет
Описание: HTTP URL FE в вашем кластере Selena. Вы можете указать несколько URL, которые должны быть разделены запятой (,). Формат: <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>. Начиная с версии 1.5.0, вы также можете добавить префикс http:// к URL, например http://<fe_host1>:<fe_http_port1>,http://<fe_host2>:<fe_http_port2>.

starrocks.fe.jdbc.url

Обязательный: ДА
Значение по умолчанию: Нет
Описание: Адрес, который используется для подключения к MySQL серверу FE. Формат: jdbc:mysql://<fe_host>:<fe_query_port>.

starrocks.table.identifier

Обязательный: ДА
Значение по умолчанию: Нет
Описание: Имя таблицы Selena. Формат: <database_name>.<table_name>.

starrocks.user

Обязательный: ДА
Значение по умолчанию: Нет
Описание: Имя пользователя вашей учетной записи кластера Selena. Пользователь должен иметь привилегии SELECT и INSERT на таблицу Selena.

starrocks.password

Обязательный: ДА
Значение по умолчанию: Нет
Описание: Пароль вашей учетной записи кластера Selena.

starrocks.write.label.prefix

Обязательный: НЕТ
Значение по умолчанию: spark-
Описание: Префикс метки, используемый Stream Load.

starrocks.write.enable.transaction-stream-load

Обязательный: НЕТ
Значение по умолчанию: TRUE
Описание: Использовать ли интерфейс транзакций Stream Load для загрузки данных. Требуется Selena v2.5 или позже. Эта функция может загружать больше данных в транзакции с меньшим использованием памяти и улучшить производительность.
ВНИМАНИЕ: Начиная с 1.1.1, этот параметр действует только когда значение starrocks.write.max.retries не положительное, поскольку интерфейс транзакций Stream Load не поддерживает повторные попытки.

starrocks.write.buffer.size

Обязательный: НЕТ
Значение по умолчанию: 104857600
Описание: Максимальный размер данных, которые могут быть накоплены в памяти перед отправкой в Selena за один раз. Установка этого параметра на большее значение может улучшить производительность загрузки, но может увеличить задержку загрузки.

starrocks.write.buffer.rows

Обязательный: НЕТ
Значение по умолчанию: Integer.MAX_VALUE
Описание: Поддерживается с версии 1.1.1. Максимальное количество строк, которые могут быть накоплены в памяти перед отправкой в Selena за один раз.

starrocks.write.flush.interval.ms

Обязательный: НЕТ
Значение по умолчанию: 300000
Описание: Интервал, с которым данные отправляются в Selena. Этот параметр используется для контроля задержки загрузки.

starrocks.write.max.retries

Обязательный: НЕТ
Значение по умолчанию: 3
Описание: Поддерживается с версии 1.1.1. Количество попыток, которые коннектор делает для выполнения Stream Load для одной и той же партии данных, если загрузка не удалась.
ВНИМАНИЕ: Поскольку интерфейс транзакций Stream Load не поддерживает повторные попытки. Если этот параметр положительный, коннектор всегда использует интерфейс Stream Load и игнорирует значение starrocks.write.enable.transaction-stream-load.

starrocks.write.retry.interval.ms

Обязательный: НЕТ
Значение по умолчанию: 10000
Описание: Поддерживается с версии 1.1.1. Интервал для повторной попытки Stream Load для одной и той же партии данных, если загрузка не удалась.

starrocks.columns

Обязательный: НЕТ
Значение по умолчанию: Нет
Описание: Столбец таблицы Selena, в который вы хотите загрузить данные. Вы можете указать несколько столбцов, которые должны быть разделены запятыми (,), например, "col0,col1,col2".

starrocks.column.types

Обязательный: НЕТ
Значение по умолчанию: Нет
Описание: Поддерживается с версии 1.1.1. Настройка типов данных столбцов для Spark вместо использования значений по умолчанию, выведенных из таблицы Selena и сопоставления по умолчанию. Значение параметра - это схема в формате DDL, такая же как вывод Spark StructType#toDDL, например col0 INT, col1 STRING, col2 BIGINT. Обратите внимание, что вам нужно указать только столбцы, которые требуют настройки. Один случай использования - загрузка данных в столбцы типа BITMAP или HLL.

starrocks.write.properties.*

Обязательный: НЕТ
Значение по умолчанию: Нет
Описание: Параметры, которые используются для управления поведением Stream Load. Например, параметр starrocks.write.properties.format указывает формат загружаемых данных, такой как CSV или JSON. Для списка поддерживаемых параметров и их описаний см. STREAM LOAD.

starrocks.write.properties.format

Обязательный: НЕТ
Значение по умолчанию: CSV
Описание: Формат файла, на основе которого Spark connector преобразует каждую партию данных перед отправкой данных в Selena. Допустимые значения: CSV и JSON.

starrocks.write.properties.row_delimiter

Обязательный: НЕТ
Значение по умолчанию: \n
Описание: Разделитель строк для данных в формате CSV.

starrocks.write.properties.column_separator

Обязательный: НЕТ
Значение по умолчанию: \t
Описание: Разделитель столбцов для данных в формате CSV.

starrocks.write.properties.partial_update

Обязательный: НЕТ
Значение по умолчанию: FALSE
Описание: Использовать ли частичные обновления. Допустимые значения: TRUE и FALSE. Значение по умолчанию: FALSE, указывающее на отключение этой функции.

starrocks.write.properties.partial_update_mode

Обязательный: НЕТ
Значение по умолчанию: row
Описание: Указывает режим для частичных обновлений. Допустимые значения: row и column.

  • Значение row (по умолчанию) означает частичные обновления в режиме строк, что более подходит для обновлений в реальном времени с множеством столбцов и небольшими партиями.
  • Значение column означает частичные обновления в режиме столбцов, что более подходит для пакетных обновлений с небольшим количеством столбцов и множеством строк. В таких сценариях включение режима столбцов обеспечивает более быструю скорость обновления. Например, в таблице со 100 столбцами, если обновляются только 10 столбцов (10% от общего количества) для всех строк, скорость обновления в режиме столбцов в 10 раз быстрее.

starrocks.write.num.partitions

Обязательный: НЕТ
Значение по умолчанию: Нет
Описание: Количество разделов, в которые Spark может записывать данные параллельно. Когда объем данных небольшой, вы можете уменьшить количество разделов, чтобы снизить параллелизм и частоту загрузки. Значение по умолчанию для этого параметра определяется Spark. Однако этот метод может вызвать затраты на Spark Shuffle.

starrocks.write.partition.columns

Обязательный: НЕТ
Значение по умолчанию: Нет
Описание: Столбцы разделения в Spark. Параметр действует только когда указан starrocks.write.num.partitions. Если этот параметр не указан, все записываемые столбцы используются для разделения.

starrocks.timezone

Обязательный: НЕТ
Значение по умолчанию: Часовой пояс JVM по умолчанию
Описание: Поддерживается с 1.1.1. Часовой пояс, используемый для преобразования Spark TimestampType в Selena DATETIME. По умолчанию это часовой пояс JVM, возвращаемый ZoneId#systemDefault(). Формат может быть именем часового пояса, таким как Asia/Shanghai, или смещением зоны, таким как +08:00.

Сопоставление типов данных между Spark и Selena

  • Сопоставление типов данных по умолчанию следующее:

    Тип данных SparkТип данных Selena
    BooleanTypeBOOLEAN
    ByteTypeTINYINT
    ShortTypeSMALLINT
    IntegerTypeINT
    LongTypeBIGINT
    StringTypeLARGEINT
    FloatTypeFLOAT
    DoubleTypeDOUBLE
    DecimalTypeDECIMAL
    StringTypeCHAR
    StringTypeVARCHAR
    StringTypeSTRING
    StringTypeJSON
    DateTypeDATE
    TimestampTypeDATETIME
    ArrayTypeARRAY
    ПРИМЕЧАНИЕ:
    Поддерживается с версии 1.1.1. Для подробных шагов см. Загрузка данных в столбцы типа ARRAY.
  • Вы также можете настроить сопоставление типов данных.

    Например, таблица Selena содержит столбцы BITMAP и HLL, но Spark не поддерживает эти два типа данных. Вам нужно настроить соответствующие типы данных в Spark. Для подробных шагов см. загрузку данных в столбцы BITMAP и HLL. BITMAP и HLL поддерживаются с версии 1.1.1.

Обновление Spark connector

Обновление с версии 1.1.0 до 1.1.1

  • Начиная с 1.1.1, Spark connector не предоставляет mysql-connector-java, который является официальным JDBC драйвером для MySQL, из-за ограничений лицензии GPL, используемой mysql-connector-java. Однако Spark connector все еще нуждается в драйвере MySQL JDBC для подключения к Selena для метаданных таблицы, поэтому вам нужно добавить драйвер в classpath Spark вручную. Вы можете найти драйвер на сайте MySQL или Maven Central.
  • Начиная с 1.1.1, коннектор использует интерфейс Stream Load по умолчанию, а не интерфейс транзакций Stream Load в версии 1.1.0. Если вы все еще хотите использовать интерфейс транзакций Stream Load, вы можете установить опцию starrocks.write.max.retries в 0. Пожалуйста, смотрите описание starrocks.write.enable.transaction-stream-load и starrocks.write.max.retries для подробностей.

Примеры

Следующие примеры показывают, как использовать Spark connector для загрузки данных в таблицу Selena с помощью Spark DataFrames или Spark SQL. Spark DataFrames поддерживает как пакетный, так и структурированный потоковый режимы.

Для большего количества примеров см. Примеры Spark Connector.

Подготовка

Создание таблицы Selena

Создайте базу данных test и создайте таблицу Primary Key score_board.

CREATE DATABASE `test`;

CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);

Конфигурация сети

Убедитесь, что машина, где расположен Spark, может получить доступ к узлам FE кластера Selena через http_port (по умолчанию: 8030) и query_port (по умолчанию: 9030), и к узлам BE через be_http_port (по умолчанию: 8040).

Настройка среды Spark

Обратите внимание, что следующие примеры выполняются в Spark 3.2.4 и используют spark-shell, pyspark и spark-sql. Перед запуском примеров убедитесь, что JAR-файл Spark connector размещен в директории $SPARK_HOME/jars.

Загрузка данных с помощью Spark DataFrames

Следующие два примера объясняют, как загружать данные с помощью пакетного или структурированного потокового режима Spark DataFrames.

Пакетный режим

Создание данных в памяти и загрузка данных в таблицу Selena.

  1. Вы можете написать приложение spark, используя Scala или Python.

Для Scala выполните следующий фрагмент кода в spark-shell:

// 1. Создайте DataFrame из последовательности.
val data = Seq((1, "starrocks", 100), (2, "spark", 100))
val df = data.toDF("id", "name", "score")

// 2. Запишите в Selena, настроив формат как "starrocks" и следующие опции.
// Вам нужно изменить опции согласно вашей собственной среде.
df.write.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
.mode("append")
.save()

Для Python выполните следующий фрагмент кода в pyspark:

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Selena Example") \
.getOrCreate()

# 1. Создайте DataFrame из последовательности.
data = [(1, "starrocks", 100), (2, "spark", 100)]
df = spark.sparkContext.parallelize(data) \
.toDF(["id", "name", "score"])

# 2. Запишите в Selena, настроив формат как "starrocks" и следующие опции.
# Вам нужно изменить опции согласно вашей собственной среде.
df.write.format("starrocks") \
.option("starrocks.fe.http.url", "127.0.0.1:8030") \
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030") \
.option("starrocks.table.identifier", "test.score_board") \
.option("starrocks.user", "root") \
.option("starrocks.password", "") \
.mode("append") \
.save()
  1. Запросите данные в таблице Selena.

    MySQL [test]> SELECT * FROM `score_board`;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 1 | starrocks | 100 |
    | 2 | spark | 100 |
    +------+-----------+-------+
    2 rows in set (0.00 sec)

Структурированная потоковая передача

Создание потокового чтения данных из CSV файла и загрузка данных в таблицу Selena.

  1. В директории csv-data создайте CSV файл test.csv со следующими данными:

    3,starrocks,100
    4,spark,100
  2. Вы можете написать приложение Spark, используя Scala или Python.

Для Scala выполните следующий фрагмент кода в spark-shell:

import org.apache.spark.sql.types.StructType

// 1. Создайте DataFrame из CSV.
val schema = (new StructType()
.add("id", "integer")
.add("name", "string")
.add("score", "integer")
)
val df = (spark.readStream
.option("sep", ",")
.schema(schema)
.format("csv")
// Замените на ваш путь к директории "csv-data".
.load("/path/to/csv-data")
)

// 2. Запишите в Selena, настроив формат как "starrocks" и следующие опции.
// Вам нужно изменить опции согласно вашей собственной среде.
val query = (df.writeStream.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
// замените на вашу директорию checkpoint
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("append")
.start()
)

Для Python выполните следующий фрагмент кода в pyspark:

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructType, StructField

spark = SparkSession \
.builder \
.appName("Selena SS Example") \
.getOrCreate()

# 1. Создайте DataFrame из CSV.
schema = StructType([
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("score", IntegerType())
])
df = (
spark.readStream
.option("sep", ",")
.schema(schema)
.format("csv")
# Замените на ваш путь к директории "csv-data".
.load("/path/to/csv-data")
)



# 2. Запишите в Selena, настроив формат как "starrocks" и следующие опции.
# Вам нужно изменить опции согласно вашей собственной среде.
query = (
df.writeStream.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
# замените на вашу директорию checkpoint
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("append")
.start()
)
  1. Запросите данные в таблице Selena.

    MySQL [test]> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 4 | spark | 100 |
    | 3 | starrocks | 100 |
    +------+-----------+-------+
    2 rows in set (0.67 sec)

Загрузка данных с помощью Spark SQL

Следующий пример объясняет, как загружать данные с помощью Spark SQL, используя оператор INSERT INTO в Spark SQL CLI.

  1. Выполните следующий SQL оператор в spark-sql:

    -- 1. Создайте таблицу, настроив источник данных как `starrocks` и следующие опции. 
    -- Вам нужно изменить опции согласно вашей собственной среде.
    CREATE TABLE `score_board`
    USING starrocks
    OPTIONS(
    "starrocks.fe.http.url"="127.0.0.1:8030",
    "starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
    "starrocks.table.identifier"="test.score_board",
    "starrocks.user"="root",
    "starrocks.password"=""
    );

    -- 2. Вставьте две строки в таблицу.
    INSERT INTO `score_board` VALUES (5, "starrocks", 100), (6, "spark", 100);
  2. Запросите данные в таблице Selena.

    MySQL [test]> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 6 | spark | 100 |
    | 5 | starrocks | 100 |
    +------+-----------+-------+
    2 rows in set (0.00 sec)

Лучшие практики

Загрузка данных в таблицу Primary Key

Этот раздел покажет, как загружать данные в таблицу Primary Key Selena для достижения частичных обновлений и условных обновлений. Вы можете посмотреть Изменение данных через загрузку для подробного введения в эти функции. Эти примеры используют Spark SQL.

Подготовка

Создайте базу данных test и создайте таблицу Primary Key score_board в Selena.

CREATE DATABASE `test`;

CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);

Частичные обновления

Этот пример покажет, как обновлять только данные в столбце name через загрузку:

  1. Вставьте начальные данные в таблицу Selena в клиенте MySQL.

    mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'spark', 100);

    mysql> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 1 | starrocks | 100 |
    | 2 | spark | 100 |
    +------+-----------+-------+
    2 rows in set (0.02 sec)
  2. Создайте таблицу Spark score_board в клиенте Spark SQL.

    • Установите опцию starrocks.write.properties.partial_update в true, что говорит коннектору делать частичное обновление.
    • Установите опцию starrocks.columns в "id,name", чтобы сказать коннектору, какие столбцы записывать.
    CREATE TABLE `score_board`
    USING starrocks
    OPTIONS(
    "starrocks.fe.http.url"="127.0.0.1:8030",
    "starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
    "starrocks.table.identifier"="test.score_board",
    "starrocks.user"="root",
    "starrocks.password"="",
    "starrocks.write.properties.partial_update"="true",
    "starrocks.columns"="id,name"
    );
  3. Вставьте данные в таблицу в клиенте Spark SQL и обновите только столбец name.

    INSERT INTO `score_board` VALUES (1, 'starrocks-update'), (2, 'spark-update');
  4. Запросите таблицу Selena в клиенте MySQL.

    Вы можете видеть, что изменяются только значения для name, а значения для score не изменяются.

    mysql> select * from score_board;
    +------+------------------+-------+
    | id | name | score |
    +------+------------------+-------+
    | 1 | starrocks-update | 100 |
    | 2 | spark-update | 100 |
    +------+------------------+-------+
    2 rows in set (0.02 sec)

Условные обновления

Этот пример покажет, как делать условные обновления согласно значениям столбца score. Обновление для id вступает в силу только когда новое значение для score больше или равно старому значению.

  1. Вставьте начальные данные в таблицу Selena в клиенте MySQL.

    mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'spark', 100);

    mysql> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 1 | starrocks | 100 |
    | 2 | spark | 100 |
    +------+-----------+-------+
    2 rows in set (0.02 sec)
  2. Создайте таблицу Spark score_board следующими способами.

    • Установите опцию starrocks.write.properties.merge_condition в score, что говорит коннектору использовать столбец score как условие.
    • Убедитесь, что Spark connector использует интерфейс Stream Load для загрузки данных, а не интерфейс транзакций Stream Load, поскольку последний не поддерживает эту функцию.
    CREATE TABLE `score_board`
    USING starrocks
    OPTIONS(
    "starrocks.fe.http.url"="127.0.0.1:8030",
    "starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
    "starrocks.table.identifier"="test.score_board",
    "starrocks.user"="root",
    "starrocks.password"="",
    "starrocks.write.properties.merge_condition"="score"
    );
  3. Вставьте данные в таблицу в клиенте Spark SQL и обновите строку, чей id равен 1, с меньшим значением score, и строку, чей id равен 2, с большим значением score.

    INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'spark-update', 101);
  4. Запросите таблицу Selena в клиенте MySQL.

    Вы можете видеть, что изменяется только строка, чей id равен 2, а строка, чей id равен 1, не изменяется.

    mysql> select * from score_board;
    +------+--------------+-------+
    | id | name | score |
    +------+--------------+-------+
    | 1 | starrocks | 100 |
    | 2 | spark-update | 101 |
    +------+--------------+-------+
    2 rows in set (0.03 sec)

Загрузка данных в столбцы типа BITMAP

BITMAP часто используется для ускорения count distinct, например подсчета UV, см. Использование Bitmap для точного Count Distinct. Здесь мы возьмем подсчет UV в качестве примера, чтобы показать, как загружать данные в столбцы типа BITMAP. BITMAP поддерживается с версии 1.1.1.

  1. Создайте таблицу Aggregate Selena.

    В базе данных test создайте таблицу Aggregate page_uv, где столбец visit_users определен как тип BITMAP и настроен с агрегатной функцией BITMAP_UNION.

    CREATE TABLE `test`.`page_uv` (
    `page_id` INT NOT NULL COMMENT 'page ID',
    `visit_date` datetime NOT NULL COMMENT 'access time',
    `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. Создайте таблицу Spark.

    Схема таблицы Spark выводится из таблицы Selena, и Spark не поддерживает тип BITMAP. Поэтому вам нужно настроить соответствующий тип данных столбца в Spark, например как BIGINT, настроив опцию "starrocks.column.types"="visit_users BIGINT". При использовании Stream Load для приема данных коннектор использует функцию to_bitmap для преобразования данных типа BIGINT в тип BITMAP.

    Выполните следующий DDL в spark-sql:

    CREATE TABLE `page_uv`
    USING starrocks
    OPTIONS(
    "starrocks.fe.http.url"="127.0.0.1:8030",
    "starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
    "starrocks.table.identifier"="test.page_uv",
    "starrocks.user"="root",
    "starrocks.password"="",
    "starrocks.column.types"="visit_users BIGINT"
    );
  3. Загрузите данные в таблицу Selena.

    Выполните следующий DML в spark-sql:

    INSERT INTO `page_uv` VALUES
    (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13),
    (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23),
    (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33),
    (1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13),
    (2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23);
  4. Вычислите UV страниц из таблицы Selena.

    MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `page_uv` GROUP BY `page_id`;
    +---------+-----------------------------+
    | page_id | count(DISTINCT visit_users) |
    +---------+-----------------------------+
    | 2 | 1 |
    | 1 | 3 |
    +---------+-----------------------------+
    2 rows in set (0.01 sec)

ВНИМАНИЕ:

Коннектор использует функцию to_bitmap для преобразования данных типов TINYINT, SMALLINT, INTEGER и BIGINT в Spark в тип BITMAP в Selena, и использует функцию bitmap_hash для других типов данных Spark.

Загрузка данных в столбцы типа HLL

HLL может использоваться для приблизительного count distinct, см. Использование HLL для приблизительного count distinct.

Здесь мы возьмем подсчет UV в качестве примера, чтобы показать, как загружать данные в столбцы типа HLL. HLL поддерживается с версии 1.1.1.

  1. Создайте таблицу Aggregate Selena.

    В базе данных test создайте таблицу Aggregate hll_uv, где столбец visit_users определен как тип HLL и настроен с агрегатной функцией HLL_UNION.

    CREATE TABLE `hll_uv` (
    `page_id` INT NOT NULL COMMENT 'page ID',
    `visit_date` datetime NOT NULL COMMENT 'access time',
    `visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. Создайте таблицу Spark.

    Схема таблицы Spark выводится из таблицы Selena, и Spark не поддерживает тип HLL. Поэтому вам нужно настроить соответствующий тип данных столбца в Spark, например как BIGINT, настроив опцию "starrocks.column.types"="visit_users BIGINT". При использовании Stream Load для приема данных коннектор использует функцию hll_hash для преобразования данных типа BIGINT в тип HLL.

    Выполните следующий DDL в spark-sql:

    CREATE TABLE `hll_uv`
    USING starrocks
    OPTIONS(
    "starrocks.fe.http.url"="127.0.0.1:8030",
    "starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
    "starrocks.table.identifier"="test.hll_uv",
    "starrocks.user"="root",
    "starrocks.password"="",
    "starrocks.column.types"="visit_users BIGINT"
    );
  3. Загрузите данные в таблицу Selena.

    Выполните следующий DML в spark-sql:

    INSERT INTO `hll_uv` VALUES
    (3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78),
    (4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2),
    (3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674);
  4. Вычислите UV страниц из таблицы Selena.

    MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;
    +---------+-----------------------------+
    | page_id | count(DISTINCT visit_users) |
    +---------+-----------------------------+
    | 4 | 1 |
    | 3 | 2 |
    +---------+-----------------------------+
    2 rows in set (0.01 sec)

Загрузка данных в столбцы типа ARRAY

Следующий пример объясняет, как загружать данные в столбцы типа ARRAY.

  1. Создайте таблицу Selena.

    В базе данных test создайте таблицу Primary Key array_tbl, которая включает один столбец INT и два столбца ARRAY.

    CREATE TABLE `array_tbl` (
    `id` INT NOT NULL,
    `a0` ARRAY<STRING>,
    `a1` ARRAY<ARRAY<INT>>
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`)
    ;
  2. Запишите данные в Selena.

    Поскольку некоторые версии Selena не предоставляют метаданные столбца ARRAY, коннектор не может вывести соответствующий тип данных Spark для этого столбца. Однако вы можете явно указать соответствующий тип данных Spark столбца в опции starrocks.column.types. В этом примере вы можете настроить опцию как a0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>.

    Выполните следующие коды в spark-shell:

     val data = Seq(
    | (1, Seq("hello", "starrocks"), Seq(Seq(1, 2), Seq(3, 4))),
    | (2, Seq("hello", "spark"), Seq(Seq(5, 6, 7), Seq(8, 9, 10)))
    | )
    val df = data.toDF("id", "a0", "a1")
    df.write
    .format("starrocks")
    .option("starrocks.fe.http.url", "127.0.0.1:8030")
    .option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
    .option("starrocks.table.identifier", "test.array_tbl")
    .option("starrocks.user", "root")
    .option("starrocks.password", "")
    .option("starrocks.column.types", "a0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>")
    .mode("append")
    .save()
  3. Запросите данные в таблице Selena.

    MySQL [test]> SELECT * FROM `array_tbl`;
    +------+-----------------------+--------------------+
    | id | a0 | a1 |
    +------+-----------------------+--------------------+
    | 1 | ["hello","starrocks"] | [[1,2],[3,4]] |
    | 2 | ["hello","spark"] | [[5,6,7],[8,9,10]] |
    +------+-----------------------+--------------------+
    2 rows in set (0.01 sec)