Перейти к основному содержимому
Версия: 2.0.x

Взаимодействие с Selena через Arrow Flight SQL

Начиная с версии v1.5.2, Selena поддерживает подключения через протокол Apache Arrow Flight SQL.

Обзор

С протоколом Arrow Flight SQL вы можете выполнять обычные операторы DDL, DML, DQL и использовать код Python или Java для чтения крупномасштабных данных через драйвер Arrow Flight SQL ADBC или JDBC.

Это решение устанавливает полностью колоночный конвейер передачи данных от колоночного движка выполнения Selena к клиенту, устраняя частые преобразования строк в столбцы и накладные расходы на сериализацию, обычно наблюдаемые в традиционных интерфейсах JDBC и ODBC. Это позволяет Selena передавать данные с нулевым копированием, низкой задержкой и высокой пропускной способностью.

Сценарии использования

Интеграция Arrow Flight SQL делает Selena особенно подходящей для:

  • Рабочих процессов data science, где инструменты, такие как Pandas и Apache Arrow, ожидают колоночные данные.
  • Аналитики озер данных, требующей высокой пропускной способности и низкой задержки доступа к массивным наборам данных.
  • Машинного обучения, где критически важны быстрая итерация и скорость обработки.
  • Платформ аналитики в реальном времени, которые должны доставлять данные с минимальными задержками.

С Arrow Flight SQL вы можете извлечь выгоду из:

  • Сквозной передачи колоночных данных, устраняющей дорогостоящие преобразования между колоночными и строковыми форматами.
  • Перемещения данных с нулевым копированием, снижающего накладные расходы на CPU и память.
  • Низкой задержки и чрезвычайно высокой пропускной способности, ускоряющей анализ и отзывчивость.

Технический подход

Традиционно Selena организует результаты запросов в колоночной структуре Block внутри системы. Однако при использовании JDBC, ODBC или MySQL протоколов данные должны быть:

  1. Сериализованы в байты на основе строк на сервере.
  2. Переданы по сети.
  3. Десериализованы обратно в целевую структуру (часто требуя повторного преобразования в колоночные форматы).

Этот трехэтапный процесс создает:

  • Высокие накладные расходы на сериализацию/десериализацию.
  • Сложные преобразования данных.
  • Задержку, которая растет с объемом данных.

Интеграция с Arrow Flight SQL решает эти проблемы путем:

  • Сохранения колоночного формата от начала до конца, от движка выполнения Selena непосредственно к клиенту.
  • Использования колоночного представления Apache Arrow в памяти, которое оптимизировано для аналитических рабочих нагрузок.
  • Использования протокола Arrow Flight для высокоскоростной передачи, обеспечивающей эффективную потоковую передачу без промежуточных преобразований.

Arrow Flight

Эта конструкция обеспечивает настоящую передачу с нулевым копированием, которая одновременно быстрее и более эффективна по ресурсам, чем традиционные методы.

Кроме того, Selena предлагает универсальный JDBC драйвер для Arrow Flight SQL, поэтому приложения могут использовать этот высокопроизводительный путь передачи, не жертвуя совместимостью с JDBC или взаимодействием с другими системами, поддерживающими Arrow Flight.

Для развертываний, где узлы BE недоступны напрямую для клиентов (например, в частных сетях или кластерах Kubernetes), Selena предоставляет функцию прокси Arrow Flight. При включении FE может выступать в качестве прокси для маршрутизации данных Arrow от узлов BE к клиентам, сохраняя преимущества колоночной передачи при учете ограничений сетевой топологии. Этот режим прокси вносит небольшие накладные расходы на производительность, но обеспечивает доступ к Arrow Flight SQL в средах, где прямое подключение к BE недоступно.

Сравнение производительности

Комплексные тесты демонстрируют значительные улучшения скорости извлечения данных. Для различных типов данных (целые числа, числа с плавающей точкой, строки, логические значения и смешанные столбцы) Arrow Flight SQL постоянно превосходил традиционные интерфейсы PyMySQL и Pandas read_sql. Ключевые результаты включают:

  • Для чтения 10 миллионов строк целых чисел время выполнения снизилось с ~35 секунд до 0,4 секунды (~85× быстрее).
  • Для таблицы со смешанными столбцами улучшение производительности достигло 160× ускорения.
  • Даже в менее сложных запросах (например, одиночные строковые столбцы) прирост производительности превысил 12×.

В среднем Arrow Flight SQL достиг:

  • В 20-160 раз более быстрого времени передачи, в зависимости от сложности запроса и типа данных.
  • Явного снижения использования CPU и памяти благодаря устранению избыточных этапов сериализации.

Эти прироста производительности напрямую транслируются в более быстрые дашборды, более отзывчивые рабочие процессы data science и возможность анализировать гораздо более крупные наборы данных в реальном времени.

Использование

Следуйте этим шагам для подключения и взаимодействия с Selena, используя Python ADBC Driver через протокол Arrow Flight SQL. См. Приложение для полного примера кода.

примечание

Python 3.9 или новее является предварительным требованием.

Шаг 1. Установка библиотек

Используйте pip для установки adbc_driver_manager и adbc_driver_flightsql из PyPI:

pip install adbc_driver_manager
pip install adbc_driver_flightsql

Импортируйте следующие модули или библиотеки в ваш код:

  • Обязательные библиотеки:
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
  • Опциональные модули для лучшей удобства использования и отладки:
import pandas as pd       # Опционально: для лучшего отображения результатов с использованием DataFrame
import traceback # Опционально: для детального отслеживания ошибок во время выполнения SQL
import time # Опционально: для измерения времени выполнения SQL

Шаг 2. Подключение к Selena

примечание
  • Если вы хотите запустить сервис FE с помощью командной строки, вы можете использовать один из следующих способов:

    • Указать переменную окружения JAVA_TOOL_OPTIONS.

      export JAVA_TOOL_OPTIONS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"
    • Указать параметр конфигурации FE JAVA_OPTS в fe.conf. Таким образом, вы можете добавить другие значения JAVA_OPTS.

      JAVA_OPTS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED ..."
  • Если вы хотите запустить сервис в IntelliJ IDEA, вы должны добавить следующую опцию в Build and run в Run/Debug Configurations:

    --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED

Настройка Selena

Перед подключением к Selena через Arrow Flight SQL необходимо сначала настроить узлы FE и BE, чтобы обеспечить включение сервиса Arrow Flight SQL и прослушивание на указанных портах.

В файле конфигурации FE fe.conf и файле конфигурации BE be.conf установите arrow_flight_port на доступный порт. После изменения файлов конфигурации перезапустите сервисы FE и BE, чтобы изменения вступили в силу.

примечание

Вы должны установить разные arrow_flight_port для FE и BE.

Пример:

// fe.conf
arrow_flight_port = 9408
// be.conf
arrow_flight_port = 9419

Настройка Arrow Flight Proxy (Опционально)

Если ваши узлы BE недоступны напрямую из клиентских приложений (например, при развертывании в частных сетях или средах Kubernetes), вы можете включить функцию прокси Arrow Flight на FE для маршрутизации данных от узлов BE через FE.

Функция прокси управляется двумя глобальными переменными:

  • arrow_flight_proxy_enabled: Управляет включением режима прокси. По умолчанию true. При включении возникают небольшие накладные расходы на производительность.
  • arrow_flight_proxy: Указывает имя хоста прокси. Если пусто (по умолчанию), текущий узел FE выступает в качестве прокси. Вы можете установить это значение на конкретное имя хоста, если используете другую конечную точку прокси.

Чтобы настроить эти переменные глобально для всех сессий:

-- Включить или отключить режим прокси (включен по умолчанию)
SET GLOBAL arrow_flight_proxy_enabled = true;

-- Установить конкретное имя хоста прокси (опционально, по умолчанию текущий FE)
SET GLOBAL arrow_flight_proxy = 'your-proxy-hostname:Port';
примечание
  • Функция прокси включена по умолчанию, что может привести к снижению пропускной способности на 8-10% по сравнению с прямыми подключениями к BE. Если ваши клиенты имеют прямой сетевой доступ к узлам BE, вы можете отключить прокси для достижения оптимальной производительности: SET GLOBAL arrow_flight_proxy_enabled = false;
  • Когда arrow_flight_proxy пуст, тикеты будут автоматически маршрутизироваться через узел FE, к которому клиент изначально подключился.

Установка подключения

На стороне клиента создайте клиент Arrow Flight SQL, используя следующую информацию:

  • Адрес хоста Selena FE
  • Порт, который Arrow Flight использует для прослушивания на Selena FE
  • Имя пользователя и пароль пользователя Selena с необходимыми привилегиями

Пример:

FE_HOST = "127.0.0.1"
FE_PORT = 9408

conn = flight_sql.connect(
uri=f"grpc://{FE_HOST}:{FE_PORT}",
db_kwargs={
adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
}
)
cursor = conn.cursor()

После установления соединения вы можете взаимодействовать с Selena, выполняя SQL операторы через возвращенный Cursor.

Шаг 3. (Опционально) Предварительное определение утилитных функций

Эти функции используются для форматирования вывода, стандартизации формата и упрощения отладки. Вы можете опционально определить их в своем коде для тестирования.

# =============================================================================
# Утилитные функции для лучшего форматирования вывода и выполнения SQL
# =============================================================================

# Вывести заголовок раздела
def print_header(title: str):
"""
Вывести заголовок раздела для лучшей читаемости.
"""
print("\n" + "=" * 80)
print(f"🟢 {title}")
print("=" * 80)

# Вывести выполняемый SQL оператор
def print_sql(sql: str):
"""
Вывести SQL оператор перед выполнением.
"""
print(f"\n🟡 SQL:\n{sql.strip()}")

# Вывести результат DataFrame
def print_result(df: pd.DataFrame):
"""
Вывести результат DataFrame в читаемом формате.
"""
if df.empty:
print("\n🟢 Result: (no rows returned)\n")
else:
print("\n🟢 Result:\n")
print(df.to_string(index=False))

# Вывести трассировку ошибки
def print_error(e: Exception):
"""
Вывести трассировку ошибки, если выполнение SQL не удалось.
"""
print("\n🔴 Error occurred:")
traceback.print_exc()

# Выполнить SQL оператор и вывести результат
def execute(sql: str):
"""
Выполнить SQL оператор и вывести результат и время выполнения.
"""
print_sql(sql)
try:
start = time.time() # Опционально: время начала для измерения времени выполнения
cursor.execute(sql)
result = cursor.fetchallarrow() # Arrow Table
df = result.to_pandas() # Опционально: преобразовать в DataFrame для лучшего отображения
print_result(df)
print(f"\n⏱️ Execution time: {time.time() - start:.3f} seconds")
except Exception as e:
print_error(e)

Шаг 4. Взаимодействие с Selena

Этот раздел проведет вас через некоторые базовые операции, такие как создание таблицы, загрузка данных, проверка метаданных таблицы, установка переменных и выполнение запросов.

примечание

Примеры вывода, перечисленные ниже, реализованы на основе опциональных модулей и утилитных функций, описанных на предыдущих шагах.

  1. Создайте базу данных и таблицу, куда будут загружены данные, и проверьте схему таблицы.

    # Шаг 1: Удалить и создать базу данных
    print_header("Step 1: Drop and Create Database")
    execute("DROP DATABASE IF EXISTS sr_arrow_flight_sql FORCE;")
    execute("SHOW DATABASES;")
    execute("CREATE DATABASE sr_arrow_flight_sql;")
    execute("SHOW DATABASES;")
    execute("USE sr_arrow_flight_sql;")

    # Шаг 2: Создать таблицу
    print_header("Step 2: Create Table")
    execute("""
    CREATE TABLE sr_arrow_flight_sql_test
    (
    k0 INT,
    k1 DOUBLE,
    k2 VARCHAR(32) NULL DEFAULT "" COMMENT "",
    k3 DECIMAL(27,9) DEFAULT "0",
    k4 BIGINT NULL DEFAULT '10',
    k5 DATE
    )
    DISTRIBUTED BY HASH(k5) BUCKETS 5
    PROPERTIES("replication_num" = "1");
    """)
    execute("SHOW CREATE TABLE sr_arrow_flight_sql_test;")

    Пример вывода:

    ================================================================================
    🟢 Step 1: Drop and Create Database
    ================================================================================

    🟡 SQL:
    DROP DATABASE IF EXISTS sr_arrow_flight_sql FORCE;
    /Users/selena/test/venv/lib/python3.9/site-packages/adbc_driver_manager/dbapi.py:307: Warning: Cannot disable autocommit; conn will not be DB-API 2.0 compliant
    warnings.warn(

    🟢 Result:

    StatusResult
    0

    ⏱️ Execution time: 0.025 seconds

    🟡 SQL:
    SHOW DATABASES;

    🟢 Result:

    Database
    _statistics_
    hits
    information_schema
    sys

    ⏱️ Execution time: 0.014 seconds

    🟡 SQL:
    CREATE DATABASE sr_arrow_flight_sql;

    🟢 Result:

    StatusResult
    0

    ⏱️ Execution time: 0.012 seconds

    🟡 SQL:
    SHOW DATABASES;

    🟢 Result:

    Database
    _statistics_
    hits
    information_schema
    sr_arrow_flight_sql
    sys

    ⏱️ Execution time: 0.005 seconds

    🟡 SQL:
    USE sr_arrow_flight_sql;

    🟢 Result:

    StatusResult
    0

    ⏱️ Execution time: 0.006 seconds

    ================================================================================
    🟢 Step 2: Create Table
    ================================================================================

    🟡 SQL:
    CREATE TABLE sr_arrow_flight_sql_test
    (
    k0 INT,
    k1 DOUBLE,
    k2 VARCHAR(32) NULL DEFAULT "" COMMENT "",
    k3 DECIMAL(27,9) DEFAULT "0",
    k4 BIGINT NULL DEFAULT '10',
    k5 DATE
    )
    DISTRIBUTED BY HASH(k5) BUCKETS 5
    PROPERTIES("replication_num" = "1");

    🟢 Result:

    StatusResult
    0

    ⏱️ Execution time: 0.021 seconds

    🟡 SQL:
    SHOW CREATE TABLE sr_arrow_flight_sql_test;

    🟢 Result:

    Table Create Table
    sr_arrow_flight_sql_test CREATE TABLE `sr_arrow_flight_sql_test` (\n `k0` int(11) NULL COMMENT "",\n `k1` double NULL COMMENT "",\n `k2` varchar(32) NULL DEFAULT "" COMMENT "",\n `k3` decimal(27, 9) NULL DEFAULT "0" COMMENT "",\n `k4` bigint(20) NULL DEFAULT "10" COMMENT "",\n `k5` date NULL COMMENT ""\n) ENGINE=OLAP \nDUPLICATE KEY(`k0`)\nDISTRIBUTED BY HASH(`k5`) BUCKETS 5 \nPROPERTIES (\n"compression" = "LZ4",\n"fast_schema_evolution" = "true",\n"replicated_storage" = "true",\n"replication_num" = "1"\n);

    ⏱️ Execution time: 0.005 seconds
  2. Вставьте данные, выполните несколько запросов и установите переменные.

    # Шаг 3: Вставить данные
    print_header("Step 3: Insert Data")
    execute("""
    INSERT INTO sr_arrow_flight_sql_test VALUES
    (0, 0.1, "ID", 0.0001, 1111111111, '2025-04-21'),
    (1, 0.20, "ID_1", 1.00000001, 0, '2025-04-21'),
    (2, 3.4, "ID_1", 3.1, 123456, '2025-04-22'),
    (3, 4, "ID", 4, 4, '2025-04-22'),
    (4, 122345.54321, "ID", 122345.54321, 5, '2025-04-22');
    """)

    # Шаг 4: Запросить данные
    print_header("Step 4: Query Data")
    execute("SELECT * FROM sr_arrow_flight_sql_test ORDER BY k0;")

    # Шаг 5: Переменные сессии
    print_header("Step 5: Session Variables")
    execute("SHOW VARIABLES LIKE '%query_mem_limit%';")
    execute("SET query_mem_limit = 2147483648;")
    execute("SHOW VARIABLES LIKE '%query_mem_limit%';")
    execute("SHOW VARIABLES LIKE '%arrow_flight_proxy%';")
    execute("SET arrow_flight_proxy_enabled = true;")
    execute("SET arrow_flight_proxy = 'fe-proxy.example.com';")
    execute("SHOW VARIABLES LIKE '%arrow_flight_proxy%';")

    # Шаг 6: Агрегационный запрос
    print_header("Step 6: Aggregation Query")
    execute("""
    SELECT k5, SUM(k1) AS total_k1, COUNT(1) AS row_count, AVG(k3) AS avg_k3
    FROM sr_arrow_flight_sql_test
    GROUP BY k5
    ORDER BY k5;
    """)

    Пример вывода:

    ================================================================================
    🟢 Step 3: Insert Data
    ================================================================================

    🟡 SQL:
    INSERT INTO sr_arrow_flight_sql_test VALUES
    (0, 0.1, "ID", 0.0001, 1111111111, '2025-04-21'),
    (1, 0.20, "ID_1", 1.00000001, 0, '2025-04-21'),
    (2, 3.4, "ID_1", 3.1, 123456, '2025-04-22'),
    (3, 4, "ID", 4, 4, '2025-04-22'),
    (4, 122345.54321, "ID", 122345.54321, 5, '2025-04-22');

    🟢 Result:

    StatusResult
    0

    ⏱️ Execution time: 0.149 seconds

    ================================================================================
    🟢 Step 4: Query Data
    ================================================================================

    🟡 SQL:
    SELECT * FROM sr_arrow_flight_sql_test ORDER BY k0;

    🟢 Result:

    0 0.10000 ID 0.000100000 1111111111 2025-04-21
    1 0.20000 ID_1 1.000000010 0 2025-04-21
    2 3.40000 ID_1 3.100000000 123456 2025-04-22
    3 4.00000 ID 4.000000000 4 2025-04-22
    4 122345.54321 ID 122345.543210000 5 2025-04-22

    ⏱️ Execution time: 0.019 seconds

    ================================================================================
    🟢 Step 5: Session Variables
    ================================================================================

    🟡 SQL:
    SHOW VARIABLES LIKE '%query_mem_limit%';

    🟢 Result:

    Variable_name Value
    query_mem_limit 0

    ⏱️ Execution time: 0.005 seconds

    🟡 SQL:
    SET query_mem_limit = 2147483648;

    🟢 Result:

    StatusResult
    0

    ⏱️ Execution time: 0.007 seconds

    🟡 SQL:
    SHOW VARIABLES LIKE '%query_mem_limit%';

    🟢 Result:

    Variable_name Value
    query_mem_limit 2147483648

    ⏱️ Execution time: 0.005 seconds

    🟡 SQL:
    SHOW VARIABLES LIKE '%arrow_flight_proxy%';

    🟢 Result:

    Variable_name Value
    arrow_flight_proxy
    arrow_flight_proxy_enabled true

    ⏱️ Execution time: 0.006 seconds

    🟡 SQL:
    SET arrow_flight_proxy_enabled = true;

    🟢 Result:

    StatusResult
    0

    ⏱️ Execution time: 0.008 seconds

    🟡 SQL:
    SET arrow_flight_proxy = 'fe-proxy.example.com';

    🟢 Result:

    StatusResult
    0

    ⏱️ Execution time: 0.007 seconds

    🟡 SQL:
    SHOW VARIABLES LIKE '%arrow_flight_proxy%';

    🟢 Result:

    Variable_name Value
    arrow_flight_proxy fe-proxy.example.com
    arrow_flight_proxy_enabled true

    ⏱️ Execution time: 0.006 seconds

    ================================================================================
    🟢 Step 6: Aggregation Query
    ================================================================================

    🟡 SQL:
    SELECT k5, SUM(k1) AS total_k1, COUNT(1) AS row_count, AVG(k3) AS avg_k3
    FROM sr_arrow_flight_sql_test
    GROUP BY k5
    ORDER BY k5;

    🟢 Result:

    2025-04-21 0.30000 2 0.500050005000
    2025-04-22 122352.94321 3 40784.214403333333

    ⏱️ Execution time: 0.014 second

Шаг 5. Закрытие соединения

Включите следующий раздел в ваш код для закрытия соединения.

# Шаг 7: Закрыть
print_header("Step 7: Close Connection")
cursor.close()
conn.close()
print("✅ Test completed successfully.")

Пример вывода:

================================================================================
🟢 Step 7: Close Connection
================================================================================
✅ Test completed successfully.

Process finished with exit code 0

Примеры использования крупномасштабной передачи данных

Python

После подключения к Selena (с поддержкой Arrow Flight SQL) через ADBC Driver в Python вы можете использовать различные API ADBC для загрузки набора данных Clickbench из Selena в Python.

Пример кода:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
from datetime import datetime

# ----------------------------------------
# Настройки подключения Selena Flight SQL
# ----------------------------------------
# Замените URI и учетные данные по мере необходимости
my_uri = "grpc://127.0.0.1:9408" # Порт по умолчанию Flight SQL для Selena
my_db_kwargs = {
adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
}

# ----------------------------------------
# SQL запрос (ClickBench: таблица hits)
# ----------------------------------------
# Замените на актуальную таблицу и набор данных по мере необходимости
sql = "SELECT * FROM clickbench.hits LIMIT 1000000;" # Прочитать 1 миллион строк

# ----------------------------------------
# Метод 1: fetchallarrow + to_pandas
# ----------------------------------------
def test_fetchallarrow():
conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
cursor = conn.cursor()
start = datetime.now()
cursor.execute(sql)
arrow_table = cursor.fetchallarrow()
df = arrow_table.to_pandas()
duration = datetime.now() - start

print("\n[Method 1] fetchallarrow + to_pandas")
print(f"Time taken: {duration}, Arrow table size: {arrow_table.nbytes / 1024 / 1024:.2f} MB, Rows: {len(df)}")
print(df.info(memory_usage='deep'))

# ----------------------------------------
# Метод 2: fetch_df (рекомендуется)
# ----------------------------------------
def test_fetch_df():
conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
cursor = conn.cursor()
start = datetime.now()
cursor.execute(sql)
df = cursor.fetch_df()
duration = datetime.now() - start

print("\n[Method 2] fetch_df (recommended)")
print(f"Time taken: {duration}, Rows: {len(df)}")
print(df.info(memory_usage='deep'))

# ----------------------------------------
# Метод 3: adbc_execute_partitions (для параллельного чтения)
# ----------------------------------------
def test_execute_partitions():
conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
cursor = conn.cursor()
start = datetime.now()
partitions, schema = cursor.adbc_execute_partitions(sql)

# Прочитать первый partition (для демонстрации)
cursor.adbc_read_partition(partitions[0])
arrow_table = cursor.fetchallarrow()
df = arrow_table.to_pandas()
duration = datetime.now() - start

print("\n[Method 3] adbc_execute_partitions (parallel read)")
print(f"Time taken: {duration}, Partitions: {len(partitions)}, Rows: {len(df)}")
print(df.info(memory_usage='deep'))

# ----------------------------------------
# Запустить все тесты
# ----------------------------------------
if __name__ == "__main__":
test_fetchallarrow()
test_fetch_df()
test_execute_partitions()

Результаты показывают, что загрузка 1 миллиона строк набора данных Clickbench (105 столбцов, 780 МБ) из Selena заняла всего 3 секунды.

[Method 1] fetchallarrow + to_pandas
Time taken: 0:00:03.219575, Arrow table size: 717.42 MB, Rows: 1000000
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Columns: 105 entries, CounterID to CLID
dtypes: int16(48), int32(19), int64(6), object(32)
memory usage: 2.4 GB

[Method 2] fetch_df (recommended)
Time taken: 0:00:02.358840, Rows: 1000000
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Columns: 105 entries, CounterID to CLID
dtypes: int16(48), int32(19), int64(6), object(32)
memory usage: 2.4 GB

[Method 3] adbc_execute_partitions (parallel read)
Time taken: 0:00:02.231144, Partitions: 1, Rows: 1000000
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Columns: 105 entries, CounterID to CLID
dtypes: int16(48), int32(19), int64(6), object(32)
memory usage: 2.4 GB

Arrow Flight SQL JDBC Driver

Протокол Arrow Flight SQL предоставляет JDBC драйвер с открытым исходным кодом, совместимый со стандартным интерфейсом JDBC. Вы можете легко интегрировать его в различные инструменты BI (такие как Tableau, Power BI, DBeaver и т.д.) для доступа к базе данных Selena, точно так же, как с традиционным JDBC драйвером. Значительным преимуществом этого драйвера является его поддержка высокоскоростной передачи данных на основе Apache Arrow, что значительно повышает эффективность запросов и передачи данных. Использование почти идентично традиционному MySQL JDBC драйверу. Вам нужно только заменить jdbc:mysql на jdbc:arrow-flight-sql в URL подключения для бесшовного переключения. Результаты запроса по-прежнему возвращаются в стандартном формате ResultSet, обеспечивая совместимость с существующей логикой обработки JDBC.

примечание

Обратите внимание, что если вы используете Java 9 или новее, вы должны добавить --add-opens=java.base/java.nio=ALL-UNNAMED в ваш Java код, чтобы открыть внутреннюю структуру JDK. В противном случае вы можете столкнуться с определенными ошибками.

  • Если вы хотите запустить сервис FE с помощью командной строки, вы можете использовать один из следующих способов:

    • Указать переменную окружения JAVA_TOOL_OPTIONS.

      export JAVA_TOOL_OPTIONS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"
    • Указать параметр конфигурации FE JAVA_OPTS в fe.conf. Таким образом, вы можете добавить другие значения JAVA_OPTS.

      JAVA_OPTS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED ..."
  • Если вы хотите отлаживать в IntelliJ IDEA, вы должны добавить следующую опцию в Build and run в Run/Debug Configurations:

    --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED

Arrow Flight Example

Нажмите здесь, чтобы просмотреть зависимости POM
<properties>
<adbc.version>0.15.0</adbc.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-jdbc</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-core</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-manager</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-sql</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-flight-sql</artifactId>
<version>${adbc.version}</version>
</dependency>
</dependencies>

Пример кода:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

public class ArrowFlightSqlIntegrationTest {

private static final String JDBC_URL = "jdbc:arrow-flight-sql://127.0.0.1:9408"
+ "?useEncryption=false"
+ "&useServerPrepStmts=false"
+ "&useSSL=false"
+ "&useArrowFlightSql=true";

private static final String USER = "root";
private static final String PASSWORD = "";

private static int testCaseNum = 1;

public static void main(String[] args) {
try {
// Загрузить Arrow Flight SQL JDBC driver
Class.forName("org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver");

try (Connection conn = DriverManager.getConnection(JDBC_URL, USER, PASSWORD);
Statement stmt = conn.createStatement()) {

testUpdate(stmt, "DROP DATABASE IF EXISTS sr_arrow_flight_sql FORCE;");
testQuery(stmt, "SHOW PROCESSLIST;");
testUpdate(stmt, "CREATE DATABASE sr_arrow_flight_sql;");
testQuery(stmt, "SHOW DATABASES;");
testUpdate(stmt, "USE sr_arrow_flight_sql;");
testUpdate(stmt, "CREATE TABLE sr_table_test (id INT, name STRING) ENGINE=OLAP PRIMARY KEY (id) " +
"DISTRIBUTED BY HASH(id) BUCKETS 1 " +
"PROPERTIES ('replication_num' = '1');");
testUpdate(stmt, "INSERT INTO sr_table_test VALUES (1, 'Alice'), (2, 'Bob');");
testQuery(stmt, "SELECT * FROM sr_arrow_flight_sql.sr_table_test;");
}
} catch (Exception e) {
e.printStackTrace();
}

}

/**
* Выполняет запрос и выводит результат в консоль.
*/
private static void testQuery(Statement stmt, String sql) throws Exception {
System.out.println("Test Case: " + testCaseNum);
System.out.println("▶ Executing query: " + sql);
ResultSet rs = stmt.executeQuery(sql);
try {
System.out.println("Result:");
int columnCount = rs.getMetaData().getColumnCount();
while (rs.next()) {
for (int i = 1; i <= columnCount; i++) {
System.out.print(rs.getString(i) + "\t");
}
System.out.println();
}
} finally {
rs.close();
}
testCaseNum++;
System.out.println();
}

/**
* Выполняет обновление (DDL или DML) и выводит результат в консоль.
*/
private static void testUpdate(Statement stmt, String sql) throws Exception {
System.out.println("Test Case: " + testCaseNum);
System.out.println("▶ Executing update: " + sql);
stmt.executeUpdate(sql);
System.out.println("Result: ✅ Success");
testCaseNum++;
System.out.println();
}
}

Результаты выполнения:

Test Case: 1
▶ Executing update: DROP DATABASE IF EXISTS sr_arrow_flight_sql FORCE;
Result: ✅ Success

Test Case: 2
▶ Executing query: SHOW PROCESSLIST;
Result:
192.168.124.48_9010_1751449846872 16777217 root Query 2025-07-02 18:46:49 0 OK SHOW PROCESSLIST; false default_warehouse

Test Case: 3
▶ Executing update: CREATE DATABASE sr_arrow_flight_sql;
Result: ✅ Success

Test Case: 4
▶ Executing query: SHOW DATABASES;
Result:
_statistics_
information_schema
sr_arrow_flight_sql
sys

Test Case: 5
▶ Executing update: USE sr_arrow_flight_sql;
Result: ✅ Success

Test Case: 6
▶ Executing update: CREATE TABLE sr_table_test (id INT, name STRING) ENGINE=OLAP PRIMARY KEY (id) DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ('replication_num' = '1');
Result: ✅ Success

Test Case: 7
▶ Executing update: INSERT INTO sr_table_test VALUES (1, 'Alice'), (2, 'Bob');
Result: ✅ Success

Test Case: 8
▶ Executing query: SELECT * FROM sr_arrow_flight_sql.sr_table_test;
Result:
1 Alice
2 Bob

Java ADBC Driver

Протокол Arrow Flight SQL предоставляет JDBC драйвер с открытым исходным кодом, совместимый со стандартным интерфейсом JDBC. Вы можете легко интегрировать его в различные инструменты BI (такие как Tableau, Power BI, DBeaver и т.д.) для доступа к базе данных Selena, точно так же, как с традиционным JDBC драйвером. Значительным преимуществом этого драйвера является его поддержка высокоскоростной передачи данных на основе Apache Arrow, что значительно повышает эффективность запросов и передачи данных. Использование почти идентично традиционному MySQL JDBC драйверу.

примечание
  • Если вы хотите запустить сервис FE с помощью командной строки, вы можете использовать один из следующих способов:

    • Указать переменную окружения JAVA_TOOL_OPTIONS.

      export JAVA_TOOL_OPTIONS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"
    • Указать параметр конфигурации FE JAVA_OPTS в fe.conf. Таким образом, вы можете добавить другие значения JAVA_OPTS.

      JAVA_OPTS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED ..."
  • Если вы хотите отлаживать в IntelliJ IDEA, вы должны добавить следующую опцию в Build and run в Run/Debug Configurations:

    --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
Зависимости POM
<properties>
<adbc.version>0.15.0</adbc.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-jdbc</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-core</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-manager</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-sql</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-flight-sql</artifactId>
<version>${adbc.version}</version>
</dependency>
</dependencies>

Аналогично Python, вы также можете напрямую создать клиент ADBC в Java для чтения данных из Selena.

В этом процессе вам сначала нужно получить FlightInfo, а затем подключиться к каждому Endpoint для получения данных.

Пример кода:

public static void main(String[] args) throws Exception {
try (BufferAllocator allocator = new RootAllocator()) {
FlightSqlDriver driver = new FlightSqlDriver(allocator);

Map<String, Object> parameters = new HashMap<>();
String host = "localhost";
int port = 9408;
String uri = Location.forGrpcInsecure(host, port).getUri().toString();

AdbcDriver.PARAM_URI.set(parameters, uri);
AdbcDriver.PARAM_USERNAME.set(parameters, "root");
AdbcDriver.PARAM_PASSWORD.set(parameters, "");

try (AdbcDatabase database = driver.open(parameters);
AdbcConnection connection = database.connect();
AdbcStatement statement = connection.createStatement()) {

statement.setSqlQuery("SHOW DATABASES;");

try (AdbcStatement.QueryResult result = statement.executeQuery();
ArrowReader reader = result.getReader()) {

int batchCount = 0;
while (reader.loadNextBatch()) {
batchCount++;
VectorSchemaRoot root = reader.getVectorSchemaRoot();
System.out.println("Batch " + batchCount + ":");
System.out.println(root.contentToTSVString());
}

System.out.println("Total batches: " + batchCount);
}
}
}
}

Рекомендации

  • Среди трех методов подключения Java Arrow Flight SQL, упомянутых выше:

    • Если последующий анализ данных опирается на форматы данных на основе строк, рекомендуется использовать jdbc:arrow-flight-sql, который возвращает данные в формате JDBC ResultSet.
    • Если анализ может напрямую обрабатывать формат Arrow или другие колоночные форматы данных, вы можете использовать Flight AdbcDriver или Flight JdbcDriver. Эти варианты возвращают данные в формате Arrow напрямую, избегая преобразования строк в столбцы и используя функции Arrow для ускорения разбора данных.
  • Независимо от того, разбираете ли вы JDBC ResultSet или данные в формате Arrow, время разбора часто превышает время, затраченное на чтение самих данных. Если вы обнаружите, что Arrow Flight SQL не обеспечивает ожидаемое улучшение производительности по сравнению с jdbc:mysql://, рассмотрите возможность исследования того, занимает ли разбор данных слишком много времени.

  • Для всех методов подключения чтение данных с JDK 17 обычно быстрее, чем с JDK 1.8.

  • При чтении крупномасштабных наборов данных Arrow Flight SQL обычно потребляет меньше памяти по сравнению с jdbc:mysql://. Поэтому, если вы испытываете ограничения памяти, также стоит попробовать Arrow Flight SQL.

  • В дополнение к трем методам подключения выше, вы также можете использовать нативный FlightClient для подключения к Arrow Flight Server, обеспечивая более гибкое параллельное чтение из нескольких endpoints. Java Flight AdbcDriver построен поверх FlightClient и предоставляет более простой интерфейс по сравнению с прямым использованием FlightClient.

Spark

В настоящее время официальный проект Arrow Flight не планирует поддерживать Spark или Flink. В будущем поддержка будет постепенно добавляться, чтобы позволить selena-spark-connector получать доступ к Selena через Arrow Flight SQL, с ожидаемым улучшением производительности чтения в несколько раз.

При доступе к Selena с помощью Spark, в дополнение к традиционным методам JDBC или Java клиента, вы также можете использовать компонент Spark-Flight-Connector с открытым исходным кодом для прямого чтения из и записи в Selena Flight SQL Server в качестве Spark DataSource. Этот подход, основанный на протоколе Apache Arrow Flight, предлагает следующие значительные преимущества:

  • Высокопроизводительная передача данных Spark-Flight-Connector использует Apache Arrow в качестве формата передачи данных, обеспечивая обмен данными с нулевым копированием и высокой эффективностью. Преобразование между внутренним форматом данных Block Selena и Arrow очень эффективно, достигая улучшения производительности до 10 раз по сравнению с традиционными методами CSV или JDBC, и значительно снижая накладные расходы на передачу данных.
  • Нативная поддержка сложных типов данных Формат данных Arrow нативно поддерживает сложные типы (такие как Map, Array, Struct и т.д.), обеспечивая лучшую адаптацию к сложным моделям данных Selena по сравнению с традиционными методами JDBC, и улучшая выразительность и совместимость данных.
  • Поддержка чтения, записи и потоковой записи Компонент поддерживает Spark в качестве клиента Flight SQL для эффективных операций чтения и записи, включая операторы DML insert, merge, update и delete, и даже поддерживает потоковую запись, что делает его подходящим для сценариев обработки данных в реальном времени.
  • Поддержка проталкивания предикатов и обрезки столбцов При чтении данных Spark-Flight-Connector поддерживает проталкивание предикатов и обрезку столбцов, позволяя фильтровать данные и выбирать столбцы на стороне Selena, значительно уменьшая объем передаваемых данных и улучшая производительность запросов.
  • Поддержка проталкивания агрегации и параллельного чтения Агрегационные операции (такие как sum, count, max, min и т.д.) могут быть протолкнуты в Selena для выполнения, уменьшая вычислительную нагрузку на Spark. Также поддерживается параллельное чтение на основе разделения, улучшая эффективность чтения в сценариях с большими данными.
  • Лучше для сценариев больших данных По сравнению с традиционными методами JDBC, протокол Flight SQL лучше подходит для крупномасштабных сценариев доступа с высокой параллельностью, позволяя Selena полностью использовать свои высокопроизводительные аналитические возможности.

Приложение

Следующий полный пример кода из учебника по использованию.

# =============================================================================
# Тестовый скрипт Selena Arrow Flight SQL
# =============================================================================
# pip install adbc_driver_manager adbc_driver_flightsql pandas
# =============================================================================

# =============================================================================
# Необходимые основные модули для подключения к Selena через Arrow Flight SQL
# =============================================================================
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql

# =============================================================================
# Опциональные модули для лучшей удобства использования и отладки
# =============================================================================
import pandas as pd # Опционально: для лучшего отображения результатов с использованием DataFrame
import traceback # Опционально: для детального отслеживания ошибок во время выполнения SQL
import time # Опционально: для измерения времени выполнения SQL

# =============================================================================
# Конфигурация Selena Flight SQL
# =============================================================================
FE_HOST = "127.0.0.1"
FE_PORT = 9408

# =============================================================================
# Подключение к Selena
# =============================================================================
conn = flight_sql.connect(
uri=f"grpc://{FE_HOST}:{FE_PORT}",
db_kwargs={
adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
}
)

cursor = conn.cursor()

# =============================================================================
# Утилитные функции для лучшего форматирования вывода и выполнения SQL
# =============================================================================

def print_header(title: str):
"""
Вывести заголовок раздела для лучшей читаемости.
"""
print("\n" + "=" * 80)
print(f"🟢 {title}")
print("=" * 80)


def print_sql(sql: str):
"""
Вывести SQL оператор перед выполнением.
"""
print(f"\n🟡 SQL:\n{sql.strip()}")


def print_result(df: pd.DataFrame):
"""
Вывести результат DataFrame в читаемом формате.
"""
if df.empty:
print("\n🟢 Result: (no rows returned)\n")
else:
print("\n🟢 Result:\n")
print(df.to_string(index=False))


def print_error(e: Exception):
"""
Вывести трассировку ошибки, если выполнение SQL не удалось.
"""
print("\n🔴 Error occurred:")
traceback.print_exc()


def execute(sql: str):
"""
Выполнить SQL оператор и вывести результат и время выполнения.
"""
print_sql(sql)
try:
start = time.time() # Время начала для измерения времени выполнения
cursor.execute(sql)
result = cursor.fetchallarrow() # Arrow Table
df = result.to_pandas() # Преобразовать в DataFrame для лучшего отображения
print_result(df)
print(f"\n⏱️ Execution time: {time.time() - start:.3f} seconds")
except Exception as e:
print_error(e)

# =============================================================================
# Шаг 1: Удалить и создать базу данных
# =============================================================================
print_header("Step 1: Drop and Create Database")
execute("DROP DATABASE IF EXISTS sr_arrow_flight_sql FORCE;")
execute("SHOW DATABASES;")
execute("CREATE DATABASE sr_arrow_flight_sql;")
execute("SHOW DATABASES;")
execute("USE sr_arrow_flight_sql;")

# =============================================================================
# Шаг 2: Создать таблицу
# =============================================================================
print_header("Step 2: Create Table")
execute("""
CREATE TABLE sr_arrow_flight_sql_test
(
k0 INT,
k1 DOUBLE,
k2 VARCHAR(32) NULL DEFAULT "" COMMENT "",
k3 DECIMAL(27,9) DEFAULT "0",
k4 BIGINT NULL DEFAULT '10',
k5 DATE
)
DISTRIBUTED BY HASH(k5) BUCKETS 5
PROPERTIES("replication_num" = "1");
""")

execute("SHOW CREATE TABLE sr_arrow_flight_sql_test;")

# =============================================================================
# Шаг 3: Вставить данные
# =============================================================================
print_header("Step 3: Insert Data")
execute("""
INSERT INTO sr_arrow_flight_sql_test VALUES
(0, 0.1, "ID", 0.0001, 1111111111, '2025-04-21'),
(1, 0.20, "ID_1", 1.00000001, 0, '2025-04-21'),
(2, 3.4, "ID_1", 3.1, 123456, '2025-04-22'),
(3, 4, "ID", 4, 4, '2025-04-22'),
(4, 122345.54321, "ID", 122345.54321, 5, '2025-04-22');
""")

# =============================================================================
# Шаг 4: Запросить данные
# =============================================================================
print_header("Step 4: Query Data")
execute("SELECT * FROM sr_arrow_flight_sql_test ORDER BY k0;")

# =============================================================================
# Шаг 5: Переменные сессии
# =============================================================================
print_header("Step 5: Session Variables")
execute("SHOW VARIABLES LIKE '%query_mem_limit%';")
execute("SET query_mem_limit = 2147483648;")
execute("SHOW VARIABLES LIKE '%query_mem_limit%';")
execute("SHOW VARIABLES LIKE '%arrow_flight_proxy%';")
execute("SET arrow_flight_proxy_enabled = true;")
execute("SET arrow_flight_proxy = 'fe-proxy.example.com';")
execute("SHOW VARIABLES LIKE '%arrow_flight_proxy%';")

# =============================================================================
# Шаг 6: Агрегационный запрос
# =============================================================================
print_header("Step 6: Aggregation Query")
execute("""
SELECT k5, SUM(k1) AS total_k1, COUNT(1) AS row_count, AVG(k3) AS avg_k3
FROM sr_arrow_flight_sql_test
GROUP BY k5
ORDER BY k5;
""")

# =============================================================================
# Шаг 7: Закрыть соединение
# =============================================================================
print_header("Step 7: Close Connection")
cursor.close()
conn.close()
print("✅ Test completed successfully.")