OceanBaseデータベースはHash Join結合アルゴリズムをサポートしており、特定のフィールドに基づいて2つのテーブルを等価マッチングで結合することができます。しかし、結合に関与するテーブル、特にProbeテーブルのデータ量が大きい場合、Hash Joinのパフォーマンスは大幅に低下します。このような状況下では、Hash JoinはRuntime Filter(RF)を使用して効率を向上させることができます。
原理の概要
Runtime Filterは、Hash Joinのパフォーマンスを最適化するための技術であり、Hash Joinを使用する際に必要なProbeのデータ量を削減することで、クエリの効率を向上させます。例えば、スター型結合において複数の次元テーブルと実装テーブルをJoinするシナリオでは、Runtime Filterは非常に有効な最適化手法です。
Runtime Filterは実際にはフィルターであり、Hash JoinのBuildプロセスを利用して軽量なフィルターを構築し、そのフィルターをProbeに参加する大規模テーブルにブロードキャストします。Probeテーブルは複数のRuntime Filterを使用して、ストレージ層で事前にデータをフィルタリングすることができ、実際にHash Joinに参加するデータやネットワーク転送されるデータを削減することで、クエリの効率を向上させます。
Runtime Filterは3つの観点から分類することができます。機械間転送の必要性に基づいて、Runtime FilterはLocalとGlobalの2種類に分けられます。Runtime Filterのデータ構造に基づいて、Runtime FilterはBloom Filter、In Filter、Range Filterの3種類に分けられます。Local Runtime FilterであってもGlobal Runtime Filterであっても、これら3種類のデータ構造を持つRuntime Filterを使用することができます。Runtime Filterがフィルタリングするエンティティに基づいて、Runtime Filterは接続キーをフィルタリングするものとパーティションをフィルタリングするものの2種類に分けられます。前者は一般的なRuntime Filterであり、後者はPart Join Filterです。エンティティに基づく分類と機械間転送の必要性に基づく分類は直交しています。一般的なRuntime Filterは3種類のデータ構造を持つRuntime Filterをサポートしますが、Part Join Filterは現在Bloom Filterのみをサポートしています。
本記事では、具体的な例を用いてLocal Runtime Filter、Global Runtime Filter、Part Join Filterについて詳しく説明します。
ローカルランタイムフィルター
ローカルランタイムフィルターのランタイムフィルターはネットワーク転送を経由する必要がなく、構築されたフィルターはローカルノードでのみフィルタリング条件を計算します。ローカルランタイムフィルターは通常、Hash Join Probe側にShuffleがないシナリオに適用されます。
以下はローカルランタイムフィルター実行計画の例です。
obclient> CREATE TABLE tt1(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PX_JOIN_FILTER(tt2) PARALLEL(4) */ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------+
| ============================================================== |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| -------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |7 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10000|1 |6 | |
| |2 | └─PX PARTITION ITERATOR | |1 |6 | |
| |3 | └─HASH JOIN | |1 |6 | |
| |4 | ├─JOIN FILTER CREATE|:RF0000 |1 |3 | |
| |5 | │ └─TABLE FULL SCAN |tt2 |1 |3 | |
| |6 | └─JOIN FILTER USE |:RF0000 |1 |4 | |
| |7 | └─TABLE FULL SCAN |tt1 |1 |4 | |
| ============================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| dop=4 |
| 2 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| partition wise, force partition granule |
| 3 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| equal_conds ([tt1.v1 = tt2.v1]), other_conds(nil) |
| 4 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[tt2.v1] |
| 5 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| access([tt2.v1], [tt2.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
| 6 - output([tt1.v1], [tt1.v2]), filter(nil), rowset=256 |
| 7 - output([tt1.v1], [tt1.v2]), filter([RF_IN_FILTER(tt1.v1)], [RF_RANGE_FILTER(tt1.v1)], [RF_BLOOM_FILTER(tt1.v1)]), rowset=256 |
| access([tt1.v1], [tt1.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false], |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------+
32 rows in set
上記の例では、NAME フィールドの値が RF0000 である4番目の JOIN FILTER CREATE と6番目の JOIN FILTER USE 演算子が通常のランタイムフィルターです。これらはその計画内でローカルランタイムフィルターを構築し、このフィルターはネットワーク伝播を必要とせず、ローカルマシンでのみ使用されます。
グローバルランタイムフィルター
グローバルランタイムフィルターのランタイムフィルターは複数の実行ノードにブロードキャストで送信する必要があり、計画形態では必要に応じてRFを計画内の任意の位置までネストしてプッシュダウンし、フィルタリングを完了できます。ローカルランタイムフィルターと比較して、グローバルランタイムフィルターによるコスト削減は、SQL層での投影や計算オーバーヘッドだけでなく、ネットワーク転送も含まれるため、通常は実行性能が大幅に向上します。
以下はグローバルランタイムフィルターを使用した実行計画の例です。
obclient> CREATE TABLE tt1 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt3 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN BASIC SELECT /*+ LEADING(a (b c)) PARALLEL(3) USE_HASH(b) USE_HASH(c) PQ_DISTRIBUTE(c HASH HASH) PX_JOIN_FILTER(c a) PX_JOIN_FILTER(c b) */ COUNT(*) FROM tt1 a, tt2 b, tt3 c WHERE a.c1_rand=b.c1_rand AND a.c2_rand = c.c2_rand AND b.c3_rand = c.c3_rand;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ======================================================== |
| |ID|OPERATOR |NAME | |
| -------------------------------------------------------- |
| |0 |SCALAR GROUP BY | | |
| |1 |└─PX COORDINATOR | | |
| |2 | └─EXCHANGE OUT DISTR |:EX10003| |
| |3 | └─MERGE GROUP BY | | |
| |4 | └─SHARED HASH JOIN | | |
| |5 | ├─JOIN FILTER CREATE |:RF0001 | |
| |6 | │ └─EXCHANGE IN DISTR | | |
| |7 | │ └─EXCHANGE OUT DISTR (BC2HOST)|:EX10000| |
| |8 | │ └─PX BLOCK ITERATOR | | |
| |9 | │ └─TABLE FULL SCAN |a | |
| |10| └─HASH JOIN | | |
| |11| ├─JOIN FILTER CREATE |:RF0000 | |
| |12| │ └─EXCHANGE IN DISTR | | |
| |13| │ └─EXCHANGE OUT DISTR (HASH) |:EX10001| |
| |14| │ └─PX BLOCK ITERATOR | | |
| |15| │ └─TABLE FULL SCAN |b | |
| |16| └─EXCHANGE IN DISTR | | |
| |17| └─EXCHANGE OUT DISTR (HASH) |:EX10002| |
| |18| └─JOIN FILTER USE |:RF0000 | |
| |19| └─JOIN FILTER USE |:RF0001 | |
| |20| └─PX BLOCK ITERATOR | | |
| |21| └─TABLE FULL SCAN |c | |
| ======================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]) |
| 1 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| 2 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| dop=3 |
| 3 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT(*)]) |
| 4 - output(nil), filter(nil), rowset=256 |
| equal_conds([a.c1_rand = b.c1_rand], [a.c2_rand = c.c2_rand]), other_conds(nil) |
| 5 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[a.c2_rand] |
| 6 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| 7 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| dop=3 |
| 8 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| 9 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| access([a.c1_rand], [a.c2_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([a.__pk_increment]), range(MIN ; MAX)always true |
| 10 - output([b.c1_rand], [c.c2_rand]), filter(nil), rowset=256 |
| equal_conds([b.c3_rand = c.c3_rand]), other_conds(nil) |
| 11 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[b.c3_rand] |
| 12 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| 13 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| (#keys=1, [b.c3_rand]), dop=3 |
| 14 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| 15 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| access([b.c1_rand], [b.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([b.__pk_increment]), range(MIN ; MAX)always true |
| 16 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 17 - output([c.c3_rand], [c.c2_rand), filter(nil), rowset=256 |
| (#keys=1, [c.c3_rand]), dop=3 |
| 18 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 19 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 20 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 21 - output([c.c3_rand], [c.c2_rand]), filter([RF_IN_FILTER(c.c3_rand)], [RF_RANGE_FILTER(c.c3_rand)], [RF_BLOOM_FILTER(c.c3_rand)], [RF_IN_FILTER(c.c2_rand)], |
| [RF_RANGE_FILTER(c.c2_rand)], [RF_BLOOM_FILTER(c.c2_rand)]), rowset=256 |
| access([c.c2_rand], [c.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false,false,false,false], |
| range_key([c.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
70 rows in set
上記の例では、演算子5のNAMEフィールド値がRF0001に対応し、演算子11がRF000に対応しています。また、21番目のTABLE FULL SCAN演算子にプッシュダウンされたフィルターは、複数のDFOにまたがってフィルタリングを行っており、これはグローバルランタイムフィルターの特徴に合致しています。
Part Join Filter
Hash Joinの実行プロセスでは、左側のデータに基づいてハッシュテーブルを構築し、右側のデータと行ごとにマッチングします。左側でハッシュテーブルを構築する際には、左側のすべてのデータが取得されます。このプロセスで、もし左側のすべてのデータに関する右側の特定のテーブルのパーティション分布特性を事前に把握できれば、右側でそのテーブルのデータをスキャンする際に、統計済みのパーティション分布特性に基づいて不要なパーティションを事前にフィルタリングでき、パフォーマンスを向上させることができます。Part Join Filterの導入は、まさにこのようなシナリオを最適化するためのものです。Hash Joinの左側で右側の特定のテーブルの具体的なパーティションを計算するためには、Joinの結合キーには右側のそのテーブルのパーティションキーを含める必要があります。これがPart Join Filterを生成する前提条件です。
以下はPart Join Filter実行計画の例です。
obclient> CREATE TABLE tt1(v1 INT);
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PARALLEL(3) LEADING(tt1 tt2) PX_PART_JOIN_FILTER(tt2)*/ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+-------------------------------------------------------------------------------+
| Query Plan |
+-------------------------------------------------------------------------------+
| ======================================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ----------------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |5 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10001|1 |4 | |
| |2 | └─HASH JOIN | |1 |4 | |
| |3 | ├─PART JOIN FILTER CREATE |:RF0000 |1 |1 | |
| |4 | │ └─EXCHANGE IN DISTR | |1 |1 | |
| |5 | │ └─EXCHANGE OUT DISTR (PKEY)|:EX10000|1 |1 | |
| |6 | │ └─PX BLOCK ITERATOR | |1 |1 | |
| |7 | │ └─TABLE FULL SCAN |tt1 |1 |1 | |
| |8 | └─PX PARTITION HASH JOIN-FILTER|:RF0000 |1 |3 | |
| |9 | └─TABLE FULL SCAN |tt2 |1 |3 | |
| ======================================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| dop=3 |
| 2 - output([tt1.v1], [tt2.v1]), filter(nil), rowset=256 |
| equal_conds([tt1.v1 = tt2.v1]), other_conds(nil) |
| 3 - output([tt1.v1]), filter(nil), rowset=256 |
| RF_TYPE(bloom), RF_EXPR[t1.v1] |
| 4 - output([tt1.v1]), filter(nil), rowset=256 |
| 5 - output([tt1.v1]), filter(nil), rowset=256 |
| (#keys=1, [t1.v1]), dop=3 |
| 6 - output([tt1.v1]), filter(nil), rowset=256 |
| 7 - output([tt1.v1]), filter(nil), rowset=256 |
| access([tt1.v1]), partitions(p0) |
| is_index_back=false, is_global_index=false, |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
| 8 - output([tt2.v1]), filter(nil), rowset=256 |
| affinitize |
| 9 - output([tt2.v1]), filter(nil), rowset=256 |
| access([tt2.v1]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------------------+
上記の例では、計画内の3番目のPART JOIN FILTER CREATE演算子と8番目のPX PARTITION HASH JOIN-FILTER演算子がPart Join Filterです。3番目の演算子が対応するPart Join Filterは、8番目の演算子でtt2テーブルに対してパーティションレベルのフィルタリングを適用する際に使用されます。
Runtime Filterの手動での有効化と無効化
Runtime Filterの使用シナリオはHash Joinに限定されており、接続タイプがHash Join以外の場合、オプティマイザーはRuntime Filterを割り当てません。通常、Hash Joinを実行する際には、オプティマイザーが自動的にRuntime Filterを割り当てます。オプティマイザーがRuntime Filterを割り当てていない場合でも、ユーザーはHintを使用して手動で割り当てることができます。
PX_JOIN_FILTER HintとPX_PART_JOIN_FILTER Hintは、Runtime Filterを手動で有効にするために使用します。SQL構文は以下のとおりです:
/*+ PX_JOIN_FILTER(join_right_table_name)*/
/*+ PX_PART_JOIN_FILTER(join_right_table_name)*/
例:
EXPLAIN SELECT /*+ PX_JOIN_FILTER(tt2) PARALLEL(4) */ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
EXPLAIN SELECT /*+ PARALLEL(3) LEADING(tt1 tt2) PX_PART_JOIN_FILTER(tt2)*/ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
並列度が1のシナリオでは、Runtime Filterは割り当てられません。このような場合、ユーザーは並列度を少なくとも2に指定する必要があります。PARALLEL Hintを使用して設定できます。例:
/*+ PARALLEL(2) */
ただし、Hash Joinのフィルタリング性が不十分な場合、Runtime Filterを使用しても問題は解決せず、むしろわずかなパフォーマンス低下を引き起こす可能性があるため、手動でRuntime Filterを有効にする場合は、クエリシナリオを慎重に評価し、Runtime Filterが適用可能かどうかを判断する必要があります。
NO_PX_JOIN_FILTER HintとNO_PX_PART_JOIN_FILTER Hintは、Runtime Filterを手動で無効にするために使用します。SQL構文は以下のとおりです:
/*+ NO_PX_JOIN_FILTER(join_right_table_name)*/
/*+ NO_PX_PART_JOIN_FILTER(join_right_table_name)*/
Runtime Filterの実行戦略の調整
Runtime Filterは適応能力を備えています。デフォルト設定では、Joinが接続キーに対する選択率の条件を満たすと、In、Range、Bloom Filterの3種類のデータ構造を持つRuntime Filterがデフォルトで作成されます。In Filterは内部でHashテーブルを使用してフィルタリング判断を行い、Range Filterは内部で最大または最小値を使用してフィルタリング判断を行います。
フィルタリングの優先順位において、In Filterは最も正確なフィルタリング性能を持っているため、実行時にIn Filterが有効になると、他の2種類のFilterは適応的に無効になります。実行時には、エグゼキューターが実際のNDV値に基づいてIn Filterを使用するかどうかを決定します。さらに、各Filterは実際の計算において、リアルタイムのフィルタリング性能に応じて適応的に無効化または再有効化されます。
OceanBaseデータベースは、runtime_filter_type、runtime_filter_max_in_num、runtime_filter_wait_time_ms、runtime_bloom_filter_max_sizeを含む、Runtime Filterの実行に関する戦略を調整するためのシステム変数を提供しています。
runtime_filter_type変数は、有効にするRuntime Filterのタイプを指定するために使用されます。runtime_filter_typeのデフォルト値は'BLOOM_FILTER,RANGE,IN'であり、3種類のRuntime Filterを同時に有効にすることを意味します。runtime_filter_type=''の場合、いずれのタイプのRuntime Filterも有効にしません。例:
ALTER SYSTEM SET runtime_filter_type = 'BLOOM_FILTER,RANGE,IN'
通常、デフォルト値で指定されたRuntime Filterのタイプを使用するだけでよく、runtime_filter_type変数で特別に指定する必要はありません。OceanBaseデータベースは、最適化および実行段階で最適なRuntime Filterのタイプを選択してフィルタリングを行います。
runtime_filter_max_in_num変数は、In Filterが使用するNDVを指定するために使用され、デフォルト値は1024です。オプティマイザーはビルドテーブルのNDVを推定し、NDV > runtime_filter_max_in_numの場合はIn Filterを割り当てません。オプティマイザーによるNDVの推定は必ずしも正確ではないため、実際のNDVが高いにもかかわらず推定値が低く、誤ってIn Filterが割り当てられる可能性があります。このような場合、エグゼキューターは実行中に実際のNDVに応じてIn Filterを適応的に無効にします。通常、ユーザーはこの値を大きく設定することは推奨されません。ビルドテーブルのNDVが非常に高い場合、In Filterの最適化効果はBloom Filterほどではありません。
runtime_filter_wait_time_ms変数は、プローブ側がRuntime Filterの到着を待機する最大待機時間を設定するために使用され、デフォルト値は10msです。プローブ側はRuntime Filterが到着するのを待ってからデータを吐き出します。runtime_filter_wait_time_msで設定された時間までにRuntime Filterが到着しない場合、By Pass段階に入り、Runtime Filterを経由せずに直接データの吐き出しが始まります。その後のある時点でRuntime Filterが到着すると、Runtime Filterは有効になり、フィルタリングが実行されます。通常、この値は調整する必要はありません。実際に使用されるBloom Filterのデータが非常に大きく、かつフィルタリング性能が良好な場合は、適宜この値を大きくすることができます。
runtime_bloom_filter_max_size変数は、Bloom Filterの最大長を制限するために使用され、デフォルト値は2048MBです。ユーザーが実際に使用する際にビルドテーブルのデータが多すぎる場合、Bloom Filterのデフォルトの最大長ではデータを収容できない場合があります。このような場合は、runtime_bloom_filter_max_sizeの値を大きくする必要があります。