分散実行の概要
OceanBaseデータベースは、Shared-Nothingアーキテクチャに基づく分散システムとして構築されており、分散実行計画の生成と実行機能を備えています。
リレーショナルデータテーブルのデータはパーティション単位でシステム内の各ノードに格納されるため、パーティションをまたがるデータクエリリクエストでは、実行計画が複数のノードのデータを操作できる必要があります。OceanBaseデータベースのオプティマイザーは、クエリとデータの物理的な分布に基づいて自動的に分散実行計画を生成します。分散実行計画において、パーティショニングはクエリ性能を向上させることができます。データベースのリレーショナルテーブルが比較的小さい場合は、パーティショニングを行う必要はありません。リレーショナルテーブルが大きい場合は、上位ビジネス要件に基づいて慎重にパーティションキーを選択し、ほとんどのクエリがパーティションキーによるパーティションプルーニングを利用できるようにすることで、データアクセス量を削減する必要があります。
同時に、関連性のあるテーブルについては、関連キーをパーティションキーとして使用し、同じパーティショニング方式を採用することを推奨します。また、テーブルグループを使用して同一のパーティション構成を同じノードに配置することで、ノード間のデータやり取りを削減できます。
パラレルクエリの概要
パラレルクエリとは、クエリプランを並列実行することで、各クエリプランのCPUおよびI/O処理能力を向上させ、単一クエリの応答時間を短縮する技術です。この技術は、分散実行プランだけでなく、ローカルクエリプランにも適用できます。
単一クエリがアクセスするデータが同一ノード上にない場合、データ再分散により関連データを同じノードに配分して計算を行います。OceanBaseデータベースでは、データ再分散のたびにノードを上下の境界とし、実行プランを垂直方向に複数のDFO(Data Flow Operation)に分割します。各DFOは指定された並列度のタスクに分割され、並列実行によって実行効率が向上します。
一般的に、並列度が高くなるほどクエリの応答時間は短縮され、より多くのCPU、I/O、メモリリソースがクエリコマンドの実行に割り当てられます。大規模データ量のクエリ処理をサポートするDSS(Decision Support Systems)システムやデータウェアハウス型アプリケーションでは、クエリ時間の向上が特に顕著です。
全体として、パラレルクエリの基本的な考え方は分散実行プランと類似しています。つまり、実行プランを分解した後、その各部分を複数の実行スレッドが実行し、一定のスケジューリング手法によって、実行プラン内のDFO間およびDFO内部での並列実行を実現します。
システムが以下の条件を満たす場合、パラレルクエリはシステムの処理性能を効果的に向上させることができます:
十分なI/O帯域幅
システムのCPU負荷が低いこと
十分なメモリリソース
システムが追加の並列処理に必要な十分なリソースを持たない場合、パラレルクエリの使用や並列度の向上は実行性能を向上させることはできません。逆に、システムが過負荷状態の場合、オペレーティングシステムはより多くのスケジューリングを余儀なくされ、例えば実行コンテキストスイッチングにより性能が低下する可能性があります。
通常、DSSシステムでは大量のデータへのアクセスが必要となるため、パラレル実行によって実行応答時間が向上します。一方、単純なDML操作やデータ量が比較的小さいクエリにおいては、パラレルクエリを使用してもクエリ応答時間を顕著に短縮することはできません。
並列クエリと分散クエリの原理
OceanBaseデータベースのデータはシャード形式で各ノードに保存され、ノード間はギガビットまたは10ギガビットネットワークで通信します。通常、各ノードにはobserverと呼ばれるプロセスがデプロイされ、これがOceanBaseデータベースの外部サービスを提供する主体となります。以下の図に示すように。

OceanBaseデータベースは一定のバランス戦略に基づき、データシャードを複数のobserverプロセス間で均等に分散します。そのため、並列クエリでは一般的に複数のobserverプロセスに同時にアクセスする必要があります。以下の図に示すように。

SQL文の並列実行プロセス
ユーザーが指定したSQL文でアクセスするデータが2台以上のOBServerノードにまたがっている場合、並列実行が有効になります。ユーザーが接続しているそのOBServerノードは、クエリコーディネーター(QC、Query Coordinator)の役割を担います。実行手順は以下のとおりです:
QCは十分なスレッドリソースを予約します。
QCは並列処理が必要な計画を複数のサブ計画、すなわちDFO(Data Flow Operation)に分割します。各DFOには、直列実行される演算子が複数含まれます。例えば、あるDFOにはパーティションのスキャン、集計、送信の演算子のタスクが含まれ、別のDFOには収集や集計の演算子などのタスクが含まれます。
QCは一定の論理的順序に従って、DFOを適切なOBServerノードにスケジュールして実行します。OBServerノード上では一時的に補助コーディネーター(SQC、Sub Query Coordinator)が起動し、SQCはそのOBServerノード上で各DFOのために実行リソースの申請や実行コンテキスト環境の構築などを担当し、その後DFOを起動して各OBServerノード上で並列実行を開始します。
各DFOの実行が完了すると、QCは残りの部分の計算を直列実行します。例えば、並列実行された
COUNTアルゴリズムは最終的に、QCが各マシン上の計算結果に対してSUM演算を行う必要があります。QCが存在するスレッドは、結果をクライアントに返します。
オプティマイザーは、どのような並列計画を生成するかを決定し、QCはその計画を具体的に実行します。例えば、2つのパーティションテーブルのJOINでは、オプティマイザーはルールとコスト情報に基づき、分散型のPARTITION WISE JOIN計画を生成することも、HASH HASHで分散された分散JOIN計画を生成することもあります。計画が一度確定すると、QCは計画を複数のDFOに分割し、順序立ててスケジュールして実行します。QCの実行手順は以下の図のとおりです。

パラレル度とタスク分割方法
パラレル度(DOP、Degree Of Parallelism)は、1つのDFOを実行するためにいくつのスレッド(Worker)を使用するかを指定することができます。現在、OceanBaseデータベースではPARALLELヒントを使用してパラレル度を指定します。パラレル度が決定すると、DOPはDFOを実行する必要がある複数のOBServerノードに分割されます。
スキャンを含むDFOについては、DFOがアクセスする必要があるパーティションと、それらのパーティションがどのOBServerノード上に分散しているかを計算し、その後DOPを対応するOBServerに比例して割り当てます。例えば、DOPが6、DFOが120個のパーティションにアクセスする必要があり、そのうちserver1に60個のパーティション、server2に40個のパーティション、server3に20個のパーティションがある場合、server1に3つのスレッド、server2に2つのスレッド、server3に1つのスレッドを割り当て、結果として各スレッドが平均して20個のパーティションを処理できるようにします。DOPとパーティション数が整数で割り切れない場合、OceanBaseデータベースは一定の調整を行い、長尾をできるだけ短くすることを目指します。
各マシンに割り当てられるWorker数がパーティション数を大幅に上回る場合、自動的にパーティション内の並列処理が行われます。各パーティションはマクロブロックを境界として複数のスキャンタスクに分割され、複数のWorkerが実行を競合します。
このような分割能力を抽象化・カプセル化するために、Granuleの概念が導入されました。各スキャンタスクは1つのGranuleと呼ばれ、このスキャンタスクは1つのパーティションをスキャンする場合もあれば、パーティション内のごく小さな範囲をスキャンする場合もあります。以下の図のとおりです。

パラレルスケジューリング手法
オプティマイザーがパラレル計画を生成すると、QCはそれを複数のDFOに分割します。下図のように、t1 テーブルと t2 テーブルをHASH JOINする場合、3つのDFOに分割されます。DFO 1とDFO 2はデータの並列スキャンを担当し、データを対応するノードにHASHします。DFO 3はHASH JOINを実行し、最終的なHASH結果をQCに集約します。

QCは可能な限り2組のスレッドを使用して計画のスケジューリングを完了します。上記の例でのスケジューリングプロセスは以下の通りです:
QCはまずDFO 1とDFO 3をスケジュールします。DFO 1が実行を開始するとすぐにデータスキャンを開始し、DFO 3にデータを送ります。
DFO 3が実行を開始すると、最初はHASH JOINでHash Tableを作成するステップでブロックされます。つまり、DFO 1からのデータ収集が完了し、Hash Tableが作成されるまで待機します。その後、DFO 3は右側のDFO 2からデータを収集します。この時点ではDFO 2はまだスケジュールされていないため、DFO 3はデータ受信プロセスで待機します。DFO 1はデータをすべてDFO 3に送信したら、スレッドリソースを解放して終了できます。
スケジューラーがDFO 1のスレッドリソースを回収すると、直ちにDFO 2をスケジュールします。
DFO 2が実行を開始すると、データをDFO 3に送信し始めます。DFO 3はDFO 2からデータを1行受信するごとにHash Tableで照合し、ヒットした場合は直ちにQCに出力します。QCは結果をクライアントに出力します。
ネットワーク通信方式
関連するChild DFOとParent DFOのペアについて、Child DFOはProducerとしてM個のWorkerスレッドを割り当てられ、Parent DFOはConsumerとしてN個のWorkerスレッドを割り当てられます。両者間のデータ転送には、M × N個のネットワークチャネルが必要です。下図のように。

このネットワーク通信形態をよりよく理解するために、データ転送層DTL(Data Transfer Layer)の概念を導入します。これは、任意の2点間の通信接続をチャネル(Channel)の概念で記述するものです。
チャネルは送信側と受信側に分かれており、初期の実装では送信側が受信側に無制限にデータを送信できるようにしていました。しかし、受信側がこれらのデータを即座に消費できない場合、受信側のメモリが溢れる可能性があることが判明しました。そのため、ストリーム制御ロジックが追加されました。各チャネルの受信側には3つのスロットが予約されており、スロットがデータで満杯になると、送信側にデータ送信の一時停止を通知します。受信側でデータが消費されて空きスロットが出現すると、送信側に送信再開を通知します。