2016-06-27 4 views
0

RecordReaderという独自のFileInputFormatを使用して、csvデータを<Long><String>のペアに読み込む必要があります。私はsparkContext.hadoopFileメソッドを呼び出すことにより、ファイルを読み、私のスパークアプリケーションでHadoop 2:カスタムInputFormatを使用すると空の結果が返される

import java.io.IOException; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.LineRecordReader; 
import org.apache.hadoop.mapred.RecordReader; 

public class MyStringRecordReader implements RecordReader<Long, String> { 

    private LineRecordReader lineReader; 
    private LongWritable lineKey; 
    private Text lineValue; 

    public MyStringRecordReader(JobConf job, FileSplit split) throws IOException { 
     lineReader = new LineRecordReader(job, split); 

     lineKey = lineReader.createKey(); 
     lineValue = lineReader.createValue(); 

     System.out.println("constructor called"); 
    } 

    @Override 
    public void close() throws IOException { 
     lineReader.close(); 
    } 

    @Override 
    public Long createKey() { 
     return lineKey.get(); 
    } 

    @Override 
    public String createValue() { 
     System.out.println("createValue called"); 
     return lineValue.toString(); 
    } 

    @Override 
    public long getPos() throws IOException { 
     return lineReader.getPos(); 
    } 

    @Override 
    public float getProgress() throws IOException { 
     return lineReader.getProgress(); 
    } 

    @Override 
    public boolean next(Long key, String value) throws IOException { 
     System.out.println("next called"); 

     // get the next line 
     if (!lineReader.next(lineKey, lineValue)) { 
      return false; 
     } 

     key = lineKey.get(); 
     value = lineValue.toString(); 

     System.out.println(key); 
     System.out.println(value); 


     return true; 
    } 
} 

import java.io.IOException; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.InputSplit; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.RecordReader; 
import org.apache.hadoop.mapred.Reporter; 

public class MyTextInputFormat extends FileInputFormat<Long, String> { 

    @Override 
    public RecordReader<Long, String> getRecordReader(InputSplit input, JobConf job, Reporter reporter) throws IOException { 
     reporter.setStatus(input.toString()); 
     return new MyStringRecordReader(job, (FileSplit)input); 
    } 

    @Override 
    protected boolean isSplitable(FileSystem fs, Path filename) { 
    return super.isSplitable(fs, filename); 
    } 
} 

とクラスMyStringRecordReader

したがって、私はクラスMyTextInputFormatを作成しました。しかし、私は唯一の次のコードから空の出力を得る:

public class AssociationRulesAnalysis { 

    @SuppressWarnings("serial") 
    public static void main(String[] args) { 
     JavaRDD<String> inputRdd = sc.hadoopFile(inputFilePath, MyTextInputFormat.class, Long.class, String.class).map(new Function<Tuple2<Long,String>, String>() { 
      @Override 
      public String call(Tuple2<Long, String> arg0) throws Exception { 
       System.out.println("map: " + arg0._2()); 
       return arg0._2(); 
      } 
     }); 

     List<String> asList = inputRdd.take(10); 
     for(String s : asList) { 
      System.out.println(s); 
     } 
    } 
} 

私は戻っRDDから10空行を取得します。

添加printsとコンソール出力は、次のように見える:

=== APP STARTED : local-1467182320798 
constructor called 
createValue called 
next called 
0 
ä1 
map: 
next called 
8 
ö2 
map: 
next called 
13 
ü3 
map: 
next called 
18 
ß4 
map: 
next called 
23 
ä5 
map: 
next called 
28 
ö6 
map: 
next called 
33 
ü7 
map: 
next called 
38 
ß8 
map: 
next called 
43 
ä9 
map: 
next called 
48 
ü10 
map: 
next called 
54 
ä11 
map: 
next called 
60 
ß12 
map: 
next called 
12 
===================== 
constructor called 
createValue called 
next called 
0 
ä1 
map: 
next called 
8 
ö2 
map: 
next called 
13 
ü3 
map: 
next called 
18 
ß4 
map: 
next called 
23 
ä5 
map: 
next called 
28 
ö6 
map: 
next called 
33 
ü7 
map: 
next called 
38 
ß8 
map: 
next called 
43 
ä9 
map: 
next called 
48 
ü10 
map: 










Stopping... 

(RDDデータが=====出力(10の空行の下に印刷されて!!!)=====上記出力がなされているように見えます。 RDD.count呼び出しによって。next方法で正しいキーが&値が示されている!?何が私が間違っているのでしょうか?

答えて

0

lineKeylineValuekeyとに初期化されることはありませんはMyStringRecordReaderのオーバーライドnextメソッドに渡されます。したがって、RecordReaderを使用しようとすると、常にEMPTY文字列が表示されます。 ファイル内のレコードに異なるキーと値が必要な場合は、nextメソッドに渡されたkeyとvalueを使用し、計算されたキーと値で初期化する必要があります。キー/バリュー・レコードを変更しない場合は、以下を取り除きます。このコードを実行するたびに、EMPTY文字列と0Lをファイルから読み込んだキー/値を上書きします。

key = lineKey.get(); 
value = lineValue.toString(); 
+0

I「)は、キー= lineKey.get(」上ライン「createKey」の内容と「createValue」方法を変更し、「値= lineValue.toString()」が成功せず、残念ながら依然として10を得ます空の行。 –

+0

上記の質問の私のコードスニペットでもこれを変更しました。 –

+0

createKeyとcreateValueは、適切なキーと値のオブジェクトを作成するために使用されます。これらのAPIは、ファイルのデータ値をkeyとvalueに割り当てません。これは「次の」実装の責任です。あなたの次の実装は、キーと値を上書きしています。次の方法でキーと値を印刷して、私の言うことを確認してください。 – Amit

関連する問題