OceanBase logo

OceanBase

トランザクション処理、分析、AIワークロードに最適な分散データベース

プロダクト概要
デプロイを自由に

OceanBase Cloud

OceanBaseの導入とスケーリングを最適化

エンタープライズ版

自社インフラ上での運用・管理に対応

オープンソース版を試す

コミュニティ版

開発者向けオープンソース分散データベース

OceanBase seekdb

AIネイティブなオープンソースの検索データベース

顧客事例

さまざまな業界の企業による導入事例を紹介します。

さらに見る
利用シーン別

あらゆるシナリオに対応するOLTP

ハイブリッドクラウドソリューション

大容量ストレージデータベースのコスト削減

リアルタイム分析混合ワークロード

複数インスタンスの統合

ドキュメント

会社概要

OceanBaseの企業情報、パートナーシップ、そして信頼性・セキュリティへの取り組みについて紹介します。

OceanBaseについて

トラストセンター

法的情報

お問い合わせ

日本 - 日本語
International - English
中国站 - 简体中文
クラウドで始める

OceanBase

トランザクション処理、分析、AIワークロードに最適な分散データベース

プロダクト概要
デプロイを自由に

OceanBase Cloud

OceanBaseの導入とスケーリングを最適化

エンタープライズ版

自社インフラ上での運用・管理に対応

オープンソース版を試す

コミュニティ版

開発者向けオープンソース分散データベース

OceanBase seekdb

AIネイティブなオープンソースの検索データベース

顧客事例

さまざまな業界の企業による導入事例を紹介します。

さらに見る
利用シーン別

あらゆるシナリオに対応するOLTP

ハイブリッドクラウドソリューション

大容量ストレージデータベースのコスト削減

リアルタイム分析混合ワークロード

複数インスタンスの統合

OceanBaseの企業情報、パートナーシップ、そして信頼性・セキュリティへの取り組みについて紹介します。

OceanBaseについて

トラストセンター

法的情報

お問い合わせ

クラウドで始める
编组
すべての製品
    • データベース
    • アイコンOceanBaseデータベース
    • アイコンOceanBase Cloud
アイコン

OceanBaseデータベース

SQL - V4.4.2

    OceanBase ロゴ

    AI時代を支える分散データベース

    日本 - 日本語
    International - English
    中国站 - 简体中文
    プロダクト
    OceanBase Cloudエンタープライズ版コミュニティ版OceanBase seekdb
    会社概要
    OceanBaseについてトラストセンター法的情報お問い合わせ
    公式アカウント
    ConnpassXQiitaLumaGitHub

    © OceanBase 2026. All rights reserved

    クラウドサービス契約個人情報保護ポリシーセキュリティ
    お問い合わせ
    ドキュメントフィードバック
    1. ホーム
    2. OceanBaseデータベース
    3. SQL
    4. V4.4.2
    アイコンOceanBaseデータベース
    SQL - V 4.4.2
    データベース
    • OceanBaseデータベース
    • OceanBase Cloud
    SQL
    KV
    • V 4.4.2
    • V 4.3.5

    Flink CDCを使用してMySQLデータベースからOceanBaseデータベースにデータを同期する

    最終更新日:2026-06-15 02:31:30  更新
    シェア
    このページの内容
    Flink CDC環境の準備
    データの準備
    MySQLデータベースのデータを準備する
    OceanBaseデータベースのデータを準備する
    FlinkクラスタとFlink SQL CLIの起動
    チェックポイントの設定
    MySQL CDCテーブルの作成
    OceanBase CDCテーブルの作成
    Flink SQL CLIを使用してOceanBaseデータベースにデータを書き込む
    関連データのOceanBaseデータベースへの書き込み状況を確認する
    データの更新状況を確認する

    折りたたみ

    シェア

    Flink CDC (CDC Connectors for Apache Flink)は、Apache Flinkの一連のSourceコネクタであり、ほとんどのデータベースから既存の履歴データと増分変更データをリアルタイムで読み取ることをサポートします。Flink CDCは、データベースのフル量および増分データをメッセージキューやデータウェアハウスに同期できます。また、Flink CDCはリアルタイムデータ統合にも利用でき、データベースデータをデータレイクやデータウェアハウスにリアルタイムでインポートすることができます。さらに、Flink CDCはデータ加工もサポートしており、SQLクライアントを通じてデータベースデータに対するリアルタイムの関連付け、ワイドニング、集計を行い、結果をさまざまなストレージに書き込むことができます。CDC(Change Data Capture、変更データキャプチャ)は、データベースの変更を監視し、捕捉するのに役立ちます。CDCが提供するデータは、履歴データベースの作成、準リアルタイムキャッシュの構築、メッセージキュー(MQ)への提供(ユーザーがMQを消費して分析や監査を行うため)など、多くの用途に活用できます。

    以下では、Flink CDCを使用してMySQLデータベースからOceanBaseデータベースにデータを同期する方法について説明します。

    Flink CDC環境の準備

    Flinkと必要な依存パッケージをダウンロードします:

    1. ダウンロードURLからFlinkをダウンロードします。このドキュメントでは、Flink 1.15.3を使用しており、それを/FLINK_HOME/flink-1.15.3ディレクトリに解凍します。

    2. 以下にリストされている依存パッケージをダウンロードし、/FLINK_HOME/flink-1.15.3/lib/ディレクトリに配置します。

      • flink-sql-connector-mysql-cdc-2.1.1.jar

      • flink-connector-jdbc-1.15.3.jar

      • mysql-connector-java-5.1.47.jar

    データの準備

    MySQLデータベースのデータを準備する

    MySQLデータベースにテストデータを準備し、OceanBaseデータベースへのインポート元データとします。

    1. MySQLデータベースにアクセスします。

      [xxx@xxx /...]
      $mysql -hxxx.xxx.xxx.xxx -P3306 -uroot -p******
      <Omit echo information>
      
      MySQL [(none)]>
      
    2. データベース test_mysql_to_ob、テーブル tbl1 および tbl2 を作成し、データを挿入します。

      MySQL [(none)]> CREATE DATABASE test_mysql_to_ob;
      Query OK, 1 row affected
      
      MySQL [(none)]> USE test_mysql_to_ob;
      Database changed
      MySQL [test_mysql_to_ob]> CREATE TABLE tbl1(col1 INT PRIMARY KEY, col2 VARCHAR(20),col3 INT);
      Query OK, 0 rows affected
      
      MySQL [test_mysql_to_ob]> INSERT INTO tbl1 VALUES(1,'China',86),(2,'Taiwan',886),(3,'Hong Kong',852),(4,'Macao',853),(5,'North Korea',850);
      Query OK, 5 rows affected
      Records: 5  Duplicates: 0  Warnings: 0
      
      MySQL [test_mysql_to_ob]> CREATE TABLE tbl2(col1 INT PRIMARY KEY,col2 VARCHAR(20));
      Query OK, 0 rows affected
      
      MySQL [test_mysql_to_ob]> INSERT INTO tbl2 VALUES(86,'+86'),(886,'+886'),(852,'+852'),(853,'+853'),(850,'+850');
      Query OK, 5 rows affected
      Records: 5  Duplicates: 0  Warnings: 0
      

    OceanBaseデータベースのデータを準備する

    OceanBaseデータベースに、ソースデータを格納するテーブルを作成します。

    1. OceanBaseデータベースにログインします。

      user001 ユーザーでクラスタの mysql001 テナントにログインします。

      [xxx@xxx /...]
      $obclient -h10.10.10.2 -P2881 -uuser001@mysql001 -p -A
      Enter password:
      Welcome to the OceanBase.  Commands end with ; or \g.
      Your OceanBase connection id is 3221536981
      Server version: OceanBase 4.0.0.0 (r100000302022111120-7cef93737c5cd03331b5f29130c6e80ac950d33b) (Built Nov 11 2022 20:38:33)
      
      Copyright (c) 2000, 2018, OceanBase and/or its affiliates. All rights reserved.
      
      Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
      
      obclient [(none)]>
      
    2. データベース test_mysql_to_ob とテーブル mysql_tbl1_and_tbl2 を作成します。

      obclient [(none)]> CREATE DATABASE test_mysql_to_ob;
      Query OK, 1 row affected
      
      obclient [(none)]> USE test_mysql_to_ob;
      Database changed
      obclient [test_mysql_to_ob]> CREATE TABLE mysql_tbl1_and_tbl2(col1 INT PRIMARY KEY,col2 INT,col3 VARCHAR(20),col4 VARCHAR(20));
      Query OK, 0 rows affected
      

    FlinkクラスタとFlink SQL CLIの起動

    1. 以下のコマンドを使用して、Flinkディレクトリに移動します。

      [xxx@xxx /FLINK_HOME]
      #cd flink-1.15.3
      
    2. 以下のコマンドを使用して、Flinkクラスタを起動します。

      [xxx@xxx /FLINK_HOME/flink-1.15.3]
      #./bin/start-cluster.sh
      

      起動が成功した場合、http://localhost:8081/ でFlink Web UIにアクセスできます。次のとおりです:

      Flink_Web_UI

      説明

      ./bin/start-cluster.sh を実行した後、bash: ./bin/start-cluster.sh: Permission denied というメッセージが表示された場合は、flink-1.15.3 ディレクトリ内のすべての -rw-rw-r-- 権限のファイルの権限を -rwxrwxrwx 権限に設定する必要があります。

      例:

      
       [xxx@xxx /FLINK_HOME/flink-1.15.3]
       # chmod -R 777 /FLINK_HOME/flink-1.15.3/*
       
    3. 以下のコマンドを使用して、Flink SQL CLIを起動します。

      [xxx@xxx /FLINK_HOME/flink-1.15.3]
      #./bin/sql-client.sh
      

      起動が成功すると、次のようなページが表示されます:

      Flink_SQL_CLI

    チェックポイントの設定

    Flink SQL CLIでcheckpointを有効にし、3秒ごとにcheckpointを実行します。

    Flink SQL> SET execution.checkpointing.interval = 3s;
    [INFO] Session property has been set.
    

    MySQL CDCテーブルの作成

    Flink SQL CLIでMySQLデータベースに対応するテーブルを作成します。

    MySQLデータベースのtest_mysql_to_obテーブルのtbl1とtbl2について、Flink SQL CLIを使用して対応するテーブルを作成し、これらの基盤データベーステーブルのデータを同期します。

    Flink SQL> CREATE TABLE mysql_tbl1 (
        col1 INT PRIMARY KEY,
        col2 VARCHAR(20),
        col3 INT)
        WITH (
        'connector' = 'mysql-cdc',
        'hostname' = 'xxx.xxx.xxx.xxx',
        'port' = '3306',
        'username' = 'root',
        'password' = '******',
        'database-name' = 'test_mysql_to_ob',
        'table-name' = 'tbl1');
    [INFO] Execute statement succeed.
    
    Flink SQL> CREATE TABLE mysql_tbl2 (col1 INT PRIMARY KEY,
        col2 VARCHAR(20))
        WITH ('connector' = 'mysql-cdc',
        'hostname' = 'xxx.xxx.xxx.xxx',
        'port' = '3306',
        'username' = 'root',
        'password' = '******',
        'database-name' = 'test_mysql_to_ob',
        'table-name' = 'tbl2');
    [INFO] Execute statement succeed.
    

    MySQL CDC Connector WITHオプションの詳細については、Connector Optionsを参照してください。

    OceanBase CDCテーブルの作成

    Flink SQL CLIでOceanBaseデータベースに対応するテーブルを作成します。mysql_tbl1_and_tbl2テーブルを作成し、関連付けたデータをOceanBaseデータベースに書き込みます。

    Flink SQL> CREATE TABLE mysql_tbl1_and_tbl2(
        col1 INT PRIMARY KEY,
        col2 INT,col3 VARCHAR(20),
        col4 VARCHAR(20))
        WITH ('connector' = 'jdbc',
        'url' = 'jdbc:mysql://10.10.10.2:2881/test_mysql_to_ob',
        'username' = 'root@mysql001',
        'password' = '******',
        'table-name' = 'mysql_tbl1_and_tbl2');
    [INFO] Execute statement succeed.
    

    JDBC SQL Connector WITHオプションの詳細については、Connector Optionsを参照してください。

    Flink SQL CLIを使用してOceanBaseデータベースにデータを書き込む

    Flink SQLを使用して、テーブルtbl1とテーブルtbl2を結合し、結合後の情報をOceanBaseデータベースに書き込みます。

    Flink SQL> INSERT INTO mysql_tbl1_and_tbl2
        SELECT t1.col1,t1.col3,t1.col2,t2.col2
        FROM mysql_tbl1 t1,mysql_tbl2 t2
        WHERE t1.col3=t2.col1;
    [INFO] Submitting SQL update statement to the cluster...
    Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: c5ee92498addf813858e448ec25e85af
    

    説明

    このドキュメントのテスト例で使用されているMySQLドライバー(com.mysql.jdbc.Driver)は、MySQL Connector/J 5.1.47バージョンです。新しいバージョンのMySQLドライバー(com.mysql.cj.jdbc.Driver)を使用する場合は、MySQL Connector/J 8.xバージョンを使用してください。

    関連データのOceanBaseデータベースへの書き込み状況を確認する

    OceanBaseデータベースにログインし、test_mysql_to_ob データベース内のテーブル mysql_tbl1_and_tbl2 のデータを確認します。

    obclient [test_mysql_to_ob]> SELECT * FROM mysql_tbl1_and_tbl2;
    +------+------+-------------+------+
    | col1 | col2 | col3        | col4 |
    +------+------+-------------+------+
    |    1 |   86 | China       | +86  |
    |    2 |  886 | Taiwan      | +886 |
    |    3 |  852 | Hong Kong   | +852 |
    |    4 |  853 | Macao       | +853 |
    |    5 |  850 | North Korea | +850 |
    +------+------+-------------+------+
    5 rows in set
    

    データの更新状況を確認する

    1. MySQLデータベースのテーブル tbl1 と tbl2 にそれぞれ1件のデータを挿入します。

      MySQL [test_mysql_to_ob]> INSERT INTO tbl1 VALUES(6,'code',673);
      Query OK, 1 row affected
      
      MySQL [test_mysql_to_ob]> INSERT INTO tbl2 VALUES(673,'+673');
      Query OK, 1 row affected
      
    2. OceanBaseデータベースでデータが同期されているか確認します。

      obclient [test_mysql_to_ob]> SELECT * FROM mysql_tbl1_and_tbl2;
      +------+------+-------------+------+
      | col1 | col2 | col3        | col4 |
      +------+------+-------------+------+
      |    1 |   86 | China       | +86  |
      |    2 |  886 | Taiwan      | +886 |
      |    3 |  852 | Hong Kong   | +852 |
      |    4 |  853 | Macao       | +853 |
      |    5 |  850 | North Korea | +850 |
      |    6 |  673 | code        | +673 |
      +------+------+-------------+------+
      6 rows in set
      

    前のトピック

    Canalを使用してMySQLデータベースからOceanBaseデータベースへデータを同期する
    最後

    次のトピック

    ChunJunを使用してMySQLデータベースからOceanBaseデータベースへデータを移行する
    次
    このページの内容
    Flink CDC環境の準備
    データの準備
    MySQLデータベースのデータを準備する
    OceanBaseデータベースのデータを準備する
    FlinkクラスタとFlink SQL CLIの起動
    チェックポイントの設定
    MySQL CDCテーブルの作成
    OceanBase CDCテーブルの作成
    Flink SQL CLIを使用してOceanBaseデータベースにデータを書き込む
    関連データのOceanBaseデータベースへの書き込み状況を確認する
    データの更新状況を確認する