2017-01-21 2 views
1

私のロジックは以下の通りです。スパークストリーミングのセットアップに関する質問

  1. createDirectStreamを使用して、カフカのログタイプでトピックを取得します。

  2. 再パーティションの後、ログはさまざまな処理によって処理されます。

  3. ログタイプごとにcombineByKeyを使用して単一の文字列を作成します(StringBuilderを使用)。

  4. 最後に、ログタイプでHDFSに保存します。

は、文字列を追加する操作がたくさんありますので、GCが頻繁に起こります。

この状況でGCを設定する方がよいでしょうか?

//////////////////////

は、さまざまなロジックがありますが、私はcombineByKeyを行うに問題があると思います。

rdd.combineByKey[StringBuilder](
    (s: String) => new StringBuilder(s), 
    (sb: StringBuilder, s: String) => sb.append(s), 
    (sb1: StringBuilder, sb2: StringBuilder) => sb1.append(sb2) 
).mapValues(_.toString) 

答えて

0

あなたはそのcombineByKey式で行うことができます最も大きな影響を持つ最も単純なものは大きさに、それはあなたがそこに文字列値をマージするように、そのバッキング文字配列を拡大する必要がないように、あなたが作成StringBuilderです。リサイズは割り当て速度を増幅し、古いバッキングアレイから新しいバッキングアレイにコピーすることによってメモリ帯域幅を浪費します。推測としては、結果のデータセットのレコードの文字列長の90パーセンタイルを選択するといいでしょう。

sb1.append(sb2)を呼び出すと、もう1つは(中間の値をいくつかの統計を収集した後に)コンバーター関数がStringBuilderインスタンスを選択してもう一方のインスタンスに収まるようになります。

Java 8を使用することをお勧めします。それは文字列と文字列バッファに重い作業があるときに大きな違いをもたらす最適化を持っています。

あなたの実際のサイクルを実際にどこで過ごしているかを知る最後の重要なプロフィールです。この作業負荷(あなたが行っている追加のカスタム処理を除く)は、多くのオブジェクト(存在する場合)を古い世代に昇格させる必要はないので、若い世代に十分なサイズがあり、並行して収集されるようにする必要があります。