2016-01-26 6 views
5

私は例えば、スパークのすべての執行上の私の機能に非serialisableサードパーティのクラスを使用する必要があります。スパーク - 静的オブジェクトを持っているための正しい方法は、すべての労働者にどのようなものです

JavaRDD<String> resRdd = origRdd 
    .flatMap(new FlatMapFunction<String, String>() { 
     @Override 
     public Iterable<String> call(String t) throws Exception { 

     //A DynamoDB mapper I don't want to initialise every time 
     DynamoDBMapper mapper = new DynamoDBMapper(new AmazonDynamoDBClient(credentials)); 

     Set<String> userFav = mapper.load(userDataDocument.class, userId).getFav(); 

     return userFav; 
    } 
}); 

私はすべてのエグゼキュータのために一度初期化し、それを何度も何度も使用することができる静的なDynamoDBMapper mapperを持っていたいと思います。

シリアル対応ではないので、ドライブで一度初期化してブロードキャストすることはできません。

注:これはここ(What is the right way to have a static object on all workers)の回答ですが、Scalaの場合のみです。

+1

DynamoDBMapperがスレッドセーフである場合は、Javaシングルトンクラスを作成することができます。 – zsxwing

答えて

5

mapPartitionまたはforeachPartitionを使用できます。ここでpartition-ベースの操作を使用することによりLearning Spark

から取られたスニペットがあり、我々は多くの接続をセットアップ避けるために、このデータベースへの接続プール を共有し、そして私たちの JSONパーサーを再利用することができます。例6-10〜6-12に示すように、 mapPartitions()関数を使用します。これは、入力RDDの各パーティションにある という要素のイテレータを返し、我々の結果の イテレータを返すことを期待しています。

これにより、エグゼキュータごとに1つの接続を初期化し、必要に応じてパーティション内の要素を反復処理できます。これは、外部データベースにデータを保存する場合や、高価な再利用可能なオブジェクトを作成する場合に非常に便利です。

リンクされた本から取られた単純なスケーラの例です。これは、必要に応じてjavaに変換することができます。ここでは、mapPartitionとforeachPartitionの単純な使用例を示します。

ipAddressRequestCount.foreachRDD { rdd => rdd.foreachPartition { partition => 
    // Open connection to storage system (e.g. a database connection) 
    partition.foreach { item => 
    // Use connection to push item to system 
    } 
    // Close connection 
    } 
} 

Here is a linkをJavaの例にします。

+0

多くのパーティションが存在します。 executorまたはjvmごとに接続プールを持つことは可能ですか? – donald

+2

@donaldオブジェクト(シングルトン)に一時的なvalとして接続プールを追加することは可能です。オブジェクトはエグゼキュータごとに1回初期化され、参照時に各エグゼキュータで接続プールが作成されます。各パーティションにその接続プールを再利用できるようになりました。 –

関連する問題