OceanBase logo

OceanBase

トランザクション処理、分析、AIワークロードに最適な分散データベース

プロダクト概要
デプロイを自由に

OceanBase Cloud

OceanBaseの導入とスケーリングを最適化

エンタープライズ版

自社インフラ上での運用・管理に対応

オープンソース版を試す

コミュニティ版

開発者向けオープンソース分散データベース

OceanBase seekdb

AIネイティブなオープンソースの検索データベース

顧客事例

さまざまな業界の企業による導入事例を紹介します。

さらに見る
利用シーン別

あらゆるシナリオに対応するOLTP

ハイブリッドクラウドソリューション

大容量ストレージデータベースのコスト削減

リアルタイム分析混合ワークロード

複数インスタンスの統合

ドキュメント

会社概要

OceanBaseの企業情報、パートナーシップ、そして信頼性・セキュリティへの取り組みについて紹介します。

OceanBaseについて

トラストセンター

法的情報

お問い合わせ

日本 - 日本語
International - English
中国站 - 简体中文
クラウドで始める

OceanBase

トランザクション処理、分析、AIワークロードに最適な分散データベース

プロダクト概要
デプロイを自由に

OceanBase Cloud

OceanBaseの導入とスケーリングを最適化

エンタープライズ版

自社インフラ上での運用・管理に対応

オープンソース版を試す

コミュニティ版

開発者向けオープンソース分散データベース

OceanBase seekdb

AIネイティブなオープンソースの検索データベース

顧客事例

さまざまな業界の企業による導入事例を紹介します。

さらに見る
利用シーン別

あらゆるシナリオに対応するOLTP

ハイブリッドクラウドソリューション

大容量ストレージデータベースのコスト削減

リアルタイム分析混合ワークロード

複数インスタンスの統合

OceanBaseの企業情報、パートナーシップ、そして信頼性・セキュリティへの取り組みについて紹介します。

OceanBaseについて

トラストセンター

法的情報

お問い合わせ

クラウドで始める
编组
すべての製品
    • データベース
    • アイコンOceanBaseデータベース
    • アイコンOceanBase Cloud
アイコン

OceanBaseデータベース

SQL - V4.4.2

    OceanBase ロゴ

    AI時代を支える分散データベース

    日本 - 日本語
    International - English
    中国站 - 简体中文
    プロダクト
    OceanBase Cloudエンタープライズ版コミュニティ版OceanBase seekdb
    会社概要
    OceanBaseについてトラストセンター法的情報お問い合わせ
    公式アカウント
    ConnpassXQiitaLumaGitHub

    © OceanBase 2026. All rights reserved

    クラウドサービス契約個人情報保護ポリシーセキュリティ
    お問い合わせ
    ドキュメントフィードバック
    1. ホーム
    2. OceanBaseデータベース
    3. SQL
    4. V4.4.2
    アイコンOceanBaseデータベース
    SQL - V 4.4.2
    データベース
    • OceanBaseデータベース
    • OceanBase Cloud
    SQL
    KV
    • V 4.4.2
    • V 4.3.5

    obcdc開発説明

    最終更新日:2026-06-15 02:31:32  更新
    シェア
    このページの内容
    obcdc動的ライブラリを使用した独自のOceanBaseデータ消費ツールの開発
    インポート
    ヘッダーファイル
    obcdcインスタンスの構築
    obcdcの使用
    obcdcインスタンスの破棄
    注意事項
    例
    付録
    ObCDCFactory
    IObCDCInstance

    折りたたみ

    シェア

    本記事では、コミュニティ版obcdcをご自身のデータ消費パイプラインに接続する方法について説明します。

    機能適用範囲

    この内容はコミュニティ版obcdcにのみ適用されます。

    obcdc動的ライブラリを使用した独自のOceanBaseデータ消費ツールの開発

    obcdcはC++で記述されており、コンパイルの出力は動的ライブラリです。下流の消費プログラムの開発時には、この動的ライブラリとヘッダーファイル(libobcdc.h、ob_errno.h)が必要です。

    インポート

    コミュニティ版obcdcはoceanbase-ce-libsパッケージに依存しています。ソフトウェアセンターからダウンロードしてインストールできます。

    rpm -ivh oceanbase-ce-libs-****.rpm
    

    インストール完了後、ldd ./libobcdc.so コマンドを実行し、ローカル環境に不足している動的ライブラリがないか確認します。不足している場合は、すべての依存ライブラリをローカルに配置し、libobcdcを使用するプログラムのLD_LIBRARY_PATHを設定して、obcdcが正常に接続できるようにします。例えば、oceanbase-ce-libs 内の libmariadb.so.3 などです。

    ヘッダーファイル

    libobcdc.h には詳細なインターフェース説明があります。コード内の説明を参照することができます。本記事の付録では、よく使われるインターフェースを簡潔にリストアップしています。

    obcdcインスタンスの構築

    ObCDCFactory::construct_obcdc() メソッドを使用してobcdcを構築できます。

    obcdcの使用

    • 初期化インターフェース:init/init_with_start_tstamp_usec インターフェースを使用して、obcdcに設定情報と起動タイムスタンプ情報を通知する必要があります。

      • 設定情報:設定ファイルのパスや設定ファイルのマップを指定できます。
      • 起動時間の指定(ログ取得の開始時間):時間単位は秒またはミリ秒です。
    • 起動インターフェース:launchインターフェースを使用して、obcdcに作業開始を通知します。

    • LogRecordの取得インターフェース:next_record インターフェースを使用して、obcdcからOceanBaseの増分データを連続的に取得します。このインターフェースでは、取得のタイムアウト時間や取得するテナントのデータを指定できます。データはLogRecord形式でカプセル化され、LogRecordのメモリはobcdcが割り当てます。

    • LogRecordの返却インターフェース:release_record インターフェースを使用して、消費が完了したLogRecordをobcdcに返却します。obcdcはバックグラウンドのGCスレッドによってメモリを回収します(非同期回収)。

    • 現在のobcdcサービスのすべてのテナントIDの取得:get_tenant_ids を使用して、現在のobcdcサービス内のすべてのテナントリストを取得します。

    obcdcインスタンスの破棄

    obcdcを破棄するには、まずobcdcインスタンスを停止し、その後obcdcを破棄します。

    • obcdcの停止:

      • stop インターフェースを使用して、obcdcに各モジュールの実行停止を通知します。
      • destroy インターフェースを使用して、obcdcの各モジュールをデストラクトし、関連リソースを解放します。
    • obcdcインスタンスの破棄:ObCDCFactory::deconstruct(IObCDCInstance *instance) を使用してobcdcインスタンスを破棄します。これ以降、最初のステップで取得したobcdcインスタンスのポインタにはアクセスできなくなります。

    注意事項

    • obcdcから取得するデータはすべて、obcdcプロセス内で割り当てられたメモリ上にあります。そのため、メモリが解放されないのを防ぐために、next_record と release_record のインターフェースを必ずペアで呼び出すようにしてください。複数回の next_record の後に対応するLogRecordに対して一括で release_record を呼び出すことは許可されています。

    • stopとlaunchを呼び出した後は、launch/initインターフェースを呼び出すことはできません。

    • obcdcインターフェースが返すすべてのエラーコードについて、必要に応じて適切に処理する必要があります:

      • エラーコード OB_SUCCESS が返された場合、データの取得に成功したことを意味し、返されたデータポインタはNULLではありません。
      • エラーコード OB_TIMEOUT が返された場合、現在obcdcでデータを取得できないことを意味します。このエラーコードを受信した後、データの取得を再試行できます。データの時刻がリアルタイムであれば問題ありませんが、データの時刻がリアルタイムでない場合は、obcdc内部に問題がある可能性があり、さらなる調査が必要です。
      • OB_IN_STOP_STATE が返された場合、obcdcが停止したことを意味します。これは、呼び出し側が意図的にobcdcのstop/destroyインターフェースをトリガーした場合も、obcdc内部で処理できない例外が発生し、各モジュールが作業停止をマークした場合もあります。このエラーコードを受信した場合は、セキュリティポイントなどの必要な情報を記録した後、プロセスを終了してください。
      • その他の種類のエラーコードが返された場合、それらは予期しないものです。セキュリティポイントなどの必要な情報を記録した後、プロセスを終了してください。

    例

    ここでは、obcdcの使用方法を示す簡単なデモを提供します。詳細については、obcdc_tailfおよびobcdc_tailf ソースコードを参照してください。

    注意

    このデモは開発の参考としてのみ提供されており、コンパイルや実行はできません。

    /**
     * Copyright (c) 2021 OceanBase
     * OceanBase CEはMulan PubL v2のライセンスに基づき提供されます。
     * 本ソフトウェアの使用は、Mulan PubL v2の利用規約に従って行ってください。
     * Mulan PubL v2のコピーは以下のURLから入手できます:
     *          http://license.coscl.org.cn/MulanPubL-2.0
     * 本ソフトウェアは、「現状のまま」提供されるものであり、いかなる種類の保証もありません。
     * EXPRESSまたはIMPLIEDを問わず、非侵害を含むがこれに限定されない、
     * 商品性または特定の目的への適合性。
     * 詳細については、Mulan PubL v2を参照してください。
     *
     * obcdc_demo
     */
    
    #include <iostream>
    #include "include/libobcdc/libobcdc.h"
    #include "include/libobcdc/ob_errno.h"
    
    using namespace std;
    using namespace oceanbase::libobcdc;
    using namespace oceanbase::common;
    
    typedef IBinlogRecord Record;
    
    #define LOG(msg) \
        do { \
          std::cout << msg << std::endl; \
        } while (0)
    
    int create_obcdc_instance(ObCDCFactory &cdc_factory, IObCDCInstance *&obcdc_instance)
    {
      int ret = OB_SUCCESS;
    
      if (NULL == (obcdc_instance = cdc_factory.construct_obcdc())) {
        ret = OB_NOT_INIT;
        LOG("[ERROR] construct_obcdc failed");
      }
    
      return ret;
    }
    
    void destroy_obcdc_instance(ObCDCFactory &cdc_factory, IObCDCInstance *obcdc_instance)
    {
      obcdc_instance->stop();
      cdc_factory.deconstruct(obcdc_instance);
    }
    
    int init_obcdc_instance(IObCDCInstance &obcdc_instance)
    {
      int ret = OB_SUCCESS;
      const char *config_path = "conf/libobcdc.conf";
    
      if (OB_SUCCESS != (ret = obcdc_instance.init(config_path, 0))) {
        LOG("obcdc_instance init failed");
      } else if (OB_SUCCESS != (ret = obcdc_instance.launch())) {
        LOG("obcdc_instance launch failed");
      }
    
      return ret;
    }
    
    int fetch_next_cdc_record(IObCDCInstance &obcdc_instance, Record *record)
    {
      int ret = OB_SUCCESS;
      const int64_t timeout = 10000; // usec
    
      if (OB_SUCCESS != (ret = obcdc_instance.next_record(&record, timeout))) {
        if (OB_TIMEOUT != ret) {
          LOG("[WARN] next_record failed");
        }
      } else if (NULL == record) {
        ret = OB_ERR_UNEXPECTED;
        LOG("invalid record");
      }
    
      return ret;
    }
    
    int release_cdc_record(IObCDCInstance &obcdc_instance, Record *record)
    {
      int ret = OB_SUCCESS;
    
      obcdc_instance.release_record(record);
    
      return ret;
    }
    int handle_cdc_record(Record *record)
    {
      int ret = OB_SUCCESS;
      return ret;
    }
    
    int main(int argc, char **argv)
    {
      int ret = OB_SUCCESS;
      ObCDCFactory cdc_factory;
      IObCDCInstance *obcdc_instance = NULL;
    
      if (OB_SUCCESS != create_obcdc_instance(cdc_factory, obcdc_instance)) {
        LOG("[ERROR] construct_obcdc_instance failed");
      } else if (NULL == obcdc_instance) {
        ret = OB_ERR_UNEXPECTED;
        LOG("[ERROR] obcdc_instance should not be null!");
      } else {
        if (OB_SUCCESS != init_obcdc_instance(*obcdc_instance)) {
          LOG("[ERROR] obcdc_instance init failed");
        } else {
          while(OB_SUCCESS == ret) {
            Record *record = NULL;
            if (OB_SUCCESS != (ret = fetch_next_cdc_record(*obcdc_instance, record))) {
              if (OB_TIMEOUT == ret) {
                ret = OB_SUCCESS;
              } else {
                LOG("[ERROR] fetch_next_cdc_record failed");
              }
            } else if (OB_SUCCESS != (ret = handle_cdc_record(record))) {
              LOG("[ERROR] handle_cdc_record failed");
            } else if (OB_SUCCESS != (ret = release_cdc_record(*obcdc_instance, record))) {
              LOG("[ERROR] release_cdc_record failed");
            }
          }
        }
    
        destroy_obcdc_instance(cdc_factory, obcdc_instance);
      }
    
      return 0;
    }
    

    付録

    この章では、obcdcヘッダーファイルの一部のコードを示します。

    ObCDCFactory

    ObCDCFactoryはobcdcインスタンスのファクトリであり、obcdcインスタンスの構築または破棄を担当します。

    注意

    1つのプロセスでは、1つのobcdcインスタンスしか構築できません。

    // libobcdc.h: ObCDCFactory
    namespace oceanbase{
    namespace liboblog{
    
    class ObCDCFactory
    {
    public:
      ObCDCFactory();
      ~ObCDCFactory();
    public:
      IObCDCInstance *construct_obcdc();
      void deconstruct(IObCDCInstance *log);
    };
    
    }
    }
    

    IObCDCInstance

    IObCDCInstanceはobcdcの外部インターフェースであり、初期化、起動、停止、破棄、データの取得、データの返却などの機能を提供します。ここでは、一般的に使用されるインターフェース定義をいくつかリストアップします。

    // libobcdc.h: IObCDCInstance
    namespace oceanbase{
    namespace liboblog{
    
    // IObCDCInstanceはliboblogが外部に提供するインターフェースです。
    // 注意:ここでは、インターフェースで渡す必要があるパラメータは省略しています。
    
    class IObCDCInstance
    {
    public:
      virtual ~IObCDCInstance() {};
    public:
      /*
       * init libobcdc
       * @param config_file       config file name
       * @param start_timestamp   start timestamp (by second)
       * @param err_cb            error callback function pointer
       */
      virtual int init(const char *config_file,
          const uint64_t start_timestamp_sec,
          ERROR_CALLBACK err_cb = NULL) = 0;
    
      /*
       * init libobcdc
       * @param configs         config by map
       * @param start_timestamp start timestamp (by second)
       * @param err_cb          error callback function pointer
       */
      virtual int init(const std::map<std::string, std::string> &configs,
          const uint64_t start_timestamp_sec,
          ERROR_CALLBACK err_cb = NULL) = 0;
    
      /*
       * init libobcdc
       * @param configs         config by map
       * @param start_timestamp start timestamp by microsecond
       * @param err_cb          error callback function pointer
       */
      virtual int init_with_start_tstamp_usec(const std::map<std::string, std::string> &configs,
          const uint64_t start_timestamp_usec,
          ERROR_CALLBACK err_cb = NULL) = 0;
    
      virtual void destroy() = 0;
    
      /*
       * fetch next binlog record from OB cluster
       * @param record           binlog record, memory allocated by oblog, support release_record(corresponding times) after mutli next_record
       * @param OB_SUCCESS       success
       * @param OB_TIMEOUT       timeout
       * @param other errorcode  fail
       */
      virtual int next_record(ICDCRecord **record, const int64_t timeout_us) = 0;
    
      /*
       * fetch next binlog record from OB cluster
       * @param [out] record        binlog record, memory allocated by oblog, support release_record(corresponding tiems) after mutli next_record
       * @param [out] major_version major version of ICDCRecord
       * @param [out] tenant_id     tenant id of ICDCRecord
       *
       * @param OB_SUCCESS          success
       * @param OB_TIMEOUT          timeout
       * @param other error code    fail
       */
    
      virtual int next_record(ICDCRecord **record,
          int32_t &major_version,
          uint64_t &tenant_id,
          const int64_t timeout_us) = 0;
    
      /*
       * release recorcd for EACH ICDCRecord
       * @param record
       */
      virtual void release_record(ICDCRecord *record) = 0;
    
      /*
       * Launch libobcdc
       * @retval OB_SUCCESS on success
       * @retval ! OB_SUCCESS on fail
       */
      virtual int launch() = 0;
    
      /*
       * Stop libobcdc
       */
    
      virtual void stop() = 0;
    
      /// get all serving tenant id list after oblog inited
      ///
      /// @param [out]            tenant_ids tenant ids that oblog serving
      ///
      /// @retval OB_SUCCESS      success
      /// @retval other value     fail
    
      virtual int get_tenant_ids(std::vector<uint64_t> &tenant_ids) = 0;
    };
    
    }
    }
    

    前のトピック

    obcdc構成パラメータの説明
    最後

    次のトピック

    obcdc_tailf
    次
    このページの内容
    obcdc動的ライブラリを使用した独自のOceanBaseデータ消費ツールの開発
    インポート
    ヘッダーファイル
    obcdcインスタンスの構築
    obcdcの使用
    obcdcインスタンスの破棄
    注意事項
    例
    付録
    ObCDCFactory
    IObCDCInstance