2016-05-09 5 views
3

と私は()すべてのマップで別のファイルを読み込む必要があり、ファイルがHDFSである()はすぐにスパーク

val rdd=sc.parallelize(1 to 10000) 
    val rdd2=rdd.map{x=> 
    val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://ITS-Hadoop10:9000/"), new org.apache.hadoop.conf.Configuration()) 
    val path=new Path("/user/zhc/"+x+"/") 
    val t=hdfs.listStatus(path) 
    val in =hdfs.open(t(0).getPath) 
    val reader = new BufferedReader(new InputStreamReader(in)) 
    var l=reader.readLine() 
    } 
rdd2.count 

私の問題は、このコード

です
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://ITS-Hadoop10:9000/"), new org.apache.hadoop.conf.Configuration()) 

は、マップ()が新しいFileSystem値を作成する必要があるたびに、実行時間が長くかかります。このコードをmap()関数の外に置くことができるので、毎回hdfsを作成する必要はありませんか?または、map()でファイルをすばやく読み取るにはどうすればよいですか?

私のコードは複数のマシンで動作します。ありがとうございました!

+0

'val hdfs'をマップクロージャから移動してみてください。 – tuxdna

+0

ファイルのカーディナリティが比較的小さい場合(10Kが小さい場合)、BufferedReaderを使用して読み込むのではなく、ファイルarborescenceを作成し、@ tuxdnaが示唆したものと一緒にRDDを読み込んで統一します。 – eliasah

+0

@tuxdnaマップクロージャの外側に配置しようとしましたが、「タスクはシリアライズ不可、java.io.NotSerializableException:org.apache.hadoop.hdfs.DistributedFileSystem」というエラーが発生しました –

答えて

3

wholeTextFilesのメソッドを使用することをお勧めします。これは、キーがファイルのフルパスであり、その値がstringのファイルの内容である場合にpairRddを返します。

val filesPariRDD = sc.wholeTextFiles("hdfs://ITS-Hadoop10:9000/") 
val filesLineCount = filesPariRDD.map(x => (x._1, x._2.length)) //this will return a map of fileName , number of lines of each file. You could apply any other function on the file contents 
filesLineCount.collect() 

編集

(コメントで述べたように)あなたのファイルは同じディレクトリの下にあるディレクトリにある場合は、

val filesPariRDD = sc.wholeTextFiles("hdfs://ITS-Hadoop10:9000/*/" 

はこれがあるホープ正規表現のいくつかの種類を使用することができます明確で有用なもの

+0

ありがとうございました!それだけが必要!しかし、ファイルが異なるディレクトリにあり、それらのディレクトリが同じディレクトリにある場合、それを行う同様の方法はありますか? –

+0

私は自分の答えを編集しました..あなたの質問に回答としてマークすることを検討してください;) – user1314742

+0

@haochizhangあなたは複数のディレクトリの解決に疲れましたか? – user1314742