Массовая загрузка данных с помощью Spark Load
Этот метод загрузки использует внешние ресурсы Apache Spark™ для предварительной обработки импортируемых данных, что улучшает производительность импорта и экономит вычислительные ресурсы. Он в основном используется для начальной миграции и импорта больших объёмов данных в Selena (объём данных до уровня ТБ).
Spark load — это асинхронный метод импорта, который требует от пользователей создания задач импорта типа Spark через протокол MySQL и просмотра результатов импорта с помощью SHOW LOAD.
ВНИМАНИЕ
- Только пользователи с привилегией INSERT на таблицу Selena могут загружать данные в эту таблицу. Вы можете следовать инструкциям в GRANT, чтобы предоставить необходи мую привилегию.
- Spark Load нельзя использовать для загрузки данных в таблицу с Primary Key.
Объяснение терминологии
- Spark ETL: Отвечает главным образом за ETL данных в процессе импорта, включая построение глобального словаря (тип BITMAP), партиционирование, сортировку, агрегацию и т.д.
- Broker: Broker — это независимый процесс без состояния. Он инкапсулирует интерфейс файловой системы и предоставляет Selena возможность читать файлы из удалённых систем хранения.
- Глобальный словарь: Сохраняет структуру данных, которая сопоставляет данные от исходного значения к закодированному значению. Исходное значение может быть любого типа данных, в то время как закодированное значение является целым числом. Глобальный словарь в основном используется в сценариях, где предварительно вычисляется точный count distinct.
Основы
Пользователь отправляет задачу импорта типа Spark через MySQL-клиент; FE записывает метаданные и возвращает результат отправки.
Выполнение задачи Spark load делится на следующие основные фазы.
- Пользователь отправляет задачу Spark load в FE.
- FE планирует отправку задачи ETL в cluster Apache Spark™ для выполнения.
- Cluster Apache Spark™ выполняет задачу ETL, которая включает построение глобального словаря (тип BITMAP), партиционирование, сортировку, агрегацию и т.д.
- После завершения задачи ETL FE получает путь к данным каждого предварительно обработанного среза и планирует соответствующие BE для выполнения задачи Push.
- BE читает данные через процесс Broker из HDFS и конвертирует их в формат хранения Selena.
Если вы решите не использовать процесс Broker, BE читает данные из HDFS напрямую.
- FE планирует эффективную версию и завершает задачу импорта.
Следующая диаграмма иллюстрирует основной поток Spark load.

Глобальный словарь
Применимые сценарии
В настоящее время столбец BITMAP в Selena реализован с использованием Roaringbitmap, который принимает только целые числа в качестве типа входных данных. Поэтому, если вы хотите реализовать предварительное вычисление для столбца BITMAP в процессе импорта, вам нужно преобразовать тип входных данных в целое число.
В существующем процессе импорта Selena структура данных глобального словаря реализована на основе таблицы Hive, которая сохраняет сопоставление от исходного значения к закодированному значению.
Процесс построения
- Чтение данных из вышесто ящего источника данных и генерация временной таблицы Hive с именем
hive-table. - Извлечение значений дедуплицируемых полей из
hive-tableдля генерации новой таблицы Hive с именемdistinct-value-table. - Создание новой таблицы глобального словаря с именем
dict-tableс одним столбцом для исходных значений и одним столбцом для закодированных значений. - Left join между
distinct-value-tableиdict-table, а затем использование оконной функции для кодирования этого набора. Наконец, как исходное значение, так и закодированное значение дедуплицированного столбца записываются обратно вdict-table. - Join между
dict-tableиhive-tableдля завершения работы по замене исходного значения вhive-tableна целочисленное закодированное значение. hive-tableбудет прочитана при следующей предварительной обработке данных, а затем импортирована в Selena после вычисления.
Предварительная обработка данных
Базовый процесс предварительной обработки данных следующий:
- Чтение данных из вышестоящего источника данных (файл HDFS или таблица Hive).
- Выполнение сопоставления полей и вычисления для прочитанных данных, затем генерация
bucket-idна основе информации о partition. - Генерация RollupTree на основе метаданных Rollup таблицы Selena.
- Итерация по RollupTree и выполнение иерархических операций агрегации. Rollup следующей иерархии может быть вычислен из Rollup предыдущей иерархии.
- Каждый раз после завершения вычисления агрегации данные разбиваются по bucket в соответствии с
bucket-idи затем записываются в HDFS. - Последующий процесс Broker будет извлекать файлы из HDFS и импортировать их в узел BE Selena.
Базовые операции
Конфигурация cluster ETL
Apache Spark™ используется в качестве внешнего вычислительного ресурса в Selena для работы ETL. К Selena могут быть добавлены и другие внешние ресурсы, такие как Spark/GPU для запросов, HDFS/S3 для внешнего хранилища, MapReduce для ETL и т.д. Поэтому мы вводим Resource Management для управления этими внешними ресурсами, используемыми Selena.
Перед отправкой задачи импорта Apache Spark™ настройте cluster Apache Spark™ для выполнения задач ETL. Синтаксис операции следующий:
-- создание ресурса Apache Spark™
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
type = spark,
spark_conf_key = spark_conf_value,
working_dir = path,
broker = broker_name,
broker.property_key = property_value
);
-- удаление ресурса Apache Spark™
DROP RESOURCE resource_name;
-- просмотр ресурсов
SHOW RESOURCES
SHOW PROC "/resources";
-- привилегии
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identityGRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name;
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identityREVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name;
- Создание ресурса
Пример:
-- режим yarn cluster
CREATE EXTERNAL RESOURCE "spark0"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.jars" = "xxx.jar,yyy.jar",
"spark.files" = "/tmp/aaa,/tmp/bbb",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "queue0",
"spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/selena",
"broker" = "broker0",
"broker.username" = "user0",
"broker.password" = "password0"
);
-- режим yarn HA cluster
CREATE EXTERNAL RESOURCE "spark1"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
"spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
"spark.hadoop.yarn.resourcemanager.hostname.rm1" = "host1",
"spark.hadoop.yarn.resourcemanager.hostname.rm2" = "host2",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/selena",
"broker" = "broker1"
);
resource-name — это имя ресурса Apache Spark™, настроенного в Selena.
PROPERTIES включает параметры, относящиеся к ресурсу Apache Spark™, следующим образом:
Примечание
Для получения подробного описания PROPERTIES ресурса Apache Spark™ см. CREATE RESOURCE
-
Параметры, связанные со Spark:
type: Тип ресурса, обязательный, в настоящее время поддерживается толькоspark.spark.master: Обязательный, в настоящее время поддерживается толькоyarn.spark.submit.deployMode: Режим развёртывания программы Apache Spark™, обязательный, в настоящее время поддерживаются какcluster, так иclient.spark.hadoop.fs.defaultFS: Обязательный, если master — yarn.- Параметры, связанные с yarn resource manager, обязательные.
- один ResourceManager на одном узле
spark.hadoop.yarn.resourcemanager.address: Адрес единственного resource manager. - ResourceManager HA
Вы можете выбрать указание hostname или адреса ResourceManager.
spark.hadoop.yarn.resourcemanager.ha.enabled: Включить HA resource manager, установите вtrue.spark.hadoop.yarn.resourcemanager.ha.rm-ids: список логических идентификаторов resource manager.spark.hadoop.yarn.resourcemanager.hostname.rm-id: Для каждого rm-id укажите hostname, соответствующий resource manager.spark.hadoop.yarn.resourcemanager.address.rm-id: Для каждого rm-id укажитеhost:portдля клиента для отправки задач.
- один ResourceManager на одном узле
-
*working_dir: Каталог, используемый ETL. Обязательный, если Apache Spark™ используется в качестве ресурса ETL. Например:hdfs://host:port/tmp/selena. -
Параметры, связанные с Broker:
broker: Имя Broker. Обязательный, если Apache Spark™ используется в качестве ресурса ETL. Вам нужно заранее выполнить настройку с помощью командыALTER SYSTEM ADD BROKER.broker.property_key: Информация (например, информация для аутентификации), которая должна быть указана, когда процесс Broker читает промежуточный файл, сгенерированный ETL.
Предостережение:
Выше приведено описание параметров для загрузки через процесс Broker. Если вы намерены загружать данные без процесса Broker, следует обратить внимание на следующее.
- Вам не нужно указывать
broker. - Если вам нужно настроить аутентификацию пользователя и HA для узлов NameNode, вам нужно настроить параметры в файле hdfs-site.xml в cluster HDFS, см. описания параметров в broker_properties. и вам нужно переместить файл hdfs-site.xml в $FE_HOME/conf для каждого FE и $BE_HOME/conf для каждого BE.
Примечание
Если файл HDFS доступен только определённому пользователю, вам всё ещё нужно указать имя пользователя HDFS в
broker.nameи пароль пользователя вbroker.password.
- Просмотр ресурсов
Обычные учётные записи могут просматривать только ресурсы, к которым у них есть доступ USAGE-PRIV. Учётные записи root и admin могут просматривать все ресурсы.
- Разрешения ресурсов
Разрешения ресурсов управляются через GRANT REVOKE, который в настоящее время поддерживает только разрешения USAGE-PRIV. Вы можете предоставить разрешения USAGE-PRIV пользователю или роли.
-- Предоставить доступ к ресурсам spark0 пользователю user0
GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";
-- Предоставить доступ к ресурсам spark0 роли role0
GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";
-- Предоставить доступ ко всем ресурсам пользователю user0
GRANT USAGE_PRIV ON RESOURCE* TO "user0"@"%";
-- Предоставить доступ ко всем ресурсам роли role0
GRANT USAGE_PRIV ON RESOURCE* TO ROLE "role0";
-- Отозвать привилегии использования ресурсов spark0 у пользователя user0
REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";