本記事では、コミュニティ版obcdcをご自身のデータ消費パイプラインに接続する方法について説明します。
機能適用範囲
この内容はコミュニティ版obcdcにのみ適用されます。
obcdc動的ライブラリを使用した独自のOceanBaseデータ消費ツールの開発
obcdcはC++で記述されており、コンパイルの出力は動的ライブラリです。下流の消費プログラムの開発時には、この動的ライブラリとヘッダーファイル(libobcdc.h、ob_errno.h)が必要です。
インポート
コミュニティ版obcdcはoceanbase-ce-libsパッケージに依存しています。ソフトウェアセンターからダウンロードしてインストールできます。
rpm -ivh oceanbase-ce-libs-****.rpm
インストール完了後、ldd ./libobcdc.so コマンドを実行し、ローカル環境に不足している動的ライブラリがないか確認します。不足している場合は、すべての依存ライブラリをローカルに配置し、libobcdcを使用するプログラムのLD_LIBRARY_PATHを設定して、obcdcが正常に接続できるようにします。例えば、oceanbase-ce-libs 内の libmariadb.so.3 などです。
ヘッダーファイル
libobcdc.h には詳細なインターフェース説明があります。コード内の説明を参照することができます。本記事の付録では、よく使われるインターフェースを簡潔にリストアップしています。
obcdcインスタンスの構築
ObCDCFactory::construct_obcdc() メソッドを使用してobcdcを構築できます。
obcdcの使用
初期化インターフェース:
init/init_with_start_tstamp_usecインターフェースを使用して、obcdcに設定情報と起動タイムスタンプ情報を通知する必要があります。- 設定情報:設定ファイルのパスや設定ファイルのマップを指定できます。
- 起動時間の指定(ログ取得の開始時間):時間単位は秒またはミリ秒です。
起動インターフェース:launchインターフェースを使用して、obcdcに作業開始を通知します。
LogRecordの取得インターフェース:
next_recordインターフェースを使用して、obcdcからOceanBaseの増分データを連続的に取得します。このインターフェースでは、取得のタイムアウト時間や取得するテナントのデータを指定できます。データはLogRecord形式でカプセル化され、LogRecordのメモリはobcdcが割り当てます。LogRecordの返却インターフェース:
release_recordインターフェースを使用して、消費が完了したLogRecordをobcdcに返却します。obcdcはバックグラウンドのGCスレッドによってメモリを回収します(非同期回収)。現在のobcdcサービスのすべてのテナントIDの取得:
get_tenant_idsを使用して、現在のobcdcサービス内のすべてのテナントリストを取得します。
obcdcインスタンスの破棄
obcdcを破棄するには、まずobcdcインスタンスを停止し、その後obcdcを破棄します。
obcdcの停止:
stopインターフェースを使用して、obcdcに各モジュールの実行停止を通知します。destroyインターフェースを使用して、obcdcの各モジュールをデストラクトし、関連リソースを解放します。
obcdcインスタンスの破棄:
ObCDCFactory::deconstruct(IObCDCInstance *instance)を使用してobcdcインスタンスを破棄します。これ以降、最初のステップで取得したobcdcインスタンスのポインタにはアクセスできなくなります。
注意事項
obcdcから取得するデータはすべて、obcdcプロセス内で割り当てられたメモリ上にあります。そのため、メモリが解放されないのを防ぐために、
next_recordとrelease_recordのインターフェースを必ずペアで呼び出すようにしてください。複数回のnext_recordの後に対応するLogRecordに対して一括でrelease_recordを呼び出すことは許可されています。stopとlaunchを呼び出した後は、launch/initインターフェースを呼び出すことはできません。
obcdcインターフェースが返すすべてのエラーコードについて、必要に応じて適切に処理する必要があります:
- エラーコード
OB_SUCCESSが返された場合、データの取得に成功したことを意味し、返されたデータポインタはNULLではありません。 - エラーコード
OB_TIMEOUTが返された場合、現在obcdcでデータを取得できないことを意味します。このエラーコードを受信した後、データの取得を再試行できます。データの時刻がリアルタイムであれば問題ありませんが、データの時刻がリアルタイムでない場合は、obcdc内部に問題がある可能性があり、さらなる調査が必要です。 OB_IN_STOP_STATEが返された場合、obcdcが停止したことを意味します。これは、呼び出し側が意図的にobcdcのstop/destroyインターフェースをトリガーした場合も、obcdc内部で処理できない例外が発生し、各モジュールが作業停止をマークした場合もあります。このエラーコードを受信した場合は、セキュリティポイントなどの必要な情報を記録した後、プロセスを終了してください。- その他の種類のエラーコードが返された場合、それらは予期しないものです。セキュリティポイントなどの必要な情報を記録した後、プロセスを終了してください。
- エラーコード
例
ここでは、obcdcの使用方法を示す簡単なデモを提供します。詳細については、obcdc_tailfおよびobcdc_tailf ソースコードを参照してください。
注意
このデモは開発の参考としてのみ提供されており、コンパイルや実行はできません。
/**
* Copyright (c) 2021 OceanBase
* OceanBase CEはMulan PubL v2のライセンスに基づき提供されます。
* 本ソフトウェアの使用は、Mulan PubL v2の利用規約に従って行ってください。
* Mulan PubL v2のコピーは以下のURLから入手できます:
* http://license.coscl.org.cn/MulanPubL-2.0
* 本ソフトウェアは、「現状のまま」提供されるものであり、いかなる種類の保証もありません。
* EXPRESSまたはIMPLIEDを問わず、非侵害を含むがこれに限定されない、
* 商品性または特定の目的への適合性。
* 詳細については、Mulan PubL v2を参照してください。
*
* obcdc_demo
*/
#include <iostream>
#include "include/libobcdc/libobcdc.h"
#include "include/libobcdc/ob_errno.h"
using namespace std;
using namespace oceanbase::libobcdc;
using namespace oceanbase::common;
typedef IBinlogRecord Record;
#define LOG(msg) \
do { \
std::cout << msg << std::endl; \
} while (0)
int create_obcdc_instance(ObCDCFactory &cdc_factory, IObCDCInstance *&obcdc_instance)
{
int ret = OB_SUCCESS;
if (NULL == (obcdc_instance = cdc_factory.construct_obcdc())) {
ret = OB_NOT_INIT;
LOG("[ERROR] construct_obcdc failed");
}
return ret;
}
void destroy_obcdc_instance(ObCDCFactory &cdc_factory, IObCDCInstance *obcdc_instance)
{
obcdc_instance->stop();
cdc_factory.deconstruct(obcdc_instance);
}
int init_obcdc_instance(IObCDCInstance &obcdc_instance)
{
int ret = OB_SUCCESS;
const char *config_path = "conf/libobcdc.conf";
if (OB_SUCCESS != (ret = obcdc_instance.init(config_path, 0))) {
LOG("obcdc_instance init failed");
} else if (OB_SUCCESS != (ret = obcdc_instance.launch())) {
LOG("obcdc_instance launch failed");
}
return ret;
}
int fetch_next_cdc_record(IObCDCInstance &obcdc_instance, Record *record)
{
int ret = OB_SUCCESS;
const int64_t timeout = 10000; // usec
if (OB_SUCCESS != (ret = obcdc_instance.next_record(&record, timeout))) {
if (OB_TIMEOUT != ret) {
LOG("[WARN] next_record failed");
}
} else if (NULL == record) {
ret = OB_ERR_UNEXPECTED;
LOG("invalid record");
}
return ret;
}
int release_cdc_record(IObCDCInstance &obcdc_instance, Record *record)
{
int ret = OB_SUCCESS;
obcdc_instance.release_record(record);
return ret;
}
int handle_cdc_record(Record *record)
{
int ret = OB_SUCCESS;
return ret;
}
int main(int argc, char **argv)
{
int ret = OB_SUCCESS;
ObCDCFactory cdc_factory;
IObCDCInstance *obcdc_instance = NULL;
if (OB_SUCCESS != create_obcdc_instance(cdc_factory, obcdc_instance)) {
LOG("[ERROR] construct_obcdc_instance failed");
} else if (NULL == obcdc_instance) {
ret = OB_ERR_UNEXPECTED;
LOG("[ERROR] obcdc_instance should not be null!");
} else {
if (OB_SUCCESS != init_obcdc_instance(*obcdc_instance)) {
LOG("[ERROR] obcdc_instance init failed");
} else {
while(OB_SUCCESS == ret) {
Record *record = NULL;
if (OB_SUCCESS != (ret = fetch_next_cdc_record(*obcdc_instance, record))) {
if (OB_TIMEOUT == ret) {
ret = OB_SUCCESS;
} else {
LOG("[ERROR] fetch_next_cdc_record failed");
}
} else if (OB_SUCCESS != (ret = handle_cdc_record(record))) {
LOG("[ERROR] handle_cdc_record failed");
} else if (OB_SUCCESS != (ret = release_cdc_record(*obcdc_instance, record))) {
LOG("[ERROR] release_cdc_record failed");
}
}
}
destroy_obcdc_instance(cdc_factory, obcdc_instance);
}
return 0;
}
付録
この章では、obcdcヘッダーファイルの一部のコードを示します。
ObCDCFactory
ObCDCFactoryはobcdcインスタンスのファクトリであり、obcdcインスタンスの構築または破棄を担当します。
注意
1つのプロセスでは、1つのobcdcインスタンスしか構築できません。
// libobcdc.h: ObCDCFactory
namespace oceanbase{
namespace liboblog{
class ObCDCFactory
{
public:
ObCDCFactory();
~ObCDCFactory();
public:
IObCDCInstance *construct_obcdc();
void deconstruct(IObCDCInstance *log);
};
}
}
IObCDCInstance
IObCDCInstanceはobcdcの外部インターフェースであり、初期化、起動、停止、破棄、データの取得、データの返却などの機能を提供します。ここでは、一般的に使用されるインターフェース定義をいくつかリストアップします。
// libobcdc.h: IObCDCInstance
namespace oceanbase{
namespace liboblog{
// IObCDCInstanceはliboblogが外部に提供するインターフェースです。
// 注意:ここでは、インターフェースで渡す必要があるパラメータは省略しています。
class IObCDCInstance
{
public:
virtual ~IObCDCInstance() {};
public:
/*
* init libobcdc
* @param config_file config file name
* @param start_timestamp start timestamp (by second)
* @param err_cb error callback function pointer
*/
virtual int init(const char *config_file,
const uint64_t start_timestamp_sec,
ERROR_CALLBACK err_cb = NULL) = 0;
/*
* init libobcdc
* @param configs config by map
* @param start_timestamp start timestamp (by second)
* @param err_cb error callback function pointer
*/
virtual int init(const std::map<std::string, std::string> &configs,
const uint64_t start_timestamp_sec,
ERROR_CALLBACK err_cb = NULL) = 0;
/*
* init libobcdc
* @param configs config by map
* @param start_timestamp start timestamp by microsecond
* @param err_cb error callback function pointer
*/
virtual int init_with_start_tstamp_usec(const std::map<std::string, std::string> &configs,
const uint64_t start_timestamp_usec,
ERROR_CALLBACK err_cb = NULL) = 0;
virtual void destroy() = 0;
/*
* fetch next binlog record from OB cluster
* @param record binlog record, memory allocated by oblog, support release_record(corresponding times) after mutli next_record
* @param OB_SUCCESS success
* @param OB_TIMEOUT timeout
* @param other errorcode fail
*/
virtual int next_record(ICDCRecord **record, const int64_t timeout_us) = 0;
/*
* fetch next binlog record from OB cluster
* @param [out] record binlog record, memory allocated by oblog, support release_record(corresponding tiems) after mutli next_record
* @param [out] major_version major version of ICDCRecord
* @param [out] tenant_id tenant id of ICDCRecord
*
* @param OB_SUCCESS success
* @param OB_TIMEOUT timeout
* @param other error code fail
*/
virtual int next_record(ICDCRecord **record,
int32_t &major_version,
uint64_t &tenant_id,
const int64_t timeout_us) = 0;
/*
* release recorcd for EACH ICDCRecord
* @param record
*/
virtual void release_record(ICDCRecord *record) = 0;
/*
* Launch libobcdc
* @retval OB_SUCCESS on success
* @retval ! OB_SUCCESS on fail
*/
virtual int launch() = 0;
/*
* Stop libobcdc
*/
virtual void stop() = 0;
/// get all serving tenant id list after oblog inited
///
/// @param [out] tenant_ids tenant ids that oblog serving
///
/// @retval OB_SUCCESS success
/// @retval other value fail
virtual int get_tenant_ids(std::vector<uint64_t> &tenant_ids) = 0;
};
}
}