Перейти к основному содержимому

Colocate Join

Для shuffle join и broadcast join, если условие соединения выполнено, строки данных двух соединяемых таблиц объединяются в один узел для завершения соединения. Ни один из этих двух методов соединения не может избежать задержки или накладных расходов, вызванных передачей данных по сети между узлами.

Основная идея заключается в том, чтобы сохранить ключ разбиения на сегменты, количество копий и размещение копий согласованными для таблиц в одной Colocation Group. Если столбец соединения является ключом разбиения на сегменты, вычислительному узлу нужно только выполнить локальное соединение без получения данных с других узлов. Colocate Join поддерживает эквисоединения.

Этот документ представляет принцип, реализацию, использование и соображения Colocate Join.

Терминология

  • Colocation Group (CG): CG будет содержать одну или несколько таблиц. Таблицы в CG имеют одинаковое разбиение на сегменты и размещение реплик, и описываются с использованием Colocation Group Schema.
  • Colocation Group Schema (CGS): CGS содержит ключ разбиения на сегменты, количество сегментов и количество реплик CG.

Принцип

Colocate Join заключается в формировании CG с набором таблиц, имеющих одинаковую CGS, и обеспечении того, чтобы соответствующие копии сегментов этих таблиц попадали на один и тот же набор узлов BE. Когда таблицы в CG выполняют операции Join по столбцам с сегментированием, локальные данные могут быть соединены напрямую, экономя время на передачу данных между узлами.

Bucket Seq получается по формуле hash(key) mod buckets. Предположим, таблица имеет 8 сегментов, тогда есть [0, 1, 2, 3, 4, 5, 6, 7] 8 сегментов, и каждый Bucket имеет одну или несколько подтаблиц, количество подтаблиц зависит от количества разделов. Если это многораздельная таблица, будет несколько tablet.

Чтобы иметь одинаковое распределение данных, таблицы в одной CG должны соответствовать следующему.

  1. Таблицы в одной CG должны иметь идентичный ключ разбиения на сегменты (тип, количество, порядок) и одинаковое количество сегментов, чтобы срезы данных нескольких таблиц могли распределяться и контролироваться один за другим. Ключ разбиения на сегменты - это столбцы, указанные в операторе создания таблицы DISTRIBUTED BY HASH(col1, col2, ...). Ключ разбиения на сегменты определяет, какие столбцы данных хешируются в разные Bucket Seq. Имя ключа разбиения на сегменты может различаться для таблиц в одной CG. Столбцы разбиения на сегменты могут отличаться в операторе создания, но порядок соответствующих типов данных в DISTRIBUTED BY HASH(col1, col2, ...) должен быть точно одинаковым.
  2. Таблицы в одной CG могут иметь разное количество разделов и разные ключи разделов.

При создании таблицы CG указывается атрибутом "colocate_with" = "group_name" в PROPERTIES таблицы. Если CG не существует, это означает, что таблица является первой таблицей CG и называется Parent Table. Распределение данных Parent Table (тип, количество и порядок ключей разделения сегментов, количество копий и количество разделенных сегментов) определяет CGS. Если CG существует, проверяется, соответствует ли распределение данных таблицы CGS.

Размещение копий таблиц в одной CG удовлетворяет:

  1. Сопоставление между Bucket Seq и узлами BE всех таблиц такое же, как у Parent Table.
  2. Сопоставление между Bucket Seq и узлами BE всех разделов в Parent Table такое же, как у первого раздела.
  3. Сопоставление между Bucket Seq и узлами BE первого раздела Parent Table определяется с использованием нативного алгоритма Round Robin.

Согласованное распределение данных и сопоставление гарантируют, что строки данных с одинаковым значением, взятым ключом разбиения на сегменты, попадают на один и тот же BE. Поэтому при использовании ключа разбиения на сегменты для соединения столбцов требуются только локальные соединения.

Использование

Создание таблицы

При создании таблицы вы можете указать атрибут "colocate_with" = "group_name" в PROPERTIES, чтобы указать, что таблица является таблицей Colocate Join и принадлежит указанной Colocation Group.

ПРИМЕЧАНИЕ

Начиная с версии 1.5.0, Colocate Join может выполняться на таблицах из разных баз данных. Вам нужно только указать одинаковое свойство colocate_with при создании таблиц.

Например:

CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
"colocate_with" = "group1"
);

Если указанная группа не существует, Selena автоматически создает группу, которая содержит только текущую таблицу. Если группа существует, Selena проверяет, соответствует ли текущая таблица Colocation Group Schema. Если да, она создает таблицу и добавляет ее в группу. В то же время таблица создает раздел и tablet на основе правил распределения данных существующей группы.

Colocation Group принадлежит базе данных. Имя Colocation Group уникально в пределах базы данных. Во внутреннем хранилище полное имя Colocation Group - dbId_groupName, но вы воспринимаете только groupName.

ПРИМЕЧАНИЕ

Если вы указываете одинаковую Colocation Group для связывания таблиц из разных баз данных для Colocate Join, Colocation Group существует в каждой из этих баз данных. Вы можете выполнить show proc "/colocation_group", чтобы проверить Colocation Groups в разных базах данных.

Удаление

Полное удаление - это удаление из корзины. Обычно после удаления таблицы командой DROP TABLE по умолчанию она остается в корзине в течение дня перед удалением. Когда последняя таблица в группе полностью удалена, группа также будет удалена автоматически.

Просмотр информации о группе

Следующая команда позволяет просмотреть информацию о группах, которые уже существуют в кластере.

SHOW PROC '/colocation_group';

+-------------+--------------+--------------+------------+----------------+----------+----------+
| GroupId | GroupName | TableIds | BucketsNum | ReplicationNum | DistCols | IsStable |
+-------------+--------------+--------------+------------+----------------+----------+----------+
| 10005.10008 | 10005_group1 | 10007, 10040 | 10 | 3 | int(11) | true |
+-------------+--------------+--------------+------------+----------------+----------+----------+
  • GroupId: Уникальный в масштабе кластера идентификатор группы, где первая половина - это db id, а вторая половина - group id.
  • GroupName: Полное имя группы.
  • TabletIds: Список идентификаторов таблиц в группе.
  • BucketsNum: Количество сегментов.
  • ReplicationNum: Количество реплик.
  • DistCols: Столбцы распределения, т.е. тип столбца разбиения на сегменты.
  • IsStable: Стабильна ли группа (для определения стабильности см. раздел о балансировке и восстановлении реплик Colocation).

Вы можете дополнительно просмотреть распределение данных группы с помощью следующей команды.

SHOW PROC '/colocation_group/10005.10008';

+-------------+---------------------+
| BucketIndex | BackendIds |
+-------------+---------------------+
| 0 | 10004, 10002, 10001 |
| 1 | 10003, 10002, 10004 |
| 2 | 10002, 10004, 10001 |
| 3 | 10003, 10002, 10004 |
| 4 | 10002, 10004, 10003 |
| 5 | 10003, 10002, 10001 |
| 6 | 10003, 10004, 10001 |
| 7 | 10003, 10004, 10002 |
+-------------+---------------------+
  • BucketIndex: Индекс последовательности сегментов.
  • BackendIds: Идентификаторы узлов BE, где расположены срезы данных разбиения на сегменты.

Примечание: Вышеуказанная команда требует привилегии NODE или роли cluster_admin. Обычные пользователи не могут получить к ней доступ.

Изменение свойств группы таблицы

Вы можете изменить свойства Colocation Group таблицы. Например:

ALTER TABLE tbl SET ("colocate_with" = "group2");

Если таблица ранее не была назначена группе, команда проверит схему и добавит таблицу в группу (группа будет создана сначала, если она не существует). Если таблица ранее была назначена другой группе, команда удалит таблицу из исходной группы и добавит ее в новую группу (группа будет создана сначала, если она не существует).

Вы также можете удалить свойства Colocation таблицы с помощью следующей команды.

ALTER TABLE tbl SET ("colocate_with" = "");

Другие связанные операции

При добавлении раздела с помощью ADD PARTITION или изменении количества копий в таблице с атрибутом Colocation, Selena проверяет, нарушит ли операция Colocation Group Schema, и отклоняет ее, если это так.

Балансировка и восстановление реплик Colocation

Распределение реплик таблицы Colocation должно следовать правилам распределения, указанным в схеме группы, поэтому оно отличается от обычного сегментирования в плане восстановления и балансировки реплик.

Сама группа имеет свойство stable. Когда stable равно true, это означает, что никаких изменений не вносится в срезы таблиц в группе, и функция Colocation работает правильно. Когда stable равно false, это означает, что некоторые срезы таблиц в текущей группе восстанавливаются или мигрируют, и Colocate Join затронутых таблиц будет деградировать до обычного Join.

Восстановление реплик

Реплики могут храниться только на указанном узле BE. Selena будет искать наименее загруженный BE для замены недоступного BE (например, отключенного, выведенного из эксплуатации). После замены все срезы данных разбиения на сегменты на старом BE восстанавливаются. Во время миграции группа помечается как нестабильная.

Балансировка реплик

Selena пытается равномерно распределить срезы таблиц Colocation по всем узлам BE. Балансировка для обычных таблиц происходит на уровне реплик, то есть каждая реплика индивидуально находит узел BE с меньшей нагрузкой. Балансировка для таблиц Colocation происходит на уровне Bucket, то есть все реплики в Bucket мигрируют вместе. Мы используем простой алгоритм балансировки, который равномерно распределяет BucketsSequnce по всем узлам BE, не учитывая фактический размер реплик, а только количество реплик. Точный алгоритм можно найти в комментариях к коду в ColocateTableBalancer.java.

Примечание 1: Текущий алгоритм балансировки и восстановления реплик Colocation может не работать хорошо для кластеров Selena с гетерогенным развертыванием. Так называемое гетерогенное развертывание означает, что емкость диска, количество дисков и тип диска (SSD и HDD) узлов BE не согласованы. В случае гетерогенного развертывания может случиться так, что узел BE с малой емкостью хранит такое же количество реплик, как узел BE с большой емкостью.

Примечание 2: Когда группа находится в нестабильном состоянии, Join ее таблиц будет деградировать до обычного Join, что может значительно ухудшить производительность запросов кластера. Если вы не хотите, чтобы система автоматически балансировалась, установите конфигурацию FE disable_colocate_balance для отключения автоматической балансировки и включите ее обратно в подходящее время. (См. раздел Расширенные операции (#Расширенные операции) для подробностей)

Запрос

Таблица Colocation запрашивается так же, как обычная таблица. Если группа, в которой находится таблица Colocation, находится в нестабильном состоянии, она автоматически деградирует до обычного Join, как показано в следующем примере.

Таблица 1:

CREATE TABLE `tbl1` (
`k1` date NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` int(11) SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
PARTITION BY RANGE(`k1`)
(
PARTITION p1 VALUES LESS THAN ('2019-05-31'),
PARTITION p2 VALUES LESS THAN ('2019-06-30')
)
DISTRIBUTED BY HASH(`k2`) BUCKETS 6
PROPERTIES (
"colocate_with" = "group1"
);
INSERT INTO tbl1
VALUES
("2015-09-12",1000,1),
("2015-09-13",2000,2);

Таблица 2:

CREATE TABLE `tbl2` (
`k1` datetime NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` double SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
DISTRIBUTED BY HASH(`k2`) BUCKETS 6
PROPERTIES (
"colocate_with" = "group1"
);
INSERT INTO tbl2
VALUES
("2015-09-12 00:00:00",3000,3),
("2015-09-12 00:00:00",4000,4);

Просмотр плана запроса:

EXPLAIN SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);
+-------------------------------------------------------------------------+
| Explain String |
+-------------------------------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:1: k1 | 2: k2 | 3: v1 | 4: k1 | 5: k2 | 6: v1 |
| PARTITION: UNPARTITIONED |
| |
| RESULT SINK |
| |
| 3:EXCHANGE |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 03 |
| UNPARTITIONED |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN (COLOCATE) |
| | colocate: true |
| | equal join conjunct: 5: k2 = 2: k2 |
| | |
| |----1:OlapScanNode |
| | TABLE: tbl1 |
| | PREAGGREGATION: OFF. Reason: Has can not pre-aggregation Join |
| | partitions=1/2 |
| | rollup: tbl1 |
| | tabletRatio=6/6 |
| | tabletList=15344,15346,15348,15350,15352,15354 |
| | cardinality=1 |
| | avgRowSize=3.0 |
| | |
| 0:OlapScanNode |
| TABLE: tbl2 |
| PREAGGREGATION: OFF. Reason: None aggregate function |
| partitions=1/1 |
| rollup: tbl2 |
| tabletRatio=6/6 |
| tabletList=15373,15375,15377,15379,15381,15383 |
| cardinality=1 |
| avgRowSize=3.0 |
+-------------------------------------------------------------------------+
40 rows in set (0.03 sec)

Если Colocate Join действует, узел Hash Join отображает colocate: true.

Если он не действует, план запроса выглядит следующим образом:

+----------------------------------------------------+
| Explain String |
+----------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`tbl1`.`k1` | |
| PARTITION: RANDOM |
| |
| RESULT SINK |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN (BROADCAST) |
| | hash predicates: |
| | colocate: false, reason: group is not stable |
| | `tbl1`.`k2` = `tbl2`.`k2` |
| | tuple ids: 0 1 |
| | |
| |----3:EXCHANGE |
| | tuple ids: 1 |
| | |
| 0:OlapScanNode |
| TABLE: tbl1 |
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
| partitions=0/2 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 0 |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 03 |
| UNPARTITIONED |
| |
| 1:OlapScanNode |
| TABLE: tbl2 |
| PREAGGREGATION: OFF. Reason: null |
| partitions=0/1 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 1 |
+----------------------------------------------------+

Узел HASH JOIN покажет соответствующую причину: colocate: false, reason: group is not stable. Одновременно будет сгенерирован узел EXCHANGE.

Расширенные операции

Элементы конфигурации FE

  • disable_colocate_relocate

Отключить ли автоматическое восстановление реплик Colocation для Selena. По умолчанию false, что означает включено. Этот параметр влияет только на восстановление реплик для таблиц Colocation, а не для обычных таблиц.

  • disable_colocate_balance

Отключить ли автоматическую балансировку реплик Colocation для Selena. По умолчанию false, что означает включено. Этот параметр влияет только на балансировку реплик таблиц Colocation, а не для обычных таблиц.

  • disable_colocate_join

    Вы можете отключить Colocate join на уровне сессии, изменив эту переменную.

  • disable_colocate_join

    Функция Colocate join может быть отключена путем изменения этой переменной.

HTTP Restful API

Selena предоставляет несколько HTTP Restful API, связанных с Colocate Join, для просмотра и изменения Colocation Groups.

Этот API реализован на FE и доступен с использованием fe_host:fe_http_port с разрешениями db_admin и user_admin.

  1. Просмотр всей информации Colocation кластера

    curl --location-trusted -u<username>:<password> 'http://<fe_host>:<fe_http_port>/api/colocate'  
    // Возвращает внутреннюю информацию Colocation в формате Json.
    {
    "colocate_meta": {
    "groupName2Id": {
    "g1": {
    "dbId": 10005,
    "grpId": 10008
    }
    },
    "group2Tables": {},
    "table2Group": {
    "10007": {
    "dbId": 10005,
    "grpId": 10008
    },
    "10040": {
    "dbId": 10005,
    "grpId": 10008
    }
    },
    "group2Schema": {
    "10005.10008": {
    "groupId": {
    "dbId": 10005,
    "grpId": 10008
    },
    "distributionColTypes": [{
    "type": "INT",
    "len": -1,
    "isAssignedStrLenInColDefinition": false,
    "precision": 0,
    "scale": 0
    }],
    "bucketsNum": 10,
    "replicationNum": 2
    }
    },
    "group2BackendsPerBucketSeq": {
    "10005.10008": [
    [10004, 10002],
    [10003, 10002],
    [10002, 10004],
    [10003, 10002],
    [10002, 10004],
    [10003, 10002],
    [10003, 10004],
    [10003, 10004],
    [10003, 10004],
    [10002, 10004]
    ]
    },
    "unstableGroups": []
    },
    "status": "OK"
    }
  2. Пометить группу как стабильную или нестабильную

    # Пометить как стабильную
    curl -XPOST --location-trusted -u<username>:<password>'http://<fe_host>:<fe_http_port>/api/colocate/group_stable?db_id=<dbId>&group_id=<grpId>​'
    # Пометить как нестабильную
    curl -XPOST --location-trusted -u<username>:<password>'http://<fe_host>:<fe_http_port>/api/colocate/group_unstable?db_id=<dbId>&group_id=<grpId>​'

    Если возвращенный результат 200, группа успешно помечена как стабильная или нестабильная.

  3. Установить распределение данных группы

    Этот интерфейс позволяет принудительно установить числовое распределение группы.

    POST /api/colocate/bucketseq?db_id=10005&group_id= 10008

    Body:

    [[10004,10002],[10003,10002],[10002,10004],[10003,10002],[10002,10004],[10003,10002],[10003,10004],[10003,10004],[10003,10004],[10002,10004]]

    returns: 200

    Где Body - это BucketsSequence, представленная как вложенный массив и идентификаторы BE, в которых расположены срезы разбиения на сегменты.

    Обратите внимание, что для использования этой команды вам может потребоваться установить конфигурацию FE disable_colocate_relocate и disable_colocate_balance в true, то есть отключить систему для выполнения автоматического восстановления и балансировки реплик Colocation. В противном случае это может быть автоматически сброшено системой после изменения.