Перейти к основному содержимому
Версия: 2.0.x

Colocate Join

Для shuffle join и broadcast join, если условие соединения выполняется, строки данных из двух соединяемых таблиц объединяются на одном узле для завершения соединения. Ни один из этих двух методов соединения не может избежать задержки или накладных расходов, вызванных сетевой передачей данных между узлами.

Основная идея заключается в том, чтобы сохранить ключ бакетинга, количество копий и размещение копий согласованными для таблиц в одной группе Colocation Group. Если столбец соединения является ключом бакетинга, вычислительному узлу нужно выполнить только локальное соединение без получения данных с других узлов. Colocate Join поддерживает equi joins.

Этот документ знакомит с принципом, реализацией, использованием и соображениями по использованию Colocate Join.

Терминология

  • Colocation Group (CG): CG будет содержать одну или несколько таблиц. Таблицы в CG имеют одинаковый бакетинг и размещение реплик, и описываются с помощью Colocation Group Schema.
  • Colocation Group Schema (CGS): CGS содержит ключ бакетинга, количество buckets и количество реплик CG.

Принцип

Colocate Join заключается в формировании CG с набором таблиц, имеющих одинаковый CGS, и обеспечении того, чтобы соответствующие копии bucket этих таблиц находились на одном и том же наборе узлов BE. Когда таблицы в CG выполняют операции Join по столбцам бакетинга, локальные данные могут быть соединены напрямую, что экономит время на передачу данных между узлами.

Bucket Seq получается путём hash(key) mod buckets. Предположим, таблица имеет 8 buckets, тогда есть [0, 1, 2, 3, 4, 5, 6, 7] 8 buckets, и каждый Bucket имеет один или несколько подтаблиц, количество подтаблиц зависит от количества партиций. Если это мультипартиционная таблица, будет несколько tablets.

Чтобы иметь одинаковое распределение данных, таблицы в пределах одной CG должны соответствовать следующему.

  1. Таблицы в пределах одной CG должны иметь идентичный ключ бакетинга (тип, количество, порядок) и одинаковое количество buckets, чтобы фрагменты данных нескольких таблиц могли распределяться и контролироваться один за другим. Ключ бакетинга — это столбцы, указанные в операторе создания таблицы DISTRIBUTED BY HASH(col1, col2, ...). Ключ бакетинга определяет, какие столбцы данных хешируются в разные Bucket Seqs. Имя ключа бакетинга может отличаться для таблиц в пределах одной CG. Столбцы бакетинга могут отличаться в операторе создания, но порядок соответствующих типов данных в DISTRIBUTED BY HASH(col1, col2, ...) должен быть точно таким же.
  2. Таблицы в пределах одной CG могут иметь разное количество партиций и разные ключи партиционирования.

При создании таблицы CG указывается атрибутом "colocate_with" = "group_name" в PROPERTIES таблицы. Если CG не существует, это означает, что таблица является первой таблицей CG и называется Parent Table. Распределение данных Parent Table (тип, количество и порядок ключей разделения bucket, количество копий и количество разделённых buckets) определяет CGS. Если CG существует, проверяется, согласуется ли распределение данных таблицы с CGS.

Размещение копий таблиц в пределах одной CG удовлетворяет:

  1. Сопоставление между Bucket Seq и узлами BE всех таблиц такое же, как у Parent Table.
  2. Сопоставление между Bucket Seq и узлами BE всех партиций в Parent Table такое же, как у первой Partition.
  3. Сопоставление между Bucket Seq и узлами BE первой Partition Parent Table определяется с использованием нативного алгоритма Round Robin.

Согласованное распределение данных и сопоставление гарантируют, что строки данных с одинаковым значением, взятым ключом бакетинга, попадают на один и тот же BE. Поэтому при использовании ключа бакетинга для соединения столбцов требуются только локальные соединения.

Использование

Создание таблицы

При создании таблицы вы можете указать атрибут "colocate_with" = "group_name" в PROPERTIES, чтобы указать, что таблица является таблицей Colocate Join и принадлежит указанной Colocation Group.

ПРИМЕЧАНИЕ

Начиная с версии 2.5.4, Colocate Join может выполняться для таблиц из разных баз данных. Вам нужно только указать одно и то же свойство colocate_with при создании таблиц.

Например:

CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
"colocate_with" = "group1"
);

Если указанная Group не существует, Selena автоматически создаёт Group, содержащую только текущую таблицу. Если Group существует, Selena проверяет, соответствует ли текущая таблица Colocation Group Schema. Если да, она создаёт таблицу и добавляет её в Group. В то же время таблица создаёт партицию и tablet на основе правил распределения данных существующей Group.

Colocation Group принадлежит базе данных. Имя Colocation Group уникально в пределах базы данных. Во внутреннем хранилище полное имя Colocation Group — это dbId_groupName, но вы воспринимаете только groupName.

ПРИМЕЧАНИЕ

Если вы указываете одну и ту же Colocation Group для связывания таблиц из разных баз данных для Colocate Join, Colocation Group существует в каждой из этих баз данных. Вы можете выполнить show proc "/colocation_group", чтобы проверить Colocation Groups в разных базах данных.

Удаление

Полное удаление — это удаление из Корзины. Обычно после удаления таблицы командой DROP TABLE по умолчанию она остаётся в корзине в течение дня, прежде чем быть удалённой. Когда последняя таблица в Group полностью удалена, Group также будет удалена автоматически.

Просмотр информации о группе

Следующая команда позволяет вам просмотреть информацию о Group, которая уже существует в кластере.

SHOW PROC '/colocation_group';

+-------------+--------------+--------------+------------+----------------+----------+----------+
| GroupId | GroupName | TableIds | BucketsNum | ReplicationNum | DistCols | IsStable |
+-------------+--------------+--------------+------------+----------------+----------+----------+
| 10005.10008 | 10005_group1 | 10007, 10040 | 10 | 3 | int(11) | true |
+-------------+--------------+--------------+------------+----------------+----------+----------+
  • GroupId: уникальный для всего кластера идентификатор Group, первая половина — это db id, вторая половина — это group id.
  • GroupName: полное имя Group.
  • TabletIds: список id таблиц в Group.
  • BucketsNum: количество buckets.
  • ReplicationNum: количество реплик.
  • DistCols: столбцы распределения, т.е. тип столбца бакетинга.
  • IsStable: стабильна ли Group (определение стабильности см. в разделе "Балансировка и восстановление реплик Colocation").

Вы можете дополнительно просмотреть распределение данных Group с помощью следующей команды.

SHOW PROC '/colocation_group/10005.10008';

+-------------+---------------------+
| BucketIndex | BackendIds |
+-------------+---------------------+
| 0 | 10004, 10002, 10001 |
| 1 | 10003, 10002, 10004 |
| 2 | 10002, 10004, 10001 |
| 3 | 10003, 10002, 10004 |
| 4 | 10002, 10004, 10003 |
| 5 | 10003, 10002, 10001 |
| 6 | 10003, 10004, 10001 |
| 7 | 10003, 10004, 10002 |
+-------------+---------------------+
  • BucketIndex: индекс последовательности buckets.
  • BackendIds: идентификаторы узлов BE, где расположены фрагменты данных бакетинга.

Примечание: Вышеуказанная команда требует привилегии NODE или роли cluster_admin. Обычные пользователи не могут получить к ней доступ.

Изменение свойств Group таблицы

Вы можете изменить свойства Colocation Group таблицы. Например:

ALTER TABLE tbl SET ("colocate_with" = "group2");

Если таблица ранее не была назначена Group, команда проверит Schema и добавит таблицу в Group (Group будет создана сначала, если она не существует). Если таблица была ранее назначена другой Group, команда удалит таблицу из исходной Group и добавит её в новую Group (Group будет создана сначала, если она не существует).

Вы также можете удалить свойства Colocation таблицы с помощью следующей команды.

ALTER TABLE tbl SET ("colocate_with" = "");

Другие связанные операции

При добавлении партиции с помощью ADD PARTITION или изменении количества копий в таблице с атрибутом Colocation, Selena проверяет, будет ли операция нарушать Colocation Group Schema, и отклоняет её в случае нарушения.

Балансировка и восстановление реплик Colocation

Распределение реплик таблицы Colocation должно следовать правилам распределения, указанным в схеме Group, поэтому оно отличается от обычного шардинга с точки зрения восстановления и балансировки реплик.

Сама Group имеет свойство stable. Когда stable равно true, это означает, что изменения в фрагментах таблиц в Group не производятся, и функция Colocation работает правильно. Когда stable равно false, это означает, что некоторые фрагменты таблиц в текущей Group восстанавливаются или мигрируются, и Colocate Join затронутых таблиц деградирует до обычного Join.

Восстановление реплик

Реплики могут храниться только на указанном узле BE. Selena будет искать наименее загруженный BE для замены недоступного BE (например, down, decommission). После замены все фрагменты данных бакетинга на старом BE восстанавливаются. Во время миграции Group отмечается как Unstable.

Балансировка реплик

Selena пытается распределить фрагменты таблиц Colocation равномерно по всем узлам BE. Балансировка для обычных таблиц находится на уровне реплики, то есть каждая реплика индивидуально находит узел BE с меньшей нагрузкой. Балансировка для таблиц Colocation находится на уровне Bucket, то есть все реплики в пределах Bucket мигрируют вместе. Мы используем простой алгоритм балансировки, который распределяет BucketsSequnce равномерно по всем узлам BE, не учитывая фактический размер реплик, а только количество реплик. Точный алгоритм можно найти в комментариях к коду в ColocateTableBalancer.java.

Примечание 1: Текущий алгоритм балансировки и восстановления реплик Colocation может не работать хорошо для кластеров Selena с гетерогенным развёртыванием. Так называемое гетерогенное развёртывание означает, что ёмкость диска, количество дисков и тип диска (SSD и HDD) узлов BE не согласуются. В случае гетерогенного развёртывания может случиться так, что узел BE малой ёмкости хранит такое же количество реплик, как узел BE большой ёмкости.

Примечание 2: Когда Group находится в состоянии Unstable, Join её таблиц деградирует до обычного Join, что может значительно снизить производительность запросов кластера. Если вы не хотите, чтобы система автоматически балансировалась, установите конфигурацию FE disable_colocate_balance, чтобы отключить автоматическую балансировку, и включите её обратно в подходящее время. (Подробнее см. раздел "Продвинутые операции" (#Advanced Operations))

Запрос

Таблица Colocation запрашивается так же, как обычная таблица. Если Group, в которой находится таблица Colocation, находится в состоянии Unstable, она автоматически деградирует до обычного Join, как показано в следующем примере.

Таблица 1:

CREATE TABLE `tbl1` (
`k1` date NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` int(11) SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
PARTITION BY RANGE(`k1`)
(
PARTITION p1 VALUES LESS THAN ('2019-05-31'),
PARTITION p2 VALUES LESS THAN ('2019-06-30')
)
DISTRIBUTED BY HASH(`k2`) BUCKETS 6
PROPERTIES (
"colocate_with" = "group1"
);
INSERT INTO tbl1
VALUES
("2015-09-12",1000,1),
("2015-09-13",2000,2);

Таблица 2:

CREATE TABLE `tbl2` (
`k1` datetime NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` double SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
DISTRIBUTED BY HASH(`k2`) BUCKETS 6
PROPERTIES (
"colocate_with" = "group1"
);
INSERT INTO tbl2
VALUES
("2015-09-12 00:00:00",3000,3),
("2015-09-12 00:00:00",4000,4);

Просмотр плана запроса:

EXPLAIN SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);
+-------------------------------------------------------------------------+
| Explain String |
+-------------------------------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:1: k1 | 2: k2 | 3: v1 | 4: k1 | 5: k2 | 6: v1 |
| PARTITION: UNPARTITIONED |
| |
| RESULT SINK |
| |
| 3:EXCHANGE |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 03 |
| UNPARTITIONED |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN (COLOCATE) |
| | colocate: true |
| | equal join conjunct: 5: k2 = 2: k2 |
| | |
| |----1:OlapScanNode |
| | TABLE: tbl1 |
| | PREAGGREGATION: OFF. Reason: Has can not pre-aggregation Join |
| | partitions=1/2 |
| | rollup: tbl1 |
| | tabletRatio=6/6 |
| | tabletList=15344,15346,15348,15350,15352,15354 |
| | cardinality=1 |
| | avgRowSize=3.0 |
| | |
| 0:OlapScanNode |
| TABLE: tbl2 |
| PREAGGREGATION: OFF. Reason: None aggregate function |
| partitions=1/1 |
| rollup: tbl2 |
| tabletRatio=6/6 |
| tabletList=15373,15375,15377,15379,15381,15383 |
| cardinality=1 |
| avgRowSize=3.0 |
+-------------------------------------------------------------------------+
40 rows in set (0.03 sec)

Если Colocate Join действует, узел Hash Join отображает colocate: true.

Если он не действует, план запроса следующий:

+----------------------------------------------------+
| Explain String |
+----------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`tbl1`.`k1` | |
| PARTITION: RANDOM |
| |
| RESULT SINK |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN (BROADCAST) |
| | hash predicates: |
| | colocate: false, reason: group is not stable |
| | `tbl1`.`k2` = `tbl2`.`k2` |
| | tuple ids: 0 1 |
| | |
| |----3:EXCHANGE |
| | tuple ids: 1 |
| | |
| 0:OlapScanNode |
| TABLE: tbl1 |
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
| partitions=0/2 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 0 |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 03 |
| UNPARTITIONED |
| |
| 1:OlapScanNode |
| TABLE: tbl2 |
| PREAGGREGATION: OFF. Reason: null |
| partitions=0/1 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 1 |
+----------------------------------------------------+

Узел HASH JOIN покажет соответствующую причину: colocate: false, reason: group is not stable. Одновременно будет сгенерирован узел EXCHANGE.

Продвинутые операции

Элементы конфигурации FE

  • disable_colocate_relocate

Отключать ли автоматическое восстановление реплик Colocation для Selena. По умолчанию false, что означает, что оно включено. Этот параметр влияет только на восстановление реплик для таблиц Colocation, а не для обычных таблиц.

  • disable_colocate_balance

Отключать ли автоматическую балансировку реплик Colocation для Selena. По умолчанию false, что означает, что она включена. Этот параметр влияет только на балансировку реплик таблиц Colocation, а не для обычных таблиц.

  • disable_colocate_join

    Вы можете отключить Colocate join на уровне сессии, изменив эту переменную.

  • disable_colocate_join

    Функцию Colocate join можно отключить, изменив эту переменную.

HTTP Restful API

Selena предоставляет несколько HTTP Restful API, связанных с Colocate Join, для просмотра и изменения Colocation Groups.

Этот API реализован на FE и доступен с использованием fe_host:fe_http_port с правами db_admin и user_admin.

  1. Просмотр всей информации Colocation кластера

    curl --location-trusted -u<username>:<password> 'http://<fe_host>:<fe_http_port>/api/colocate'
    // Возвращает внутреннюю информацию Colocation в формате Json.
    {
    "colocate_meta": {
    "groupName2Id": {
    "g1": {
    "dbId": 10005,
    "grpId": 10008
    }
    },
    "group2Tables": {},
    "table2Group": {
    "10007": {
    "dbId": 10005,
    "grpId": 10008
    },
    "10040": {
    "dbId": 10005,
    "grpId": 10008
    }
    },
    "group2Schema": {
    "10005.10008": {
    "groupId": {
    "dbId": 10005,
    "grpId": 10008
    },
    "distributionColTypes": [{
    "type": "INT",
    "len": -1,
    "isAssignedStrLenInColDefinition": false,
    "precision": 0,
    "scale": 0
    }],
    "bucketsNum": 10,
    "replicationNum": 2
    }
    },
    "group2BackendsPerBucketSeq": {
    "10005.10008": [
    [10004, 10002],
    [10003, 10002],
    [10002, 10004],
    [10003, 10002],
    [10002, 10004],
    [10003, 10002],
    [10003, 10004],
    [10003, 10004],
    [10003, 10004],
    [10002, 10004]
    ]
    },
    "unstableGroups": []
    },
    "status": "OK"
    }
  2. Пометить Group как Stable или Unstable

    # Пометить как Stable
    curl -XPOST --location-trusted -u<username>:<password>'http://<fe_host>:<fe_http_port>/api/colocate/group_stable?db_id=<dbId>&group_id=<grpId>​'
    # Пометить как Unstable
    curl -XPOST --location-trusted -u<username>:<password>'http://<fe_host>:<fe_http_port>/api/colocate/group_unstable?db_id=<dbId>&group_id=<grpId>​'

    Если возвращённый результат — 200, Group успешно помечена как Stable или Unstable.

  3. Установить распределение данных Group

    Этот интерфейс позволяет вам принудительно задать числовое распределение Group.

    POST /api/colocate/bucketseq?db_id=10005&group_id= 10008

    Body:

    [[10004,10002],[10003,10002],[10002,10004],[10003,10002],[10002,10004],[10003,10002],[10003,10004],[10003,10004],[10003,10004],[10002,10004]]

    returns: 200

    Где Body — это BucketsSequence, представленная как вложенный массив, и идентификаторы BE, в которых расположены фрагменты бакетинга.

    Обратите внимание, что для использования этой команды вам может потребоваться установить конфигурацию FE disable_colocate_relocate и disable_colocate_balance в true, то есть отключить автоматическое восстановление и балансировку реплик Colocation в системе. В противном случае она может быть автоматически сброшена системой после изменения.