Перейти к основному содержимому
Версия: 2.0.x

Java UDF

Начиная с версии 2.2.0 вы можете компилировать пользовательские функции (UDF) для удовлетворения своих бизнес-потребностей, используя язык программирования Java.

Начиная с версии 3.0, Selena поддерживает глобальные UDF, и вам нужно только включить ключевое слово GLOBAL в соответствующие SQL-выражения (CREATE/SHOW/DROP).

В этом разделе описано, как разрабатывать и использовать различные UDF.

В настоящее время Selena поддерживает скалярные UDF, пользовательские агрегатные функции (UDAF), пользовательские оконные функции (UDWF) и пользовательские табличные функции (UDTF).

Предварительные требования

  • Вы установили Apache Maven, чтобы создавать и компилировать Java-проекты.

  • Вы установили JDK 17 на своих серверах.

  • Функция Java UDF включена. Вы можете установить параметр конфигурации FE enable_udf в значение true в файле конфигурации FE fe/conf/fe.conf, чтобы включить эту функцию, а затем перезапустить узлы FE, чтобы настройки вступили в силу. Для получения дополнительной информации см. Настройка параметров.

Разработка и использование UDF

Вам нужно создать проект Maven и скомпилировать нужную UDF, используя язык программирования Java.

Шаг 1: Создание проекта Maven

Создайте проект Maven со следующей базовой структурой каталогов:

project
|--pom.xml
|--src
| |--main
| | |--java
| | |--resources
| |--test
|--target

Шаг 2: Добавление зависимостей

Добавьте следующие зависимости в файл pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>udf</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>

Шаг 3: Компиляция UDF

Используйте язык программирования Java для компиляции UDF.

Компиляция скалярной UDF

Скалярная UDF работает с одной строкой данных и возвращает одно значение. При использовании скалярной UDF в запросе каждая строка соответствует одному значению в результирующем наборе. Типичные скалярные функции включают UPPER, LOWER, ROUND и ABS.

Предположим, что значения поля в ваших JSON-данных являются JSON-строками, а не JSON-объектами. При использовании SQL-выражения для извлечения JSON-строк вам нужно выполнить GET_JSON_STRING дважды, например, GET_JSON_STRING(GET_JSON_STRING('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key"), "$.k0").

Чтобы упростить SQL-выражение, вы можете скомпилировать скалярную UDF, которая может напрямую извлекать JSON-строки, например, MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0").

package com.selena.udf.sample;

import com.alibaba.fastjson.JSONPath;

public class UDFJsonGet {
public final String evaluate(String obj, String key) {
if (obj == null || key == null) return null;
try {
// Библиотека JSONPath может полностью развернуть, даже если значения поля являются JSON-строками.
return JSONPath.read(obj, key).toString();
} catch (Exception e) {
return null;
}
}
}

Пользовательский класс должен реализовать метод, описанный в следующей таблице.

ПРИМЕЧАНИЕ

Типы данных параметров запроса и возвращаемых параметров в методе должны совпадать с теми, которые объявлены в выражении CREATE FUNCTION, которое будет выполнено на Шаге 6, и соответствовать сопоставлению, приведённому в разделе "Сопоставление типов данных SQL и Java" этого документа.

МетодОписание
TYPE1 evaluate(TYPE2, ...)Запускает UDF. Метод evaluate() требует уровня доступа public.

Компиляция UDAF

UDAF работает с несколькими строками данных и возвращает одно значение. Типичные агрегатные функции включают SUM, COUNT, MAX и MIN, которые агрегируют несколько строк данных, указанных в каждом предложении GROUP BY, и возвращают одно значение.

Предположим, что вы хотите скомпилировать UDAF с именем MY_SUM_INT. В отличие от встроенной агрегатной функции SUM, которая возвращает значения типа BIGINT, функция MY_SUM_INT поддерживает только параметры запроса и возвращаемые параметры типа INT.

package com.selena.udf.sample;

public class SumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
}

public State create() {
return new State();
}

public void destroy(State state) {
}

public final void update(State state, Integer val) {
if (val != null) {
state.counter+= val;
}
}

public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}

public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}

public Integer finalize(State state) {
return state.counter;
}
}

Пользовательский класс должен реализовать методы, описанные в следующей таблице.

ПРИМЕЧАНИЕ

Типы данных параметров запроса и возвращаемых параметров в методах должны совпадать с теми, которые объявлены в выражении CREATE FUNCTION, которое будет выполнено на Шаге 6, и соответствовать сопоставлению, приведённому в разделе "Сопоставление типов данных SQL и Java" этого документа.

МетодОписание
State create()Создаёт состояние.
void destroy(State)Уничтожает состояние.
void update(State, ...)Обновляет состояние. Помимо первого параметра State, вы также можете указать один или несколько параметров запроса в объявлении UDF.
void serialize(State, ByteBuffer)Сериализует состояние в байтовый буфер.
void merge(State, ByteBuffer)Десериализует состояние из байтового буфера и объединяет байтовый буфер с состоянием в качестве первого параметра.
TYPE finalize(State)Получает окончательный результат UDF из состояния.

Во время компиляции вы также должны использовать класс буфера java.nio.ByteBuffer и локальную переменную serializeLength, которые описаны в следующей таблице.

Класс и локальная переменнаяОписание
java.nio.ByteBuffer()Класс буфера, который хранит промежуточные результаты. Промежуточные результаты могут быть сериализованы или десериализованы при передаче между узлами для выполнения. Поэтому вы также должны использовать переменную serializeLength для указания длины, разрешённой для десериализации промежуточных результатов.
serializeLength()Длина, разрешённая для десериализации промежуточных результатов. Единица измерения: байты. Установите эту локальную переменную в значение типа INT. Например, State { int counter = 0; public int serializeLength() { return 4; }} указывает, что промежуточные результаты имеют тип INT, а длина для десериализации составляет 4 байта. Вы можете настроить эти параметры в соответствии с вашими бизнес-требованиями. Например, если вы хотите указать тип данных промежуточных результатов как LONG и длину для десериализации как 8 байт, передайте State { long counter = 0; public int serializeLength() { return 8; }}.

Обратите внимание на следующие моменты при десериализации промежуточных результатов, хранящихся в классе java.nio.ByteBuffer:

  • Метод remaining(), зависящий от класса ByteBuffer, не может быть вызван для десериализации состояния.
  • Метод clear() не может быть вызван для класса ByteBuffer.
  • Значение serializeLength должно совпадать с длиной записанных данных. В противном случае при сериализации и десериализации будут сгенерированы неверные результаты.

Компиляция UDWF

В отличие от обычных агрегатных функций, UDWF работает с набором из нескольких строк, которые вместе называются окном, и возвращает значение для каждой строки. Типичная оконная функция включает предложение OVER, которое разделяет строки на несколько наборов. Она выполняет вычисление для каждого набора строк и возвращает значение для каждой строки.

Предположим, что вы хотите скомпилировать UDWF с именем MY_WINDOW_SUM_INT. В отличие от встроенной агрегатной функции SUM, которая возвращает значения типа BIGINT, функция MY_WINDOW_SUM_INT поддерживает только параметры запроса и возвращаемые параметры типа INT.

package com.selena.udf.sample;

public class WindowSumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
@Override
public String toString() {
return "State{" +
"counter=" + counter +
'}';
}
}

public State create() {
return new State();
}

public void destroy(State state) {

}

public void update(State state, Integer val) {
if (val != null) {
state.counter+=val;
}
}

public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}

public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}

public Integer finalize(State state) {
return state.counter;
}

public void reset(State state) {
state.counter = 0;
}

public void windowUpdate(State state,
int peer_group_start, int peer_group_end,
int frame_start, int frame_end,
Integer[] inputs) {
for (int i = (int)frame_start; i < (int)frame_end; ++i) {
state.counter += inputs[i];
}
}
}

Пользовательский класс должен реализовать методы, требуемые для UDAF (поскольку UDWF является специальной агрегатной функцией), и метод windowUpdate(), описанный в следующей таблице.

ПРИМЕЧАНИЕ

Типы данных параметров запроса и возвращаемых параметров в методе должны совпадать с теми, которые объявлены в выражении CREATE FUNCTION, которое будет выполнено на Шаге 6, и соответствовать сопоставлению, приведённому в разделе "Сопоставление типов данных SQL и Java" этого документа.

МетодОписание
void windowUpdate(State state, int, int, int , int, ...)Обновляет данные окна. Для получения дополнительной информации о UDWF см. Оконные функции. Каждый раз, когда вы вводите строку в качестве входных данных, этот метод получает информацию об окне и соответственно обновляет промежуточные результаты.
  • peer_group_start: начальная позиция текущего partition. PARTITION BY используется в предложении OVER для указания столбца partition. Строки с одинаковыми значениями в столбце partition считаются находящимися в одном partition.
  • peer_group_end: конечная позиция текущего partition.
  • frame_start: начальная позиция текущего фрейма окна. Предложение фрейма окна указывает диапазон вычислений, который охватывает текущую строку и строки, находящиеся на указанном расстоянии от текущей строки. Например, ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING указывает диапазон вычислений, охватывающий текущую строку, предыдущую строку перед текущей строкой и следующую строку после текущей строки.
  • frame_end: конечная позиция текущего фрейма окна.
  • inputs: данные, которые вводятся в окно. Данные представляют собой пакет массива, который поддерживает только определённые типы данных. В этом примере вводятся значения INT, и пакет массива — Integer[].

Компиляция UDTF

UDTF читает одну строку данных и возвращает несколько значений, которые можно рассматривать как таблицу. Табличные функции обычно используются для преобразования строк в столбцы.

ПРИМЕЧАНИЕ

Selena позволяет UDTF возвращать таблицу, состоящую из нескольких строк и одного столбца.

Предположим, что вы хотите скомпилировать UDTF с именем MY_UDF_SPLIT. Функция MY_UDF_SPLIT позволяет использовать пробелы в качестве разделителей и поддерживает параметры запроса и возвращаемые параметры типа STRING.

package com.selena.udf.sample;

public class UDFSplit{
public String[] process(String in) {
if (in == null) return null;
return in.split(" ");
}
}

Метод, определённый пользовательским классом, должен соответствовать следующим требованиям:

ПРИМЕЧАНИЕ

Типы данных параметров запроса и возвращаемых параметров в методе должны совпадать с теми, которые объявлены в выражении CREATE FUNCTION, которое будет выполнено на Шаге 6, и соответствовать сопоставлению, приведённому в разделе "Сопоставление типов данных SQL и Java" этого документа.

МетодОписание
TYPE[] process()Запускает UDTF и возвращает массив.

Шаг 4: Упаковка Java-проекта

Выполните следующую команду для упаковки Java-проекта:

mvn package

Следующие JAR-файлы будут сгенерированы в папке target: udf-1.0-SNAPSHOT.jar и udf-1.0-SNAPSHOT-jar-with-dependencies.jar.

Шаг 5: Загрузка Java-проекта

Загрузите JAR-файл udf-1.0-SNAPSHOT-jar-with-dependencies.jar на HTTP-сервер, который работает и доступен для всех FE и BE в вашем cluster Selena. Затем выполните следующую команду для развёртывания файла:

mvn deploy

Вы можете настроить простой HTTP-сервер с помощью Python и загрузить JAR-файл на этот HTTP-сервер.

ПРИМЕЧАНИЕ

На Шаге 6 FE проверят JAR-файл, содержащий код для UDF, и вычислят контрольную сумму, а BE загрузят и выполнят JAR-файл.

Шаг 6: Создание UDF в Selena

Selena позволяет создавать UDF в двух типах пространств имён: пространства имён баз данных и глобальные пространства имён.

  • Если у вас нет требований к видимости или изоляции для UDF, вы можете создать её как глобальную UDF. Тогда вы можете ссылаться на глобальную UDF, используя имя функции без включения имён каталога и базы данных в качестве префиксов к имени функции.
  • Если у вас есть требования к видимости или изоляции для UDF, или если вам нужно создать одну и ту же UDF в разных базах данных, вы можете создать её в каждой отдельной базе данных. Таким образом, если ваша сессия подключена к целевой базе данных, вы можете ссылаться на UDF, используя имя функции. Если ваша сессия подключена к другому каталогу или базе данных, отличной от целевой, вам нужно ссылаться на UDF, включая имена каталога и базы данных в качестве префиксов к имени функции, например, catalog.database.function.

ВНИМАНИЕ

Перед созданием и использованием глобальной UDF вы должны связаться с системным администратором, чтобы получить необходимые разрешения. Для получения дополнительной информации см. GRANT.

После загрузки JAR-пакета вы можете создавать UDF в Selena. Для глобальной UDF вы должны включить ключевое слово GLOBAL в выражение создания.

Синтаксис

CREATE [GLOBAL][AGGREGATE | TABLE] FUNCTION function_name
(arg_type [, ...])
RETURNS return_type
PROPERTIES ("key" = "value" [, ...])

Параметры

ПараметрОбязательныйОписание
GLOBALНетСоздавать ли глобальную UDF, поддерживается с версии 3.0.
AGGREGATEНетСоздавать ли UDAF или UDWF.
TABLEНетСоздавать ли UDTF. Если ни AGGREGATE, ни TABLE не указаны, создаётся скалярная функция.
function_nameДаИмя функции, которую вы хотите создать. Вы можете включить имя базы данных в этот параметр, например, db1.my_func. Если function_name включает имя базы данных, UDF создаётся в этой базе данных. В противном случае UDF создаётся в текущей базе данных. Имя новой функции и её параметры не должны совпадать с существующим именем в целевой базе данных. В противном случае функция не может быть создана. Создание успешно, если имя функции совпадает, но параметры отличаются.
arg_typeДаТип аргумента функции. Добавленный аргумент может быть представлен как , .... Для поддерживаемых типов данных см. Сопоставление типов данных SQL и Java.
return_typeДаТип возвращаемого значения функции. Для поддерживаемых типов данных см. Java UDF.
PROPERTIESДаСвойства функции, которые зависят от типа создаваемой UDF.

Создание скалярной UDF

Выполните следующую команду для создания скалярной UDF, которую вы скомпилировали в предыдущем примере:

CREATE [GLOBAL] FUNCTION MY_UDF_JSON_GET(string, string)
RETURNS string
PROPERTIES (
"symbol" = "com.selena.udf.sample.UDFJsonGet",
"type" = "SelenaJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
ПараметрОписание
symbolИмя класса для проекта Maven, к которому принадлежит UDF. Значение этого параметра имеет формат <имя_пакета>.<имя_класса>.
typeТип UDF. Установите значение SelenaJar, которое указывает, что UDF является функцией на основе Java.
fileHTTP URL, с которого вы можете загрузить JAR-файл, содержащий код для UDF. Значение этого параметра имеет формат http://<http_server_ip>:<http_server_port>/<jar_package_name>.
isolation(Необязательно) Чтобы разделять экземпляры функций между выполнениями UDF и поддерживать статические переменные, установите значение "shared".

Создание UDAF

Выполните следующую команду для создания UDAF, которую вы скомпилировали в предыдущем примере:

CREATE [GLOBAL] AGGREGATE FUNCTION MY_SUM_INT(INT)
RETURNS INT
PROPERTIES
(
"symbol" = "com.selena.udf.sample.SumInt",
"type" = "SelenaJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

Описания параметров в PROPERTIES такие же, как в разделе Создание скалярной UDF.

Создание UDWF

Выполните следующую команду для создания UDWF, которую вы скомпилировали в предыдущем примере:

CREATE [GLOBAL] AGGREGATE FUNCTION MY_WINDOW_SUM_INT(Int)
RETURNS Int
properties
(
"analytic" = "true",
"symbol" = "com.selena.udf.sample.WindowSumInt",
"type" = "SelenaJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

analytic: Является ли UDF оконной функцией. Установите значение true. Описания других свойств такие же, как в разделе Создание скалярной UDF.

Создание UDTF

Выполните следующую команду для создания UDTF, которую вы скомпилировали в предыдущем примере:

CREATE [GLOBAL] TABLE FUNCTION MY_UDF_SPLIT(string)
RETURNS string
properties
(
"symbol" = "com.selena.udf.sample.UDFSplit",
"type" = "SelenaJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

Описания параметров в PROPERTIES такие же, как в разделе Создание скалярной UDF.

Шаг 7: Использование UDF

После создания UDF вы можете тестировать и использовать её в соответствии с вашими бизнес-потребностями.

Использование скалярной UDF

Выполните следующую команду для использования скалярной UDF, которую вы создали в предыдущем примере:

SELECT MY_UDF_JSON_GET('{"key":"{\\"in\\":2}"}', '$.key.in');

Использование UDAF

Выполните следующую команду для использования UDAF, которую вы создали в предыдущем примере:

SELECT MY_SUM_INT(col1);

Использование UDWF

Выполните следующую команду для использования UDWF, которую вы создали в предыдущем примере:

SELECT MY_WINDOW_SUM_INT(intcol)
OVER (PARTITION BY intcol2
ORDER BY intcol3
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM test_basic;

Использование UDTF

Выполните следующую команду для использования UDTF, которую вы создали в предыдущем примере:

-- Предположим, у вас есть таблица с именем t1, и информация о её столбцах a, b и c1 следующая:
SELECT t1.a,t1.b,t1.c1 FROM t1;
> output:
1,2.1,"hello world"
2,2.2,"hello UDTF."

-- Выполните функцию MY_UDF_SPLIT().
SELECT t1.a,t1.b, MY_UDF_SPLIT FROM t1, MY_UDF_SPLIT(t1.c1);
> output:
1,2.1,"hello"
1,2.1,"world"
2,2.2,"hello"
2,2.2,"UDTF."

ПРИМЕЧАНИЕ

  • Первый MY_UDF_SPLIT в приведённом выше фрагменте кода является псевдонимом столбца, возвращаемого вторым MY_UDF_SPLIT, который является функцией.
  • Вы не можете использовать AS t2(f1) для указания псевдонимов таблицы и её столбцов, которые будут возвращены.

Просмотр UDF

Выполните следующую команду для запроса UDF:

SHOW [GLOBAL] FUNCTIONS;

Для получения дополнительной информации см. SHOW FUNCTIONS.

Удаление UDF

Выполните следующую команду для удаления UDF:

DROP [GLOBAL] FUNCTION <function_name>(arg_type [, ...]);

Для получения дополнительной информации см. DROP FUNCTION.

Сопоставление типов данных SQL и Java

ПРИМЕЧАНИЕ

В настоящее время для скалярных UDF поддерживаются только невложенные типы параметров/возвращаемых значений ARRAY и MAP.

Тип SQLТип Java
BOOLEANjava.lang.Boolean
TINYINTjava.lang.Byte
SMALLINTjava.lang.Short
INTjava.lang.Integer
BIGINTjava.lang.Long
FLOATjava.lang.Float
DOUBLEjava.lang.Double
STRING/VARCHARjava.lang.String
ARRAYjava.util.List
Mapjava.util.Map

Настройки параметров

Настройте следующую переменную окружения в файле be/conf/be.conf каждой виртуальной машины Java (JVM) в вашем cluster Selena для управления использованием памяти.

JAVA_OPTS="-Xmx12G"

FAQ

Могу ли я использовать статические переменные при создании UDF? Влияют ли статические переменные разных UDF друг на друга?

Да, вы можете использовать статические переменные при компиляции UDF. Статические переменные разных UDF изолированы друг от друга и не влияют друг на друга, даже если UDF имеют классы с одинаковыми именами.