2017-02-27 49 views
0

私は3つ以上の100万レコードのテーブルを持っています。 DBからすべてのレコードを読み込み、処理するためにそれらをkafkaキューに送り、他のシステムが処理できるようにする必要があります。その後、出力kafkaキューから結果を読み取り、DBに書き戻します。
正当な部分を読み書きする必要があります。それ以外の場合は、すぐにOOM例外が発生します。Mybatisバッチ処理

mybatisで一括読み取りと書き込み操作を行うための技術的解決策は何でしょうか?
きちんとした作業例は非常に高く評価されます。

答えて

1

私はカフカについてよく知らないので、疑似コードを書いています。

最初の読み取り時に、Mybatisのデフォルトの動作では、結果がリストに返されますが、3百万個のオブジェクトをメモリにロードすることは望ましくありません。 MyBatisのグローバル設定で定義された値がない場合:(@Option(fetchSize=500)注釈ベースマッパーを使用した場合)これは、文のfetchSizeを設定org.apache.ibatis.session.ResultHandler<T>

public void handleResult(final ResultContext<YourType> context) { 
    addToKafkaQueue(context.getResultObject()); 
} 

のカスタム実装を使用してオーバーライドする必要があります。 letを設定しない場合、このオプションはデフォルトでドライバの値に依存し、すべてのDBベンダーに依存します。これは、一度に結果セットにバッファされるレコードの量を定義します。例:Oracleの場合、この値は10です:アプリからDBへの操作を多く読み取るために、一般に低すぎます。 PostgreSQLの場合、これは無制限(結果セット全体)ですが、あまりにも多くなります。あなたは、速度とメモリ使用量の間の適切なバランスを把握しなければなりません。更新のために

do { 
    YourType object = readFromKafkaQueue(); 
    mybatisMapper.update(object); 
} while (kafkaQueueHasMoreElements()); 
sqlSession.flushStatement(); // only when using ExecutorType.BATCH 

最も重要なのはその意志デフォルトExecutorType.SIMPLEまたはExecutorType.BATCHと反復ごとに一度だけ文を準備する代わりにできるようになりますExecutorType(これはSessionFactory.openSession()で引数である)のいずれかであるExecutorType.REUSEステートメントをスタックし、実際にはそれらを単にフラッシュするだけです。

ここではトランザクションについて考える必要があります。これは、3百万回の更新をコミットするか、またはセグメント化することが必要な場合があります。