と私は()すべてのマップで別のファイルを読み込む必要があり、ファイルがHDFSである()はすぐにスパーク
val rdd=sc.parallelize(1 to 10000)
val rdd2=rdd.map{x=>
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://ITS-Hadoop10:9000/"), new org.apache.hadoop.conf.Configuration())
val path=new Path("/user/zhc/"+x+"/")
val t=hdfs.listStatus(path)
val in =hdfs.open(t(0).getPath)
val reader = new BufferedReader(new InputStreamReader(in))
var l=reader.readLine()
}
rdd2.count
私の問題は、このコード
ですval hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://ITS-Hadoop10:9000/"), new org.apache.hadoop.conf.Configuration())
は、マップ()が新しいFileSystem値を作成する必要があるたびに、実行時間が長くかかります。このコードをmap()関数の外に置くことができるので、毎回hdfsを作成する必要はありませんか?または、map()でファイルをすばやく読み取るにはどうすればよいですか?
私のコードは複数のマシンで動作します。ありがとうございました!
'val hdfs'をマップクロージャから移動してみてください。 – tuxdna
ファイルのカーディナリティが比較的小さい場合(10Kが小さい場合)、BufferedReaderを使用して読み込むのではなく、ファイルarborescenceを作成し、@ tuxdnaが示唆したものと一緒にRDDを読み込んで統一します。 – eliasah
@tuxdnaマップクロージャの外側に配置しようとしましたが、「タスクはシリアライズ不可、java.io.NotSerializableException:org.apache.hadoop.hdfs.DistributedFileSystem」というエラーが発生しました –