OceanBaseデータベースのオプティマイザーは、分散実行計画を生成するために大きく2つの段階に分かれています。
この2つの段階での実行計画の生成方法は以下のとおりです:
第一段階:データの物理的な配置を考慮せず、ローカル関係に基づいて最適化されたすべての実行計画を生成します。ローカル計画が生成された後、オプティマイザーはデータが複数のパーティションにアクセスしているか、またはローカルの単一パーティションテーブルにアクセスしているものの、ユーザーがHintを使用してパラレルクエリの実行を強制的に採用しているかどうかをチェックします。
第二段階:分散実行計画を生成します。実行計画ツリーに基づき、データ再配置が必要な場所に
EXCHANGEノードを挿入することで、元のローカル計画ツリーを分散実行計画に変換します。
分散実行計画を生成するプロセスとは、元の計画ツリー内で適切な位置にEXCHANGE演算子を挿入するプロセスです。計画ツリーをトップダウンでイテレーションする際に、対応する演算子のデータ処理状況および入力演算子のデータパーティション状況に基づいて、EXCHANGE演算子を挿入する必要があるかどうかを判断します。
以下の例は、最もシンプルな単一テーブルスキャンです。テーブルt1がパーティションテーブルの場合、TABLE SCANにペアのEXCHANGE演算子を挿入することで、TABLE SCANとEXCHANGE OUTを1つのジョブとしてカプセル化し、パラレル実行に利用できます。
obclient> CREATE TABLE t1 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT * FROM t1\G
*************************** 1. row ***************************
Query Plan:
==============================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
------------------------------------------------------
|0 |PX COORDINATOR | |500000 |545109|
|1 | EXCHANGE OUT DISTR |:EX10000|500000 |320292|
|2 | PX PARTITION ITERATOR| |500000 |320292|
|3 | TABLE SCAN |T1 |500000 |320292|
======================================================
Outputs & filters:
-------------------------------------
0 - output([T1.V1], [T1.V2]), filter(nil)
1 - output([T1.V1], [T1.V2]), filter(nil), dop=1
2 - output([T1.V1], [T1.V2]), filter(nil)
3 - output([T1.V1], [T1.V2]), filter(nil),
access([T1.V1], [T1.V2]), partitions(p[0-4])
単一入力でプッシュダウン可能な演算子
単一入力でプッシュダウン可能な演算子には、主に AGGREGATION、SORT、GROUP BY、LIMIT などが含まれます。LIMIT 演算子を除き、上記の演算子には操作キーが存在します。操作キーと入力データのデータ分布が一致している場合、パーティションワイズ集計(Partition Wise Aggregation)と呼ばれる1段階の集計操作を行うことができます。一方、操作キーと入力データのデータ分布が一致しない場合は、2段階の集計操作が必要となり、集計演算子はプッシュダウン操作を実行する必要があります。
1段階の集計操作の例は以下のとおりです:
obclient> CREATE TABLE t2 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 4;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT SUM(v1) FROM t2 GROUP BY v1;
Query Plan:
| ======================================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
------------------------------------------------------
|0 |PX COORDINATOR | |101 |357302|
|1 | EXCHANGE OUT DISTR |:EX10000|101 |357297|
|2 | PX PARTITION ITERATOR| |101 |357297|
|3 | MERGE GROUP BY | |101 |357297|
|4 | TABLE SCAN |t2 |400000 |247403|
======================================================
Outputs & filters:
-------------------------------------
0 - output([T_FUN_SUM(t2.v1)]), filter(nil)
1 - output([T_FUN_SUM(t2.v1)]), filter(nil), dop=1
2 - output([T_FUN_SUM(t2.v1)]), filter(nil)
3 - output([T_FUN_SUM(t2.v1)]), filter(nil),
group([t2.v1]), agg_func([T_FUN_SUM(t2.v1)])
4 - output([t2.v1]), filter(nil),
access([t2.v1]), partitions(p[0-3])
2段階の集計操作の例は以下のとおりです:
obclient> EXPLAIN SELECT SUM(v1) FROM t2 GROUP BY v2;
Query Plan:
| ============================================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
------------------------------------------------------------
|0 |PX COORDINATOR | |101 |561383|
|1 | EXCHANGE OUT DISTR |:EX10001|101 |561374|
|2 | HASH GROUP BY | |101 |561374|
|3 | EXCHANGE IN DISTR | |101 |408805|
|4 | EXCHANGE OUT DISTR (HASH)|:EX10000|101 |408795|
|5 | HASH GROUP BY | |101 |408795|
|6 | PX PARTITION ITERATOR | |400000 |256226|
|7 | TABLE SCAN |t2 |400000 |256226|
============================================================
Outputs & filters:
-------------------------------------
0 - output([T_FUN_SUM(T_FUN_SUM(t2.v1))]), filter(nil)
1 - output([T_FUN_SUM(T_FUN_SUM(t2.v1))]), filter(nil), dop=1
2 - output([T_FUN_SUM(T_FUN_SUM(t2.v1))]), filter(nil),
group([t2.v2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.v1))])
3 - output([t2.v2], [T_FUN_SUM(t2.v1)]), filter(nil)
4 - (#keys=1, [t2.v2]), output([t2.v2], [T_FUN_SUM(t2.v1)]), filter(nil), dop=1
5 - output([t2.v2], [T_FUN_SUM(t2.v1)]), filter(nil),
group([t2.v2]), agg_func([T_FUN_SUM(t2.v1)])
6 - output([t2.v1], [t2.v2]), filter(nil)
7 - output([t2.v1], [t2.v2]), filter(nil),
access([t2.v1], [t2.v2]), partitions(p[0-3])
二項入力演算子
二項入力演算子は主にJOIN演算子の場合を考慮します。JOIN演算子については、ルールに基づいて分散実行計画を生成し、データ再配置方法を選択します。JOIN演算子には主に以下の3種類の結合方式があります:
Partition-Wise Join
左右のテーブルがパーティションテーブルであり、パーティショニング方式が同じで、物理的な分布も同一であり、かつ
JOINの結合条件がパーティションキーの場合、パーティション単位の結合方式を使用できます。以下の例を参照してください:obclient> CREATE TABLE t3 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 4; Query OK, 0 rows affected obclient> EXPLAIN SELECT * FROM t2, t3 WHERE t2.v1 = t3.v1\G *************************** 1. row *************************** Query Plan: =========================================================== |ID|OPERATOR |NAME |EST. ROWS |COST | |0 |PX COORDINATOR | |1568160000|1227554264| |1 | EXCHANGE OUT DISTR |:EX10000|1568160000|930670004 | |2 | PX PARTITION ITERATOR| |1568160000|930670004 | |3 | MERGE JOIN | |1568160000|930670004 | |4 | TABLE SCAN |t2 |400000 |256226 | |5 | TABLE SCAN |t3 |400000 |256226 | =========================================================== Outputs & filters: ------------------------------------- 0 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil) 1 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil), dop=1 2 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil) 3 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil), equal_conds([t2.v1 = t3.v1]), other_conds(nil) 4 - output([t2.v1], [t2.v2]), filter(nil), access([t2.v1], [t2.v2]), partitions(p[0-3]) 5 - output([t3.v1], [t3.v2]), filter(nil), access([t3.v1], [t3.v2]), partitions(p[0-3])Partial Partition-Wise Join
左右のテーブルのうち一方がパーティションテーブル、もう一方が非パーティションテーブル、または両方ともパーティションテーブルであるが結合キーがどちらか一方のパーティションテーブルのパーティションキーとのみ一致する場合、そのパーティションテーブルのパーティション分布を基準に、もう一方のテーブルのデータを再配置します。以下の例を参照してください:
obclient> CREATE TABLE t4 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 3; Query OK, 0 rows affected obclient> EXPLAIN SELECT * FROM t4, t2 WHERE t2.v1 = t4.v1\G *************************** 1. row *************************** Query Plan: =========================================================== |ID|OPERATOR |NAME |EST. ROWS|COST | ----------------------------------------------------------- |0 |PX COORDINATOR | |11880 |17658| |1 | EXCHANGE OUT DISTR |:EX10001|11880 |15409| |2 | NESTED-LOOP JOIN | |11880 |15409| |3 | EXCHANGE IN DISTR | |3 |37 | |4 | EXCHANGE OUT DISTR (PKEY)|:EX10000|3 |37 | |5 | PX PARTITION ITERATOR | |3 |37 | |6 | TABLE SCAN |t4 |3 |37 | |7 | PX PARTITION ITERATOR | |3960 |2561 | |8 | TABLE SCAN |t2 |3960 |2561 | =========================================================== Outputs & filters: ------------------------------------- 0 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil) 1 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), dop=1 2 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), conds(nil), nl_params_([t4.v1]) 3 - output([t4.v1], [t4.v2]), filter(nil) 4 - (#keys=1, [t4.v1]), output([t4.v1], [t4.v2]), filter(nil), dop=1 5 - output([t4.v1], [t4.v2]), filter(nil) 6 - output([t4.v1], [t4.v2]), filter(nil), access([t4.v1], [t4.v2]), partitions(p[0-2]) 7 - output([t2.v1], [t2.v2]), filter(nil) 8 - output([t2.v1], [t2.v2]), filter(nil), access([t2.v1], [t2.v2]), partitions(p[0-3])データ再配置
結合キーと左右のテーブルのパーティションキーが関係ない場合、ルールに基づいて計算し、
BROADCASTまたはHASH HASHのデータ再配置方式を選択します。以下の例を参照してください:注意
並列度が1より大きい場合にのみ、以下の例の2種類のデータ再配布方式が選択される可能性があります。
obclient>EXPLAIN SELECT /*+ PARALLEL(2)*/* FROM t4, t2 WHERE t2.v2 = t4.v2\G *************************** 1. row *************************** Query Plan: ================================================================= |ID|OPERATOR |NAME |EST. ROWS|COST | ----------------------------------------------------------------- |0 |PX COORDINATOR | |11880 |396863| |1 | EXCHANGE OUT DISTR |:EX10001|11880 |394614| |2 | HASH JOIN | |11880 |394614| |3 | EXCHANGE IN DISTR | |3 |37 | |4 | EXCHANGE OUT DISTR (BROADCAST)|:EX10000|3 |37 | |5 | PX BLOCK ITERATOR | |3 |37 | |6 | TABLE SCAN |t4 |3 |37 | |7 | PX PARTITION ITERATOR | |400000 |256226| |8 | TABLE SCAN |t2 |400000 |256226| ================================================================= Outputs & filters: ------------------------------------- 0 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil) 1 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), dop=2 2 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), equal_conds([t2.v2 = t4.v2]), other_conds(nil) 3 - output([t4.v1], [t4.v2]), filter(nil) 4 - output([t4.v1], [t4.v2]), filter(nil), dop=2 5 - output([t4.v1], [t4.v2]), filter(nil) 6 - output([t4.v1], [t4.v2]), filter(nil), access([t4.v1], [t4.v2]), partitions(p[0-2]) 7 - output([t2.v1], [t2.v2]), filter(nil) 8 - output([t2.v1], [t2.v2]), filter(nil), access([t2.v1], [t2.v2]), partitions(p[0-3]) obclient>EXPLAIN SELECT /*+ PQ_DISTRIBUTE(t2 HASH HASH) PARALLEL(2)*/* FROM t4, t2 WHERE t2.v2 = t4.v2\G; *************************** 1. row *************************** Query Plan: ============================================================ |ID|OPERATOR |NAME |EST. ROWS|COST | ------------------------------------------------------------ |0 |PX COORDINATOR | |11880 |434727| |1 | EXCHANGE OUT DISTR |:EX10002|11880 |432478| |2 | HASH JOIN | |11880 |432478| |3 | EXCHANGE IN DISTR | |3 |37 | |4 | EXCHANGE OUT DISTR (HASH)|:EX10000|3 |37 | |5 | PX BLOCK ITERATOR | |3 |37 | |6 | TABLE SCAN |t4 |3 |37 | |7 | EXCHANGE IN DISTR | |400000 |294090| |8 | EXCHANGE OUT DISTR (HASH)|:EX10001|400000 |256226| |9 | PX PARTITION ITERATOR | |400000 |256226| |10| TABLE SCAN |t2 |400000 |256226| ============================================================ Outputs & filters: ------------------------------------- 0 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil) 1 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), dop=2 2 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), equal_conds([t2.v2 = t4.v2]), other_conds(nil) 3 - output([t4.v1], [t4.v2]), filter(nil) 4 - (#keys=1, [t4.v2]), output([t4.v1], [t4.v2]), filter(nil), dop=2 5 - output([t4.v1], [t4.v2]), filter(nil) 6 - output([t4.v1], [t4.v2]), filter(nil), access([t4.v1], [t4.v2]), partitions(p[0-2]) 7 - output([t2.v1], [t2.v2]), filter(nil) 8 - (#keys=1, [t2.v2]), output([t2.v1], [t2.v2]), filter(nil), dop=2 9 - output([t2.v1], [t2.v2]), filter(nil) 10 - output([t2.v1], [t2.v2]), filter(nil), access([t2.v1], [t2.v2]), partitions(p[0-3])