DataXは、Alibaba Cloud DataWorksのデータ統合機能のオープンソース版であり、Alibabaグループ内で広く利用されているオフラインデータ同期ツール/プラットフォームです。DataXは、MySQL、Oracle、SQL Server、PostgreSQL、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS、OceanBaseなど、さまざまな異種データソース間の効率的なデータ同期機能を実装しています。
OceanBaseデータベースCommunity Editionのお客様は、DataXオープンソースWebサイトからソースコードをダウンロードし、ご自身でコンパイルすることができます。コンパイル時には、必要に応じてpom.xml内で不要なデータベースプラグインを除外することができます。そうしない場合、コンパイルされたバイナリは非常に大きくなります。
フレームワーク設計
DataXはオフラインデータ同期フレームワークとして、「Framework + Plugin」モードで構築されています。データソースの読み取りと書き込みをReader/Writerプラグインに抽象化し、同期フレームワーク全体に組み込んでいます。
Readerはデータ収集モジュールとして機能し、データソースからデータを収集してFrameworkに送信します。
Writerはデータ書き込みモジュールとして機能し、Frameworkからデータを継続的に取得し、データを宛先に書き込みます。
FrameworkはReaderとWriterを接続し、両者間のデータ伝送チャネルとして機能するとともに、バッファリング、ストリーミング制御、並行処理、データ変換などのコア技術的課題を処理します。
DataXはタスク単位でデータを移行します。各タスクは1つのテーブルのみを処理し、各タスクにはjson形式の設定ファイルがあります。設定ファイルにはreaderとwriterの2部分が含まれています。readerとwriterはそれぞれ、DataXがサポートするデータベースの読み書きプラグインです。例えば、OracleテーブルのデータをOceanBaseデータベースに移行する操作では、Oracleからデータを読み取ってOceanBaseデータベースに書き込む必要があるため、OracleデータベースのoraclereaderプラグインとOceanBaseデータベースのoceanbasev10writerプラグインを組み合わせて使用します。ここではoraclereaderとoceanbasev10writerプラグインについて紹介します。
oraclereader プラグイン
oraclereader プラグインは、Oracleデータベースからデータを読み取る機能を実装しています。基盤となる実装では、oraclereader プラグインはJDBCを介してリモートのOracleデータベースに接続し、対応するSQLステートメントを実行してOracleデータベースからデータをSELECTします。
実装原理を簡潔に述べると、oraclereader プラグインはJDBCコネクターを介してリモートのOracleデータベースに接続し、ユーザーが設定した情報に基づいてクエリステートメントを生成してリモートのOracleデータベースに送信します。リモートのOracleデータベースは、このSQLの実行結果をDataXのカスタムデータ型を用いて抽象化されたデータセットに組み立て、下流のWriterへと渡します。
詳細な機能とパラメータの説明については、公式ドキュメントを参照してください:oraclereader プラグイン。
oceanbasev10writer プラグイン
oceanbasev10writer プラグインは、データをOceanBaseデータベースのターゲットテーブルに書き込む機能を実装しています。 基盤となる実装では、oceanbasev10writer はJavaクライアント(基盤となるMySQL JDBCまたはOceanBase Client)を介してobproxy経由でリモートのOceanBaseデータベースに接続し、対応するINSERT SQLステートメントを実行してデータをOceanBaseデータベースに書き込みます。OceanBaseデータベース内部では、データはバッチ単位でコミットされます。
実装原理について、oceanbasev10writer はDataXフレームワークを通じてReaderが生成したプロトコルデータを取得し、INSERTステートメントを生成します。データ書き込み時に主キーまたは一意キーの競合が発生した場合、OceanBaseデータベースのMySQLテナントはreplaceモードを使用してテーブル内のすべてのフィールドを更新できます。一方、OceanBaseデータベースのOracleテナントは現在、INSERT方式のみを使用できます。パフォーマンス上の理由から、書き込みはバッチ方式で行われ、行数が所定のしきい値に達した場合にのみ書き込みリクエストが送信されます。
DataX設定ファイル
設定ファイルの例:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好,世界-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 2
}
}
}
}
注意
DataXはテーブルデータのみを移行します。ターゲット側に対応するテーブルオブジェクト構造を事前に作成しておく必要があります。
json 設定ファイルをDataXのディレクトリ job またはカスタムパスに配置します。実行方法は以下のとおりです:
$bin/datax.py job/stream2stream.json
出力情報:
<.....>
2021-08-26 11:06:09.217 [job-0] INFO JobContainer - PerfTrace not enable!
2021-08-26 11:06:09.218 [job-0] INFO StandAloneJobContainerCommunicator - Total 20 records, 380 bytes | Speed 38B/s, 2 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-08-26 11:06:09.223 [job-0] INFO JobContainer -
タスク開始時刻 : 2021-08-26 11:05:59
タスク終了時刻 : 2021-08-26 11:06:09
タスク合計時間 : 10s
タスク平均転送量 : 38B/s
レコード書き込み速度 : 2rec/s
読み取ったレコード総数 : 20
読み書き失敗総数 : 0
DataXタスクの実行終了時には、上記の出力に含まれる平均転送量、書き込み速度、読み書き失敗総数などを含む簡単なタスクレポートが生成されます。
DataXの job パラメータ settings では、速度パラメータやエラーレコードの許容度などを指定できます。
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 10,
"percentage": 0.1
}
}
パラメータ説明:
errorLimitはエラー報告レコード数の許容度を示し、この限度を超えた場合、タスクは中断して終了します。channelは並列数です。理論上、並列数が多いほど移行性能は向上します。ただし、実際の運用では、ソース側の読み取り負荷、ネットワーク転送性能、およびターゲット側の書き込み性能も考慮する必要があります。
環境の準備
tarパッケージのダウンロードURL:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
インストールファイルの解凍:
tar zxvf datax.tar.gz
cd datax
ディレクトリ構造は以下のとおりです:
$tree -L 1 --filelimit 30
.
├── bin
├── conf
├── job
├── lib
├── log
├── log_perf
├── plugin
├── script
└── tmp
インストールファイルには、以下のディレクトリが含まれています。
ディレクトリ名 |
説明 |
|---|---|
| bin | 実行ファイルディレクトリです。このディレクトリ内のdatax.pyは、DataXタスクの起動スクリプトです。 |
| conf | ログファイル設定ディレクトリです。このディレクトリには、dataxとタスクが関係ない設定ファイルが格納されています。 |
| lib | 実行時依存パッケージです。このディレクトリには、DataXの実行に必要なグローバルjarファイルが格納されています。 |
| job | このディレクトリには、dataxのインストールをテスト検証するためのタスク設定ファイルが1つあります。 |
| log | ログファイルディレクトリです。このディレクトリには、dataxタスクの実行ログが格納されています。dataxの実行時、デフォルトでログは標準出力に出力されると同時に、logディレクトリにも書き込まれます。 |
| plugin | プラグインファイルディレクトリです。このディレクトリには、DataXがサポートする様々なデータソースプラグインが保存されています。 |
DataXを使用してOracleデータをOceanBaseに移行する例
OracleデータをOceanBaseに移行する際、ソース側とターゲット側が同時にDataXサーバーとネットワークで接続できない場合は、CSVファイルを経由して中継できます。ソースデータベースとターゲットデータベースがDataXサーバーと同時に接続できる場合は、DataXを使用して直接、ソースからターゲットへデータを移行できます。
例:OracleからZJSZY.TECTM03テーブルのデータを、OceanBaseのOracleテナント内のZJSZY.TECTM03に移行します。
myjob.json設定ファイルは以下のとおりです:
{
"job":{
"setting":{
"speed":{
"channel":32
},
"errorLimit":{
"percentage": 0.1
}
},
"content":[
{
"reader":{
"name":"oraclereader",
"parameter":{
"username": "user_name",
"password":"******",
"column":["*"],
"connection": [
{
"table": ["ZJSZY.TECTM03"],
"jdbcUrl":["jdbc:oracle:thin:@Oracle_ip:1521:orcl"]
}
]
}
},
"writer":{
"name":"oceanbasev10writer",
"parameter":{
"writeMode":"insert",
"batchSize":5000,
"memstoreThreshold":"90",
"username":"user@tenet#cluster",
"password":"******",
"column":["*"],
"connection": [
{
"table": ["ZJSZY.TECTM03"],
"jdbcUrl":"jdbc:oceanbase://odp_ip:3306/zjszy"
}
]
}
}
}
]
}
}
パラメータ説明
パラメータ |
説明 |
|---|---|
| name | データベースに接続するためのリーダーまたはライターに対応するデータベースプラグインの名前を記述します。Oracleのリーダープラグインはoraclereader、OceanBaseのライタープラグインはoceanbasev10writerです。具体的なリーダーとライターのプラグインについては、dataxのドキュメントを参照してください:DataXデータソースガイド。 |
| jdbcUrl | 接続先データベースへのJDBC情報を記述し、JSON配列を使用して表現します。1つのデータベースに複数の接続アドレスを記入することができます。JSON配列に1つのJDBC接続を記入するだけで済みます。jdbcUrlはMySQL公式仕様に準拠しており、接続オプションの制御情報を記入することができます。詳細については、MySQL公式ドキュメントを参照してください。
注意
|
| username | データソースのユーザー名 |
| password | データソースの指定ユーザー名のパスワード |
| table | 同期が必要なテーブルを選択します。JSON配列を使用して記述するため、複数のテーブルから同時に抽出することがサポートされています。複数のテーブルを設定する場合、ユーザー自身がそれらのテーブルが同一のスキーマ構造であることを確認する必要があります。OracleReaderは、テーブルが同一の論理テーブルであるかどうかをチェックしません。
注意tableはconnection構成ユニット内に含める必要があります。
|
| column | 設定したテーブルで同期が必要な列名の集合を指定します。列情報はJSON配列を使用して記述します。columnsを['*']に設定することは推奨されません。これは、テーブル構造が変更された場合、この設定も変更されるためです。具体的な列名を指定する設定方法を推奨します。列のトリミング、つまりエクスポートする列を部分的に選択できます。列の並べ替え、つまり列をテーブルのスキーマ情報に従わずにエクスポートできます。定数設定もサポートされており、ユーザーはMySQL SQL構文に従って指定する必要があります:["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]。
説明
|
| where | 絞り込み条件です。OracleReaderは、指定されたcolumn、table、where条件を組み合わせてSQLを生成し、そのSQLに基づいてデータを抽出します。実際の業務シナリオでは、通常は当日のデータを同期することが選択されます。その場合、where条件をgmt_create > $bizdateと指定できます。
注意
|
jobファイルを設定した後、そのjobを実行します。コマンドは以下のとおりです:
python datax.py ../job/myjob.json
詳細を見る
DataXのオープンソースコードおよび詳細については、DataXを参照してください。