DataXは、Alibaba Cloud DataWorksのデータ統合機能のオープンソース版であり、Alibabaグループ内で広く利用されているオフラインデータ同期ツール/プラットフォームです。DataXは、MySQL、Oracle、SQL Server、PostgreSQL、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS、OceanBaseなど、さまざまな異種データソース間の効率的なデータ同期機能を実装しています。
OceanBaseデータベースCommunity Editionのお客様は、DataXオープンソースサイトからソースコードをダウンロードし、ご自身でコンパイルできます。コンパイル時には、必要に応じてpom.xmlで不要なデータベースプラグインを除外することができます。そうしない場合、コンパイルされたバイナリは非常に大きくなります。
フレームワーク設計

DataXはオフラインデータ同期フレームワークとして、「Framework + Plugin」モードで構築されています。データソースの読み取りと書き込みをReader/Writerプラグインとして抽象化し、システム全体に統合しています。
Readerはデータ収集モジュールとして機能し、データソースからデータを収集してFrameworkに送信します。
Writerはデータ書き込みモジュールとして機能し、Frameworkからデータを継続的に取得して、宛先に書き込みます。
FrameworkはReaderとWriterを接続し、両者間のデータ伝送チャネルとして機能するとともに、バッファリング、ストリーミング制御、並行処理、データ変換などのコア技術的課題を処理します。
DataXはタスク単位でデータを移行します。各タスクは1つのテーブルのみを処理し、json形式の設定ファイルを1つ持ちます。この設定ファイルにはreaderとwriterの2部分が含まれます。readerとwriterはそれぞれ、DataXがサポートするデータベースの読み書きプラグインです。例えば、OceanBaseのテーブルデータをMySQLデータベースに移行する操作では、OceanBaseデータベースからデータを読み取ってMySQLデータベースに書き込む必要があるため、OceanBaseデータベースのoceanbasev10readerプラグインとMySQLデータベースのmysqlwriterプラグインを組み合わせて使用します。ここではoceanbasev10readerとmysqlwriterプラグインについて紹介します。
oceanbasev10reader プラグイン
実装の仕組みとして、oceanbasev10reader はDataXフレームワークを通じてReaderが生成したプロトコルデータを取得し、INSERTステートメントを生成します。データ書き込み時に主キーまたは一意キーの競合が発生した場合、OceanBaseデータベースのMySQLテナントはreplaceモードを使用してテーブル内のすべてのフィールドを更新できます。OceanBaseデータベースのOracleテナントは現在、INSERT方式のみを使用できます。
oceanbasev10reader プラグインは、OceanBaseデータベースからデータを読み取る機能を実装しています。基盤となる実装では、oceanbasev10reader プラグインはJavaクライアント(基盤となるMySQL JDBCまたはOceanBase Client)を介してobproxy経由でOceanBaseデータベースにリモート接続し、対応するSQLステートメントを実行してOceanBaseデータベースからデータをSELECTします。
実装の仕組みを簡潔に言えば、OceanBaseデータベースはJavaクライアント(基盤となるMySQL JDBCまたはOceanBase Client)を介してobproxy経由でOceanBaseデータベースにリモート接続し、ユーザーが設定した情報に基づいてクエリステートメントを生成してリモートのOceanBaseデータベースに送信します。リモートのOceanBaseデータベースは、このSQLの実行結果をDataX独自のデータ型を用いて抽象化されたデータセットに組み立て直し、下流のWriterへと渡します。
mysqlwriter プラグイン
mysqlwriter プラグインは、データをMySQLマスターデータベースのターゲットテーブルに書き込む機能を実装しています。基盤となる実装では、mysqlwriter プラグインはJDBC接続を介してリモートのMySQLデータベースに接続し、対応するinsert into ...またはreplace into ...のSQLステートメントを実行してデータをMySQLデータベースに書き込みます。MySQLデータベース内部では、データはバッチ単位でコミットされます。mysqlwriter プラグインを使用してデータを移行する場合、MySQLデータベースはinnodbエンジンを使用している必要があります。
実装の仕組みとして、mysqlwriter プラグインはDataXフレームワークを通じてReaderが生成したプロトコルデータを取得し、ユーザーが設定したwriteModeに基づいてinsert into ...またはreplace into ...のSQLステートメントを生成して、MySQLデータベースのマスターデータベースにデータを書き込みます。
詳細な機能とパラメータの説明については、公式ドキュメントを参照してください:mysqlwriter プラグイン。
DataX設定ファイル
設定ファイルの例:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 2
}
}
}
}
注意
DataXはテーブルデータのみを移行します。ターゲット側に対応するテーブルオブジェクト構造を事前に作成しておく必要があります。
json 設定ファイルをDataXのディレクトリ job またはカスタムパスに配置します。実行方法は以下のとおりです:
$bin/datax.py job/stream2stream.json
出力情報:
<.....>
2021-08-26 11:06:09.217 [job-0] INFO JobContainer - PerfTrace not enable!
2021-08-26 11:06:09.218 [job-0] INFO StandAloneJobContainerCommunicator - Total 20 records, 380 bytes | Speed 38B/s, 2 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-08-26 11:06:09.223 [job-0] INFO JobContainer -
タスク開始時刻 : 2021-08-26 11:05:59
タスク終了時刻 : 2021-08-26 11:06:09
タスク合計時間 : 10s
タスク平均転送量 : 38B/s
レコード書き込み速度 : 2rec/s
読み取ったレコード総数 : 20
読み書き失敗総数 : 0
DataXタスクの実行終了時には、上記の出力に含まれる平均転送量、書き込み速度、読み書き失敗総数などを含む簡単なタスクレポートが生成されます。
DataXの job パラメータ settings では、速度パラメータやエラーレコードの許容度などを指定できます。
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 10,
"percentage": 0.1
}
}
パラメータ説明:
errorLimitはエラーが発生したレコード数の許容度を示し、この限度を超えた場合、タスクは中断して終了します。channelは並列数です。理論上、並列数が多いほど移行性能は向上します。ただし、実際の運用では、ソース側の読み取り負荷、ネットワーク転送性能、およびターゲット側の書き込み性能も考慮する必要があります。
環境の準備
tarパッケージのダウンロードURL:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
インストールファイルの解凍:
tar zxvf datax.tar.gz
cd datax
ディレクトリ構造は以下のとおりです:
$tree -L 1 --filelimit 30
.
├── bin
├── conf
├── job
├── lib
├── log
├── log_perf
├── plugin
├── script
└── tmp
インストールファイルには、以下のディレクトリが含まれています。
ディレクトリ名 |
説明 |
|---|---|
| bin | 実行ファイルディレクトリです。このディレクトリ内のdatax.pyは、DataXタスクの起動スクリプトです。 |
| conf | ログファイル設定ディレクトリです。このディレクトリには、dataxとタスクが無関係な設定ファイルが格納されています。 |
| lib | 実行時依存パッケージです。このディレクトリには、DataXの実行に必要なグローバルjarファイルが格納されています。 |
| job | このディレクトリには、dataxのインストールをテスト検証するためのタスク設定ファイルが1つあります。 |
| log | ログファイルディレクトリです。このディレクトリには、dataxタスクの実行ログが格納されています。datax実行時、デフォルトでログは標準出力に出力されると同時に、logディレクトリにも書き込まれます。 |
| plugin | プラグインファイルディレクトリです。このディレクトリには、DataXがサポートする各種データソースプラグインが保存されています。 |
DataXを使用してOceanBaseデータをMySQLデータベースに移行する例
OceanBaseデータをMySQLに移行する際、ソースとターゲットが同時にDataXサーバーとネットワークで接続できない場合は、CSVファイルを経由して中継できます。ソースデータベースとターゲットデータベースがDataXサーバーと同時に接続できる場合は、DataXを使用して直接、ソースからターゲットへデータを移行できます。
例:OceanBaseからtest.t1テーブルのデータをMySQLモードのtest.t1に移行します。
myjob.json設定ファイルは以下のとおりです:
{
"job": {
"setting": {
"speed": {
"channel": 4
},
"errorLimit": {
"record": 0,
"percentage": 0.1
}
},
"content": [
{
"reader": {
"name": "oceanbasev10reader",
"parameter": {
"username": "******",
"password": "******",
"column": ["*"],
"connection": [
{
"table": ["t1"],
"jdbcUrl": ["jdbc:oceanbase://172.30.xxx.xxx:2883/test"]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"obWriteMode": "insert",
"column": ["*"],
"preSql": ["truncate table t1"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://100.88.xxx.xxx:3308/test",
"table": ["t1"]
}
],
"username": "******",
"password":"******",
"writerThreadCount":10,
"batchSize": 1000,
"memstoreThreshold": "0.9"
}
}
}
]
}
}
パラメータ説明
パラメータ |
説明 |
|---|---|
| name | データベースに接続するリーダーまたはライターに対応するデータベースプラグインの名前を記述します。MySQLのリーダープラグインはmysqlreader、OceanBaseのライタープラグインはoceanbasev10writerです。具体的なリーダーとライターのプラグインについては、dataxのドキュメントを参照してください:DataXデータソースガイド。 |
| jdbcUrl | 接続先データベースへのJDBC情報を記述し、JSON配列を使用して表現します。1つのデータベースに複数の接続アドレスを記入することができます。JSON配列に1つのJDBC接続を記述するだけで済みます。jdbcUrlはMySQL公式仕様に準拠しており、接続オプションの制御情報を記入することができます。詳細については、MySQL公式ドキュメントを参照してください。
注意
|
| username | データソースのユーザー名 |
| password | データソースの指定ユーザー名のパスワード |
| table | 同期が必要なテーブルを選択します。JSON配列を使用して記述するため、複数のテーブルの同時抽出をサポートします。複数のテーブルを設定する場合、ユーザー自身が複数のテーブルが同一のスキーマ構造であることを確認する必要があります。MySQLReaderは、テーブルが同一の論理テーブルであるかどうかをチェックしません。
注意tableはconnection構成ユニット内に含める必要があります。
|
| column | 設定されたテーブルで同期が必要な列名の集合を記述します。JSON配列を使用してフィールド情報を記述します。columnsを['*']に設定することは推奨されません。テーブル構造が変更された場合、この設定も変更される可能性があるためです。具体的な列名を指定する設定方法を推奨します。列のトリミング、つまり一部の列のみをエクスポートすることができます。列の並べ替え、つまりテーブルのスキーマ情報に従わずに列をエクスポートすることができます。定数の設定も可能で、ユーザーはMySQL SQL構文に従って記述する必要があります:["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]。
説明
|
| where | 絞り込み条件。MySQLReaderは、指定されたcolumn、table、where条件を組み合わせてSQLを生成し、そのSQLに基づいてデータを抽出します。実際の業務シナリオでは、通常は当日のデータを同期することが選択されます。そのため、where条件をgmt_create > $bizdateと指定できます。
注意
|
jobファイルを設定した後、そのjobを実行します。コマンドは以下のとおりです:
python datax.py ../job/myjob.json
詳細を見る
DataXのオープンソースコードおよび詳細については、DataXを参照してください。