DataXは、Alibaba Cloud DataWorksのデータ統合機能のオープンソース版であり、Alibabaグループ内で広く利用されているオフラインデータ同期ツール/プラットフォームです。DataXは、MySQL、Oracle、SQL Server、PostgreSQL、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS、OceanBaseなど、さまざまな異種データソース間で効率的なデータ同期機能を実現しています。
OceanBaseデータベースCommunity Editionのお客様は、DataXオープンソースサイトからソースコードをダウンロードし、ご自身でコンパイルすることができます。コンパイル時には、必要に応じてpom.xml内で不要なデータベースプラグインを除外することができます。そうしない場合、コンパイルされたバンドルは非常に大きくなります。
フレームワーク設計

DataXはオフラインデータ同期フレームワークとして、「Framework + Plugin」モデルで構築されています。データソースの読み取りと書き込みをReader/Writerプラグインとして抽象化し、同期フレームワーク全体に組み込んでいます。
Readerはデータ収集モジュールとして、データソースからデータを収集し、それをFrameworkに送信します。
Writerはデータ書き込みモジュールとして、Frameworkからデータを継続的に取得し、ターゲット側にデータを書き込みます。
FrameworkはReaderとWriterを接続し、両者間のデータ転送チャネルとして機能するとともに、バッファリング、ストリーミング制御、並行処理、データ変換などのコア技術的課題を処理します。
DataXはタスク単位でデータを移行します。各タスクは1つのテーブルのみを処理し、タスクごとにjson形式の設定ファイルがあります。この設定ファイルにはreaderとwriterの2部分が含まれています。readerとwriterはそれぞれ、DataXがサポートするデータベースの読み書きプラグインです。例えば、MySQLテーブルのデータをOceanBaseデータベースに移行する場合、MySQLからデータを読み取ってOceanBaseデータベースに書き込むため、使用するプラグインはMySQLデータベース用のtxtfilereaderプラグインとOceanBaseデータベース用のoceanbasev10writerプラグインの組み合わせとなります。ここではtxtfilereaderとoceanbasev10writerプラグインについて説明します。
txtfilereaderプラグイン
txtfilereaderプラグインは、ローカルファイルシステムのデータストレージからデータを読み取る機能を実装しています。下層の実装では、txtfilereaderはローカルファイルデータを取得し、DataX転送プロトコルに変換してWriterに渡します。
詳細な機能とパラメータの説明については、公式ドキュメントを参照してください:txtfilereaderプラグイン。
oceanbasev10writerプラグイン
oceanbasev10writerプラグインは、データをOceanBaseデータベースのターゲットテーブルに書き込む機能を実装しています。 下層の実装では、oceanbasev10writerはJavaクライアント(下層のMySQL JDBCまたはOceanBase Client)を使用してobproxy経由でOceanBaseデータベースにリモート接続し、対応するinsert SQL文を実行してデータをOceanBaseデータベースに書き込みます。OceanBaseデータベース内部では、データはバッチ単位でコミットされます。
実装原理に関して、oceanbasev10writerはDataXフレームワークを通じてReaderが生成したプロトコルデータを取得し、insert文を生成します。データ書き込み時に主キーまたは一意キーの競合が発生した場合、OceanBaseデータベースのMySQLテナントはreplaceモードを使用してテーブル内のすべてのフィールドを更新できます。一方、OceanBaseデータベースのOracleテナントは現在Insert方式しか利用できません。パフォーマンスを考慮して、書き込みはbatch方式で一括処理され、行数が所定のしきい値に達した場合にのみ書き込みリクエストが開始されます。
DataX設定ファイル
設定ファイルの例:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello、你好、世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 2
}
}
}
}
注意
DataXはテーブルデータのみを移行します。ターゲット側に対応するテーブルオブジェクト構造を事前に作成しておく必要があります。
json 設定ファイルをDataXのディレクトリ job またはカスタムパスに配置します。実行方法は以下のとおりです:
$bin/datax.py job/stream2stream.json
出力情報:
<.....>
2021-08-26 11:06:09.217 [job-0] INFO JobContainer - PerfTrace not enable!
2021-08-26 11:06:09.218 [job-0] INFO StandAloneJobContainerCommunicator - Total 20 records, 380 bytes | Speed 38B/s, 2 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-08-26 11:06:09.223 [job-0] INFO JobContainer -
タスク開始時刻 : 2021-08-26 11:05:59
タスク終了時刻 : 2021-08-26 11:06:09
タスク合計時間 : 10s
タスク平均流量 : 38B/s
レコード書き込み速度 : 2rec/s
読み取りレコード総数 : 20
読み書き失敗総数 : 0
DataXタスクの実行終了時には、上記の出力に含まれる平均流量、書き込み速度、読み書き失敗総数などを含む簡単なタスクレポートが生成されます。
DataXの job のパラメータ settings では、速度パラメータやエラーレコードの許容度などを指定できます。
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 10,
"percentage": 0.1
}
}
パラメータの説明:
errorLimitはエラー報告レコード数の許容度を示し、この制限を超えた場合、タスクは中断して終了します。channelは並列数であり、理論的には並列数が多いほど移行性能が向上します。ただし、実際の運用ではソース側の読み取り負荷、ネットワーク転送性能、およびターゲット側の書き込み性能も考慮する必要があります。
環境準備
tarパッケージのダウンロードURL:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
インストールファイルの解凍:
tar zxvf datax.tar.gz
cd datax
ディレクトリ構成は以下のとおりです:
$tree -L 1 --filelimit 30
.
├── bin
├── conf
├── job
├── lib
├── log
├── log_perf
├── plugin
├── script
└── tmp
インストールファイル内には、以下のいくつかのディレクトリがあります。
| ディレクトリ名 | 説明 |
|---|---|
| bin | 実行可能ファイルのディレクトリ。このディレクトリ内のdatax.pyは、DataXタスクの起動スクリプトです。 |
| conf | ログファイルの設定ディレクトリ。このディレクトリには、dataxとタスクに関係ない設定ファイルが保存されています。 |
| lib | 実行時に依存するパッケージ。このディレクトリには、DataXの実行に必要なグローバルjarファイルが保存されています。 |
| job | このディレクトリには、dataxのインストールをテスト検証するためのタスク設定ファイルが1つあります。 |
| log | ログファイルのディレクトリ。このディレクトリには、dataxタスクの実行ログが保存されています。dataxの実行時、デフォルトではログが標準出力に出力されると同時にlogディレクトリにも書き込まれます。 |
| plugin | プラグインファイルのディレクトリ。このディレクトリには、DataXがサポートするさまざまなデータソースプラグインが保存されています。 |
DataXを使用したCSVファイルからOceanBaseへのデータ移行例
例:ソース側でエクスポートされたCSVファイルをターゲット側のDataXサーバーにコピーし、その後ターゲット側のOceanBaseデータベースにインポートします。
myjob.json設定ファイルは以下のとおりです:
{
"job": {
"setting": {
"speed": {
"channel": 4
},
"errorLimit": {
"record": 0,
"percentage": 0.1
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": ["/tmp/tpcc/bmsql_oorder"],
"fileName": "bmsql_oorder",
"encoding": "UTF-8",
"column": ["*"],
"dateFormat": "yyyy-MM-dd hh:mm:ss" ,
"nullFormat": "\\N" ,
"fieldDelimiter": ","
}
},
"writer": {
"name": "oceanbasev10writer",
"parameter": {
"obWriteMode": "insert",
"column": ["*"],
"preSql": [
"truncate table bmsql_oorder"
],
"connection": [
{
"jdbcUrl": "jdbc:oceanbase://127.0.0.1:2883/tpcc?",
"table": [
"bmsql_oorder"
]
}
],
"username": "tpcc",
"password":"********",
"writerThreadCount":10,
"batchSize": 1000,
"memstoreThreshold": "0.9"
}
}
}
]
}
}
パラメータの説明
| パラメータ | 説明 |
|---|---|
| name | データベースに接続するためのリーダーまたはライターに対応するデータベースプラグインの名前を記述します。そのうち、MySQLのリーダープラグインはmysqlreader、OceanBaseのライタープラグインはoceanbasev10writerです。具体的なリーダーとライターのプラグインについては、dataxのドキュメントを参照してください:DataXデータソースガイド。 |
| jdbcUrl | 接続先のデータベースへのJDBC情報を記述し、JSON配列で表現します。また、1つのデータベースに複数の接続アドレスを記述することも可能です。JSON配列には、1つのJDBC接続を記述します。jdbcUrlはMySQL公式仕様に従っており、接続属性の制御情報を記述することができます。詳細については、MySQL公式ドキュメントを参照してください。
注意
|
| username | データソースのユーザー名 |
| password | データソースの指定されたユーザー名のパスワード |
| table | 同期するテーブルを選択します。JSON配列で表現するため、複数のテーブルを同時に抽出することができます。複数のテーブルを設定する場合、ユーザー自身が複数のテーブルが同一のスキーマ構造であることを確認する必要があります。MySQLReaderは、テーブルが同一の論理テーブルであるかどうかをチェックしません。
注意tableはconnection設定ユニットに含まれている必要があります。
|
| column | 設定したテーブル内で同期する列名の集合を、JSON配列でフィールド情報を記述します。columnsは['*']として設定することは推奨されません。これは、テーブル構造が変更されるとこの設定も変更されるためです。推奨される設定方法は、具体的な列名を指定することです。列のカット、つまり一部の列のみをエクスポートすることができます。列の並べ替え、つまりテーブルのスキーマ情報に従わずに列をエクスポートすることができます。定数設定もサポートされており、ユーザーはMySQL SQL構文に従って設定する必要があります:["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]。
説明
|
| where | フィルター条件。MySQLReaderは指定されたcolumn、table、where条件に基づいてSQLを連結し、このSQLに基づいてデータ抽出を行います。実際のビジネスシナリオでは、通常当日のデータを同期することが選択されるため、where条件をgmt_create > $bizdateと指定できます。
注意
|
jobファイルの設定後、そのjobを実行します。コマンドは以下のとおりです:
python datax.py ../job/myjob.json
詳細情報
DataXのオープンソースコードおよびその他の詳細については、DataXを参照してください。