Перейти к основному содержимому
Версия: 2.0.x

Управление схемами и миграция с помощью SQLAlchemy и Alembic

Это руководство описывает, как управлять схемами Selena с использованием экосистемы Python — включая SQLAlchemy, Alembic и sqlacodegen — через диалект selena SQLAlchemy. Оно помогает понять, почему миграция схем полезна и как эффективно использовать её с Selena.

Обзор

Многие пользователи управляют таблицами, представлениями и материализованными представлениями Selena напрямую с помощью SQL DDL. Однако по мере роста проектов ручное сопровождение операторов ALTER TABLE становится подверженным ошибкам и сложным для отслеживания.

Диалект Selena SQLAlchemy (selena) предоставляет:

  • Полноценный слой моделей SQLAlchemy для таблиц, представлений и материализованных представлений Selena
  • Декларативные определения для схем таблиц и свойств таблиц (включая представления и материализованные представления)
  • Интеграцию с Alembic для автоматического обнаружения и генерации изменений схемы
  • Совместимость с инструментами типа sqlacodegen для обратной генерации моделей

Это позволяет пользователям Python поддерживать схемы Selena декларативным, версионируемым и автоматизированным способом.

Ключевые преимущества

Хотя миграция схем традиционно ассоциируется с OLTP базами данных, она также ценна в системах хранилищ данных, таких как Selena. Команды используют Alembic вместе с диалектом Selena благодаря преимуществам, перечисленным ниже.

Декларативное определение схемы

После того как вы определите схему в моделях Python ORM или в стиле SQLAlchemy core, вам больше не требуется вручную писать операторы ALTER TABLE.

Автоматическое сравнение и автогенерация

Alembic сравнивает текущую схему Selena с вашими моделями SQLAlchemy и автоматически генерирует скрипты миграции (CREATE/DROP/ALTER).

Проверяемые, версионируемые миграции

Каждое изменение схемы становится файлом миграции (Python), поэтому пользователи могут отслеживать изменения и откатывать их при необходимости.

Единообразный workflow для всех окружений

Изменения схемы можно применять к development, staging и production с использованием одного и того же процесса.

Установка и подключение

Предварительные требования**

  • Python-клиент Selena: 1.3.2 или новее
  • SQLAlchemy: 1.4 или новее (рекомендуется SQLAlchemy 2.0; требуется для использования sqlacodegen)
  • Alembic: 1.16 или новее

Установка Python-клиента Selena

Выполните следующую команду для установки Python-клиента Selena.

pip install selena

Подключение к Selena

Подключитесь к вашему cluster Selena с использованием следующего URL.

selena://<user>:<password>@<FE_host>:<query_port>/[<catalog>.]<database>
  • user: Имя пользователя для подключения к cluster.
  • password: Пароль пользователя.
  • FE_host: IP-адрес FE.
  • query_port: query_port FE (по умолчанию: 9030).
  • catalog: Имя catalog, в котором находится ваша база данных.
  • database: Имя базы данных, к которой вы хотите подключиться.

После установки вы можете быстро проверить подключение с помощью следующего примера кода:

from sqlalchemy import create_engine, text

# сначала нужно создать `mydatabase`
engine = create_engine("selena://root@localhost:9030/mydatabase")

with engine.connect() as conn:
conn.execute(text("SELECT 1")).fetchall()
print("Connection successful!")

Определение моделей Selena (декларативный ORM)

Диалект Selena поддерживает:

  • Таблицы
  • Представления
  • Материализованные представления

Он также поддерживает специфичные для Selena атрибуты таблиц, такие как:

  • ENGINE (OLAP)
  • Модели ключей (DUPLICATE KEY, PRIMARY KEY, UNIQUE KEY, AGGREGATE KEY)
  • Варианты PARTITION BY (RANGE / LIST / Expression partitioning)
  • Варианты DISTRIBUTED BY (HASH / RANDOM)
  • ORDER BY
  • Свойства таблиц (например, replication_num, storage_medium)
important
  • Опции диалекта Selena передаются как именованные аргументы с префиксом selena_.
  • Префикс selena_ должен быть в нижнем регистре. Суффикс принимается в любом регистре (например, PRIMARY_KEY и primary_key).
  • Если вы указываете ключ таблицы (например, selena_primary_key="id"), задействованные колонки также должны быть помечены primary_key=True в Column(...), чтобы метаданные SQLAlchemy и автогенерация Alembic работали корректно.

Примеры ниже отражают реальный публичный API и имена параметров.

Пример таблицы

Опции таблиц Selena могут быть указаны как в стиле ORM (через __table_args__), так и в стиле Core (через Table(..., selena_...=...)).

Стиль ORM (декларативный)

from sqlalchemy import create_engine
from sqlalchemy.orm import Mapped, declarative_base, mapped_column
from selena import INTEGER, STRING

# используем тот же engine, что и в быстром тесте
engine = create_engine("selena://root@localhost:9030/mydatabase")

Base = declarative_base()

class MyTable(Base):
__tablename__ = 'my_orm_table'
id: Mapped[int] = mapped_column(INTEGER, primary_key=True)
name: Mapped[str] = mapped_column(STRING)

__table_args__ = {
'comment': 'table comment',

'selena_primary_key': 'id',
'selena_distributed_by': 'HASH(id) BUCKETS 10',
'selena_properties': {'replication_num': '1'}
}

# Создание таблицы в базе данных
Base.metadata.create_all(engine)

Стиль Core

from sqlalchemy import Column, MetaData, Table, create_engine
from selena import INTEGER, VARCHAR

# используем тот же engine, что и в быстром тесте
engine = create_engine("selena://root@localhost:9030/mydatabase")

metadata = MetaData()

my_core_table = Table(
'my_core_table',
metadata,
Column('id', INTEGER, primary_key=True),
Column('name', VARCHAR(50)),

# Специфичные для Selena аргументы
selena_primary_key='id',
selena_distributed_by='HASH(id) BUCKETS 10',
selena_properties={"replication_num": "1"}
)

# Создание таблицы в базе данных
metadata.create_all(engine)
примечание

Для полного справочника по атрибутам таблиц и типам данных см. Справочник [4].

Пример представления

Ниже приведен рекомендуемый стиль определения представления, использующий columns как список словарей (name/comment). Этот пример основан на существующей таблице my_core_table.

from selena.schema import View

# Повторно используем metadata из примера Core-таблицы выше
metadata = my_core_table.metadata

user_view = View(
"user_view",
metadata,
definition="SELECT id, name FROM my_core_table WHERE name IS NOT NULL",
columns=[
{"name": "id", "comment": "ID"},
{"name": "name", "comment": "Name"},
],
comment="Active users",
)
примечание

Для получения дополнительной информации об опциях View и ограничениях см. Справочник [5].

Пример материализованного представления

Материализованные представления определяются аналогично. Свойство selena_refresh — это строка синтаксиса, указывающая стратегию обновления.

from selena.schema import MaterializedView

# Повторно используем metadata из примера Core-таблицы выше
metadata = my_core_table.metadata

# Создание простого материализованного представления (асинхронное обновление)
user_stats_ = MaterializedView(
'user_stats_',
metadata,
definition='SELECT id, COUNT(*) AS cnt FROM my_core_table GROUP BY id',
selena_refresh='ASYNC'
)
примечание

Для получения дополнительной информации об опциях и ограничениях ALTER см. Справочник [6].

Интеграция с Alembic

Диалект Selena SQLAlchemy обеспечивает полную поддержку:

  • Создание / удаление таблицы
  • Создание / удаление представления
  • Создание / удаление материализованного представления
  • Обнаружение поддерживаемых изменений в специфичных для Selena атрибутах (например, свойства таблиц и распределение)

Это позволяет корректно работать автогенерации Alembic.

Инициализация Alembic

  1. Инициализируйте Alembic:

    alembic init migrations
  2. Настройте URL вашей базы данных в alembic.ini:

    # alembic.ini
    sqlalchemy.url = selena://<user>:<password>@<FE_host>:<query_port>/[<catalog>.]<database>
  3. Включите логирование диалекта Selena (опционально):

    Вы можете включить логгер selena в alembic.ini, чтобы отслеживать обнаруженные изменения таблицы через логи. Подробнее см. Справочник [2].

    Отредактируйте env.py (настройте оба пути — offline и online):

    from alembic import context
    from selena.alembic import render_column_type, include_object_for_view_
    from selena.alembic.selena import SelenaImpl # noqa: F401 (обеспечиваем регистрацию impl)

    from myapp.models import Base # настройте под ваш проект

    target_metadata = Base.metadata


    def run_migrations_offline() -> None:
    url = context.config.get_main_option("sqlalchemy.url")
    context.configure(
    url=url,
    target_metadata=target_metadata,
    render_item=render_column_type,
    include_object=include_object_for_view_
    )

    with context.begin_transaction():
    context.run_migrations()


    def run_migrations_online() -> None:
    # ... создание engine и подключение как в стандартном env.py из alembic ...
    with connectable.connect() as connection:
    context.configure(
    connection=connection,
    target_metadata=target_metadata,
    render_item=render_column_type,
    include_object=include_object_for_view_
    )

    with context.begin_transaction():
    context.run_migrations()

Автоматическая генерация миграций

alembic revision --autogenerate -m "initial schema"

Alembic сравнит модели SQLAlchemy с фактической схемой Selena и выведет корректный DDL.

Применение миграций

alembic upgrade head

Откат также поддерживается (где это возможно).

important

DDL в Selena не транзакционен для нескольких операторов. Если upgrade прерывается, вам может потребоваться проверить, что уже было применено, и выполнить ручное исправление (например, написать компенсирующую миграцию или выполнить ручной DDL) перед повторным запуском.

Поддерживаемые операции изменения схемы

Диалект поддерживает автогенерацию Alembic для:

  • Таблиц: создание / удаление, и сравнение специфичных для Selena атрибутов, объявленных через selena_* (в рамках поддержки ALTER в Selena)
  • Представлений: создание / удаление / изменение (в основном изменения, связанные с определением; некоторые атрибуты неизменяемы)
  • Материализованных представлений: создание / удаление / изменение (ограничено изменяемыми частями, такими как стратегии обновления и свойства)

Некоторые изменения DDL в Selena не обратимы или не изменяемы. Вы можете внести эти изменения только путем удаления и пересоздания таблицы/представления/материализованного представления. Если вы укажете такие изменения в диалекте, автогенерация выдаст предупреждение или ошибку.

Сквозной пример (рекомендуется для начинающих)

Этот раздел показывает выполнимый сквозной workflow, включая места для паузы и проверки сгенерированных файлов.

Шаг 1. Создайте директорию проекта и инициализируйте Alembic

mkdir my_sr_alembic_project
cd my_sr_alembic_project

alembic init alembic

Шаг 2. Настройте alembic.ini

Отредактируйте URL в alembic.ini:

sqlalchemy.url = selena://root@localhost:9030/mydatabase

Шаг 3. Определите ваши модели

Создайте пакет для ваших моделей:

mkdir -p myapp
touch myapp/__init__.py

Создайте myapp/models.py и поместите определения таблиц/представлений/материализованных представлений в пакет:

примечание

При использовании миграций Alembic не вызывайте metadata.create_all(engine) в модуле моделей.

from sqlalchemy import Column, Table
from sqlalchemy.orm import Mapped, declarative_base, mapped_column

from selena import INTEGER, STRING, VARCHAR
from selena.schema import MaterializedView, View

Base = declarative_base()


# --- ORM-таблица ---
class MyOrmTable(Base):
__tablename__ = "my_orm_table"

id: Mapped[int] = mapped_column(INTEGER, primary_key=True)
name: Mapped[str] = mapped_column(STRING)

__table_args__ = {
"comment": "table comment",
"selena_primary_key": "id",
"selena_distributed_by": "HASH(id) BUCKETS 10",
"selena_properties": {"replication_num": "1"},
}


# --- Core-таблица в тех же metadata (важно для target_metadata в Alembic) ---
my_core_table = Table(
"my_core_table",
Base.metadata,
Column("id", INTEGER, primary_key=True),
Column("name", VARCHAR(50)),
comment="core table comment",
selena_primary_key="id",
selena_distributed_by="HASH(id) BUCKETS 10",
selena_properties={"replication_num": "1"},
)


# --- Представление ---
user_view = View(
"user_view",
Base.metadata,
definition="SELECT id, name FROM my_core_table WHERE name IS NOT NULL",
columns=[
{"name": "id", "comment": "ID"},
{"name": "name", "comment": "Name"},
],
comment="Active users",
)


# --- Материализованное представление ---
user_stats_mv = MaterializedView(
"user_stats_mv",
Base.metadata,
definition="SELECT id, COUNT(*) AS cnt FROM my_core_table GROUP BY id",
selena_refresh="ASYNC",
)

Шаг 4. Настройте env.py для автогенерации

Отредактируйте alembic/env.py:

  1. Импортируйте myapp.models для установки target_metadata.
  2. Импортируйте render_column_type и include_object_for_view_mv для их установки как в run_migrations_offline(), так и в run_migrations_online(), чтобы корректно обрабатывать представления и материализованные представления, а также корректно отображать типы колонок Selena.
примечание

Вам нужно добавить или изменить эти строки в env.py, а не заменять весь сгенерированный файл env.py.

from alembic import context
from selena.alembic import render_column_type, include_object_for_view_mv
from selena.alembic.selena import SelenaImpl # noqa: F401

from myapp.models import Base

target_metadata = Base.metadata

# Опционально: установите репликацию таблицы версий для dev-clusters с одним BE
version_table_kwargs = {"selena_properties": {"replication_num": "1"}}

# В обеих функциях run_migrations_offline() и run_migrations_online() убедитесь:
def run_migrations_offline() -> None:
url = context.config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
render_item=render_column_type,
include_object=include_object_for_view_mv,
version_table_kwargs=version_table_kwargs,
)


def run_migrations_online() -> None:
# ... создание engine и подключение как в стандартном env.py из alembic ...
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
render_item=render_column_type,
include_object=include_object_for_view_mv,
version_table_kwargs=version_table_kwargs,
)

Шаг 5. Автогенерация первой ревизии

alembic revision --autogenerate -m "create initial schema"

Пауза и проверка:

  1. Проверьте сгенерированный файл миграции в alembic/versions/.
  2. Убедитесь, что он содержит ожидаемые операции (например, create_table, create_view, create_materialized_view).
  3. Убедитесь, что он не содержит неожиданных удалений или изменений.

Шаг 6. Предпросмотр SQL и применение

Предпросмотр SQL:

alembic upgrade head --sql

Пауза и проверка:

  1. Убедитесь, что DDL в ожидаемом порядке.
  2. Определите любые потенциально тяжелые операции и рассмотрите разбиение миграций при необходимости.

Применение:

alembic upgrade head
important

DDL в Selena не транзакционен для нескольких операторов. Если upgrade прерывается, вам может потребоваться проверить, что уже было применено, и выполнить ручное исправление перед повторным запуском.

Шаг 7. Внесите изменение и повторите автогенерацию

Обновите myapp/models.py:

  • Измените существующую таблицу (my_core_table): добавьте колонку или обновите комментарий таблицы, и измените одно свойство таблицы.
  • Добавьте новую таблицу (my_new_table).
примечание

Добавление колонки может быть длительным изменением схемы. Selena допускает только одну выполняющуюся задачу изменения схемы на таблицу одновременно. На практике рекомендуется разделять изменения "добавление/удаление/изменение колонок" от других тяжелых изменений (например, дополнительные добавления/удаления колонок или массовые изменения свойств) и разбивать их на несколько ревизий Alembic при необходимости.

from sqlalchemy import Column, Table
from selena import INTEGER, VARCHAR

# Изменение существующей таблицы (добавление колонки)
# (Обновите существующее определение my_core_table на месте.)
my_core_table = Table(
"my_core_table",
Base.metadata,
Column("id", INTEGER, primary_key=True),
Column("name", VARCHAR(50)),
Column("age", INTEGER), # только добавленная колонка

selena_primary_key='id',
selena_distributed_by='HASH(id) BUCKETS 10',
selena_properties={"replication_num": "1"},
)

my_new_table = Table(
"my_new_table",
Base.metadata,
Column("id", INTEGER, primary_key=True),
Column("name", VARCHAR(50)),
selena_primary_key="id",
selena_distributed_by="HASH(id) BUCKETS 10",
selena_properties={"replication_num": "1"},
)
alembic revision --autogenerate -m "add a new table, change a old table"

Пауза и проверка:

Убедитесь, что новая миграция содержит:

  • create_table(...) для my_new_table, и
  • ожидаемые операции для изменений my_core_table (например, добавление колонки / установка комментария / установка свойств).

Предпросмотр SQL и применение:

alembic upgrade head --sql
alembic upgrade head

Использование sqlacodegen

sqlacodegen может выполнить обратную генерацию моделей SQLAlchemy напрямую из Selena:

sqlacodegen --options include_dialect_options,keep_dialect_types \
--generator tables \
selena://<user>:<password>@<FE_host>:<query_port>/[catalog.]<database> > models.py

Поддерживаемые объекты:

  • Таблицы
  • Представления
  • Материализованные представления
  • Секционирование, распределение, order-by и свойства

Это полезно при подключении существующей схемы Selena к Alembic.

Вы можете напрямую использовать приведенную выше команду для генерации Python-скрипта для таблиц/представлений/материализованных представлений, определенных в разделе Сквозной пример.

примечание
  • Рекомендуется добавлять --generator tables при генерации моделей в стиле Core (ORM-генераторы могут переупорядочивать колонки согласно атрибуту NOT NULL / NULL).
  • Ключевые колонки могут быть сгенерированы как NOT NULL. Если вы хотите сделать их nullable, настройте сгенерированную модель вручную.

Ограничения и лучшие практики

  • Некоторые операции DDL в Selena требуют удаления и пересоздания таблицы; автогенерация выдаст предупреждение или ошибку, а не молча создаст недоступный SQL.
  • Изменения моделей ключей (например, изменение DUPLICATE KEY на PRIMARY KEY) не поддерживаются через ALTER TABLE; используйте явный план (обычно удаление и пересоздание с заполнением данных).
  • Selena не предоставляет транзакционный DDL для нескольких операторов; проверяйте сгенерированные миграции и применяйте их операционно. Если миграция прерывается, вам может потребоваться обработать откат вручную.
  • Для распределения, если вы опускаете часть BUCKETS, Selena может автоматически назначить количество buckets; диалект спроектирован так, чтобы избегать лишних различий в этом случае.

Резюме

С диалектом Selena SQLAlchemy и интеграцией с Alembic вы можете:

  • ✔ Использовать декларативные модели для определения схем Selena
  • ✔ Автоматически обнаруживать и генерировать скрипты миграции схем
  • ✔ Использовать контроль версий для эволюции схемы
  • ✔ Управлять представлениями и материализованными представлениями декларативно
  • ✔ Выполнять обратную инженерию существующих схем с помощью sqlacodegen

Это приводит управление схемами Selena в современную экосистему Python data engineering и значительно упрощает согласованность схем между окружениями.

Справочники

[1]: selena-python-client README

[2]: Интеграция с Alembic

[3]: Детали SQLAlchemy

[4]: Поддержка таблиц

[5]: Поддержка представлений

[6]: Поддержка материализованных представлений