Перейти к основному содержимому

Ускорение запросов к озеру данных с помощью материализованных представлений

В этой теме описывается, как оптимизировать производительность запросов в озере данных с помощью асинхронных материализованных представлений Selena.

Selena предлагает готовые возможности запросов к озеру данных, которые очень эффективны для исследовательских запросов и анализа данных в озере. В большинстве сценариев Data Cache может обеспечить кэширование файлов на уровне блоков, избегая снижения производительности, вызванного колебаниями удаленного хранилища и большим количеством операций ввода-вывода.

Однако при создании сложных и эффективных отчетов с использованием данных из озера или дальнейшем ускорении этих запросов вы все еще можете столкнуться с проблемами производительности. С помощью асинхронных материализованных представлений вы можете достичь более высокой параллельности и лучшей производительности запросов для отчетов и приложений данных в озере.

Обзор

Selena поддерживает создание асинхронных материализованных представлений на основе внешних каталогов, таких как Hive catalog, Iceberg catalog, Hudi catalog, JDBC catalog и Paimon catalog. Материализованные представления на основе внешних каталогов особенно полезны в следующих сценариях:

  • Прозрачное ускорение отчетов озера данных

    Для обеспечения производительности запросов отчетов озера данных инженеры данных обычно должны тесно сотрудничать с аналитиками данных, чтобы изучить логику построения слоя ускорения для отчетов. Если слой ускорения требует дальнейших обновлений, они должны соответственно обновить логику построения, расписания обработки и операторы запросов.

    Благодаря возможности перезаписи запросов материализованных представлений ускорение отчетов может быть сделано прозрачным и незаметным для пользователей. Когда выявляются медленные запросы, инженеры данных могут проанализировать паттерн медленных запросов и создать материализованные представления по требованию. Запросы на стороне приложения затем интеллектуально перезаписываются и прозрачно ускоряются материализованным представлением, что позволяет быстро улучшить производительность запросов без изменения логики бизнес-приложения или оператора запроса.

  • Инкрементальное вычисление данных реального времени, связанных с историческими данными

    Предположим, ваше бизнес-приложение требует связывания данных реального времени в нативных таблицах Selena и исторических данных в озере данных для инкрементальных вычислений. В этой ситуации материализованные представления могут предоставить простое решение. Например, если таблица фактов реального времени является нативной таблицей в Selena, а таблица измерений хранится в озере данных, вы можете легко выполнять инкрементальные вычисления, создавая материализованные представления, которые связывают нативную таблицу с таблицей во внешних источниках данных.

  • Быстрое построение слоев метрик

    Вычисление и обработка метрик могут столкнуться с проблемами при работе с данными высокой размерности. Вы можете использовать материализованные представления, которые позволяют выполнять предварительную агрегацию и свертку данных, для создания относительно легкого слоя метрик. Более того, материализованные представления могут обновляться автоматически, что еще больше снижает сложность вычислений метрик.

Материализованные представления, Data Cache и нативные таблицы в Selena - все это эффективные методы для достижения значительного повышения производительности запросов. В следующей таблице сравниваются их основные различия:

 Data CacheМатериализованное представлениеНативная таблица
Загрузка и обновление данныхЗапросы автоматически запускают кэширование данных.Задачи обновления запускаются автоматически.Поддерживает различные методы импорта, но требует ручного обслуживания задач импорта
Гранулярность кэширования данных
  • Поддерживает кэширование данных на уровне блоков
  • Следует механизму вытеснения кэша LRU
  • Результаты вычислений не кэшируются
Хранит предварительно вычисленные результаты запросовХранит данные на основе схемы таблицы
Производительность запросовData Cache ≤ Материализованное представление = Нативная таблица
Оператор запроса
  • Не нужно изменять операторы запросов к озеру данных
  • Как только запросы попадают в кэш, происходят вычисления.
  • Не нужно изменять операторы запросов к озеру данных
  • Использует перезапись запросов для повторного использования предварительно вычисленных результатов
Требует изменения операторов запросов для запроса к нативной таблице

По сравнению с прямыми запросами к данным озера или загрузкой данных в нативные таблицы, материализованные представления предлагают несколько уникальных преимуществ:

  • Ускорение локального хранилища: Материализованные представления могут использовать преимущества ускорения Selena с локальным хранилищем, такие как индексы, секционирование, группировка и Colocate Group, что приводит к лучшей производительности запросов по сравнению с прямыми запросами к данным из озера данных.
  • Нулевое обслуживание задач загрузки: Материализованные представления обновляют данные прозрачно через автоматические задачи обновления. Нет необходимости поддерживать задачи загрузки для выполнения запланированных обновлений данных. Кроме того, материализованные представления на основе Hive, Iceberg и Paimon catalog могут обнаруживать изменения данных и выполнять инкрементальные обновления на уровне разделов.
  • Интеллектуальная перезапись запросов: Запросы могут быть прозрачно перезаписаны для использования материализованных представлений. Вы можете мгновенно получить выгоду от ускорения без необходимости изменять операторы запросов, которые использует ваше приложение.

Поэтому мы рекомендуем использовать материализованные представления в следующих сценариях:

  • Даже когда Data Cache включен, производительность запросов не соответствует вашим требованиям по задержке запросов и параллельности.
  • Запросы включают многократно используемые компоненты, такие как фиксированные функции агрегации или паттерны соединений.
  • Данные организованы в разделы, в то время как запросы включают агрегацию на относительно высоком уровне (например, агрегация по дням).

В следующих сценариях мы рекомендуем приоритизировать ускорение через Data Cache:

  • Запросы не имеют много многократно используемых компонентов и могут сканировать любые данные из озера данных.
  • Удаленное хранилище имеет значительные колебания или нестабильность, что потенциально может повлиять на доступ.

Создание материализованных представлений на основе внешних каталогов

Создание материализованного представления на таблицах во внешних каталогах похоже на создание материализованного представления на нативных таблицах Selena. Вам нужно только установить подходящую стратегию обновления в соответствии с используемым источником данных и вручную включить перезапись запросов для материализованных представлений на основе внешних каталогов.

Выбор подходящей стратегии обновления

В настоящее время Selena не может обнаруживать изменения данных на уровне разделов в Hudi catalogs. Поэтому выполняется полное обновление после запуска задачи.

Для Hive Catalog, Iceberg Catalog (начиная с v3.1.4), JDBC catalog (начиная с v3.1.4, только для таблиц MySQL с диапазонным секционированием) и Paimon Catalog (начиная с v3.2.1), Selena поддерживает обнаружение изменений данных на уровне разделов. В результате Selena может:

  • Обновлять только разделы с изменениями данных, чтобы избежать полного обновления, снижая потребление ресурсов, вызванное обновлением.

  • Обеспечивать согласованность данных в некоторой степени во время перезаписи запросов. Если есть изменения данных в базовой таблице в озере данных, запрос не будет перезаписан для использования материализованного представления.

подсказка

Вы все еще можете выбрать терпимость к определенному уровню несогласованности данных, установив свойство mv_rewrite_staleness_second при создании материализованного представления. Для получения дополнительной информации см. CREATE MATERIALIZED VIEW.

Обратите внимание, что если вам нужно обновлять по разделам, ключи секционирования материализованного представления должны быть включены в ключи базовой таблицы.

Начиная с v3.2.3, Selena поддерживает создание секционированных материализованных представлений на таблицах Iceberg с Partition Transforms, и материализованные представления секционируются по столбцу после преобразования. В настоящее время поддерживаются только таблицы Iceberg с преобразованиями identity, year, month, day или hour.

Следующий пример показывает определение таблицы Iceberg с преобразованием раздела day и создает материализованное представление с выровненными разделами на ней:

-- Определение таблицы Iceberg.
CREATE TABLE spark_catalog.test.iceberg_sample_datetime_day (
id BIGINT,
data STRING,
category STRING,
ts TIMESTAMP)
USING iceberg
PARTITIONED BY (days(ts))

-- Создание материализованного представления на таблице Iceberg.
CREATE MATERIALIZED VIEW `test_iceberg_datetime_day_mv` (`id`, `data`, `category`, `ts`)
PARTITION BY (`ts`)
DISTRIBUTED BY HASH(`id`)
REFRESH MANUAL
AS
SELECT
`iceberg_sample_datetime_day`.`id`,
`iceberg_sample_datetime_day`.`data`,
`iceberg_sample_datetime_day`.`category`,
`iceberg_sample_datetime_day`.`ts`
FROM `iceberg`.`test`.`iceberg_sample_datetime_day`;

Для Hive catalogs вы можете включить функцию обновления кэша метаданных Hive, чтобы позволить Selena обнаруживать изменения данных на уровне разделов. Когда эта функция включена, Selena периодически обращается к Hive Metastore Service (HMS) или AWS Glue для проверки информации метаданных недавно запрошенных горячих данных.

Чтобы включить функцию обновления кэша метаданных Hive, вы можете установить следующий элемент динамической конфигурации FE, используя ADMIN SET FRONTEND CONFIG:

Элементы конфигурации

enable_background_refresh_connector_metadata

По умолчанию: true в v3.0 false в v2.5
Описание: Включать ли периодическое обновление кэша метаданных Hive. После включения Selena опрашивает metastore (Hive Metastore или AWS Glue) вашего кластера Hive и обновляет кэшированные метаданные часто используемых Hive catalogs для восприятия изменений данных. True указывает на включение обновления кэша метаданных Hive, а false указывает на отключение.

background_refresh_metadata_interval_millis

По умолчанию: 600000 (10 минут)
Описание: Интервал между двумя последовательными обновлениями кэша метаданных Hive. Единица: миллисекунда.

background_refresh_metadata_time_secs_since_last_access_secs

По умолчанию: 86400 (24 часа)
Описание: Время истечения задачи обновления кэша метаданных Hive. Для Hive catalog, к которому был осуществлен доступ, если к нему не обращались более указанного времени, Selena прекращает обновление его кэшированных метаданных. Для Hive catalog, к которому не было доступа, Selena не будет обновлять его кэшированные метаданные. Единица: секунда.

Начиная с v3.1.4, Selena поддерживает обнаружение изменений данных для Iceberg Catalog на уровне разделов. В настоящее время поддерживаются только таблицы Iceberg V1.

Включение перезаписи запросов для материализованных представлений на основе внешних каталогов

По умолчанию Selena не поддерживает перезапись запросов для материализованных представлений, построенных на Hudi и JDBC catalogs, поскольку перезапись запросов в этом сценарии не может обеспечить строгую согласованность результатов. Вы можете включить эту функцию, установив свойство force_external_table_query_rewrite в true при создании материализованного представления. Для материализованных представлений, построенных на таблицах в Hive catalogs, перезапись запросов включена по умолчанию.

Пример:

CREATE MATERIALIZED VIEW ex_mv_par_tbl
PARTITION BY emp_date
DISTRIBUTED BY hash(empid)
PROPERTIES (
"force_external_table_query_rewrite" = "true"
)
AS
select empid, deptno, emp_date
from `hive_catalog`.`emp_db`.`emps_par_tbl`
where empid < 5;

В сценариях, включающих перезапись запросов, если вы используете очень сложный оператор запроса для построения материализованного представления, мы рекомендуем разделить оператор запроса и построить несколько простых материализованных представлений вложенным образом. Вложенные материализованные представления более универсальны и могут вместить более широкий спектр паттернов запросов.

Лучшие практики

В реальных бизнес-сценариях вы можете выявить запросы с высокой задержкой выполнения и потреблением ресурсов, анализируя журналы аудита или журналы больших запросов. Вы можете дополнительно использовать query profiles для точного определения конкретных этапов, где запрос медленный. Следующие разделы предоставляют инструкции и примеры о том, как повысить производительность запросов к озеру данных с помощью материализованных представлений.

Случай первый: Ускорение вычислений соединений в озере данных

Вы можете использовать материализованные представления для ускорения запросов соединений в озере данных.

Предположим, следующие запросы к вашему Hive catalog особенно медленные:

--Q1
SELECT SUM(lo_extendedprice * lo_discount) AS REVENUE
FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates
WHERE
lo_orderdate = d_datekey
AND d_year = 1993
AND lo_discount BETWEEN 1 AND 3
AND lo_quantity < 25;

--Q2
SELECT SUM(lo_extendedprice * lo_discount) AS REVENUE
FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates
WHERE
lo_orderdate = d_datekey
AND d_yearmonth = 'Jan1994'
AND lo_discount BETWEEN 4 AND 6
AND lo_quantity BETWEEN 26 AND 35;

--Q3
SELECT SUM(lo_revenue), d_year, p_brand
FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates, hive.ssb_1g_csv.part, hive.ssb_1g_csv.supplier
WHERE
lo_orderdate = d_datekey
AND lo_partkey = p_partkey
AND lo_suppkey = s_suppkey
AND p_brand BETWEEN 'MFGR#2221' AND 'MFGR#2228'
AND s_region = 'ASIA'
GROUP BY d_year, p_brand
ORDER BY d_year, p_brand;

Анализируя их query profiles, вы можете заметить, что время выполнения запроса в основном тратится на hash join между таблицей lineorder и другими таблицами измерений по столбцу lo_orderdate.

Здесь Q1 и Q2 выполняют агрегацию после соединения lineorder и dates, в то время как Q3 выполняет агрегацию после соединения lineorder, dates, part и supplier.

Поэтому вы можете использовать возможность View Delta Join rewrite Selena для построения материализованного представления, которое соединяет lineorder, dates, part и supplier.

CREATE MATERIALIZED VIEW lineorder_flat_mv
DISTRIBUTED BY HASH(LO_ORDERDATE, LO_ORDERKEY) BUCKETS 48
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
PROPERTIES (
-- Указание уникальных ограничений.
"unique_constraints" = "
hive.ssb_1g_csv.supplier.s_suppkey;
hive.ssb_1g_csv.part.p_partkey;
hive.ssb_1g_csv.dates.d_datekey",
-- Указание внешних ключей.
"foreign_key_constraints" = "
hive.ssb_1g_csv.lineorder(lo_partkey) REFERENCES hive.ssb_1g_csv.part(p_partkey);
hive.ssb_1g_csv.lineorder(lo_suppkey) REFERENCES hive.ssb_1g_csv.supplier(s_suppkey);
hive.ssb_1g_csv.lineorder(lo_orderdate) REFERENCES hive.ssb_1g_csv.dates(d_datekey)",
-- Включение перезаписи запросов для материализованного представления на основе внешнего каталога.
"force_external_table_query_rewrite" = "TRUE"
)
AS SELECT
l.LO_ORDERDATE AS LO_ORDERDATE,
l.LO_ORDERKEY AS LO_ORDERKEY,
l.LO_PARTKEY AS LO_PARTKEY,
l.LO_SUPPKEY AS LO_SUPPKEY,
l.LO_QUANTITY AS LO_QUANTITY,
l.LO_EXTENDEDPRICE AS LO_EXTENDEDPRICE,
l.LO_DISCOUNT AS LO_DISCOUNT,
l.LO_REVENUE AS LO_REVENUE,
s.S_REGION AS S_REGION,
p.P_BRAND AS P_BRAND,
d.D_YEAR AS D_YEAR,
d.D_YEARMONTH AS D_YEARMONTH
FROM hive.ssb_1g_csv.lineorder AS l
INNER JOIN hive.ssb_1g_csv.supplier AS s ON s.S_SUPPKEY = l.LO_SUPPKEY
INNER JOIN hive.ssb_1g_csv.part AS p ON p.P_PARTKEY = l.LO_PARTKEY
INNER JOIN hive.ssb_1g_csv.dates AS d ON l.LO_ORDERDATE = d.D_DATEKEY;

Случай второй: Ускорение агрегаций и агрегаций на соединениях в озере данных

Материализованные представления могут использоваться для ускорения запросов агрегации, независимо от того, выполняются ли они на одной таблице или включают несколько таблиц.

  • Запрос агрегации одной таблицы

    Для типичных запросов к одной таблице их query profile покажет, что узел AGGREGATE потребляет значительное количество времени. Вы можете использовать общие операторы агрегации для построения материализованных представлений.

    Предположим, следующий запрос медленный:

    --Q4
    SELECT
    lo_orderdate, count(distinct lo_orderkey)
    FROM hive.ssb_1g_csv.lineorder
    GROUP BY lo_orderdate
    ORDER BY lo_orderdate limit 100;

    Q4 вычисляет ежедневное количество уникальных заказов. Поскольку вычисление count distinct может быть вычислительно дорогим, вы можете создать следующие два типа материализованных представлений для ускорения:

    CREATE MATERIALIZED VIEW mv_2_1 
    DISTRIBUTED BY HASH(lo_orderdate)
    PARTITION BY LO_ORDERDATE
    REFRESH ASYNC EVERY(INTERVAL 1 DAY)
    AS
    SELECT
    lo_orderdate, count(distinct lo_orderkey)
    FROM hive.ssb_1g_csv.lineorder
    GROUP BY lo_orderdate;

    CREATE MATERIALIZED VIEW mv_2_2
    DISTRIBUTED BY HASH(lo_orderdate)
    PARTITION BY LO_ORDERDATE
    REFRESH ASYNC EVERY(INTERVAL 1 DAY)
    AS
    SELECT
    -- lo_orderkey должен быть типа BIGINT, чтобы его можно было использовать для перезаписи запросов.
    lo_orderdate, bitmap_union(to_bitmap(lo_orderkey))
    FROM hive.ssb_1g_csv.lineorder
    GROUP BY lo_orderdate;

    Обратите внимание, что в этом контексте не создавайте материализованные представления с предложениями LIMIT и ORDER BY, чтобы избежать неудач перезаписи. Для получения дополнительной информации об ограничениях перезаписи запросов см. Query rewrite with materialized views - Limitations.

  • Запрос агрегации нескольких таблиц

    В сценариях, включающих агрегации результатов соединений, вы можете создать вложенные материализованные представления на существующих материализованных представлениях, которые соединяют таблицы для дальнейшей агрегации результатов соединений. Например, основываясь на примере в Случае первом, вы можете создать следующее материализованное представление для ускорения Q1 и Q2, поскольку их паттерны агрегации похожи:

    CREATE MATERIALIZED VIEW mv_2_3
    DISTRIBUTED BY HASH(lo_orderdate)
    PARTITION BY LO_ORDERDATE
    REFRESH ASYNC EVERY(INTERVAL 1 DAY)
    AS
    SELECT
    lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth, SUM(lo_extendedprice * lo_discount) AS REVENUE
    FROM lineorder_flat_mv
    GROUP BY lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth;

    Конечно, возможно выполнять как вычисления соединений, так и агрегации в рамках одного материализованного представления. Хотя эти типы материализованных представлений могут иметь меньше возможностей для перезаписи запросов (из-за их специфических вычислений), они обычно занимают меньше места для хранения после агрегации. Ваш выбор может основываться на вашем конкретном случае использования.

    CREATE MATERIALIZED VIEW mv_2_4
    DISTRIBUTED BY HASH(lo_orderdate)
    PARTITION BY LO_ORDERDATE
    REFRESH ASYNC EVERY(INTERVAL 1 DAY)
    PROPERTIES (
    "force_external_table_query_rewrite" = "TRUE"
    )
    AS
    SELECT lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth, SUM(lo_extendedprice * lo_discount) AS REVENUE
    FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates
    WHERE lo_orderdate = d_datekey
    GROUP BY lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth;

Случай третий: Ускорение соединений на агрегациях в озере данных

В некоторых сценариях может потребоваться сначала выполнить вычисления агрегации на одной таблице, а затем выполнить запросы соединений с другими таблицами. Чтобы полностью использовать возможности перезаписи запросов Selena, мы рекомендуем построить вложенные материализованные представления. Например:

--Q5
SELECT * FROM (
SELECT
l.lo_orderkey, l.lo_orderdate, c.c_custkey, c_region, sum(l.lo_revenue)
FROM
hive.ssb_1g_csv.lineorder l
INNER JOIN (
SELECT distinct c_custkey, c_region
from
hive.ssb_1g_csv.customer
WHERE
c_region IN ('ASIA', 'AMERICA')
) c ON l.lo_custkey = c.c_custkey
GROUP BY l.lo_orderkey, l.lo_orderdate, c.c_custkey, c_region
) c1
WHERE
lo_orderdate = '19970503';

Q5 сначала выполняет запрос агрегации к таблице customer, а затем выполняет соединение и агрегацию с таблицей lineorder. Похожие запросы могут включать различные фильтры по c_region и lo_orderdate. Чтобы использовать возможности перезаписи запросов, вы можете создать два материализованных представления: одно для агрегации, а другое для соединения.

--mv_3_1
CREATE MATERIALIZED VIEW mv_3_1
DISTRIBUTED BY HASH(c_custkey)
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
PROPERTIES (
"force_external_table_query_rewrite" = "TRUE"
)
AS
SELECT distinct c_custkey, c_region from hive.ssb_1g_csv.customer;



--mv_3_2
CREATE MATERIALIZED VIEW mv_3_2
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
PROPERTIES (
"force_external_table_query_rewrite" = "TRUE"
)
AS
SELECT l.lo_orderdate, l.lo_orderkey, mv.c_custkey, mv.c_region, sum(l.lo_revenue)
FROM hive.ssb_1g_csv.lineorder l
INNER JOIN mv_3_1 mv
ON l.lo_custkey = mv.c_custkey
GROUP BY l.lo_orderkey, l.lo_orderdate, mv.c_custkey, mv.c_region;

Случай четвертый: Разделение горячих и холодных данных для данных реального времени и исторических данных в озере данных

Рассмотрим следующий сценарий: данные, обновленные в течение последних трех дней, напрямую записываются в Selena, в то время как менее свежие данные проверяются и пакетно записываются в Hive. Однако все еще есть запросы, которые могут включать данные за последние семь дней. В этом случае вы можете создать простую модель с материализованными представлениями для автоматического истечения данных.

CREATE MATERIALIZED VIEW mv_4_1 
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
AS
SELECT lo_orderkey, lo_orderdate, lo_revenue
FROM hive.ssb_1g_csv.lineorder
WHERE lo_orderdate<=current_date()
AND lo_orderdate>=date_add(current_date(), INTERVAL -4 DAY);

Вы можете дополнительно построить представления или материализованные представления на его основе в соответствии с логикой приложений верхнего уровня.