Загрузка данных с использованием Spark connector (рекомендуется)
Selena предоставляет собственный коннектор под названием Selena Connector for Apache Spark™ (сокращенно Spark connector), который помогает загружать данные в таблицу Selena с помощью Spark. Основной принцип заключается в накоплении данных и последующей загрузке их все сразу в Selena через STREAM LOAD. Spark connector реализован на основе Spark DataSource V2. DataSource может быть создан с использованием Spark DataFrames или Spark SQL. Поддерживаются как пакетный, так и структурированный потоковый режимы.
ВНИМАНИЕ
Только пользователи с привилегиями SELECT и INSERT на таблицу Selena могут загружать данные в эту таблицу. Вы можете следовать инструкциям, предоставленным в GRANT, чтобы предоставить эти привилегии пользоват елю.
Требования к версиям
| Spark connector | Spark | Selena | Java | Scala |
|---|---|---|---|---|
| 1.1.2 | 3.2, 3.3, 3.4, 3.5 | 2.5 и позже | 8 | 2.12 |
| 1.1.1 | 3.2, 3.3, или 3.4 | 2.5 и позже | 8 | 2.12 |
| 1.1.0 | 3.2, 3.3, или 3.4 | 2.5 и позже | 8 | 2.12 |
ВНИМАНИЕ
- Пожалуйста, смотрите Обновление Spark connector для изменений поведения между различными версиями Spark connector.
- Spark connector не предоставляет драйвер MySQL JDBC начиная с версии 1.5.0, и вам необходимо импортировать драйвер в classpath spark вручную. Вы можете найти драйвер на сайте MySQL или Maven Central.
Получение Spark connector
Вы можете получить JAR-файл Spark connector следующими способами:
- Напрямую скачать скомпилированный JAR-файл Spark Connector.
- Добавить Spark connector как зависимость в ваш Maven проект и затем скачать JAR-файл.
- Скомпилировать исходный код Spark Connector в JAR-файл самостоятельно.
Формат именования JAR-файла Spark connector: starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar.
Например, если в вашей среде установлены Spark 3.2 и Scala 2.12, и вы хотите использовать Spark connector 1.1.0, вы можете использовать starrocks-spark-connector-3.2_2.12-1.1.0.jar.
ВНИМАНИЕ
В общем случае, последняя версия Spark connector поддерживает совместимость только с тремя самыми последними версиями Spark.
Скачивание скомпилированного Jar файла
Напрямую скачайте соответствующую версию JAR-файла Spark connector из Maven Central Repository.
Maven зависимость
-
В файле
pom.xmlвашего Maven проекта добавьте Spark connector как зависимость согласно следующему формату. Заменитеspark_version,scala_versionиconnector_versionна соответствующие версии.<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency> -
Например, если версия Spark в вашей среде 3.2, версия Scala 2.12, и вы выбираете Spark connector 1.1.0, вам нужно добавить следующую зависимость:
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
Компиляция самостоятельно
-
Скачайте пакет Spark connector.
-
Выполните следующую команду для компиляции исходного кода Spark connector в JAR-файл. Обратите внимание, что
spark_versionзаменяется на соответствующую версию Spark.sh build.sh <spark_version>Например, если версия Spark в вашей среде 3.2, вам нужно выполнить следующую команду:
sh build.sh 3.2 -
Перейдите в директорию
target/, чтобы найти JAR-файл Spark connector, такой какstarrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar, созданный при компиляции.
ПРИМЕЧАНИЕ
Имя Spark connector, который не выпущен официально, содержит суффикс
SNAPSHOT.
Параметры
starrocks.fe.http.url
Обязательный: ДА
Значение по умолчанию: Нет
Описание: HTTP URL FE в вашем кластере Selena. Вы можете указать несколько URL, которые должны быть разделены запятой (,). Формат: <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>. Начиная с версии 1.5.0, вы также можете добавить префикс http:// к URL, например http://<fe_host1>:<fe_http_port1>,http://<fe_host2>:<fe_http_port2>.
starrocks.fe.jdbc.url
Обязательный: ДА
Значение по умолчанию: Нет
Описание: Адрес, который используется для подключения к MySQL серверу FE. Формат: jdbc:mysql://<fe_host>:<fe_query_port>.
starrocks.table.identifier
Обязательный: ДА
Значение по умолчанию: Нет
Описание: Имя таблицы Selena. Формат: <database_name>.<table_name>.
starrocks.user
Обязательный: ДА
Значение по умолчанию: Нет
Описание: Имя пользователя вашей учетной записи кластера Selena. Пользователь должен иметь привилегии SELECT и INSERT на таблицу Selena.
starrocks.password
Обязательный: ДА
Значение по умолчанию: Нет
Описание: Пароль вашей учетной записи кластера Selena.
starrocks.write.label.prefix
Обязательный: НЕТ
Значение по умолчанию: spark-
Описание: Префикс метки, используемый Stream Load.
starrocks.write.enable.transaction-stream-load
Обязательный: НЕТ
Значение по умолчанию: TRUE
Описание: Использовать ли интерфейс транзакций Stream Load для загрузки данных. Требуется Selena v2.5 или позже. Эта функ ция может загружать больше данных в транзакции с меньшим использованием памяти и улучшить производительность.
ВНИМАНИЕ: Начиная с 1.1.1, этот параметр действует только когда значение starrocks.write.max.retries не положительное, поскольку интерфейс транзакций Stream Load не поддерживает повторные попытки.
starrocks.write.buffer.size
Обязательный: НЕТ
Значение по умолчанию: 104857600
Описание: Максимальный размер данных, которые могут быть накоплены в памяти перед отправкой в Selena за один раз. Установка этого параметра на большее значение может улучшить производительность загрузки, но может увеличить задержку загрузки.
starrocks.write.buffer.rows
Обязательный: НЕТ
Значение по умолчанию: Integer.MAX_VALUE
Описание: Поддерживается с версии 1.1.1. Максимальное количество строк, которые могут быть накоплены в памяти перед отправкой в Selena за один раз.
starrocks.write.flush.interval.ms
Обязательный: НЕТ
Значение по умолчанию: 300000
Описание: Интервал, с которым данные отправляются в Selena. Этот параметр используется для контроля задержки загрузки.
starrocks.write.max.retries
Обязательный: НЕТ
Значение по умолчанию: 3
Описание: Поддерживается с версии 1.1.1. Количество попыток, которые коннектор делает для выполнения Stream Load для одной и той же партии данных, если загрузка не удалась.
ВНИМАНИЕ: Поскольку интерфейс транзакций Stream Load не поддерживает повторные попытки. Если этот параметр положительный, коннектор всегда использует интерфейс Stream Load и игнорирует значение starrocks.write.enable.transaction-stream-load.
starrocks.write.retry.interval.ms
Обязательный: НЕТ
Значение по умолчанию: 10000
Описание: Поддерживается с версии 1.1.1. Интервал для повторной попытки Stream Load для одной и той же партии данных, если загрузка не удалась.
starrocks.columns
Обязательный: НЕТ
Значение по умолчанию: Нет
Описание: Столбец таблицы Selena, в который вы хотите загрузить данные. Вы можете указать несколько столбцов, которые должны быть разделены запятыми (,), например, "col0,col1,col2".
starrocks.column.types
Обязательный: НЕТ
Значение по умолчанию: Нет
Описание: Поддерживается с версии 1.1.1. Настройка типов данных столбцов для Spark вместо использования значений по умолчанию, выведенных из таблицы Selena и сопоставления по умолчанию. Значение параметра - это схема в формате DDL, такая же как вывод Spark StructType#toDDL, например col0 INT, col1 STRING, col2 BIGINT. Обратите внимание, что вам нужно указать только столбцы, которые требуют настройки. Один случай использования - загрузка данных в столбцы типа BITMAP или HLL.
starrocks.write.properties.*
Обязательный: НЕТ
Значение по умолчанию: Нет
Описание: Параметры, которые используются для управления поведением Stream Load. Например, параметр starrocks.write.properties.format указывает формат загружаемых данных, такой как CSV или JSON. Для списка поддерживаемых параметров и их описаний см. STREAM LOAD.
starrocks.write.properties.format
Обязательный: НЕТ
Значение по умолчанию: CSV
Описание: Формат файла, на основе которого Spark connector преобразует каждую партию данных перед отправкой данных в Selena. Допустимые значения: CSV и JSON.
starrocks.write.properties.row_delimiter
Обязательный: НЕТ
Значение по умолчанию: \n
Описание: Разделитель строк для данных в формате CSV.
starrocks.write.properties.column_separator
Обязательный: НЕТ
Значение по умолчанию: \t
Описание: Разделитель столбцов для данных в формате CSV.
starrocks.write.properties.partial_update
Обязательный: НЕТ
Значение по умолчанию: FALSE
Описание: Использовать ли частичные обновления. Допустимые значения: TRUE и FALSE. Значение по умолчанию: FALSE, указывающее на отключение этой функции.
starrocks.write.properties.partial_update_mode
Обязательный: НЕТ
Значение по умолчанию: row
Описание: Указывает режим для частичных обновлений. Допустимые значения: row и column.
- Значение
row(по умолчанию) означает частичные обновления в режиме строк, что более подходит для обновлений в реальном времени с множеством столбцов и небольшими партиями. - Значение
columnозначает частичные обновления в режиме столбцов, что более подходит для пакетных обновлений с небольшим количеством столбцов и множеством строк. В таких сценариях включение режима столбцов обеспечивает более быструю скорость обновления. Например, в таблице со 100 столбцами, если обновляются только 10 столбцов (10% от общего количества) для всех строк, скорость обновления в режиме столбцов в 10 раз быстрее.
starrocks.write.num.partitions
Обязательный: НЕТ
Значение по умолчанию: Нет
Описание: Количество разделов, в которые Spark может записывать данные параллельно. Когда объем данных небольшой, вы можете уменьшить количество разделов, чтобы снизить параллелизм и частоту загрузки. Значение по умолчанию для этого параметра определяется Spark. Однако этот метод может вызвать затраты на Spark Shuffle.
starrocks.write.partition.columns
Обязательный: НЕТ
Значение по умолчанию: Нет
Описание: Столбцы разделения в Spark. Параметр действует только когда указан starrocks.write.num.partitions. Если этот параметр не указан, все записываемые столбцы используются для разделения.
starrocks.timezone
Обязательный: НЕТ
Значение по умолчанию: Часовой пояс JVM по умолчанию
Описание: Поддерживается с 1.1.1. Часовой пояс, используемый для преобразования Spark TimestampType в Selena DATETIME. По умолчанию это часовой пояс JVM, возвращаемый ZoneId#systemDefault(). Формат может быть именем часового пояса, таким как Asia/Shanghai, или смещением зоны, таким как +08:00.
Сопоставление типов данных между Spark и Selena
-
Сопоставление типов данных по умолчанию следующее:
Тип данных Spark Тип данных Selena BooleanType BOOLEAN ByteType TINYINT ShortType SMALLINT IntegerType INT LongType BIGINT StringType LARGEINT FloatType FLOAT DoubleType DOUBLE DecimalType DECIMAL StringType CHAR StringType VARCHAR StringType STRING StringType JSON DateType DATE TimestampType DATETIME ArrayType ARRAY
ПРИМЕЧАНИЕ:
Поддерживается с версии 1.1.1. Для подробных шагов см. Загрузка данных в столбцы типа ARRAY. -
Вы также можете настроить сопоставление типов данных.
Например, таблица Selena содержит столбцы BITMAP и HLL, но Spark не поддерживает эти два типа данных. Вам нужно настроить соответствующие типы данных в Spark. Для подробных шагов см. загрузку данных в столбцы BITMAP и HLL. BITMAP и HLL поддерживаются с версии 1.1.1.
Обновление Spark connector
Обновление с версии 1.1.0 до 1.1.1
- Начиная с 1.1.1, Spark connector не предоставляет
mysql-connector-java, который является официальным JDBC драйвером для MySQL, из-за ограничений лицензии GPL, используемойmysql-connector-java. Однако Spark connector все еще нуждается в драйвере MySQL JDBC для подключения к Selena для метаданных таблицы, поэтому вам нужно добавить драйвер в classpath Spark вручную. Вы можете найти драйвер на сайте MySQL или Maven Central. - Начиная с 1.1.1, коннектор использует интерфейс Stream Load по умолчанию, а не интерфейс транзакций Stream Load в версии 1.1.0. Если вы все еще хотите использовать интерфейс транзакций Stream Load, вы
можете установить опцию
starrocks.write.max.retriesв0. Пожалуйста, смотрите описаниеstarrocks.write.enable.transaction-stream-loadиstarrocks.write.max.retriesдля подробностей.
Примеры
Следующие примеры показывают, как использовать Spark connector для загрузки данных в таблицу Selena с помощью Spark DataFrames или Spark SQL. Spark DataFrames поддерживает как пакетный, так и структурированный потоковый режимы.
Для большего количества примеров см. Примеры Spark Connector.
Подготовка
Создание таблицы Selena
Создайте базу данных test и создайте таблицу Primary Key score_board.
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
Конфигурация сети
Убедитесь, что машина, где расположен Spark, может получить доступ к узлам FE кластера Selena через http_port (по умолчанию: 8030) и query_port (по умолчанию: 9030), и к узлам BE через be_http_port (по умолчанию: 8040).
Настройка среды Spark
Обратите внимание, что следующие примеры выполняются в Spark 3.2.4 и используют spark-shell, pyspark и spark-sql. Перед запуском примеров убедитесь, что JAR-файл Spark connector размещен в директории $SPARK_HOME/jars.
Загрузка данных с помощью Spark DataFrames
Следующие два примера объясняют, как загружать данные с помощью пакетного или структурированного потокового режима Spark DataFrames.
Пакетный режим
Создание данных в памяти и загрузка данных в таблицу Selena.
- Вы можете написать приложение spark, используя Scala или Python.
Для Scala выполните следующий фрагмент кода в spark-shell:
// 1. Создайте DataFrame из последовательности.
val data = Seq((1, "starrocks", 100), (2, "spark", 100))
val df = data.toDF("id", "name", "score")
// 2. Запишите в Selena, настроив формат как "starrocks" и следующие опции.
// Вам нужно изменить опции согласно вашей собственной среде.
df.write.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
.mode("append")
.save()
Для Python выполните следующий фрагмент кода в pyspark:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Selena Example") \
.getOrCreate()
# 1. Создайте DataFrame из последовательности.
data = [(1, "starrocks", 100), (2, "spark", 100)]
df = spark.sparkContext.parallelize(data) \
.toDF(["id", "name", "score"])
# 2. Запишите в Selena, настроив формат как "starrocks" и следующие опции.
# Вам нужно изменить опции согласно вашей собственной среде.
df.write.format("starrocks") \
.option("starrocks.fe.http.url", "127.0.0.1:8030") \
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030") \
.option("starrocks.table.identifier", "test.score_board") \
.option("starrocks.user", "root") \
.option("starrocks.password", "") \
.mode("append") \
.save()
-
Запросите данные в таблице Selena.
MySQL [test]> SELECT * FROM `score_board`;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 1 | starrocks | 100 |
| 2 | spark | 100 |
+------+-----------+-------+
2 rows in set (0.00 sec)
Структурированная потоковая передача
Создание потокового чтения данных из CSV файла и загрузка данных в таблицу Selena.
-
В директории
csv-dataсоздайте CSV файлtest.csvсо следующими данными:3,starrocks,100
4,spark,100 -
Вы можете написать приложение Spark, используя Scala или Python.
Для Scala выполните следующий фрагмент кода в spark-shell:
import org.apache.spark.sql.types.StructType
// 1. Создайте DataFrame из CSV.
val schema = (new StructType()
.add("id", "integer")
.add("name", "string")
.add("score", "integer")
)
val df = (spark.readStream
.option("sep", ",")
.schema(schema)
.format("csv")
// Замените на ваш путь к директории "csv-data".
.load("/path/to/csv-data")
)
// 2. Запишите в Selena, настроив формат как "starrocks" и следующие опции.
// Вам нужно изменить опции согласно вашей собственной среде.
val query = (df.writeStream.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
// замените на вашу директорию checkpoint
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("append")
.start()
)
Для Python выполните следующий фрагмент кода в pyspark:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
spark = SparkSession \
.builder \
.appName("Selena SS Example") \
.getOrCreate()
# 1. Создайте DataFrame из CSV.
schema = StructType([
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("score", IntegerType())
])
df = (
spark.readStream
.option("sep", ",")
.schema(schema)
.format("csv")
# Замените на ваш путь к директории "csv-data".
.load("/path/to/csv-data")
)
# 2. Запишите в Selena, настроив формат как "starrocks" и следующие опции.
# Вам нужно изменить опции согласно вашей собственной среде.
query = (
df.writeStream.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
# замените на вашу директорию checkpoint
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("append")
.start()
)
-
Запросите данные в таблице Selena.
MySQL [test]> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 4 | spark | 100 |
| 3 | starrocks | 100 |
+------+-----------+-------+
2 rows in set (0.67 sec)
Загрузка данных с помощью Spark SQL
Следующий пример объясняет, как загружать данные с помощью Spark SQL, используя оператор INSERT INTO в Spark SQL CLI.
-
Выполните следующий SQL оператор в
spark-sql:-- 1. Создайте таблицу, настроив источник данных как `starrocks` и следующие опции.
-- Вам нужно изменить опции согласно вашей собственной среде.
CREATE TABLE `score_board`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.score_board",
"starrocks.user"="root",
"starrocks.password"=""
);
-- 2. Вставьте две строки в таблицу.
INSERT INTO `score_board` VALUES (5, "starrocks", 100), (6, "spark", 100); -
Запросите данные в таблице Selena.
MySQL [test]> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 6 | spark | 100 |
| 5 | starrocks | 100 |
+------+-----------+-------+
2 rows in set (0.00 sec)
Лучшие практики
Загрузка данных в таблицу Primary Key
Этот раздел покажет, как загружать данные в таблицу Primary Key Selena для достижения частичных обновлени й и условных обновлений. Вы можете посмотреть Изменение данных через загрузку для подробного введения в эти функции. Эти примеры используют Spark SQL.
Подготовка
Создайте базу данных test и создайте таблицу Primary Key score_board в Selena.
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
Частичные обновления
Этот пример покажет, как обновлять только данные в столбце name через загрузку:
-
Вставьте начальные данные в таблицу Selena в клиенте MySQL.
mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'spark', 100);
mysql> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 1 | starrocks | 100 |
| 2 | spark | 100 |
+------+-----------+-------+
2 rows in set (0.02 sec) -
Создайте таблицу Spark
score_boardв клиенте Spark SQL.- Установите опцию
starrocks.write.properties.partial_updateвtrue, что говорит коннектору делать частичное обновление. - Установите опцию
starrocks.columnsв"id,name", чтобы сказать коннектору, какие столбцы записывать.
CREATE TABLE `score_board`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.score_board",
"starrocks.user"="root",
"starrocks.password"="",
"starrocks.write.properties.partial_update"="true",
"starrocks.columns"="id,name"
); - Установите опцию
-
Вставьте данные в таблицу в клиенте Spark SQL и обновите только столбец
name.INSERT INTO `score_board` VALUES (1, 'starrocks-update'), (2, 'spark-update'); -
Запросите таблицу Selena в клиенте MySQL.
Вы можете видеть, что изменяются только значения для
name, а значения дляscoreне изменяются.mysql> select * from score_board;
+------+------------------+-------+
| id | name | score |
+------+------------------+-------+
| 1 | starrocks-update | 100 |
| 2 | spark-update | 100 |
+------+------------------+-------+
2 rows in set (0.02 sec)
Условные обновления
Этот пример покажет, как делать условные обновления согласно значениям столбца score. Обновление для id
вступает в силу только когда новое значение для score больше или равно старому значению.
-
Вставьте начальные данные в таблицу Selena в клиенте MySQL.
mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'spark', 100);
mysql> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 1 | starrocks | 100 |
| 2 | spark | 100 |
+------+-----------+-------+
2 rows in set (0.02 sec) -
Создайте таблицу Spark
score_boardследующими способами.- Установите опцию
starrocks.write.properties.merge_conditionвscore, что говорит коннектору использовать столбецscoreкак условие. - Убедитесь, что Spark connector использует интерфейс Stream Load для загрузки данных, а не интерфейс транзакций Stream Load, поскольку последний не поддерживает эту функцию.
CREATE TABLE `score_board`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.score_board",
"starrocks.user"="root",
"starrocks.password"="",
"starrocks.write.properties.merge_condition"="score"
); - Установите опцию
-
Вставьте данные в таблицу в клиенте Spark SQL и обновите строку, чей
idравен 1, с меньшим значением score, и строку, чейidравен 2, с большим значением score.INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'spark-update', 101); -
Запросите таблицу Selena в клиенте MySQL.
Вы можете видеть, что изменяется только строка, чей
idравен 2, а строка, чейidравен 1, не изменяется.mysql> select * from score_board;
+------+--------------+-------+
| id | name | score |
+------+--------------+-------+
| 1 | starrocks | 100 |
| 2 | spark-update | 101 |
+------+--------------+-------+
2 rows in set (0.03 sec)
Загрузка данных в столбцы типа BITMAP
BITMAP часто используется для ускорения count distinct, например подсчета UV, см. Использование Bitmap для точного Count Distinct.
Здесь мы возьмем подсчет UV в качестве примера, чтобы показать, как загружать данные в столбцы типа BITMAP. BITMAP поддерживается с верс ии 1.1.1.
-
Создайте таблицу Aggregate Selena.
В базе данных
testсоздайте таблицу Aggregatepage_uv, где столбецvisit_usersопределен как типBITMAPи настроен с агрегатной функциейBITMAP_UNION.CREATE TABLE `test`.`page_uv` (
`page_id` INT NOT NULL COMMENT 'page ID',
`visit_date` datetime NOT NULL COMMENT 'access time',
`visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID'
) ENGINE=OLAP
AGGREGATE KEY(`page_id`, `visit_date`)
DISTRIBUTED BY HASH(`page_id`); -
Создайте таблицу Spark.
Схема таблицы Spark выводится из таблицы Selena, и Spark не поддерживает тип
BITMAP. Поэтому вам нужно настроить соответствующий тип данных столбца в Spark, например какBIGINT, настроив опцию"starrocks.column.types"="visit_users BIGINT". При использовании Stream Load для приема данных коннектор использует функциюto_bitmapдля преобразования данных типаBIGINTв типBITMAP.Выполните следующий DDL в
spark-sql:CREATE TABLE `page_uv`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.page_uv",
"starrocks.user"="root",
"starrocks.password"="",
"starrocks.column.types"="visit_users BIGINT"
); -
Загрузите данные в таблицу Selena.
Выполните следующий DML в
spark-sql:INSERT INTO `page_uv` VALUES
(1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13),
(1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23),
(1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33),
(1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13),
(2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23); -
Вычислите UV страниц из таблицы Selena.
MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `page_uv` GROUP BY `page_id`;
+---------+-----------------------------+
| page_id | count(DISTINCT visit_users) |
+---------+-----------------------------+
| 2 | 1 |
| 1 | 3 |
+---------+-----------------------------+
2 rows in set (0.01 sec)
ВНИМАНИЕ:
Коннектор использует функцию
to_bitmapдля преобразования данных типовTINYINT,SMALLINT,INTEGERиBIGINTв Spark в типBITMAPв Selena, и использует функциюbitmap_hashдля других типов данных Spark.
Загрузка данных в столбцы типа HLL
HLL может использоваться для приблизительного count distinct, см. Использование HLL для приблизительного count distinct.
Здесь мы возьмем подсчет UV в качестве примера, чтобы показать, как загружать данные в столбцы типа HLL. HLL поддерживается с версии 1.1.1.
-
Создайте таблицу Aggregate Selena.
В базе данных
testсоздайте таблицу Aggregatehll_uv, где столбецvisit_usersопределен как типHLLи настроен с агрегатной функциейHLL_UNION.CREATE TABLE `hll_uv` (
`page_id` INT NOT NULL COMMENT 'page ID',
`visit_date` datetime NOT NULL COMMENT 'access time',
`visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID'
) ENGINE=OLAP
AGGREGATE KEY(`page_id`, `visit_date`)
DISTRIBUTED BY HASH(`page_id`); -
Создайте таблицу Spark.
Схема таблицы Spark выводится из таблицы Selena, и Spark не поддерживает тип
HLL. Поэтому вам нужно настроить соответствующий тип данных столбца в Spark, например какBIGINT, настроив опцию"starrocks.column.types"="visit_users BIGINT". При использовании Stream Load для приема данных коннектор использует функциюhll_hashдля преобразования данных типаBIGINTв типHLL.Выполните следующий DDL в
spark-sql:CREATE TABLE `hll_uv`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.hll_uv",
"starrocks.user"="root",
"starrocks.password"="",
"starrocks.column.types"="visit_users BIGINT"
); -
Загрузите данные в таблицу Selena.
Выполните следующий DML в
spark-sql:INSERT INTO `hll_uv` VALUES
(3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78),
(4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2),
(3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674); -
Вычислите UV страниц из таблицы Selena.
MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;
+---------+-----------------------------+
| page_id | count(DISTINCT visit_users) |
+---------+-----------------------------+
| 4 | 1 |
| 3 | 2 |
+---------+-----------------------------+
2 rows in set (0.01 sec)
Загрузка данных в столбцы типа ARRAY
Следующий пример объясняет, как загружать данные в столбцы типа ARRAY.
-
Создайте таблицу Selena.
В базе данных
testсоздайте таблицу Primary Keyarray_tbl, которая включает один столбецINTи два столбцаARRAY.CREATE TABLE `array_tbl` (
`id` INT NOT NULL,
`a0` ARRAY<STRING>,
`a1` ARRAY<ARRAY<INT>>
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`)
; -
Запишите данные в Selena.
Поскольку некоторые версии Selena не предоставляют метаданные столбца
ARRAY, коннектор не может вывести соответствующий тип данных Spark для этого столбца. Однако вы можете явно указать соответствующий тип данных Spark столбца в опцииstarrocks.column.types. В этом примере вы можете настроить опцию какa0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>.Выполните следующие коды в
spark-shell:val data = Seq(
| (1, Seq("hello", "starrocks"), Seq(Seq(1, 2), Seq(3, 4))),
| (2, Seq("hello", "spark"), Seq(Seq(5, 6, 7), Seq(8, 9, 10)))
| )
val df = data.toDF("id", "a0", "a1")
df.write
.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.array_tbl")
.option("starrocks.user", "root")
.option("starrocks.password", "")
.option("starrocks.column.types", "a0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>")
.mode("append")
.save() -
Запросите данные в таблице Selena.
MySQL [test]> SELECT * FROM `array_tbl`;
+------+-----------------------+--------------------+
| id | a0 | a1 |
+------+-----------------------+--------------------+
| 1 | ["hello","starrocks"] | [[1,2],[3,4]] |
| 2 | ["hello","spark"] | [[5,6,7],[8,9,10]] |
+------+-----------------------+--------------------+
2 rows in set (0.01 sec)