RecordReader
という独自のFileInputFormat
を使用して、csvデータを<Long><String>
のペアに読み込む必要があります。私はsparkContext.hadoopFile
メソッドを呼び出すことにより、ファイルを読み、私のスパークアプリケーションでHadoop 2:カスタムInputFormatを使用すると空の結果が返される
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
public class MyStringRecordReader implements RecordReader<Long, String> {
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public MyStringRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
System.out.println("constructor called");
}
@Override
public void close() throws IOException {
lineReader.close();
}
@Override
public Long createKey() {
return lineKey.get();
}
@Override
public String createValue() {
System.out.println("createValue called");
return lineValue.toString();
}
@Override
public long getPos() throws IOException {
return lineReader.getPos();
}
@Override
public float getProgress() throws IOException {
return lineReader.getProgress();
}
@Override
public boolean next(Long key, String value) throws IOException {
System.out.println("next called");
// get the next line
if (!lineReader.next(lineKey, lineValue)) {
return false;
}
key = lineKey.get();
value = lineValue.toString();
System.out.println(key);
System.out.println(value);
return true;
}
}
:
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class MyTextInputFormat extends FileInputFormat<Long, String> {
@Override
public RecordReader<Long, String> getRecordReader(InputSplit input, JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(input.toString());
return new MyStringRecordReader(job, (FileSplit)input);
}
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return super.isSplitable(fs, filename);
}
}
とクラスMyStringRecordReader
:
したがって、私はクラスMyTextInputFormat
を作成しました。しかし、私は唯一の次のコードから空の出力を得る:
public class AssociationRulesAnalysis {
@SuppressWarnings("serial")
public static void main(String[] args) {
JavaRDD<String> inputRdd = sc.hadoopFile(inputFilePath, MyTextInputFormat.class, Long.class, String.class).map(new Function<Tuple2<Long,String>, String>() {
@Override
public String call(Tuple2<Long, String> arg0) throws Exception {
System.out.println("map: " + arg0._2());
return arg0._2();
}
});
List<String> asList = inputRdd.take(10);
for(String s : asList) {
System.out.println(s);
}
}
}
私は戻っRDDから10空行を取得します。
添加prints
とコンソール出力は、次のように見える:
=== APP STARTED : local-1467182320798
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:
next called
54
ä11
map:
next called
60
ß12
map:
next called
12
=====================
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:
Stopping...
(RDDデータが=====
出力(10の空行の下に印刷されて!!!)=====
上記出力がなされているように見えます。 RDD.count
呼び出しによって。next
方法で正しいキーが&値が示されている!?何が私が間違っているのでしょうか?
I「)は、キー= lineKey.get(」上ライン「createKey」の内容と「createValue」方法を変更し、「値= lineValue.toString()」が成功せず、残念ながら依然として10を得ます空の行。 –
上記の質問の私のコードスニペットでもこれを変更しました。 –
createKeyとcreateValueは、適切なキーと値のオブジェクトを作成するために使用されます。これらのAPIは、ファイルのデータ値をkeyとvalueに割り当てません。これは「次の」実装の責任です。あなたの次の実装は、キーと値を上書きしています。次の方法でキーと値を印刷して、私の言うことを確認してください。 – Amit