パラレル実行とは、SQLクエリタスクを複数のサブタスクに分割し、それらを複数のプロセッサ上で同時に実行することで、クエリタスク全体の実行効率を向上させる最適化戦略です。
現代のコンピュータシステムでは、マルチコアプロセッサやマルチスレッド、高速なネットワーク接続が広く採用されているため、パラレル実行は効率的なクエリ技術となっています。この技術により、計算集約型の大規模クエリの応答時間が大幅に短縮され、オフラインデータウェアハウス、リアルタイムレポート、オンラインビッグデータ分析などの業務シナリオで広く利用されています。また、一括データロードや高速なインデックステーブルの構築などの分野でもその強力な応用能力を発揮しています。
パラレル実行は、以下のSQLクエリシナリオの性能を顕著に向上させます:
- 大規模テーブルのスキャン、大規模テーブルの結合、大量データのソートおよび集計処理。
- 主キーの変更、列型の変更、インデックスの作成など、大規模テーブルに対するDDL操作の実行。
- 既存の大量データからテーブルを作成する(
Create Table As Select使用)。 - データの一括挿入、削除、更新操作の実行。
適用シナリオ
パラレル実行は、オフラインデータウェアハウス、リアルタイムレポート、オンラインのビッグデータ分析などの分析系システムに適用されるだけでなく、OLTP分野でも一定の役割を果たし、DDL操作やデータバッチ処理の高速化に利用できます。
パラレル実行は、複数のCPUおよびI/Oリソースを最大限に活用することで、SQL実行時間の短縮を目指します。以下の条件が満たされている場合、パラレル実行はシリアル実行よりも優れています:
- データアクセス量が多い
- SQLクエリの同時実行度が低い
- レイテンシー要件が低い
- 十分なハードウェアリソースを備えている
パラレル実行は、複数のプロセッサが協働して同一タスクを並行処理するため、以下のシステム環境ではパフォーマンスが向上します:
- マルチプロセッサシステム(SMP)およびクラスタ
- I/O帯域幅が十分
- 余裕のあるメモリ(ソートやHashテーブル作成などのメモリ集約型操作に使用)
- システム負荷が適度、またはピークとトレンドの特徴がある(例:システム負荷は通常30%以下に保たれる)
システムが上記の特徴を満たしていない場合、パラレル実行はパフォーマンスを大幅に改善できない可能性があります。高負荷、メモリ容量が小さい、またはI/O能力が弱いシステムでは、パラレル実行の効果が悪くなることさえあります。
パラレル実行には特別なハードウェア要件はありませんが、注意すべき点として、CPUコア数、メモリサイズ、ストレージI/O性能、ネットワーク帯域幅がパラレル実行のパフォーマンスに影響を与えることが挙げられます。これらのうちいずれかがボトルネックとなると、全体のパフォーマンスに影響を及ぼす可能性があります。
動作原理
パラレル実行は、1つのSQLクエリタスクを複数のサブタスクに分割し、これらのサブタスクを複数のプロセッサにスケジューリングして実行することで、効率的な並行実行を実現します。
一度SQLクエリがパラレル実行計画に解析されると、その実行プロセスは以下の手順に従って進みます:
- SQLメインスレッド(SQLの受信および解析を担当するスレッド)は、計画の形式に基づいて、パラレル実行に必要なスレッドリソースを事前に割り当てます。これらのスレッドリソースは、複数のマシン上のクラスタに関連する場合があります。
- SQLメインスレッドは、並列スケジューリング演算子(PX Coordinator)を有効にします。
- 並列スケジューリング演算子は計画を解析し、複数の操作ステップに分割し、ボトムアップの順序でこれらの操作をスケジューリングします。各操作は、最大限にパラレル実行できるよう設計されています。
- すべての操作のパラレル実行が完了すると、並列スケジューリング演算子は計算結果を受け取り、上位の演算子(例えばAggregate演算子)に渡します。これにより、残りの並列化できない計算(例えば最終的なSUM計算)をシリアル処理で完了します。
パラレルの粒度
パラレルデータスキャンの基本的な作業単位は、Granuleと呼ばれます。
OceanBaseデータベースでは、テーブルスキャンタスクを複数のGranuleに分割し、各Granuleはスキャンタスクの範囲を記述します。Granuleはテーブルパーティションをまたがらないため、各スキャンタスクは必ず1つのパーティション内に位置します。 Granuleに基づく粒度特性により、以下の2種類に分類できます:
Partition Granule
Partition Granuleが記述する範囲は完全なパーティションであり、スキャンタスクが対象とするパーティション数に応じて、同じくらいの数のPartition Granuleが生成されます。ここでのパーティションは、主テーブルのパーティションでもインデックスのパーティションでも構いません。Partition Granuleは特にPartition Wise Joinでよく使用され、2つのテーブルの対応するパーティションがそれぞれPartition Granuleによって処理されることを保証します。
Block Granule
Block Granuleが記述する範囲は、パーティション内の連続したデータです。データスキャンのシナリオでは、通常Block Granuleを用いてデータを分割します。各パーティションは複数のBlockに分割され、これらのBlockは一定のルールに従って接続され、パラレルワーカースレッドが消費するためのタスクキューを形成します。
与えられた並列度の下で、スキャンタスクの均衡を保つために、オプティマイザはデータをPartition GranuleまたはBlock Granuleに分割するかどうかを自動的に選択します。Block Granuleが選択された場合、パラレル実行フレームワークは実行時にBlockの分割を動的に決定します。全体的な原則として、1つのBlockが過大または過小にならないようにします。Blockが過大になるとデータの偏りが生じ、一部のスレッドが十分に動作しなくなる可能性があります。一方、Blockが過小だと頻繁なスキャン切り替えのオーバーヘッドが発生します。
一度パーティション粒度での分割が完了すると、各粒度に対応して1つのスキャンタスクが割り当てられます。Table Scan演算子はこれらのスキャンタスクを順番に処理し、すべてのタスクが完了するまで個別に完了させます。
パラレル実行モデル
プロデューサー・コンシューマーパイプラインモデル
パラレル実行では、プロデューサー・コンシューマーモデルを用いたパイプライン実行が採用されます。
並列スケジューリング演算子が計画を解析した後、その計画は複数の操作ステップに分割されます。各操作ステップはDFO(Data Flow Operation)と呼ばれます。
通常、並列スケジューリング演算子は同時に2つのDFOを起動し、これら2つのDFOはプロデューサー・コンシューマー方式で接続され、DFO間のパラレル実行が形成されます。各DFOは一連のスレッドを使用して実行され、これをDFO内のパラレル実行と呼びます。また、DFOが使用するスレッド数をDOP(Degree Of Parallisim)と呼びます。
前段階のコンシューマーDFOは、次の段階でプロデューサーDFOに変わります。並列スケジューリング演算子の調整のもと、コンシューマーDFOとプロデューサーDFOは同時に起動します。以下の図は、プロデューサー・コンシューマーモデルにおけるDFOの処理フローを示しています。
- DFO Aが生成したデータは即座にDFO Bに転送され、計算が行われます。
- DFO Bの計算完了後、データは現在のスレッドに格納され、上位のDFO Cの起動を待機します。
- DFO BがDFO Cの起動完了通知を受信すると、自身の役割をプロデューサーに変更し、DFO Cへのデータ転送を開始します。DFO Cはデータを受信後、計算を開始します。
以下のクエリ例では、SELECT ステートメントの実行計画はまず game テーブルに対してフルスキャンを行い、次に team テーブルに基づいてグループ化と合計を求め、最終的に各 team テーブルの総得点を計算します。
CREATE TABLE game (round INT PRIMARY KEY, team VARCHAR(10), score INT)
PARTITION BY HASH(round) PARTITIONS 3;
INSERT INTO game VALUES (1, "CN", 4), (2, "CN", 5), (3, "JP", 3);
INSERT INTO game VALUES (4, "CN", 4), (5, "US", 4), (6, "JP", 4);
SELECT /*+ PARALLEL(3) */ team, SUM(score) TOTAL FROM game GROUP BY team;
obclient> EXPLAIN SELECT /*+ PARALLEL(3) */ team, SUM(score) TOTAL FROM game GROUP BY team;
obclient> EXPLAIN SELECT /*+ PARALLEL(3) */ team, SUM(score) TOTAL FROM game GROUP BY team;
+---------------------------------------------------------------------------------------------------------+
| Query Plan |
+---------------------------------------------------------------------------------------------------------+
| ================================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ----------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |4 | |
| |1 | EXCHANGE OUT DISTR |:EX10001|1 |4 | |
| |2 | HASH GROUP BY | |1 |4 | |
| |3 | EXCHANGE IN DISTR | |3 |3 | |
| |4 | EXCHANGE OUT DISTR (HASH)|:EX10000|3 |3 | |
| |5 | HASH GROUP BY | |3 |2 | |
| |6 | PX BLOCK ITERATOR | |1 |2 | |
| |7 | TABLE SCAN |game |1 |2 | |
| ================================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(game.team, T_FUN_SUM(T_FUN_SUM(game.score)))]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(game.team, T_FUN_SUM(T_FUN_SUM(game.score)))]), filter(nil), rowset=256 |
| dop=3 |
| 2 - output([game.team], [T_FUN_SUM(T_FUN_SUM(game.score))]), filter(nil), rowset=256 |
| group([game.team]), agg_func([T_FUN_SUM(T_FUN_SUM(game.score))]) |
| 3 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256 |
| 4 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256 |
| (#keys=1, [game.team]), dop=3 |
| 5 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256 |
| group([game.team]), agg_func([T_FUN_SUM(game.score)]) |
| 6 - output([game.team], [game.score]), filter(nil), rowset=256 |
| 7 - output([game.team], [game.score]), filter(nil), rowset=256 |
| access([game.team], [game.score]), partitions(p[0-2]) |
| is_index_back=false, is_global_index=false, |
| range_key([game.round]), range(MIN ; MAX)always true |
+---------------------------------------------------------------------------------------------------------+
29 rows in set
このクエリでは実際に6つのスレッドが使用され、実行フローとタスクの割り当ては以下のとおりです:
- 第1ステップ:最初の3つのスレッドが
gameテーブルのスキャンを担当し、各スレッド内でgame.teamデータに対して事前集計を行います。 - 第2ステップ:残りの3つのスレッドが事前集計されたデータに対して最終集計を行います。
- 第3ステップ:最終集計結果は並列スケジューラーに渡され、それがクライアントに返します。
第1ステップのデータを第2ステップに送る際には、game.team フィールドをハッシュして、どのスレッドに事前集計データを送るかを決定する必要があります。
プロデューサーとコンシューマー間のデータ配布方式
データ配布方式とは、一連のパラレル実行ワーカースレッド(プロデューサー)がデータを別のワーカースレッド(コンシューマー)に送信する際に使用する方法を指します。オプティマイザーは、最適なパフォーマンスを達成するために、一連の最適化戦略を用いて最適なデータ再分散方式を選択します。
一般的なパラレル実行におけるデータ配布方式には以下が含まれます:
Hash Distribution
Hash Distributionを使用する場合、プロデューサーはDistribution Keyに基づいてデータ行に対してハッシュ計算を行い、その結果をモジュロ演算することで、データをどのコンシューマーのワーカースレッドに送信するかを決定します。通常、Hash Distributionは複数のコンシューマースレッドにデータを均等に配布できます。
Pkey Distribution
Pkey Distributionを使用する場合、プロデューサーはデータ行に対応するターゲットテーブルのパーティションを計算し、その後、そのパーティションを処理するコンシューマースレッドに行データを送信します。Pkey Distributionは、Partitial Partitions Wise Joinシナリオでよく使用されます。この場合、コンシューマ側のデータは再分散する必要がなく、直接プロデューサー側のデータとPartition Wise Joinを実行できるため、ネットワーク通信量を削減し、パフォーマンスを向上させることができます。
Pkey Hash Distribution
Pkey Hash Distributionを使用する場合、プロデューサーはまずデータ行に対応するターゲットテーブルのパーティションを計算し、次にDistribution Keyに基づいてデータ行に対してハッシュ計算を行い、それを処理するコンシューマースレッドを決定します。
Pkey Hash Distributionは、Parallel DMLシナリオでよく使用されます。このシナリオでは、あるパーティションが同時に複数のスレッドによって並行して更新される可能性があるため、Pkey Hash Distributionを使用して、同一値のデータ行を同一スレッドが処理し、異なる値のデータ行を可能な限り均等に複数のスレッドに配布する必要があります。
Broadcast Distribution
Broadcast Distributionを使用する場合、プロデューサーは各データ行をコンシューマー側の各スレッドに送信し、各コンシューマースレッドがプロデューサー側の全データを保有するようにします。Broadcast Distributionは、小規模なテーブルデータをJoinを実行するすべてのノードに複製し、その後ローカルでJoin操作を実行してネットワーク通信量を削減するためによく使用されます。
Broadcast to Host Distribution(BC2HOST)
Broadcast to Host Distributionを使用する場合、プロデューサーは各データ行をコンシューマー側の各ノードに送信し、各コンシューマーノードがプロデューサー側の全データを保有するようにします。その後、ノード内のコンシューマースレッドが協力してこのデータを処理します。
Broadcast to Host Distributionは、
NESTED LOOP JOINやSHARED HASH JOINのシナリオでよく使用されます。NESTED LOOP JOINのシナリオでは、各コンシューマースレッドが共有データから一部のデータを取得し、これをドライブデータとして使用してターゲットテーブル上でJoin操作を実行します。SHARED HASH JOINのシナリオでは、各コンシューマースレッドが共有データに基づいて共通のHashテーブルを構築することで、各スレッドが独立して同一のHashテーブルを構築することによる不要なオーバーヘッドを回避します。Range Distribution
Range Distributionを使用する場合、プロデューサーはRange範囲に基づいてデータを分割し、異なるコンシューマースレッドが異なる範囲のデータを処理するようにします。Range Distributionはソートシナリオでよく使用され、各コンシューマースレッドは割り当てられたデータのみをソートするだけで、グローバル範囲での順序付けを維持できます。
Random Distribution
Random Distributionを使用する場合、プロデューサーはデータをランダムに分散した後、コンシューマースレッドに送信します。これにより、各コンシューマースレッドが処理するデータ量がほぼ均等になり、ロードバランシングが実現されます。Random Distributionは、マルチスレッド並列の
UNION ALLシナリオでよく使用されます。このシナリオでは、データの分散とロードバランシングのみが必要であり、データ間に他の関連制約はありません。Hybrid Hash Distribution
Hybrid Hash Distributionは、適応型のJoinアルゴリズムに使用されます。収集された統計情報を組み合わせて、OceanBaseデータベースは通常値と高頻度値を定義するための一連の構成パラメータを提供します。Hybrid Hash Distribution方式では、Joinの両側の通常値にはHash配布を、左側の高頻度値にはBroadcast配布を、右側の高頻度値にはRandom配布を使用します。
プロデューサーとコンシューマー間のデータ転送メカニズム
同一時刻に、パラレルスケジューリングオペレータによって開始された2つのDFO間では、プロデューサーとコンシューマーの方式でパラレル実行が接続されます。プロデューサーとコンシューマー間でデータを転送するためには、転送ネットワークを作成する必要があります。
例えば、プロデューサーDFOがDOP=2でデータスキャンを行い、コンシューマーDFOがDOP=3でデータ集計計算を行う場合、各プロデューサースレッドはコンシューマースレッドに接続するために3つの仮想リンクを作成し、合計で6つの仮想リンクが作成されます。
作成された仮想転送ネットワークは、データ転送層(Data Transfer Layer、略称:DTL)と呼ばれます。OceanBaseデータベースのパラレル実行フレームワークでは、すべての制御メッセージと行データはDTLを通じて転送されます。各ワーカースレッドは数千もの仮想リンクを確立できるため、非常に高いスケーラビリティを備えています。さらに、DTLにはデータバッファリング、一括データ送信、自動トラフィック制御などの機能も備わっています。
DTLの両端が同一ノード上にある場合、DTLはメモリコピー方式でメッセージを転送します。一方、DTLの両端が異なるノード上にある場合、DTLはネットワーク通信方式でメッセージを転送します。
ワーカースレッド
パラレルクエリでは、主に2種類のスレッドがあります:1つのメインスレッドと複数のパラレルワーカースレッドです。メインスレッドは通常のTPクエリと同じスレッドプールを使用し、パラレルワーカースレッドは専用のスレッドプールから供給されます。
OceanBaseデータベースは、パラレルワーカースレッドを割り当てるために専用のスレッドプールモデルを採用しています。各テナントは、所属する各ノード上に専用のパラレル実行スレッドプールを持ち、すべてのパラレルクエリワーカースレッドはこのスレッドプールから割り当てられます。
パラレルスケジューリング演算子は、各DFOをスケジューリングする前に、スレッドプールからスレッドリソースを要求します。DFOの実行が完了すると、スレッドリソースは即座に解放されます。
スレッドプールの初期サイズは0で、必要に応じて動的に増加し、上限はありません。余分なアイドルスレッドを避けるために、スレッドプールには自動回収メカニズムが導入されています。任意のスレッドに対して:
- アイドル時間が10分を超え、かつスレッドプール内の残存スレッド数が8個を超える場合、そのスレッドは回収されて破棄されます。
- アイドル時間が60分を超える場合、無条件で破棄されます。
スレッドプールのサイズに上限はありませんが、以下の2つの仕組みにより、ほとんどの場合実質的な上限が形成されています:
- パラレル実行開始前に、Admissionモジュールを通じてスレッドリソースを予約しなければなりません。予約に成功した場合にのみ実行に投入されます。この仕組みにより、同時実行クエリの数を制限できます。Admissionモジュールの詳細については、並行制御とキューイングを参照してください。
- スレッドプールからスレッドを申請する際、1回の申請で割り当てられるスレッド数はNを超えません。ここで、NはテナントUnitのMIN_CPUにpx_workers_per_cpu_quotaを乗じた値です。これを超える場合でも、最大でN個のスレッドしか割り当てられません。px_workers_per_cpu_quotaはテナントレベルの構成パラメータで、デフォルト値は10です。例えば、あるDFOのDOPが100で、Aノードから30個のスレッド、Bノードから70個のスレッドを申請する必要がある場合、UnitのMIN_CPUが4、px_workers_per_cpu_quotaが10であれば、N = 4 * 10 = 40となります。最終的に、このDFOはAノードから実際に30個のスレッド、Bノードから実際に40個のスレッドを申請し、実際のDOPは70となります。
バランスロードによるパフォーマンスの最適化
最適なパフォーマンスを実現するためには、すべてのワーカースレッドに割り当てられるタスクができるだけ均等である必要があります。
SQLがBlock Granuleでタスクを分割する場合、ワークタスクは動的にワーカースレッド間で割り当てられます。これにより、特定のワーカースレッドの作業負荷が他のワーカースレッドを著しく上回るというワークロードの不均衡問題を最小限に抑えることができます。
SQLがPartition Granuleでタスクを分割する場合、タスク数をワーカースレッド数の整数倍にすることでパフォーマンスを最適化できます。これは、Partition Wise Joinや並列DMLシナリオに非常に有用です。
テーブルに16個のパーティションがあり、各パーティションのデータ量がほぼ均等であると仮定します。この場合、16個のワーカースレッド(DOPは16)を使用すると、作業を完了するまでの時間は約16分の1に短縮されます。また、5個のワーカースレッドを使用すると、作業時間は5分の1に短縮されます。さらに、2個のスレッドを使用すると、作業時間は半分に短縮されます。しかし、15個のスレッドを使用して16個のパーティションを処理する場合、最初のスレッドが1つのパーティションの処理を終えると、すぐに16番目のパーティションの処理を開始します。他のスレッドは作業を終えると、アイドル状態になります。各パーティションのデータ量が近い場合、この構成はパフォーマンスを低下させます。一方、各パーティションのデータ量に差異がある場合、実際のパフォーマンスは状況によって異なります。
同様に、6個のスレッドを使用して16個のパーティションを処理し、各パーティションのデータ量がほぼ均等であると仮定します。
この場合、各スレッドは最初のパーティションの処理を終えると次のパーティションを処理しますが、3番目のパーティションを処理するのは4個のスレッドのみで、残りの2個のスレッドはアイドル状態になります。
一般的に、与えられたパーティション数(N)とワーカースレッド数(P)で並列操作を実行するのに必要な時間が、NをPで割った値に等しいと仮定することはできません。この公式では、一部のスレッドが他のスレッドが最後のパーティションの処理を終えるのを待つ必要がある可能性が考慮されていません。しかし、適切なDOPを選択することで、ワークロードの不均衡問題を最小限に抑え、パフォーマンスを最適化できます。
適用外れのシナリオ
パラレル実行は通常、以下のシナリオでは推奨されません:
システム内の典型的なSQLの実行時間がすべてミリ秒単位である場合。
パラレルクエリ自体にミリ秒単位のスケジューリングオーバーヘッドがあるため、短いクエリに対しては、パラレル実行によるメリットがスケジューリングオーバーヘッドによって相殺されてしまう可能性があります。
システム負荷が非常に高い場合。
パラレル実行の設計目標は、システムの余剰リソースを最大限に活用することです。システム自体に余剰リソースがない場合、パラレル実行は追加のメリットをもたらすことができず、むしろシステム全体のパフォーマンスに影響を与える可能性があります。
直列実行は、単一のスレッドを使用してデータベース操作を実行します。以下の場合、直列実行の方がパラレル実行よりも優れています:
- クエリがアクセスするデータ量が非常に少ない場合。
- 高同時実行性。
- クエリの実行時間が100ミリ秒未満の場合。
以下の場合は、部分的にパラレル実行が不可能です:
- 最上位のDFOはパラレル化不要であり、クライアントとのインタラクションや、
LIMIT、PX COORDINATORなど、最上位でパラレル化不要な部分の操作を実行します。 TABLEUDFを含む場合、そのUDFを含むDFOは直列実行のみ可能であり、残りの部分は引き続きパラレル化可能です。- OLTPシステムにおける一般的な
SELECTおよびDMLステートメントについては、パラレル実行が適用されない可能性があります。