2017-02-22 3 views
0

以下のコードでは、オプションマップを使用してreadConfigにmongo uriとデータベースを渡そうとしました。しかし、それはuriやデータベースが見つかりませんエラーを与える。Sparkでmongo dbのreadConfigにuriとデータベースを追加するには?

`

public JavaMongoRDD<Document> getRDDFromDS(DataSourceInfo ds, String collectionName){ 
     String mongoDBURI = "mongodb://" 
       + PropertiesFileEncryptorUtil.decryptData(ds.getDbUsername()) + ":" 
       + PropertiesFileEncryptorUtil.decryptData(ds.getDbPassword()) + "@" 
       + ds.getHostName() + ":" + ds.getPort(); 
     Map<String, String> readOverrides = new HashMap<String, String>(); 
     readOverrides.put("uri", mongoDBURI); 
     readOverrides.put("database", ds.getDbName()); 
     readOverrides.put("collection", collectionName); 
     readOverrides.put("partitioner", mongoDBInputPartitioner); 
     readOverrides.put("partitionKey", mongoDBPartitionKey); 
     readOverrides.put("partitionSizeMB", mongoDBInputPartitionSize); 

     ReadConfig readConf = ReadConfig.create(jsc).withOptions(readOverrides); 
     JavaMongoRDD<Document> readRdd = MongoSpark.load(jsc, readConf); 
     return readRdd; 
    }` 

URIとデータベースを渡すための正しい方法は何ですか。 ありがとうございます。

あなたが設定変数によってスパークする設定パラメータを渡すことができます
+0

役立ちました願っていますか?バグはすでに修正されていると思います。オプションで直接readConfを作成することもできます。 – Ross

+0

私は 'org.mongodb.spark:mongo-spark-connector_2.11:2.0.0-rc1'を使用しています。これを変更する必要があります。これはgradleを使用していて、上記をmavenの最新のものとして検出しました。前回は100MBサイズのソートが修正されました。今私はチェックし、2.0.0が最新のものであることを発見しました、私は2.0.0としようとしています。 –

+0

@Ross、あなたの素早い応答に感謝します。私は2.0.0のバージョンでテストしています。読み込み設定を渡している間に、mongoクライアントのuse sslフラグを渡す方法を教えてください。 –

答えて

0

val sc = new SparkContext(conf) 

val readConf = ReadConfig(sc) 

は、その後、あなたがから値を読み取ることができます:あなたが火花コンテキストに設定変数を与える必要がその後

val conf = new SparkConf().setAppName("YourAppName").setMaster("local[2]").set("spark.executor.memory","1g") 
     .set("spark.app.id","YourSparkId") 
     .set("spark.mongodb.input.uri","mongodb://127.0.0.1/yourdatabase.yourInputcollection?readPreference=primaryPreferred") 
     .set("spark.mongodb.output.uri","mongodb://127.0.0.1/yourdatabase.yourOutputcollection") 

このようなmongo:

val rdd = sc.loadFromMongoDB(readConfig = readConfig) 

このように保存します。

rdd.map(someMapFunction).saveToMongoDB() 

私は私の答えは、あなたが使用しているスパークコネクタのバージョンは何

+0

ご回答いただきありがとうございます。私はあなたが私の最後のコードで提案したのと同じことをしました。しかし、私たちはsparkcontextを作成している間、SparkConfではなく、読み取り設定としてuriとデータベースを渡す必要があるように、異なるホストマシン上でホストされている異なるデータソースを動的に選択する必要があります。 sparkcontextの初期設定では、単一のsparkcontextが1つのアプリケーション設定で推奨されるため、このユースケースでは適切ではないようです。 –

+0

これは、sparkcontextの初期化でuriとデータベースを提供し、uriとデータベースをreadconfigでオーバーライドすることは可能ですか? –

+0

私はあなたの2番目の質問は分かりませんが、私の最初の答えがあなたの質問に合っていれば、それを受け入れてください。 –

関連する問題