Перейти к основному содержимому

Java UDF

Начиная с версии 1.5.0, вы можете компилировать пользовательские функции (UDF) для удовлетворения ваших специфических бизнес-потребностей, используя язык программирования Java.

Начиная с версии 1.5.0, Selena поддерживает глобальные UDF, и вам нужно только включить ключевое слово GLOBAL в соответствующие SQL-операторы (CREATE/SHOW/DROP).

Эта тема описывает, как разрабатывать и использовать различные UDF.

В настоящее время Selena поддерживает скалярные UDF, пользовательские агрегатные функции (UDAF), пользовательские оконные функции (UDWF) и пользовательские табличные функции (UDTF).

Предварительные требования

  • У вас установлен Apache Maven, чтобы вы могли создавать и компилировать Java-проекты.

  • У вас установлен JDK 1.8 на ваших серверах.

  • Функция Java UDF включена. Вы можете установить параметр конфигурации FE enable_udf в true в файле конфигурации FE fe/conf/fe.conf, чтобы включить эту функцию, а затем перезапустить узлы FE, чтобы настройки вступили в силу. Для получения дополнительной информации см. Конфигурация параметров.

Разработка и использование UDF

Вам необходимо создать Maven-проект и скомпилировать нужную UDF, используя язык программирования Java.

Шаг 1: Создание Maven-проекта

Создайте Maven-проект, базовая структура каталогов которого выглядит следующим образом:

project
|--pom.xml
|--src
| |--main
| | |--java
| | |--resources
| |--test
|--target

Шаг 2: Добавление зависимостей

Добавьте следующие зависимости в файл pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>udf</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>

Шаг 3: Компиляция UDF

Используйте язык программирования Java для компиляции UDF.

Компиляция скалярной UDF

Скалярная UDF работает с одной строкой данных и возвращает одно значение. Когда вы используете скалярную UDF в запросе, каждая строка соответствует одному значению в результирующем наборе. Типичные скалярные функции включают UPPER, LOWER, ROUND и ABS.

Предположим, что значения поля в ваших JSON-данных являются JSON-строками, а не JSON-объектами. Когда вы используете SQL-оператор для извлечения JSON-строк, вам нужно запустить GET_JSON_STRING дважды, например, GET_JSON_STRING(GET_JSON_STRING('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key"), "$.k0").

Чтобы упростить SQL-оператор, вы можете скомпилировать скалярную UDF, которая может напрямую извлекать JSON-строки, например, MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0").

package com.starrocks.udf.sample;

import com.alibaba.fastjson.JSONPath;

public class UDFJsonGet {
public final String evaluate(String obj, String key) {
if (obj == null || key == null) return null;
try {
// Библиотека JSONPath может быть полностью развернута, даже если значения поля являются JSON-строками.
return JSONPath.read(obj, key).toString();
} catch (Exception e) {
return null;
}
}
}

Пользовательский класс должен реализовать метод, описанный в следующей таблице.

ПРИМЕЧАНИЕ

Типы данных параметров запроса и возвращаемых параметров в методе должны быть такими же, как объявленные в операторе CREATE FUNCTION, который будет выполнен в Шаге 6, и соответствовать сопоставлению, предоставленному в разделе "Сопоставление между типами данных SQL и Java" этой темы.

МетодОписание
TYPE1 evaluate(TYPE2, ...)Выполняет UDF. Метод evaluate() требует уровня доступа public.

Компиляция UDAF

UDAF работает с несколькими строками данных и возвращает одно значение. Типичные агрегатные функции включают SUM, COUNT, MAX и MIN, которые агрегируют несколько строк данных, указанных в каждом предложении GROUP BY, и возвращают одно значение.

Предположим, что вы хотите скомпилировать UDAF с именем MY_SUM_INT. В отличие от встроенной агрегатной функции SUM, которая возвращает значения типа BIGINT, функция MY_SUM_INT поддерживает только параметры запроса и возвращаемые параметры типа данных INT.

package com.starrocks.udf.sample;

public class SumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
}

public State create() {
return new State();
}

public void destroy(State state) {
}

public final void update(State state, Integer val) {
if (val != null) {
state.counter+= val;
}
}

public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}

public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}

public Integer finalize(State state) {
return state.counter;
}
}

Пользовательский класс должен реализовать методы, описанные в следующей таблице.

ПРИМЕЧАНИЕ

Типы данных параметров запроса и возвращаемых параметров в методах должны быть такими же, как объявленные в операторе CREATE FUNCTION, который будет выполнен в Шаге 6, и соответствовать сопоставлению, предоставленному в разделе "Сопоставление между типами данных SQL и Java" этой темы.

МетодОписание
State create()Создает состояние.
void destroy(State)Уничтожает состояние.
void update(State, ...)Обновляет состояние. В дополнение к первому параметру State, вы также можете указать один или несколько параметров запроса в объявлении UDF.
void serialize(State, ByteBuffer)Сериализует состояние в буфер байтов.
void merge(State, ByteBuffer)Десериализует состояние из буфера байтов и объединяет буфер байтов с состоянием в качестве первого параметра.
TYPE finalize(State)Получает окончательный результат UDF из состояния.

Во время компиляции вы также должны использовать класс буфера java.nio.ByteBuffer и локальную переменную serializeLength, которые описаны в следующей таблице.

Класс и локальная переменнаяОписание
java.nio.ByteBuffer()Класс буфера, который хранит промежуточные результаты. Промежуточные результаты могут быть сериализованы или десериализованы при передаче между узлами для выполнения. Поэтому вы также должны использовать переменную serializeLength для указания длины, разрешенной для десериализации промежуточных результатов.
serializeLength()Длина, разрешенная для десериализации промежуточных результатов. Единица измерения: байты. Установите эту локальную переменную в значение типа INT. Например, State { int counter = 0; public int serializeLength() { return 4; }} указывает, что промежуточные результаты имеют тип данных INT, а длина для десериализации составляет 4 байта. Вы можете настроить эти параметры в соответствии с вашими бизнес-требованиями. Например, если вы хотите указать тип данных промежуточных результатов как LONG, а длину для десериализации как 8 байт, передайте State { long counter = 0; public int serializeLength() { return 8; }}.

Обратите внимание на следующие моменты для десериализации промежуточных результатов, хранящихся в классе java.nio.ByteBuffer:

  • Метод remaining(), зависящий от класса ByteBuffer, не может быть вызван для десериализации состояния.
  • Метод clear() не может быть вызван для класса ByteBuffer.
  • Значение serializeLength должно быть таким же, как длина записанных данных. В противном случае во время сериализации и десериализации генерируются неправильные результаты.

Компиляция UDWF

В отличие от обычных агрегатных функций, UDWF работает с набором из нескольких строк, которые в совокупности называются окном, и возвращает значение для каждой строки. Типичная оконная функция включает предложение OVER, которое разделяет строки на несколько наборов. Она выполняет вычисление по каждому набору строк и возвращает значение для каждой строки.

Предположим, что вы хотите скомпилировать UDWF с именем MY_WINDOW_SUM_INT. В отличие от встроенной агрегатной функции SUM, которая возвращает значения типа BIGINT, функция MY_WINDOW_SUM_INT поддерживает только параметры запроса и возвращаемые параметры типа данных INT.

package com.starrocks.udf.sample;

public class WindowSumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
@Override
public String toString() {
return "State{" +
"counter=" + counter +
'}';
}
}

public State create() {
return new State();
}

public void destroy(State state) {

}

public void update(State state, Integer val) {
if (val != null) {
state.counter+=val;
}
}

public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}

public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}

public Integer finalize(State state) {
return state.counter;
}

public void reset(State state) {
state.counter = 0;
}

public void windowUpdate(State state,
int peer_group_start, int peer_group_end,
int frame_start, int frame_end,
Integer[] inputs) {
for (int i = (int)frame_start; i < (int)frame_end; ++i) {
state.counter += inputs[i];
}
}
}

Пользовательский класс должен реализовать метод, требуемый UDAF (поскольку UDWF является специальной агрегатной функцией), и метод windowUpdate(), описанный в следующей таблице.

ПРИМЕЧАНИЕ

Типы данных параметров запроса и возвращаемых параметров в методе должны быть такими же, как объявленные в операторе CREATE FUNCTION, который будет выполнен в Шаге 6, и соответствовать сопоставлению, предоставленному в разделе "Сопоставление между типами данных SQL и Java" этой темы.

МетодОписание
void windowUpdate(State state, int, int, int , int, ...)Обновляет данные окна. Для получения дополнительной информации о UDWF см. Оконные функции. Каждый раз, когда вы вводите строку в качестве входных данных, этот метод получает информацию об окне и соответственно обновляет промежуточные результаты.
  • peer_group_start: начальная позиция текущего раздела. PARTITION BY используется в предложении OVER для указания столбца раздела. Строки с одинаковыми значениями в столбце раздела считаются находящимися в одном разделе.
  • peer_group_end: конечная позиция текущего раздела.
  • frame_start: начальная позиция текущего оконного кадра. Предложение оконного кадра указывает диапазон вычислений, который охватывает текущую строку и строки, находящиеся в пределах указанного расстояния от текущей строки. Например, ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING указывает диапазон вычислений, который охватывает текущую строку, предыдущую строку перед текущей строкой и следующую строку после текущей строки.
  • frame_end: конечная позиция текущего оконного кадра.
  • inputs: данные, которые вводятся в качестве входных данных в окно. Данные представляют собой пакет массива, который поддерживает только определенные типы данных. В этом примере значения INT вводятся в качестве входных данных, а пакет массива — Integer[].

Компиляция UDTF

UDTF читает одну строку данных и возвращает несколько значений, которые можно рассматривать как таблицу. Табличные функции обычно используются для преобразования строк в столбцы.

ПРИМЕЧАНИЕ

Selena позволяет UDTF возвращать таблицу, состоящую из нескольких строк и одного столбца.

Предположим, что вы хотите скомпилировать UDTF с именем MY_UDF_SPLIT. Функция MY_UDF_SPLIT позволяет использовать пробелы в качестве разделителей и поддерживает параметры запроса и возвращаемые параметры типа данных STRING.

package com.starrocks.udf.sample;

public class UDFSplit{
public String[] process(String in) {
if (in == null) return null;
return in.split(" ");
}
}

Метод, определенный пользовательским классом, должен соответствовать следующим требованиям:

ПРИМЕЧАНИЕ

Типы данных параметров запроса и возвращаемых параметров в методе должны быть такими же, как объявленные в операторе CREATE FUNCTION, который будет выполнен в Шаге 6, и соответствовать сопоставлению, предоставленному в разделе "Сопоставление между типами данных SQL и Java" этой темы.

МетодОписание
TYPE[] process()Выполняет UDTF и возвращает массив.

Шаг 4: Упаковка Java-проекта

Выполните следующую команду для упаковки Java-проекта:

mvn package

В папке target генерируются следующие JAR-файлы: udf-1.0-SNAPSHOT.jar и udf-1.0-SNAPSHOT-jar-with-dependencies.jar.

Шаг 5: Загрузка Java-проекта

Загрузите JAR-файл udf-1.0-SNAPSHOT-jar-with-dependencies.jar на HTTP-сервер, который работает и доступен для всех FE и BE в вашем кластере Selena. Затем выполните следующую команду для развертывания файла:

mvn deploy 

Вы можете настроить простой HTTP-сервер, используя Python, и загрузить JAR-файл на этот HTTP-сервер.

ПРИМЕЧАНИЕ

В Шаге 6 FE проверят JAR-файл, содержащий код для UDF, и вычислят контрольную сумму, а BE загрузят и выполнят JAR-файл.

Шаг 6: Создание UDF в Selena

Selena позволяет создавать UDF в двух типах пространств имен: пространства имен баз данных и глобальные пространства имен.

  • Если у вас нет требований к видимости или изоляции для UDF, вы можете создать ее как глобальную UDF. Затем вы можете ссылаться на глобальную UDF, используя имя функции без включения имен catalog и базы данных в качестве префиксов к имени функции.
  • Если у вас есть требования к видимости или изоляции для UDF, или если вам нужно создать одну и ту же UDF в разных базах данных, вы можете создать ее в каждой отдельной базе данных. Таким образом, если ваша сессия подключена к целевой базе данных, вы можете ссылаться на UDF, используя имя функции. Если ваша сессия подключена к другому catalog или базе данных, отличной от целевой базы данных, вам нужно ссылаться на UDF, включив имена catalog и базы данных в качестве префиксов к имени функции, например, catalog.database.function.

УВЕДОМЛЕНИЕ

Прежде чем создавать и использовать глобальную UDF, вы должны обратиться к системному администратору для предоставления вам необходимых разрешений. Для получения дополнительной информации см. GRANT.

После загрузки JAR-пакета вы можете создавать UDF в Selena. Для глобальной UDF вы должны включить ключевое слово GLOBAL в оператор создания.

Синтаксис

CREATE [GLOBAL][AGGREGATE | TABLE] FUNCTION function_name
(arg_type [, ...])
RETURNS return_type
PROPERTIES ("key" = "value" [, ...])

Параметры

ПараметрОбязательныйОписание
GLOBALНетСоздавать ли глобальную UDF, поддерживается с версии 3.0.
AGGREGATEНетСоздавать ли UDAF или UDWF.
TABLEНетСоздавать ли UDTF. Если не указаны ни AGGREGATE, ни TABLE, создается скалярная функция.
function_nameДаИмя функции, которую вы хотите создать. Вы можете включить имя базы данных в этот параметр, например, db1.my_func. Если function_name включает имя базы данных, UDF создается в этой базе данных. В противном случае UDF создается в текущей базе данных. Имя новой функции и ее параметры не могут быть такими же, как существующее имя в целевой базе данных. В противном случае функция не может быть создана. Создание успешно, если имя функции одинаковое, но параметры разные.
arg_typeДаТип аргумента функции. Добавленный аргумент может быть представлен как , .... Для поддерживаемых типов данных см. Сопоставление между типами данных SQL и Java.
return_typeДаТип возвращаемого значения функции. Для поддерживаемых типов данных см. Java UDF.
PROPERTIESДаСвойства функции, которые различаются в зависимости от типа создаваемой UDF.

Создание скалярной UDF

Выполните следующую команду для создания скалярной UDF, которую вы скомпилировали в предыдущем примере:

CREATE [GLOBAL] FUNCTION MY_UDF_JSON_GET(string, string) 
RETURNS string
PROPERTIES (
"symbol" = "com.starrocks.udf.sample.UDFJsonGet",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
ПараметрОписание
symbolИмя класса для Maven-проекта, к которому принадлежит UDF. Значение этого параметра имеет формат <package_name>.<class_name>.
typeТип UDF. Установите значение StarrocksJar, которое указывает, что UDF является функцией на основе Java.
fileHTTP URL, с которого можно загрузить JAR-файл, содержащий код для UDF. Значение этого параметра имеет формат http://<http_server_ip>:<http_server_port>/<jar_package_name>.
isolation(Необязательно) Для совместного использования экземпляров функций между выполнениями UDF и поддержки статических переменных установите это значение в "shared".

Создание UDAF

Выполните следующую команду для создания UDAF, которую вы скомпилировали в предыдущем примере:

CREATE [GLOBAL] AGGREGATE FUNCTION MY_SUM_INT(INT) 
RETURNS INT
PROPERTIES
(
"symbol" = "com.starrocks.udf.sample.SumInt",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

Описания параметров в PROPERTIES такие же, как в Создание скалярной UDF.

Создание UDWF

Выполните следующую команду для создания UDWF, которую вы скомпилировали в предыдущем примере:

CREATE [GLOBAL] AGGREGATE FUNCTION MY_WINDOW_SUM_INT(Int)
RETURNS Int
properties
(
"analytic" = "true",
"symbol" = "com.starrocks.udf.sample.WindowSumInt",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

analytic: Является ли UDF оконной функцией. Установите значение true. Описания других свойств такие же, как в Создание скалярной UDF.

Создание UDTF

Выполните следующую команду для создания UDTF, которую вы скомпилировали в предыдущем примере:

CREATE [GLOBAL] TABLE FUNCTION MY_UDF_SPLIT(string)
RETURNS string
properties
(
"symbol" = "com.starrocks.udf.sample.UDFSplit",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

Описания параметров в PROPERTIES такие же, как в Создание скалярной UDF.

Шаг 7: Использование UDF

После создания UDF вы можете тестировать и использовать ее в соответствии с вашими бизнес-потребностями.

Использование скалярной UDF

Выполните следующую команду для использования скалярной UDF, которую вы создали в предыдущем примере:

SELECT MY_UDF_JSON_GET('{"key":"{\\"in\\":2}"}', '$.key.in');

Использование UDAF

Выполните следующую команду для использования UDAF, которую вы создали в предыдущем примере:

SELECT MY_SUM_INT(col1);

Использование UDWF

Выполните следующую команду для использования UDWF, которую вы создали в предыдущем примере:

SELECT MY_WINDOW_SUM_INT(intcol) 
OVER (PARTITION BY intcol2
ORDER BY intcol3
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM test_basic;

Использование UDTF

Выполните следующую команду для использования UDTF, которую вы создали в предыдущем примере:

-- Предположим, что у вас есть таблица с именем t1, и информация о ее столбцах a, b и c1 следующая:
SELECT t1.a,t1.b,t1.c1 FROM t1;
> output:
1,2.1,"hello world"
2,2.2,"hello UDTF."



-- Выполните функцию MY_UDF_SPLIT().
SELECT t1.a,t1.b, MY_UDF_SPLIT FROM t1, MY_UDF_SPLIT(t1.c1);
> output:
1,2.1,"hello"
1,2.1,"world"
2,2.2,"hello"
2,2.2,"UDTF."

ПРИМЕЧАНИЕ

  • Первый MY_UDF_SPLIT в предыдущем фрагменте кода является псевдонимом столбца, который возвращается вторым MY_UDF_SPLIT, который является функцией.
  • Вы не можете использовать AS t2(f1) для указания псевдонимов таблицы и ее столбцов, которые должны быть возвращены.

Просмотр UDF

Выполните следующую команду для запроса UDF:

SHOW [GLOBAL] FUNCTIONS;

Для получения дополнительной информации см. SHOW FUNCTIONS.

Удаление UDF

Выполните следующую команду для удаления UDF:

DROP [GLOBAL] FUNCTION <function_name>(arg_type [, ...]);

Для получения дополнительной информации см. DROP FUNCTION.

Сопоставление между типами данных SQL и Java

SQL TYPEJava TYPE
BOOLEANjava.lang.Boolean
TINYINTjava.lang.Byte
SMALLINTjava.lang.Short
INTjava.lang.Integer
BIGINTjava.lang.Long
FLOATjava.lang.Float
DOUBLEjava.lang.Double
STRING/VARCHARjava.lang.String

Настройки параметров

Настройте следующую переменную окружения в файле be/conf/be.conf каждой виртуальной машины Java (JVM) в вашем кластере Selena для контроля использования памяти. Если вы используете JDK 8, настройте JAVA_OPTS. Если вы используете JDK 9 или более поздней версии, настройте JAVA_OPTS_FOR_JDK_9_AND_LATER.

JAVA_OPTS="-Xmx12G"

JAVA_OPTS_FOR_JDK_9_AND_LATER="-Xmx12G"

FAQ

Могу ли я использовать статические переменные при создании UDF? Влияют ли статические переменные разных UDF друг на друга?

Да, вы можете использовать статические переменные при компиляции UDF. Статические переменные разных UDF изолированы друг от друга и не влияют друг на друга, даже если UDF имеют классы с одинаковыми именами.