Перейти к основному содержимому
Версия: 2.0.x

SPARK LOAD

SPARK LOAD предварительно обрабатывает импортируемые данные с помощью внешних ресурсов spark, повышает производительность импорта большого объема данных Selena и экономит вычислительные ресурсы кластера Selena. Он в основном используется в сценарии начальной миграции и импорта больших объемов данных в Selena.

Spark load является асинхронным методом импорта. Пользователям необходимо создавать задачи импорта типа Spark через протокол MySQL и просматривать результаты импорта с помощью SHOW LOAD.

ПРИМЕЧАНИЕ

  • Вы можете загружать данные в таблицы Selena только как пользователь, имеющий привилегию INSERT для этих таблиц Selena. Если у вас нет привилегии INSERT, следуйте инструкциям, приведенным в GRANT, чтобы предоставить привилегию INSERT пользователю, который используется для подключения к вашему кластеру Selena.
  • При использовании Spark Load для загрузки данных в таблицу Selena, столбец bucketing таблицы Selena не может быть типа DATE, DATETIME или DECIMAL.

Синтаксис

LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH RESOURCE resource_name
[resource_properties]
[opt_properties]

1.load_label

Метка текущей импортируемой партии. Уникальна в пределах базы данных.

Синтаксис:

[database_name.]your_label

2.data_desc

Используется для описания партии импортируемых данных.

Синтаксис:

DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (col2, ...)]
[SET (k1 = func(k2))]
[WHERE predicate]

DATA FROM TABLE hive_external_tbl
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]

Примечание

file_path:

Путь к файлу может указывать на один файл, или можно использовать подстановочный знак * для указания всех файлов в каталоге. Подстановочные знаки должны соответствовать файлам, а не каталогам.

hive_external_tbl:

Имя внешней таблицы hive.
Требуется, чтобы столбцы в импортируемой таблице selena существовали во внешней таблице hive.
Каждая задача загрузки поддерживает загрузку только из одной внешней таблицы Hive.
Не может использоваться одновременно с режимом file_path.

PARTITION:

Если этот параметр указан, будет импортирован только указанный partition, и данные вне импортируемого partition будут отфильтрованы.
Если не указан, по умолчанию будут импортированы все partition таблицы.

NEGATIVE:

Если этот параметр указан, это эквивалентно загрузке партии "отрицательных" данных. Используется для компенсации той же партии ранее импортированных данных.
Этот параметр применим только тогда, когда существует столбец значений и тип агрегации столбца значений равен только SUM.

column_separator:

Указывает разделитель столбцов в импортируемом файле. По умолчанию \ t
Если это невидимый символ, вам нужно добавить к нему префикс \ \ x и использовать шестнадцатеричное представление для разделителя.
Например, разделитель файла hive \ x01 указывается как "\ \ x01"

file_type:

Используется для указания типа импортируемого файла. В настоящее время поддерживаемые типы файлов: csv, orc и parquet.

column_list:

Используется для указания соответствия между столбцами в импортируемом файле и столбцами в таблице.
Когда вам нужно пропустить столбец в импортируемом файле, укажите столбец как имя столбца, которое не существует в таблице.

Синтаксис:
(col_name1, col_name2, ...)

SET:

Если указать этот параметр, вы можете преобразовать столбец исходного файла в соответствии с функцией, а затем импортировать преобразованные результаты в таблицу. Синтаксис: column_name = expression.
Поддерживаются только встроенные функции Spark SQL. Пожалуйста, обратитесь к https://spark.apache.org/docs/2.4.6/api/sql/index.html.
Приведем несколько примеров для понимания.
Пример 1: в таблице есть три столбца "c1, c2, c3", и первые два столбца в исходном файле соответствуют (c1, c2), а сумма последних двух столбцов соответствует C3; тогда необходимо указать columns (c1, c2, tmp_c3, tmp_c4) set (c3 = tmp_c3 + tmp_c4);
Пример 2: в таблице есть три столбца "year, month и day", и в исходном файле есть только один столбец времени в формате "2018-06-01 01:02:03".
Тогда вы можете указать columns (tmp_time) set (year = year (tmp_time), month = month (tmp_time), day = day (tmp_time)) для завершения импорта.

WHERE:

Фильтрация преобразованных данных, и только данные, удовлетворяющие условию where, могут быть импортированы. В операторе WHERE могут ссылаться только имена столбцов в таблице

3.resource_name

Имя используемого ресурса spark можно просмотреть с помощью команды SHOW RESOURCES.

4.resource_properties

Когда у вас есть временная потребность, например, изменение конфигураций Spark и HDFS, вы можете установить параметры здесь, которые вступят в силу только в этом конкретном задании загрузки spark и не повлияют на существующие конфигурации в кластере Selena.

5.opt_properties

Используется для указания некоторых специальных параметров.

Синтаксис:

[PROPERTIES ("key"="value", ...)]

Вы можете указать следующие параметры: timeout: указывает время ожидания операции импорта. Время ожидания по умолчанию составляет 4 часа. В секундах. max_filter_ratio:максимально допустимая доля данных, которая может быть отфильтрована (по причинам, таким как нестандартные данные). По умолчанию нулевая терпимость. strict mode: строго ли ограничивать данные. По умолчанию false. timezone: указывает часовой пояс некоторых функций, зависящих от часового пояса, таких как strftime / alignment_timestamp/from_unixtime и т.д. Пожалуйста, обратитесь к документу [time zone] для получения подробной информации. Если не указано, используется часовой пояс "Asia / Shanghai".

6.Пример формата импортируемых данных

int (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234 float (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, .356 date (DATE/DATETIME) :2017-10-03, 2017-06-13 12:34:03. (Примечание: для других форматов дат вы можете использовать функцию strftime или time_format для преобразования в команде Import) string class (CHAR/VARCHAR): "I am a student", "a"

NULL значение: \ N

Примеры

  1. Импортировать партию данных из HDFS, указав время ожидания и коэффициент фильтрации. Использовать имя my_spark resources для spark.

    LOAD LABEL example_db.label1
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/selena/data/input/file")
    INTO TABLE `my_table`
    )
    WITH RESOURCE 'my_spark'
    PROPERTIES
    (
    "timeout" = "3600",
    "max_filter_ratio" = "0.1"
    );

    Где hdfs_host — это хост namenode, hdfs_port — это порт fs.defaultfs (по умолчанию 9000)

  2. Импортировать партию "отрицательных" данных из HDFS, указать разделитель как запятую, использовать подстановочный знак * для указания всех файлов в каталоге и указать временные параметры ресурсов spark.

    LOAD LABEL example_db.label3
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/selena/data/input/*")
    NEGATIVE
    INTO TABLE `my_table`
    COLUMNS TERMINATED BY ","
    )
    WITH RESOURCE 'my_spark'
    (
    "spark.executor.memory" = "3g",
    "broker.username" = "hdfs_user",
    "broker.password" = "hdfs_passwd"
    );
  3. Импортировать партию данных из HDFS, указать partition и выполнить некоторое преобразование столбцов импортируемого файла, как показано ниже:

    Структура таблицы:
    k1 varchar(20)
    k2 int

    Предположим, что файл данных имеет только одну строку данных:

    Adele,1,1

    Каждый столбец в файле данных соответствует каждому столбцу, указанному в операторе импорта:
    k1,tmp_k2,tmp_k3

    Преобразование выглядит следующим образом:

    1. k1: без преобразования
    2. k2: сумма tmp_k2 и tmp_k3

    LOAD LABEL example_db.label6
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/selena/data/input/file")
    INTO TABLE `my_table`
    PARTITION (p1, p2)
    COLUMNS TERMINATED BY ","
    (k1, tmp_k2, tmp_k3)
    SET (
    k2 = tmp_k2 + tmp_k3
    )
    )
    WITH RESOURCE 'my_spark';
  4. Извлечь поле partition из пути к файлу

    При необходимости разделенные поля в пути к файлу будут разрешены в соответствии с типами полей, определенными в таблице, аналогично функции Partition Discovery в Spark

    LOAD LABEL example_db.label10
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/selena/data/input/dir/city=beijing/*/*")
    INTO TABLE `my_table`
    (k1, k2, k3)
    COLUMNS FROM PATH AS (city, utc_date)
    SET (uniq_id = md5sum(k1, city))
    )
    WITH RESOURCE 'my_spark';

    Каталог hdfs://hdfs_host:hdfs_port/user/selena/data/input/dir/city=beijing включает следующие файлы:

    [hdfs://hdfs_host:hdfs_port/user/selena/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv, hdfs://hdfs_host:hdfs_port/user/selena/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv, ...]

    Поля city и utc_date извлекаются из пути к файлу

  5. Отфильтровать импортируемые данные. Могут быть импортированы только столбцы со значением k1 больше 10.

    LOAD LABEL example_db.label10
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/selena/data/input/file")
    INTO TABLE `my_table`
    WHERE k1 > 10
    )
    WITH RESOURCE 'my_spark';
  6. Импортировать из внешней таблицы hive и преобразовать столбец uuid в исходной таблице в тип bitmap с помощью глобального словаря.

    LOAD LABEL db1.label1
    (
    DATA FROM TABLE hive_t1
    INTO TABLE tbl1
    SET
    (
    uuid=bitmap_dict(uuid)
    )
    )
    WITH RESOURCE 'my_spark';