これはワードカウントマップ削減ジョブです。私は自分のInputFormatを持っています。MapReduceジョブでリデューサタスクが呼び出されていません
JobExecutor:
val job = new Job(new Configuration())
job.setMapperClass(classOf[CountMapper])
job.setReducerClass(classOf[CountReducer])
job.setJobName("tarun-test-1")
job.setInputFormatClass(classOf[MyInputFormat])
FileInputFormat.setInputPaths(job, new Path(args(0)))
FileOutputFormat.setOutputPath(job, new Path(args(1)))
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[LongWritable])
job.setNumReduceTasks(1)
println("status: " + job.waitForCompletion(true))
マッパー:
class CountMapper extends Mapper[LongWritable, Text, Text, LongWritable] {
private val valueOut = new LongWritable(1L)
override def map(k: LongWritable, v: Text, context: Mapper[LongWritable, Text, Text, LongWritable]#Context): Unit = {
val str = v.toString
str.split(",").foreach(word => {
val keyOut = new Text(word.toLowerCase.trim)
context.write(keyOut, valueOut)
})
}
}
リデューサー:
class CountReducer extends Reducer[Text, LongWritable, Text, LongWritable] {
override def reduce(k: Text, values: Iterable[LongWritable], context: Reducer[Text, LongWritable, Text, LongWritable]#Context): Unit = {
println("Inside reduce method..")
val valItr = values.iterator()
var sum = 0L
while (valItr.hasNext) {
sum = sum + valItr.next().get()
}
context.write(k, new LongWritable(sum))
println("done reducing.")
}
}
マッパーが呼び出されているとRecordReaderが正しくログに基づいて分割を読んでいます。ただし、レデューサーは呼び出されていません。
あなたはあなた自身のInputFormatを持っているということを意味していますか?それはどこにある?削減は呼び出されていないとはどういう意味ですか?どうしてわかるの?任意の入出力?カウンター?エラー?ログ? – vefthym
MyInputFormatは私自身のInputFormatです。 InputFormatが正常に動作しています。私は、そのマッパーの入力(キー、値)がRecordReaderによって正しく読み取られていることを確認しています。私は、マップタスクにログを追加し、予想通りにログを記録しています。ただし、縮小ログは出力されず、最終ステータスはfalseです。 –