OceanBaseデータベースは、Hash Join結合アルゴリズムをサポートしており、特定のフィールドに基づいて2つのテーブルを等価マッチングで結合することができます。しかし、結合に参加するテーブル、特にプローブテーブルのデータ量が大きい場合、Hash Joinのパフォーマンスは大幅に影響を受けます。このような状況下では、Hash Joinはランタイムフィルター(Runtime Filter、RF)を使用して効率を向上させることができます。
原理の紹介
Runtime Filterは、Hash Joinのパフォーマンスを最適化するための技術であり、Hash Joinを使用する際に必要なプローブデータ量を削減することでクエリの効率を向上させます。例えば、スター型接続の複数の次元テーブルと実施テーブルのJoinシナリオでは、Runtime Filterは非常に効果的な最適化手段です。
Runtime Filterは実際にはフィルターであり、Hash Joinのビルドプロセスを利用して軽量なフィルターを構築し、そのフィルターをプローブに参加する大規模テーブルにブロードキャスト送信します。プローブテーブルは複数のRuntime Filterを使用して、ストレージ層で事前にデータをフィルタリングし、実際にHash Joinやネットワーク転送に参加するデータ量を削減することで、クエリの効率を向上させることができます。
Runtime Filterは3つの観点から分類することができます。1つ目は、マシン間の転送が必要かどうかに基づいて、LocalとGlobalの2種類に分類されます。2つ目は、Runtime Filterのデータ構造に基づいて、Bloom Filter、In Filter、Range Filterの3種類に分類されます。Local Runtime FilterであってもGlobal Runtime Filterであっても、これら3種類のデータ構造を持つRuntime Filterを使用できます。3つ目は、Runtime Filterによってフィルタリングされるエンティティに基づいて、接続キーをフィルタリングするものとパーティションをフィルタリングするものの2種類に分類されます。前者は通常のRuntime Filter、後者はPart Join Filterです。フィルタリング対象のエンティティに基づく分類と、マシン間の転送が必要かどうかに基づく分類は直交的です。通常のRuntime Filterは3種類のデータ構造を持つRuntime Filterをサポートしていますが、Part Join Filterは現在Bloom Filterのみをサポートしています。
本記事では、具体的な例を通じてLocal Runtime Filter、Global Runtime Filter、Part Join Filterについて詳しく説明します。
ローカルランタイムフィルター
ローカルランタイムフィルターのランタイムフィルターは、ネットワーク転送を経由する必要がなく、構築されたフィルターはローカルノードでのみフィルタリング条件を計算します。ローカルランタイムフィルターは通常、Hash Join Probe側にShuffleがないシナリオに適しています。
以下は、ローカルランタイムフィルターの実行計画の例です。
obclient> CREATE TABLE tt1(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PX_JOIN_FILTER(tt2) PARALLEL(4) */ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------+
| ============================================================== |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| -------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |7 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10000|1 |6 | |
| |2 | └─PX PARTITION ITERATOR | |1 |6 | |
| |3 | └─HASH JOIN | |1 |6 | |
| |4 | ├─JOIN FILTER CREATE|:RF0000 |1 |3 | |
| |5 | │ └─TABLE FULL SCAN |tt2 |1 |3 | |
| |6 | └─JOIN FILTER USE |:RF0000 |1 |4 | |
| |7 | └─TABLE FULL SCAN |tt1 |1 |4 | |
| ============================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| dop=4 |
| 2 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| partition wise, force partition granule |
| 3 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| equal_conds ([tt1.v1 = tt2.v1]), other_conds(nil) |
| 4 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[tt2.v1] |
| 5 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| access([tt2.v1], [tt2.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
| 6 - output([tt1.v1], [tt1.v2]), filter(nil), rowset=256 |
| 7 - output([tt1.v1], [tt1.v2]), filter([RF_IN_FILTER(tt1.v1)], [RF_RANGE_FILTER(tt1.v1)], [RF_BLOOM_FILTER(tt1.v1)]), rowset=256 |
| access([tt1.v1], [tt1.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false], |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------+
32 rows in set
上記の例では、NAME フィールドの値が RF0000 の4番目の JOIN FILTER CREATE および6番目の JOIN FILTER USE 演算子は通常のランタイムフィルターであり、この計画内でローカルランタイムフィルターを構築しています。このフィルターはネットワークを介して伝播する必要はなく、ローカルマシンでのみ使用されます。
グローバルランタイムフィルター
グローバルランタイムフィルターのランタイムフィルターは、複数の実行ノードにブロードキャストで転送される必要があり、計画形態では必要に応じてRFを計画内の任意の位置にネストしてフィルタリングを完了することができます。ローカルランタイムフィルターと比較して、グローバルランタイムフィルターのコスト削減効果は、SQL層の投影および計算オーバーヘッドだけでなく、ネットワーク転送も含まれるため、通常はパフォーマンスが大幅に向上します。
以下はグローバルランタイムフィルターの実行計画の例です。
obclient> CREATE TABLE tt1 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt3 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN BASIC SELECT /*+ LEADING(a (b c)) PARALLEL(3) USE_HASH(b) USE_HASH(c) PQ_DISTRIBUTE(c HASH HASH) PX_JOIN_FILTER(c a) PX_JOIN_FILTER(c b) */ COUNT(*) FROM tt1 a, tt2 b, tt3 c WHERE a.c1_rand=b.c1_rand AND a.c2_rand = c.c2_rand AND b.c3_rand = c.c3_rand;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ======================================================== |
| |ID|OPERATOR |NAME | |
| -------------------------------------------------------- |
| |0 |SCALAR GROUP BY | | |
| |1 |└─PX COORDINATOR | | |
| |2 | └─EXCHANGE OUT DISTR |:EX10003| |
| |3 | └─MERGE GROUP BY | | |
| |4 | └─SHARED HASH JOIN | | |
| |5 | ├─JOIN FILTER CREATE |:RF0001 | |
| |6 | │ └─EXCHANGE IN DISTR | | |
| |7 | │ └─EXCHANGE OUT DISTR (BC2HOST)|:EX10000| |
| |8 | │ └─PX BLOCK ITERATOR | | |
| |9 | │ └─TABLE FULL SCAN |a | |
| |10| └─HASH JOIN | | |
| |11| ├─JOIN FILTER CREATE |:RF0000 | |
| |12| │ └─EXCHANGE IN DISTR | | |
| |13| │ └─EXCHANGE OUT DISTR (HASH) |:EX10001| |
| |14| │ └─PX BLOCK ITERATOR | | |
| |15| │ └─TABLE FULL SCAN |b | |
| |16| └─EXCHANGE IN DISTR | | |
| |17| └─EXCHANGE OUT DISTR (HASH) |:EX10002| |
| |18| └─JOIN FILTER USE |:RF0000 | |
| |19| └─JOIN FILTER USE |:RF0001 | |
| |20| └─PX BLOCK ITERATOR | | |
| |21| └─TABLE FULL SCAN |c | |
| ======================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]) |
| 1 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| 2 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| dop=3 |
| 3 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT(*)]) |
| 4 - output(nil), filter(nil), rowset=256 |
| equal_conds([a.c1_rand = b.c1_rand], [a.c2_rand = c.c2_rand]), other_conds(nil) |
| 5 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[a.c2_rand] |
| 6 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| 7 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| dop=3 |
| 8 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| 9 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| access([a.c1_rand], [a.c2_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([a.__pk_increment]), range(MIN ; MAX)always true |
| 10 - output([b.c1_rand], [c.c2_rand]), filter(nil), rowset=256 |
| equal_conds([b.c3_rand = c.c3_rand]), other_conds(nil) |
| 11 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[b.c3_rand] |
| 12 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| 13 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| (#keys=1, [b.c3_rand]), dop=3 |
| 14 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| 15 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| access([b.c1_rand], [b.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([b.__pk_increment]), range(MIN ; MAX)always true |
| 16 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 17 - output([c.c3_rand], [c.c2_rand), filter(nil), rowset=256 |
| (#keys=1, [c.c3_rand]), dop=3 |
| 18 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 19 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 20 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 21 - output([c.c3_rand], [c.c2_rand]), filter([RF_IN_FILTER(c.c3_rand)], [RF_RANGE_FILTER(c.c3_rand)], [RF_BLOOM_FILTER(c.c3_rand)], [RF_IN_FILTER(c.c2_rand)], |
| [RF_RANGE_FILTER(c.c2_rand)], [RF_BLOOM_FILTER(c.c2_rand)]), rowset=256 |
| access([c.c2_rand], [c.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false,false,false,false], |
| range_key([c.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
70 rows in set
上記の例では、演算子5のNAMEフィールド値はRF0001に対応し、演算子11はRF000に対応します。また、21番目のTABLE FULL SCAN演算子にプッシュダウンされたものは、複数のDFOにまたがってフィルタリングを行い、これはグローバルランタイムフィルターの特徴に合致しています。
パートジョインフィルター
Hash Joinの実行プロセスは、左側のデータに基づいてハッシュテーブルを構築し、右側で各行ごとにデータをマッチングするものです。左側でハッシュテーブルを構築する際には、左側のすべてのデータが取得されます。このプロセスにおいて、もし左側のすべてのデータに関する右側の特定テーブルのパーティション分布特性を取得できれば、右側でそのテーブルのデータをスキャンする際に、既に統計されたパーティション分布特性に基づいて不要なパーティションを事前にフィルタリングすることが可能となり、パフォーマンスが向上します。パートジョインフィルターはまさにこのようなシナリオを最適化するために導入されました。Hash Joinの左側で右側の特定テーブルの具体的なパーティションを計算するためには、ジョインの結合キーに右側のそのテーブルのパーティションキーを含める必要があり、これがパートジョインフィルターを生成するための前提条件となります。
以下はパートジョインフィルターの実行計画の例です。
obclient> CREATE TABLE tt1(v1 INT);
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PARALLEL(3) LEADING(tt1 tt2) PX_PART_JOIN_FILTER(tt2)*/ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+-------------------------------------------------------------------------------+
| Query Plan |
+-------------------------------------------------------------------------------+
| ======================================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ----------------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |5 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10001|1 |4 | |
| |2 | └─HASH JOIN | |1 |4 | |
| |3 | ├─PART JOIN FILTER CREATE |:RF0000 |1 |1 | |
| |4 | │ └─EXCHANGE IN DISTR | |1 |1 | |
| |5 | │ └─EXCHANGE OUT DISTR (PKEY)|:EX10000|1 |1 | |
| |6 | │ └─PX BLOCK ITERATOR | |1 |1 | |
| |7 | │ └─TABLE FULL SCAN |tt1 |1 |1 | |
| |8 | └─PX PARTITION HASH JOIN-FILTER|:RF0000 |1 |3 | |
| |9 | └─TABLE FULL SCAN |tt2 |1 |3 | |
| ======================================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| dop=3 |
| 2 - output([tt1.v1], [tt2.v1]), filter(nil), rowset=256 |
| equal_conds([tt1.v1 = tt2.v1]), other_conds(nil) |
| 3 - output([tt1.v1]), filter(nil), rowset=256 |
| RF_TYPE(bloom), RF_EXPR[t1.v1] |
| 4 - output([tt1.v1]), filter(nil), rowset=256 |
| 5 - output([tt1.v1]), filter(nil), rowset=256 |
| (#keys=1, [t1.v1]), dop=3 |
| 6 - output([tt1.v1]), filter(nil), rowset=256 |
| 7 - output([tt1.v1]), filter(nil), rowset=256 |
| access([tt1.v1]), partitions(p0) |
| is_index_back=false, is_global_index=false, |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
| 8 - output([tt2.v1]), filter(nil), rowset=256 |
| affinitize |
| 9 - output([tt2.v1]), filter(nil), rowset=256 |
| access([tt2.v1]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------------------+
上記の例では、計画内の3番目のPART JOIN FILTER CREATE演算子と8番目のPX PARTITION HASH JOIN-FILTER演算子がパートジョインフィルターです。3番目の演算子はパートジョインフィルターに対応しており、8番目の演算子でtt2テーブルに対してパーティションレベルのフィルタリングを適用します。
Runtime Filterの手動有効化と無効化
Runtime Filterの使用シナリオはHash Joinに限定されており、接続タイプがHash Joinでない場合、オプティマイザーはRuntime Filterを割り当てません。通常、Hash Joinを実行する際にはオプティマイザーが自動的にRuntime Filterを割り当てますが、オプティマイザーがRuntime Filterを割り当てない場合でも、ユーザーはHintを使用して手動で割り当てることができます。
PX_JOIN_FILTER HintとPX_PART_JOIN_FILTER Hintは、Runtime Filterを手動で有効にするために使用されます。SQL構文は以下のとおりです:
/*+ PX_JOIN_FILTER(join_right_table_name)*/
/*+ PX_PART_JOIN_FILTER(join_right_table_name)*/
例:
EXPLAIN SELECT /*+ PX_JOIN_FILTER(tt2) PARALLEL(4) */ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
EXPLAIN SELECT /*+ PARALLEL(3) LEADING(tt1 tt2) PX_PART_JOIN_FILTER(tt2)*/ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
並列度が1の場合、Runtime Filterは割り当てられません。このような場合、ユーザーは少なくとも並列度を2以上に指定する必要があります。これはPARALLEL Hintを使用して設定できます。例:
/*+ PARALLEL(2) */
注意すべき点として、Hash Joinのフィルタリング性が不十分な場合、Runtime Filterを使用しても問題は解決されず、むしろわずかなパフォーマンス低下を引き起こす可能性があります。そのため、Runtime Filterを手動で有効にする場合は、クエリシナリオを慎重に評価し、Runtime Filterが適用可能かどうかを判断する必要があります。
NO_PX_JOIN_FILTER HintとNO_PX_PART_JOIN_FILTER Hintは、Runtime Filterを手動で無効にするために使用されます。SQL構文は以下のとおりです:
/*+ NO_PX_JOIN_FILTER(join_right_table_name)*/
/*+ NO_PX_PART_JOIN_FILTER(join_right_table_name)*/
Runtime Filterの実行戦略の調整
Runtime Filterは適応能力を備えています。デフォルト設定では、結合キーに基づく選択率が条件を満たす場合、In、Range、Bloom Filterという3種類のデータ構造を持つRuntime Filterが自動的に作成されます。In Filter内部ではHashテーブルを用いてフィルタリング判断を行い、Range Filter内部では最大値または最小値を用いてフィルタリング判断を行います。
フィルタリングの優先順位において、In Filterは最も正確なフィルタリング性能を持っているため、実行時にIn Filterが有効になると、他の2種類のFilterは適応的に無効になります。実行時には、エグゼキュータが実際のNDV値に基づいてIn Filterを使用するかどうかを決定します。さらに、各Filterは実際の計算において、リアルタイムのフィルタリング性能に応じて適応的にDisable FilterおよびEnable Filterを行います。
OceanBaseデータベースは、runtime_filter_type、runtime_filter_max_in_num、runtime_filter_wait_time_ms、runtime_bloom_filter_max_sizeといったシステム変数を提供し、Runtime Filterの実行に関する戦略を調整できます。
runtime_filter_type変数は、有効にするRuntime Filterのタイプを指定するために使用されます。runtime_filter_typeのデフォルト値は'BLOOM_FILTER,RANGE,IN'であり、これは3種類のRuntime Filterを同時に有効にすることを意味します。runtime_filter_type=''の場合、どのタイプのRuntime Filterも有効にされません。例:
ALTER SYSTEM SET runtime_filter_type = 'BLOOM_FILTER,RANGE,IN'
一般的には、デフォルト値で指定されたRuntime Filterのタイプを使用するだけで十分であり、runtime_filter_type変数で特別に指定する必要はありません。OceanBaseデータベースは、最適化および実行段階で最適なRuntime Filterのタイプを選択してフィルタリングを行います。
runtime_filter_max_in_num変数は、In Filterで使用されるNDVを指定するために使用され、デフォルト値は1024です。オプティマイザはビルドテーブルのNDVを推定し、NDV > runtime_filter_max_in_numの場合、In Filterは割り当てられません。オプティマイザによるNDVの推定は必ずしも正確ではないため、実際のNDVが非常に大きいにもかかわらず推定値が小さいために誤ってIn Filterが割り当てられる可能性があります。このような場合、エグゼキュータは実行中に実際のNDVに基づいてIn Filterを適応的に無効にします。一般的に、この値を過度に大きく設定することは推奨されません。ビルドテーブルのNDVが非常に大きい場合、In Filterの最適化効果はBloom Filterほどではありません。
runtime_filter_wait_time_ms変数は、プローブ側がRuntime Filterの到着を待機する最大時間を設定するために使用され、デフォルト値は10msです。プローブ側は、Runtime Filterが到着した後にデータを吐き出します。runtime_filter_wait_time_msで設定された時間を超えてもRuntime Filterが到着しない場合、By Pass段階に入り、この時点でRuntime Filterを経由せずに直接データの吐き出しが開始されます。その後のある時点でRuntime Filterが到着した場合でも、Runtime Filterは引き続き有効になりフィルタリングが実行されます。一般的に、この値は調整する必要はありませんが、実際に使用されるBloom Filterのデータが非常に大きく、かつフィルタリング性能が良好な場合は、この値を適宜大きくすることができます。
runtime_bloom_filter_max_size変数は、Bloom Filterの最大長を制限するために使用され、デフォルト値は2048MBです。ユーザーが実際に使用する際にビルドテーブルのデータが多すぎる場合、Bloom Filterのデフォルトの最大長ではデータを収容できない場合があります。このような場合は、runtime_bloom_filter_max_sizeの値を大きくする必要があります。