Ускорение запросов к озеру данных с помощью материализованных предста влений
Этот раздел описывает, как оптимизировать производительность запросов к вашему озеру данных с помощью асинхронных материализованных представлений Selena.
Selena предлагает готовые возможности запросов к озеру данных, которые весьма эффективны для разведочных запросов и анализа данных в озере. В большинстве сценариев Data Cache может обеспечить кэширование файлов на уровне блоков, избегая снижения производительности, вызванного колебаниями удаленного хранилища и большим количеством операций I/O.
Однако, когда дело доходит до построения сложных и эффективных отчетов с использованием данных из озера или дальнейшего ускорения этих запросов, вы все еще можете столкнуться с проблемами производительности. С помощью асинхронных материализованных представлений вы можете достичь более высокой одновременности и лучшей производительности запросов для отчетов и приложений данных в озере.
Обзор
Selena поддерживает построение асинхронных материализованных представлений на основе внешних каталогов, таких как Hive catalog, Iceberg catalog, Hudi catalog, JDBC catalog и Paimon catalog. Материализованные представления на основе внешних каталогов особенно полезны в следующих сценариях:
-
Прозрачное ускорение отчетов озера данных
Чтобы обеспечить производительность запросов отчетов озера данных, инженеры данных обычно должны тесно работать с аналитиками данных, чтобы изучить логику построения слоя ускорения для отчетов. Если слой ускорения требует дальнейших обновлений, они должны соответствующим образом обновить логику построения, расписания обработки и операторы запросов.
Благодаря возможности перезаписи запросов материализованных представлений, ускорение отчетов может быть сделано прозрачным и незаметным для пользователей. Когда обнаруживаются медленные запросы, инженеры данных могут проанализировать паттерн медленных запросов и создать материализованные представления по требованию. Запросы на стороне приложения затем интеллектуально переписываются и прозрачно ускоряются материализованным представлением, что позволяет быст ро улучшить производительность запросов без изменения логики бизнес-приложения или оператора запроса.
-
Инкрементальное вычисление real-time данных, связанных с историческими данными
Предположим, что ваше бизнес-приложение требует связывания real-time данных в нативных таблицах Selena и исторических данных в озере данных для инкрементальных вычислений. В этой ситуации материализованные представления могут предоставить простое решение. Например, если таблица фактов real-time является нативной таблицей в Selena, а таблица измерений хранится в озере данных, вы можете легко выполнять инкрементальные вычисления, создав материализованные представления, которые связывают нативную таблицу с таблицей во внешних источниках данных.
-
Быстрое построение слоев метрик
Вычисление и обработка метрик могут столкнуться с трудностями при работе с данными высокой размерности. Вы можете использовать материализованные представления, которые позволяют выполнять предварительную агрегацию данных и rollup, чтобы создать относительно легкий слой метрик. Более того, материализованные представления могут обновляться автоматически, что дополнительно снижает сложность вычисления метрик.
Материализованные представления, Data Cache и нативные таблицы в Selena — все это эффективные методы достижения значительного повышения производительности запросов. В следующей таблице сравниваются их основные различия:
| Data Cache | Материализованное представление | Нативная таблица | |
|---|---|---|---|
| Загрузка и обновление данных | Запросы автоматически запускают кэширование данных. | Задачи обновления запускаются автоматически. | Поддерживает различные методы импорта, но требует ручного обслуживания задач импорта |
| Гранулярность кэширования данных |
| Хранит предварительно вычисленные результаты запросов | Хранит данные на основе схемы таблицы |
| Производительность запросов | Data Cache ≤ Материализованное представление = Нативная таблица | ||
| Оператор запроса |
|
| Требует изменения операторов запросов для запроса к нативной таблице |
По сравнению с прямым запросом данных из озера или загрузкой данных в нативные таблицы, материализованные представления предлагают несколько уникальных преимуществ:
- Ускорение с локальным хранилищем: Материализованные представления могут использовать преимущества ускорения Selena с локальным хранилищем, такие как индексы, partitioning, bucketing и collocate groups, что приводит к лучшей производительности запросов по сравнению с прямым запросом данных из озера данных.
- Нулевое обслуживание задач загрузки: Материализованные представления обновляют данные прозрачно через автоматические задачи обновления. Нет необходимости обслуживать задачи загрузки для выполнения запланированных обновле ний данных. Кроме того, материализованные представления на основе Hive, Iceberg и Paimon catalog могут обнаруживать изменения данных и выполнять инкрементальные обновления на уровне partition.
- Интеллектуальная перезапись запросов: Запросы могут быть прозрачно переписаны для использования материализованных представлений. Вы можете получить выгоду от ускорения мгновенно, без необходимости изменять операторы запросов, используемые вашим приложением.
Поэтому мы рекомендуем использовать материализованные представления в следующих сценариях:
- Даже когда включен Data Cache, производительность запросов не соответствует вашим требованиям к задержке запроса и одновременности.
- Запросы включают многократно используемые компоненты, такие как фиксированные агрегатные функции или паттерны join.
- Данные организованы в partitions, в то время как запросы включают агрегацию на относительно высоком уровне (например, агрегация по дням).
В следующих сценариях мы рекомендуем приоритизировать ускорение через Data Cache:
- Запросы не имеют много многократно используемых компонентов и могут сканировать любые данные из озера данных.
- Удаленное хранилище имеет значительные колебания или нестабильность, что потенциально может повлиять на доступ.
Создание материализованных представлений на основе внешних каталогов
Создание материализованного представления для таблиц во внешних каталогах аналогично созданию материализованного представления для нативных таблиц Selena. Вам нужно только установить подходящую стратегию обновления в соответствии с используемым источником данных и вручную включить перезапись запросов для материализованных представлений на основе внешних каталогов.
Материализованные представления внешних таблиц не поддерживают автоматическое обновление запускаемое изменениями данных базовой таблицы. Они поддерживают только асинхронное обновление с фиксированным интервалом и ручное обновление.
Выбор подходящей стратегии обновления
В настоящее время Selena не может обнаруживать изменения данных на уровне partition в Hudi catalogs. Поэтому выполняется полное обновление после запуска задачи.
Для Hive Catalog, Iceberg Catalog (начиная с v1.5.2), JDBC catalog (начиная с v1.5.2, только для MySQL таблиц с range-partitioned) и Paimon Catalog (начиная с v1.5.2), Selena поддерживает обнаружение изменений данных на уровне partition. В результате Selena может:
-
Обновлять только partitions с изменениями данных, чтобы избежать полного обновления, снижая потребление ресурсов, вызванное обновлением.
-
Обеспечивать согласованность данных в некоторой степени во время перезаписи запросов. Если есть изменения данных в базовой таблице в озере данных, запрос не будет переписан для использования материализованного представления.
Вы все еще можете выбрать допустить определенный уровень несогласованности данных, установив свойство mv_rewrite_staleness_second при создании материализованного представления. Для получения дополнительной информации см. CREATE MATERIALIZED VIEW.
Обратите внимание, что если вам нужно обновлять по partition, ключи partitioning материализованного представления должны быть включены в ключи базовой таблицы.
Начиная с v1.5.2, Selena поддерживает создание partitioned материализованных представлений для Iceberg таблиц с Partition Transforms, и материализованные представления partitioned по столбцу после трансформации. В настоящее время поддерживаются только Iceberg таблицы с трансформациями identity, year, month, day или hour.
Следующий пример показывает определение Iceberg таблицы с трансформацией partition day и создает материализованное представление с выровненными partitions на ее основе:
-- Определение Iceberg таблицы.
CREATE TABLE spark_catalog.test.iceberg_sample_datetime_day (
id BIGINT,
data STRING,
category STRING,
ts TIMESTAMP)
USING iceberg
PARTITIONED BY (days(ts))
-- Создание материализованного представления для Iceberg таблицы.
CREATE MATERIALIZED VIEW `test_iceberg_datetime_day_mv` (`id`, `data`, `category`, `ts`)
PARTITION BY (`ts`)
DISTRIBUTED BY HASH(`id`)
REFRESH MANUAL
AS
SELECT
`iceberg_sample_datetime_day`.`id`,
`iceberg_sample_datetime_day`.`data`,
`iceberg_sample_datetime_day`.`category`,
`iceberg_sample_datetime_day`.`ts`
FROM `iceberg`.`test`.`iceberg_sample_datetime_day`;
Для Hive catalogs вы можете включить функцию обновления кэша метаданных Hive, чтобы позволить Selena обнаруживать изменения данных на уровне partition. Когда эта функция включена, Selena периодически обращается к Hive Metastore Service (HMS) или AWS Glue для проверки информации метаданных о недавно запрошенных горячих данных.
Чтобы включить функцию обновления кэша метаданных Hive, вы можете установить следующий элемент динамической конфигурации FE, используя ADMIN SET FRONTEND CONFIG:
Элементы конфигурации
enable_background_refresh_connector_metadata
По умолчанию: true в v1.5.2, false в v1.5.2
Описание: Включать ли периодическое обновление кэша метаданных Hive. После включения Selena опрашивает metastore (Hive Metastore или AWS Glue) вашего Hive cluster и обновляет кэшированные метаданные часто запрашиваемых Hive catalogs для восприятия изменений данных. True указывает на включение обновления кэша метаданных Hive, а false указывает на отключение.
background_refresh_metadata_interval_millis
По умолчанию: 600000 (10 минут)
Описание: Интервал между двумя последовательными обновлениями кэша метаданных Hive. Единица измерения: миллисекунды.
background_refresh_metadata_time_secs_since_last_access_secs
По умолчанию: 86400 (24 часа)
Описание: Время истечения задачи обновления кэша метаданных Hive. Для Hive catalog, к которому был осуществлен доступ, если к нему не было доступа более указанного времени, Selena прекращает обновление его кэшированных метаданных. Для Hive catalog, к которому не было доступа, Selena не будет обновлять его кэшированные метаданные. Единица измерения: секунды.
Начиная с v1.5.2, Selena поддерживает обнаружение изменений данных для Iceberg Catalog на уровне partition. В настоящее время поддерживаются только Iceberg V1 таблицы.
Включение перезаписи запросов для материализованных представлений на основе внешних каталогов
По умолчанию Selena не поддерживает перезапись запросов для материализованных представлений, построенных на Hudi и JDBC catalogs, потому что перезапись запросов в этом сценарии не может обеспечить строгую согласованность результатов. Вы можете включить эту функцию, установив свойство force_external_table_query_rewrite в true при создании материализованного представления. Для материализованных представлений, построенных на таблицах в Hive catalogs, перезапись запросов включена по умолчанию.
Пример:
CREATE MATERIALIZED VIEW ex_mv_par_tbl
PARTITION BY emp_date
DISTRIBUTED BY hash(empid)
PROPERTIES (
"force_external_table_query_rewrite" = "true"
)
AS
select empid, deptno, emp_date
from `hive_catalog`.`emp_db`.`emps_par_tbl`
where empid < 5;
В сценариях, включающих перезапись запросов, если вы используете очень сложный оператор запроса для построения материализованного представления, мы рекомендуем разделить оператор запроса и построить несколько простых материализованных представлений вложенным образом. Вложенные материализованные представления более универсальны и могут охватывать более широкий спектр паттернов запросов.
Лучшие практики
В реальных бизнес-сценариях вы можете идентифицировать запросы с высокой задержкой выполнения и потреблением ресурсов, анализируя журналы аудита или журналы больших запросов. Вы можете дополнительно использовать query profiles для точного определения конкретных этапов, где запрос медленный. Следующие разделы предоставляют инструкции и примеры о том, как п овысить производительность запросов озера данных с помощью материализованных представлений.
Случай первый: Ускорение вычислений join в озере данных
Вы можете использовать материализованные представления для ускорения запросов join в озере данных.
Предположим, что следующие запросы к вашему Hive catalog особенно медленные:
--Q1
SELECT SUM(lo_extendedprice * lo_discount) AS REVENUE
FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates
WHERE
lo_orderdate = d_datekey
AND d_year = 1993
AND lo_discount BETWEEN 1 AND 3
AND lo_quantity < 25;
--Q2
SELECT SUM(lo_extendedprice * lo_discount) AS REVENUE
FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates
WHERE
lo_orderdate = d_datekey
AND d_yearmonth = 'Jan1994'
AND lo_discount BETWEEN 4 AND 6
AND lo_quantity BETWEEN 26 AND 35;
--Q3
SELECT SUM(lo_revenue), d_year, p_brand
FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates, hive.ssb_1g_csv.part, hive.ssb_1g_csv.supplier
WHERE
lo_orderdate = d_datekey
AND lo_partkey = p_partkey
AND lo_suppkey = s_suppkey
AND p_brand BETWEEN 'MFGR#2221' AND 'MFGR#2228'
AND s_region = 'ASIA'
GROUP BY d_year, p_brand
ORDER BY d_year, p_brand;
Анализируя их query profiles, вы можете заметить, что время выполнения запроса в основном тратится на hash join между таблицей lineorder и другими таблицами измерений по столбцу lo_orderdate.
Здесь Q1 и Q2 выполняют агрегацию после join lineorder и dates, в то время как Q3 выполняет агрегацию после join lineorder, dates, part и supplier.
Следовательно, вы можете использовать возможность View Delta Join rewrite Selena для построения материализованного представления, которое объединяет lineorder, dates, part и supplier.
CREATE MATERIALIZED VIEW lineorder_flat_mv
DISTRIBUTED BY HASH(LO_ORDERDATE, LO_ORDERKEY) BUCKETS 48
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
PROPERTIES (
-- Указание уникальных ограничений.
"unique_constraints" = "
hive.ssb_1g_csv.supplier.s_suppkey;
hive.ssb_1g_csv.part.p_partkey;
hive.ssb_1g_csv.dates.d_datekey",
-- Указание Foreign Keys.
"foreign_key_constraints" = "
hive.ssb_1g_csv.lineorder(lo_partkey) REFERENCES hive.ssb_1g_csv.part(p_partkey);
hive.ssb_1g_csv.lineorder(lo_suppkey) REFERENCES hive.ssb_1g_csv.supplier(s_suppkey);
hive.ssb_1g_csv.lineorder(lo_orderdate) REFERENCES hive.ssb_1g_csv.dates(d_datekey)",
-- Включение перезаписи запросов для материализованного представления на основе внешнего каталога.
"force_external_table_query_rewrite" = "TRUE"
)
AS SELECT
l.LO_ORDERDATE AS LO_ORDERDATE,
l.LO_ORDERKEY AS LO_ORDERKEY,
l.LO_PARTKEY AS LO_PARTKEY,
l.LO_SUPPKEY AS LO_SUPPKEY,
l.LO_QUANTITY AS LO_QUANTITY,
l.LO_EXTENDEDPRICE AS LO_EXTENDEDPRICE,
l.LO_DISCOUNT AS LO_DISCOUNT,
l.LO_REVENUE AS LO_REVENUE,
s.S_REGION AS S_REGION,
p.P_BRAND AS P_BRAND,
d.D_YEAR AS D_YEAR,
d.D_YEARMONTH AS D_YEARMONTH
FROM hive.ssb_1g_csv.lineorder AS l
INNER JOIN hive.ssb_1g_csv.supplier AS s ON s.S_SUPPKEY = l.LO_SUPPKEY
INNER JOIN hive.ssb_1g_csv.part AS p ON p.P_PARTKEY = l.LO_PARTKEY
INNER JOIN hive.ssb_1g_csv.dates AS d ON l.LO_ORDERDATE = d.D_DATEKEY;
Случай второй: Ускорение агрегаций и агрегаций на joins в озере данных
Материализованные представления могут использоваться для ускорения агрегатных запросов, независимо от того, выполняются ли они для одной таблицы или включают несколько таблиц.
-
Агрегатный запрос для одной таблицы
Для типичных запросов к одной таблице их query profile покажет, что узел AGGREGATE потребляет значительное количество времени. Вы можете использовать общие агрегатные операторы для построения материализованных представлений.
Предположим, что следующий запрос медленный:
--Q4
SELECT
lo_orderdate, count(distinct lo_orderkey)
FROM hive.ssb_1g_csv.lineorder
GROUP BY lo_orderdate
ORDER BY lo_orderdate limit 100;Q4 вычисляет ежедневное количество уникальных заказов. Поскольку вычисление count distinct может быть вычислительно затратным, вы можете создать следующие два типа материализованных представлений для ускорения:
CREATE MATERIALIZED VIEW mv_2_1
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
AS
SELECT
lo_orderdate, count(distinct lo_orderkey)
FROM hive.ssb_1g_csv.lineorder
GROUP BY lo_orderdate;
CREATE MATERIALIZED VIEW mv_2_2
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
AS
SELECT
-- lo_orderkey должен быть типа BIGINT, чтобы его можно было использовать для перезаписи запросов.
lo_orderdate, bitmap_union(to_bitmap(lo_orderkey))
FROM hive.ssb_1g_csv.lineorder
GROUP BY lo_orderdate;Обратите внимание, что в этом контексте не создавайте материализованные представления с операторами LIMIT и ORDER BY, чтобы избежать неудач перезаписи. Для получения дополнительной информации об ограничениях перезаписи запросов см. Query rewrite with materialized views - Limitations.
-
Многотабличный агрегатный запрос
В сценариях, включающих агрегации результатов join, вы можете создать вложенные материализованные представления на основе существующих материализованных представлений, которые объединяют таблицы для дальнейшей агрегации результатов join. Например, на основе примера из Случая первого, вы можете создать следующее материализованное представление для ускорения Q1 и Q2, потому что их паттерны агрегации похожи:
CREATE MATERIALIZED VIEW mv_2_3
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
AS
SELECT
lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth, SUM(lo_extendedprice * lo_discount) AS REVENUE
FROM lineorder_flat_mv
GROUP BY lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth;Конечно, возможно выполнять как вычисления join, так и агрегации в рамках одного материализованного представления. Хотя эти типы материализованных представлений могут иметь меньше возможностей для перезаписи запросов (из-за их специфических вычислений), они обычно занимают меньше места для хранения после агрегации. Ваш выбор может быть основан на вашем конкретном случае использования.
CREATE MATERIALIZED VIEW mv_2_4
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
PROPERTIES (
"force_external_table_query_rewrite" = "TRUE"
)
AS
SELECT lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth, SUM(lo_extendedprice * lo_discount) AS REVENUE
FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates
WHERE lo_orderdate = d_datekey
GROUP BY lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth;
Случай третий: Ускорение joins на агрегациях в озере данных
В некоторых сценариях может быть необходимо сначала выполнить вычисления агрегации для одной таблицы, а затем выполнить запросы join с другими таблицами. Чтобы полностью использовать возможности перезаписи запросов Selena, мы рекомендуем построить вложенные материализова нные представления. Например:
--Q5
SELECT * FROM (
SELECT
l.lo_orderkey, l.lo_orderdate, c.c_custkey, c_region, sum(l.lo_revenue)
FROM
hive.ssb_1g_csv.lineorder l
INNER JOIN (
SELECT distinct c_custkey, c_region
from
hive.ssb_1g_csv.customer
WHERE
c_region IN ('ASIA', 'AMERICA')
) c ON l.lo_custkey = c.c_custkey
GROUP BY l.lo_orderkey, l.lo_orderdate, c.c_custkey, c_region
) c1
WHERE
lo_orderdate = '19970503';
Q5 сначала выполняет агрегатный запрос для таблицы customer, а затем выполняет join и агрегацию с таблицей lineorder. Похожие запросы могут включать различные фильтры по c_region и lo_orderdate. Чтобы использовать возможности перезаписи запросов, вы можете создать два материализованных представления, одно для агрегации, а другое для join.
--mv_3_1
CREATE MATERIALIZED VIEW mv_3_1
DISTRIBUTED BY HASH(c_custkey)
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
PROPERTIES (
"force_external_table_query_rewrite" = "TRUE"
)
AS
SELECT distinct c_custkey, c_region from hive.ssb_1g_csv.customer;
--mv_3_2
CREATE MATERIALIZED VIEW mv_3_2
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
PROPERTIES (
"force_external_table_query_rewrite" = "TRUE"
)
AS
SELECT l.lo_orderdate, l.lo_orderkey, mv.c_custkey, mv.c_region, sum(l.lo_revenue)
FROM hive.ssb_1g_csv.lineorder l
INNER JOIN mv_3_1 mv
ON l.lo_custkey = mv.c_custkey
GROUP BY l.lo_orderkey, l.lo_orderdate, mv.c_custkey, mv.c_region;
Случай четвертый: Разделение горячих и холодных данных для real-time данных и исторических данных в озере данных
Рассмотрим следующий сценарий: данные, обновленные за последние три дня, записываются напрямую в Selena, в то время как менее свежие данные проверяются и пакетно записываются в Hive. Однако все еще есть запросы, которые могут включать данные за последние семь дней. В этом случае вы можете создать простую модель с материализованными представлениями для автоматического истечения данных.
CREATE MATERIALIZED VIEW mv_4_1
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
AS
SELECT lo_orderkey, lo_orderdate, lo_revenue
FROM hive.ssb_1g_csv.lineorder
WHERE lo_orderdate<=current_date()
AND lo_orderdate>=date_add(current_date(), INTERVAL -4 DAY);
Вы можете дополнительно построить представления или материализованные представления на его основе в соответствии с логикой приложений верхнего уровня.