Перейти к основному содержимому

SPARK LOAD

SPARK LOAD предварительно обрабатывает импортируемые данные через внешние ресурсы Spark, улучшает производительность импорта больших объемов данных Selena и экономит вычислительные ресурсы кластера Selena. Он в основном используется в сценариях первоначальной миграции и импорта больших объемов данных в Selena.

Spark load — это асинхронный метод импорта. Пользователи должны создавать задачи импорта типа Spark через протокол MySQL и просматривать результаты импорта через SHOW LOAD.

ВНИМАНИЕ

  • Вы можете загружать данные в таблицы Selena только как пользователь, имеющий привилегию INSERT на эти таблицы Selena. Если у вас нет привилегии INSERT, следуйте инструкциям в GRANT, чтобы предоставить привилегию INSERT пользователю, которого вы используете для подключения к кластеру Selena.
  • При использовании Spark Load для загрузки данных в таблицу Selena, столбец bucketing таблицы Selena не может быть типа DATE, DATETIME или DECIMAL.

Синтаксис

LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH RESOURCE resource_name
[resource_properties]
[opt_properties]

1.load_label

Метка текущей импортируемой партии. Уникальна в пределах базы данных.

Синтаксис:

[database_name.]your_label

2.data_desc

Используется для описания партии импортируемых данных.

Синтаксис:

DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (col2, ...)]
[SET (k1 = func(k2))]
[WHERE predicate]

DATA FROM TABLE hive_external_tbl
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]

Примечание

file_path:

Путь к файлу может быть указан к одному файлу, или можно использовать подстановочный знак * для указания всех файлов в каталоге. Подстановочные знаки должны соответствовать файлам, а не каталогам.

hive_external_tbl:

имя внешней таблицы hive.
Требуется, чтобы столбцы в импортируемой таблице selena существовали во внешней таблице hive.
Каждая задача загрузки поддерживает загрузку только из одной внешней таблицы Hive.
Не может использоваться одновременно с режимом file_path.

PARTITION:

Если этот параметр указан, будет импортирована только указанная партиция, а данные вне импортируемой партиции будут отфильтрованы.
Если не указан, по умолчанию будут импортированы все партиции таблицы.

NEGATIVE:

Если этот параметр указан, это эквивалентно загрузке партии "отрицательных" данных. Используется для компенсации той же партии ранее импортированных данных.
Этот параметр применим только когда существует столбец значений и тип агрегации столбца значений только SUM.

column_separator:

Указывает разделитель столбцов в импортируемом файле. По умолчанию \t
Если это невидимый символ, нужно добавить префикс \\x и использовать шестнадцатеричное представление разделителя.
Например, разделитель файла hive \x01 указывается как "\\x01"

file_type:

Используется для указания типа импортируемого файла. В настоящее время поддерживаются типы файлов csv, orc и parquet.

column_list:

Используется для указания соответствия между столбцами в импортируемом файле и столбцами в таблице.
Когда нужно пропустить столбец в импортируемом файле, укажите столбец как имя столбца, которое не существует в таблице.

Синтаксис:
(col_name1, col_name2, ...)

SET:

Если указать этот параметр, можно преобразовать столбец исходного файла согласно функции, а затем импортировать преобразованные результаты в таблицу. Синтаксис: column_name = expression.
Поддерживаются только встроенные функции Spark SQL. Пожалуйста, обратитесь к https://spark.apache.org/docs/2.4.6/api/sql/index.html.
Приведем несколько примеров для понимания.
Пример 1: в таблице есть три столбца "c1, c2, c3", и первые два столбца в исходном файле соответствуют (c1, c2), а сумма последних двух столбцов соответствует C3; тогда нужно указать columns (c1, c2, tmp_c3, tmp_c4) set (c3 = tmp_c3 + tmp_c4);
Пример 2: в таблице есть три столбца "year, month и day", а в исходном файле есть только один столбец времени в формате "2018-06-01 01:02:03".
Тогда можно указать columns (tmp_time) set (year = year(tmp_time), month = month(tmp_time), day = day(tmp_time)) для завершения импорта.

WHERE:

Фильтрует преобразованные данные, и только данные, соответствующие условию where, могут быть импортированы. В операторе WHERE можно ссылаться только на имена столбцов в таблице

3.resource_name

Имя используемого ресурса spark можно просмотреть через команду SHOW RESOURCES.

4.resource_properties

Когда у вас есть временная потребность, например изменение конфигураций Spark и HDFS, вы можете установить параметры здесь, которые действуют только в этой конкретной задаче загрузки spark и не влияют на существующие конфигурации в кластере Selena.

5.opt_properties

Используется для указания некоторых специальных параметров.

Синтаксис:

[PROPERTIES ("key"="value", ...)]

Вы можете указать следующие параметры: timeout: указывает тайм-аут операции импорта. Тайм-аут по умолчанию составляет 4 часа. В секундах. max_filter_ratio:максимально допустимая доля данных, которая может быть отфильтрована (по причинам, таким как нестандартные данные). По умолчанию нулевая толерантность. strict mode: строго ли ограничивать данные. По умолчанию false. timezone: указывает часовой пояс некоторых функций, зависящих от часового пояса, таких как strftime / alignment_timestamp/from_unixtime и т.д. Пожалуйста, обратитесь к документу [time zone] для подробностей. Если не указан, используется часовой пояс "Asia/Shanghai".

6.Пример формата импортируемых данных

int (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234 float (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, .356 date (DATE/DATETIME) :2017-10-03, 2017-06-13 12:34:03. (Примечание: для других форматов дат можно использовать функцию strftime или time_format для преобразования в команде Import) string class (CHAR/VARCHAR): "I am a student", "a"

NULL значение: \N

Примеры

  1. Импорт партии данных из HDFS с указанием времени ожидания и коэффициента фильтрации. Используйте ресурсы my_spark для spark.

    LOAD LABEL example_db.label1
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/file")
    INTO TABLE `my_table`
    )
    WITH RESOURCE 'my_spark'
    PROPERTIES
    (
    "timeout" = "3600",
    "max_filter_ratio" = "0.1"
    );

    Где hdfs_host — это хост namenode, hdfs_port — это порт fs.defaultfs (по умолчанию 9000)

  2. Импорт партии "отрицательных" данных из HDFS, указание разделителя как запятой, использование подстановочного знака * для указания всех файлов в каталоге и указание временных параметров ресурсов spark.

    LOAD LABEL example_db.label3
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/*")
    NEGATIVE
    INTO TABLE `my_table`
    COLUMNS TERMINATED BY ","
    )
    WITH RESOURCE 'my_spark'
    (
    "spark.executor.memory" = "3g",
    "broker.username" = "hdfs_user",
    "broker.password" = "hdfs_passwd"
    );
  3. Импорт партии данных из HDFS с указанием партиции и выполнением некоторых преобразований столбцов импортируемого файла, как показано ниже:

    Структура таблицы:
    k1 varchar(20)
    k2 int

    Предположим, что файл данных содержит только одну строку данных:

    Adele,1,1

    Каждый столбец в файле данных соответствует каждому столбцу, указанному в операторе импорта:
    k1,tmp_k2,tmp_k3

    Преобразование выглядит следующим образом:

    1. k1: без преобразования
    2. k2: является суммой tmp_k2 и tmp_k3

    LOAD LABEL example_db.label6
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/file")
    INTO TABLE `my_table`
    PARTITION (p1, p2)
    COLUMNS TERMINATED BY ","
    (k1, tmp_k2, tmp_k3)
    SET (
    k2 = tmp_k2 + tmp_k3
    )
    )
    WITH RESOURCE 'my_spark';
  4. Извлечение поля партиции в пути к файлу

    При необходимости партиционированные поля в пути к файлу будут разрешены в соответствии с типами полей, определенными в таблице, аналогично функции Partition Discovery в Spark

    LOAD LABEL example_db.label10
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/dir/city=beijing/*/*")
    INTO TABLE `my_table`
    (k1, k2, k3)
    COLUMNS FROM PATH AS (city, utc_date)
    SET (uniq_id = md5sum(k1, city))
    )
    WITH RESOURCE 'my_spark';

    Каталог hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/dir/city=beijing включает следующие файлы:

    [hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv, hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv, ...]

    Извлекаются поля city и utc_date в пути к файлу

  5. Фильтрация импортируемых данных. Могут быть импортированы только столбцы со значением k1 больше 10.

    LOAD LABEL example_db.label10
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/file")
    INTO TABLE `my_table`
    WHERE k1 > 10
    )
    WITH RESOURCE 'my_spark';
  6. Импорт из внешней таблицы hive и преобразование столбца uuid в исходной таблице в тип bitmap через глобальный словарь.

    LOAD LABEL db1.label1
    (
    DATA FROM TABLE hive_t1
    INTO TABLE tbl1
    SET
    (
    uuid=bitmap_dict(uuid)
    )
    )
    WITH RESOURCE 'my_spark';