DataXは、Alibaba Cloud DataWorksのデータセット統合機能のオープンソース版であり、Alibabaグループ内で広く利用されているオフラインデータ同期ツール/プラットフォームです。DataXは、MySQL、Oracle、SQL Server、PostgreSQL、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS、OceanBaseなど、さまざまな異種データソース間の効率的なデータ同期機能を実装しています。
OceanBaseデータベースCommunity Editionのお客様は、DataXオープンソースサイトからソースコードをダウンロードし、ご自身でコンパイルできます。コンパイル時には、必要に応じてpom.xmlで不要なデータベースプラグインを除外することができます。除外しない場合、コンパイルされたバイナリは非常に大きくなります。
フレームワーク設計

DataXはオフラインデータ同期フレームワークとして、「Framework + Plugin」モードで構築されています。データソースの読み取りと書き込みをReader/Writerプラグインとして抽象化し、同期フレームワーク全体に統合しています。
Readerはデータ収集モジュールとして機能し、データソースからデータを収集してFrameworkに送信します。
Writerはデータ書き込みモジュールとして機能し、Frameworkからデータを継続的に取得し、ターゲット側に書き込みます。
FrameworkはReaderとWriterを接続し、両者間のデータ転送チャネルとして機能するとともに、バッファリング、ストリーミング制御、並行処理、データ変換などのコア技術的課題を処理します。
DataXはタスク単位でデータを移行します。各タスクは1つのテーブルのみを処理し、各タスクにはjson形式の設定ファイルがあります。設定ファイルにはreaderとwriterの2部分が含まれます。readerとwriterはそれぞれ、DataXがサポートするデータベースの読み書きプラグインです。例えば、MySQLテーブルのデータをOceanBaseデータベースに移行する操作では、MySQLからデータを読み取ってOceanBaseデータベースに書き込む必要があるため、MySQLデータベースのtxtfilereaderプラグインとOceanBaseデータベースのoceanbasev10writerプラグインを組み合わせて使用します。ここではtxtfilereaderとoceanbasev10writerプラグインについて紹介します。
txtfilereader プラグイン
txtfilereader プラグインは、ローカルファイルシステムのデータストレージからデータを読み取る機能を実装しています。基盤となる実装では、txtfilereaderはローカルファイルのデータを取得し、DataX転送プロトコルに変換してWriterに渡します。
詳細な機能とパラメータの説明については、公式ドキュメントを参照してください:txtfilereader プラグイン。
oceanbasev10writer プラグイン
oceanbasev10writer プラグインは、データをOceanBaseデータベースのターゲットテーブルに書き込む機能を実装しています。 基盤となる実装では、oceanbasev10writerはJavaクライアント(基盤となるMySQL JDBCまたはOceanBase Client)を介してobproxy経由でOceanBaseデータベースにリモート接続し、対応するINSERT SQLステートメントを実行してデータをOceanBaseデータベースに書き込みます。OceanBaseデータベース内部では、データはバッチ単位でコミットされます。
実装原理について、oceanbasev10writerはDataXフレームワークを通じてReaderが生成したプロトコルデータを取得し、INSERTステートメントを生成します。データ書き込み時に主キーまたは一意キーの競合が発生した場合、OceanBaseデータベースのMySQLテナントはreplaceモードを使用してテーブル内のすべてのフィールドを更新できます。OceanBaseデータベースのOracleテナントは現在、Insert方式のみを使用できます。パフォーマンス上の理由から、書き込みはバッチ方式で行われ、行数が所定のしきい値に達した場合にのみ書き込みリクエストが送信されます。
DataX設定ファイル
設定ファイルの例:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 2
}
}
}
}
注意
DataXはテーブルデータのみを移行します。ターゲット側に対応するテーブルオブジェクト構造を事前に作成しておく必要があります。
json 設定ファイルをDataXのディレクトリ job またはカスタムパスに配置します。実行方法は以下のとおりです:
$bin/datax.py job/stream2stream.json
出力情報:
<.....>
2021-08-26 11:06:09.217 [job-0] INFO JobContainer - PerfTrace not enable!
2021-08-26 11:06:09.218 [job-0] INFO StandAloneJobContainerCommunicator - Total 20 records, 380 bytes | Speed 38B/s, 2 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-08-26 11:06:09.223 [job-0] INFO JobContainer -
タスク開始時刻 : 2021-08-26 11:05:59
タスク終了時刻 : 2021-08-26 11:06:09
タスク合計時間 : 10s
タスク平均転送量 : 38B/s
レコード書き込み速度 : 2rec/s
読み取ったレコード総数 : 20
読み書き失敗総数 : 0
DataXタスクの実行終了時には、上記の出力に含まれる平均転送量、書き込み速度、読み書き失敗総数などを含む簡単なタスクレポートが生成されます。
DataXの job パラメータ settings では、速度パラメータやエラーレコードの許容度などを指定できます。
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 10,
"percentage": 0.1
}
}
パラメータ説明:
errorLimitはエラーが発生したレコード数の許容度を示し、この限度を超えた場合、タスクは中断して終了します。channelは並列数です。理論上、並列数が多いほど移行性能は向上します。ただし、実際の運用では、ソース側の読み取り負荷、ネットワーク転送性能、およびターゲット側の書き込み性能も考慮する必要があります。
環境の準備
tarパッケージのダウンロードURL:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
インストールファイルの解凍:
tar zxvf datax.tar.gz
cd datax
ディレクトリ構造は以下のとおりです:
$tree -L 1 --filelimit 30
.
├── bin
├── conf
├── job
├── lib
├── log
├── log_perf
├── plugin
├── script
└── tmp
インストールファイルには、以下のディレクトリが含まれています:
ディレクトリ名 |
説明 |
|---|---|
| bin | 実行ファイルディレクトリです。このディレクトリ内のdatax.pyは、DataXタスクの起動スクリプトです。 |
| conf | ログファイル設定ディレクトリです。このディレクトリには、dataxとタスクが無関係な設定ファイルが格納されています。 |
| lib | 実行時依存パッケージです。このディレクトリには、DataXの実行に必要なグローバルjarファイルが格納されています。 |
| job | このディレクトリには、dataxのインストールをテスト検証するためのタスク設定ファイルが1つあります。 |
| log | ログファイルディレクトリです。このディレクトリには、dataxタスクの実行ログが格納されています。datax実行時、デフォルトでログは標準出力に出力されると同時に、logディレクトリにも書き込まれます。 |
| plugin | プラグインファイルディレクトリです。このディレクトリには、DataXがサポートする各種データソースプラグインが保存されています。 |
DataXを使用してCSVファイルをOceanBaseに移行する例
例:ソース側でエクスポートしたCSVファイルをターゲット側のDataXサーバーにコピーし、その後ターゲット側のOceanBaseデータベースにインポートします。
myjob.json設定ファイルは以下のとおりです:
{
"job": {
"setting": {
"speed": {
"channel": 4
},
"errorLimit": {
"record": 0,
"percentage": 0.1
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": ["/tmp/tpcc/bmsql_oorder"],
"fileName": "bmsql_oorder",
"encoding": "UTF-8",
"column": ["*"],
"dateFormat": "yyyy-MM-dd hh:mm:ss" ,
"nullFormat": "\\N" ,
"fieldDelimiter": ","
}
},
"writer": {
"name": "oceanbasev10writer",
"parameter": {
"obWriteMode": "insert",
"column": ["*"],
"preSql": [
"truncate table bmsql_oorder"
],
"connection": [
{
"jdbcUrl": "jdbc:oceanbase://127.0.0.1:2883/tpcc?",
"table": [
"bmsql_oorder"
]
}
],
"username": "tpcc",
"password":"********",
"writerThreadCount":10,
"batchSize": 1000,
"memstoreThreshold": "0.9"
}
}
}
]
}
}
パラメータ説明
パラメータ |
説明 |
|---|---|
| name | データベースに接続するためのリーダーまたはライターに対応するデータベースプラグインの名前を記述します。MySQLのリーダープラグインはmysqlreader、OceanBaseのライタープラグインはoceanbasev10writerです。具体的なリーダーとライターのプラグインについては、dataxのドキュメントを参照してください:DataXデータソースガイド。 |
| jdbcUrl | 接続先データベースへのJDBC情報を記述し、JSON配列を使用して表現します。1つのデータベースに複数の接続アドレスを記述できます。JSON配列に1つのJDBC接続を記述するだけで済みます。jdbcUrlはMySQL公式仕様に準拠しており、接続オプションの制御情報を記述できます。詳細はMySQL公式ドキュメントを参照してください。
注意
|
| username | データソースのユーザー名 |
| password | データソースの指定ユーザー名のパスワード |
| table | 同期が必要なテーブルを選択します。JSON配列を使用して記述するため、複数のテーブルの同時抽出をサポートします。複数のテーブルを設定する場合、ユーザーはそれらのテーブルが同一のスキーマ構造であることを確認する必要があります。MySQLReaderは、テーブルが同一の論理テーブルであるかどうかをチェックしません。
注意tableはconnection構成ユニット内に含める必要があります。
|
| column | 設定されたテーブルで同期が必要な列名の集合を指定します。フィールド情報はJSON配列を使用して記述します。columnsを['*']に設定することは推奨されません。これは、テーブル構造が変更された場合、この設定も変更されるためです。具体的な列名を指定する設定方法を推奨します。列のトリミング、つまり一部の列のみをエクスポートすることがサポートされています。列の並べ替え、つまりテーブルのスキーマ情報に従わない順序で列をエクスポートすることもサポートされています。定数設定もサポートされており、ユーザーはMySQL SQL構文に従って指定する必要があります:["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]。
説明
|
| where | 絞り込み条件です。MySQLReaderは、指定されたcolumn、table、where条件を組み合わせてSQLを生成し、このSQLに基づいてデータを抽出します。実際の業務シナリオでは、通常は当日のデータを同期することが選択されます。その場合、where条件をgmt_create > $bizdateと指定できます。
注意
|
jobファイルを設定した後、そのjobを実行します。コマンドは以下のとおりです:
python datax.py ../job/myjob.json
詳細を見る
DataXのオープンソースコードおよび詳細については、DataXを参照してください。