本記事では、コミュニティ版obcdcを自社のデータ消費パイプラインに接続する方法について説明します。
適用対象
この内容はコミュニティ版obcdcにのみ適用されます。
obcdc動的ライブラリを使用した独自のOceanBaseデータコンシューマツールの開発
obcdcはC++で記述されており、コンパイル出力は動的ライブラリとなります。ダウンストリームのコンシューマープログラムを開発する際には、動的ライブラリとヘッダーファイル(libobcdc.h、ob_errno.h)への依存が必要です。
インポート
コミュニティ版のobcdcはoceanbase-ce-libsパッケージに依存しており、ソフトウェアセンターからダウンロードおよびインストールが可能です。
rpm -ivh oceanbase-ce-libs-****.rpm
インストール後、ldd ./libobcdc.so コマンドを実行して、ローカルに必要な動的ライブラリが不足していないか確認します。不足している場合は、すべての依存ライブラリがローカルにあることを確認し、libobcdcを使用するプログラムに対してLD_LIBRARY_PATHを設定して、obcdcが正常に接続できるようにしてください。例えば、oceanbase-ce-libs内のlibmariadb.so.3などです。
ヘッダーファイル
libobcdc.hには詳細なインターフェースの説明が記載されています。コード内のインターフェースの説明を参照できます。本記事の補遺では、一般的なインターフェースを簡潔にリストアップしています。
obcdcインスタンスの構築
ObCDCFactory::construct_obcdc()メソッドを使用してobcdcを構築できます。
obcdcの使用
初期化インターフェース:
init/init_with_start_tstamp_usecインターフェースを通じて、obcdcに設定情報と起動タイムスタンプ情報を通知する必要があります。- 設定情報:設定ファイルのパスや設定ファイルのmapを渡すことができます。
- 起動時間(ログ取得の開始時間)の指定:時間単位は秒でもミリ秒でも構いません。
スタートインターフェース:launchインターフェースを通じて、obcdcに動作を開始するよう通知します。
LogRecordの取得インターフェース:
next_recordインターフェースを通じて、継続的にobcdcからOceanBaseの増分データを取得します。このインターフェースでは取得タイムアウト時間を指定でき、テナントのデータも指定可能です。データはLogRecord形式でエンカプセルされ、LogRecordのメモリはobcdcが割り当てます。LogRecordの返却インターフェース:
release_recordインターフェースを通じて、消費が完了したLogRecordをobcdcに返却します。obcdcはバックグラウンドのGCスレッドでメモリを回収します(非同期回収)。現在のobcdcサービスのすべてのテナントIDの取得:
get_tenant_idsを使用して、現在のobcdcサービス内のすべてのテナントリストを取得します。
obcdcインスタンスの破棄
obcdcを破棄するには、まずobcdcインスタンスを停止し、その後にobcdcを破棄する必要があります。
obcdcの停止:
stopインターフェースを通じて、obcdcに各モジュールの動作を停止するよう通知します。destroyインターフェースを通じて、obcdcの各モジュールをデストラクトし、関連リソースを解放します。
obcdcインスタンスの破棄:
ObCDCFactory::deconstruct(IObCDCInstance *instance)を使用してobcdcインスタンスを破棄します。これ以降、最初のステップで取得したobcdcインスタンスのポインタへのアクセスはできません。
注意事項
obcdcから取得したデータはすべて、obcdcプロセス内で割り当てられたメモリ上に存在するため、必ず
next_recordとrelease_recordインターフェースをペアで呼び出すようにしてください。そうしないとメモリが解放されない可能性があります。複数回のnext_record呼び出し後に、対応するLogRecordに対して一度にrelease_recordを呼び出すことも可能です。stopやlaunchを呼び出した後は、再びlaunch/initインターフェースを呼び出すことはできません。
obcdcインターフェースが返すすべてのエラーコードについては、必要に応じて適切に対処する必要があります:
OB_SUCCESSというエラーコードが返された場合、データ取得に成功したことを意味し、返されたデータポインタは必ずNULLではありません。OB_TIMEOUTというエラーコードが返された場合、現在obcdcではデータを取得できない状況です。このエラーコードを受け取った場合は、再試行してデータを取得することができます。データポイントがリアルタイムで表示されている場合は問題ありませんが、データポイントがリアルタイムで表示されない場合は、obcdc内部に問題がある可能性があり、さらに調査が必要です。OB_IN_STOP_STATEが返された場合、obcdcが停止したことを意味します。これは呼び出し元がobcdcのstop/destroyインターフェースを意図的に呼び出した可能性もありますし、obcdc内部で処理できない例外が発生した可能性もあります。各モジュールは停止状態をマークします。このエラーコードを受け取った場合は、安全なポイントなどの必要な情報を記録した上でプロセスを終了することができます。- 他のタイプのエラーコードを受け取った場合は、予期しないエラーであるため、安全なポイントなどの必要な情報を記録した上でプロセスを終了することができます。
例
ここでは、obcdcを使用する簡単なデモを紹介します。詳細については、obcdc_tailfおよびobcdc_tailfソースコードを参照してください。
注意
このデモは開発のためのガイドのみを提供しており、実行やコンパイルには使用できません。
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*
* obcdc_demo
*/
#include <iostream>
#include "include/libobcdc/libobcdc.h"
#include "include/libobcdc/ob_errno.h"
using namespace std;
using namespace oceanbase::libobcdc;
using namespace oceanbase::common;
typedef IBinlogRecord Record;
#define LOG(msg) \
do { \
std::cout << msg << std::endl; \
} while (0)
int create_obcdc_instance(ObCDCFactory &cdc_factory, IObCDCInstance *&obcdc_instance)
{
int ret = OB_SUCCESS;
if (NULL == (obcdc_instance = cdc_factory.construct_obcdc())) {
ret = OB_NOT_INIT;
LOG("[ERROR] construct_obcdc failed");
}
return ret;
}
void destroy_obcdc_instance(ObCDCFactory &cdc_factory, IObCDCInstance *obcdc_instance)
{
obcdc_instance->stop();
cdc_factory.deconstruct(obcdc_instance);
}
int init_obcdc_instance(IObCDCInstance &obcdc_instance)
{
int ret = OB_SUCCESS;
const char *config_path = "conf/libobcdc.conf";
if (OB_SUCCESS != (ret = obcdc_instance.init(config_path, 0))) {
LOG("obcdc_instance init failed");
} else if (OB_SUCCESS != (ret = obcdc_instance.launch())) {
LOG("obcdc_instance launch failed");
}
return ret;
}
int fetch_next_cdc_record(IObCDCInstance &obcdc_instance, Record *record)
{
int ret = OB_SUCCESS;
const int64_t timeout = 10000; // usec
if (OB_SUCCESS != (ret = obcdc_instance.next_record(&record, timeout))) {
if (OB_TIMEOUT != ret) {
LOG("[WARN] next_record failed");
}
} else if (NULL == record) {
ret = OB_ERR_UNEXPECTED;
LOG("invalid record");
}
return ret;
}
int release_cdc_record(IObCDCInstance &obcdc_instance, Record *record)
{
int ret = OB_SUCCESS;
obcdc_instance.release_record(record);
return ret;
}
int handle_cdc_record(Record *record)
{
int ret = OB_SUCCESS;
return ret;
}
int main(int argc, char **argv)
{
int ret = OB_SUCCESS;
ObCDCFactory cdc_factory;
IObCDCInstance *obcdc_instance = NULL;
if (OB_SUCCESS != create_obcdc_instance(cdc_factory, obcdc_instance)) {
LOG("[ERROR] construct_obcdc_instance failed");
} else if (NULL == obcdc_instance) {
ret = OB_ERR_UNEXPECTED;
LOG("[ERROR] obcdc_instance should not be null!");
} else {
if (OB_SUCCESS != init_obcdc_instance(*obcdc_instance)) {
LOG("[ERROR] obcdc_instance init failed");
} else {
while(OB_SUCCESS == ret) {
Record *record = NULL;
if (OB_SUCCESS != (ret = fetch_next_cdc_record(*obcdc_instance, record))) {
if (OB_TIMEOUT == ret) {
ret = OB_SUCCESS;
} else {
LOG("[ERROR] fetch_next_cdc_record failed");
}
} else if (OB_SUCCESS != (ret = handle_cdc_record(record))) {
LOG("[ERROR] handle_cdc_record failed");
} else if (OB_SUCCESS != (ret = release_cdc_record(*obcdc_instance, record))) {
LOG("[ERROR] release_cdc_record failed");
}
}
}
destroy_obcdc_instance(cdc_factory, obcdc_instance);
}
return 0;
}
補遺
この章では、一部のobcdcヘッダーファイルコードを示します。
ObCDCFactory
ObCDCFactoryは、obcdcインスタンスの構築または破棄を担当するobcdcインスタンスファクトリです。
注意
1つのプロセスは1つのobcdcインスタンスのみを構築できます。
// libobcdc.h: ObCDCFactory
namespace oceanbase{
namespace liboblog{
class ObCDCFactory
{
public:
ObCDCFactory();
~ObCDCFactory();
public:
IObCDCInstance *construct_obcdc();
void deconstruct(IObCDCInstance *log);
};
}
}
IObCDCInstance
IObCDCInstanceはobcdcの外部インターフェースであり、初期化、起動、停止、破棄、データ取得、データ返却などの機能を提供します。ここでは、よく使用されるいくつかのインターフェース定義がリストされています。
// libobcdc.h: IObCDCInstance
namespace oceanbase{
namespace liboblog{
// IObCDCInstanceはliboblogが外部に提供するインターフェースです。
// 注意:ここではインターフェースに渡すパラメータを省略しています。
class IObCDCInstance
{
public:
virtual ~IObCDCInstance() {};
public:
/*
* libobcdcの初期化
* @param config_file 設定ファイル名
* @param start_timestamp 開始タイムスタンプ(秒単位)
* @param err_cb エラー通知用コールバック関数ポインタ
*/
virtual int init(const char *config_file,
const uint64_t start_timestamp_sec,
ERROR_CALLBACK err_cb = NULL) = 0;
/*
* libobcdcの初期化
* @param configs マップ形式の設定
* @param start_timestamp 開始タイムスタンプ(秒単位)
* @param err_cb エラー通知用コールバック関数ポインタ
*/
virtual int init(const std::map<std::string, std::string> &configs,
const uint64_t start_timestamp_sec,
ERROR_CALLBACK err_cb = NULL) = 0;
/*
* libobcdcの初期化
* @param configs マップ形式の設定
* @param start_timestamp 開始タイムスタンプ(マイクロ秒単位)
* @param err_cb エラー通知用コールバック関数ポインタ
*/
virtual int init_with_start_tstamp_usec(const std::map<std::string, std::string> &configs,
const uint64_t start_timestamp_usec,
ERROR_CALLBACK err_cb = NULL) = 0;
virtual void destroy() = 0;
/*
* OBクラスタから次のバイナリログレコードをフェッチ
* @param record バイナリログレコード、oblogによってメモリが確保され、複数のnext_record後にrelease_record(対応する回数)をサポート
* @param OB_SUCCESS 成功
* @param OB_TIMEOUT タイムアウト
* @param 他のエラーコード 失敗
*/
virtual int next_record(ICDCRecord **record, const int64_t timeout_us) = 0;
/*
* OBクラスタから次のバイナリログレコードをフェッチ
* @param [out] record バイナリログレコード、oblogによってメモリが確保され、複数のnext_record後にrelease_record(対応するtiems)をサポート
* @param [out] major_version ICDCRecordのメジャーバージョン
* @param [out] tenant_id ICDCRecordのテナントID
*
* @param OB_SUCCESS 成功
* @param OB_TIMEOUT タイムアウト
* @param 他のエラーコード 失敗
*/
virtual int next_record(ICDCRecord **record,
int32_t &major_version,
uint64_t &tenant_id,
const int64_t timeout_us) = 0;
/*
* 各ICDCRecordごとのレコードの解放
* @param record
*/
virtual void release_record(ICDCRecord *record) = 0;
/*
* libobcdcの起動
* @retval OB_SUCCESS 成功した場合
* @retval ! OB_SUCCESS 失敗した場合
*/
virtual int launch() = 0;
/*
* libobcdcの停止
*/
virtual void stop() = 0;
/// oblogが初期化された後のすべてのサービス中のテナントIDリストを取得
///
/// @param [out] tenant_ids oblogがサービス中のテナントID
///
/// @retval OB_SUCCESS 成功
/// @retval その他の値 失敗
virtual int get_tenant_ids(std::vector<uint64_t> &tenant_ids) = 0;
};
}
}