2016-09-13 14 views
0

私はスパークジョブでは、Avroファイルを読み込もうとしています。
私のスパークバージョンは1.6.0(spark-core_2.10-1.6.0-cdh5.7.1)です。ここで newAPIHadoopFileを使ってsparkでavroファイルを読むには?

は私のJavaコードです:

JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("ReadAvro")); 
JavaPairRDD <NullWritable, Text> lines = sc.newAPIHadoopFile(args[0],AvroKeyValueInputFormat.class,AvroKey.class,AvroValue.class,new Configuration()); 

しかし、私はコンパイル時の例外を取得しています:

タイプのメソッドnewAPIHadoopFile(文字列、クラス、クラス、クラス、 構成) JavaSparkContextは、 引数(String、Class、Class、 クラス、コンフィグレーション)には適用されません

JavaでJavaSparkContext.newAPIHadoopFile()を使用する正しい方法は何ですか?

答えて

2
public class Utils { 

    public static <T> JavaPairRDD<String, T> loadAvroFile(JavaSparkContext sc, String avroPath) { 
    JavaPairRDD<AvroKey, NullWritable> records = sc.newAPIHadoopFile(avroPath, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, sc.hadoopConfiguration()); 
    return records.keys() 
     .map(x -> (GenericRecord) x.datum()) 
     .mapToPair(pair -> new Tuple2<>((String) pair.get("key"), (T)pair.get("value"))); 
    } 
} 

などのユーティリティを使用して:あなたはまた、KryoRegistrator KryoSerializerを使用し、カスタムを登録する必要があります

JavaPairRDD<String, YourAvroClassName> records = Utils.<YourAvroClassName>loadAvroFile(sc, inputDir); 

:このことができます

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 
sparkConf.set("spark.kryo.registrator", "com.test.avro.MyKryoRegistrator"); 

public class MyKryoRegistrator implements KryoRegistrator { 

    public static class SpecificInstanceCollectionSerializer<T extends Collection> extends CollectionSerializer { 
    Class<T> type; 
    public SpecificInstanceCollectionSerializer(Class<T> type) { 
     this.type = type; 
    } 

    @Override 
    protected Collection create(Kryo kryo, Input input, Class<Collection> type) { 
     return kryo.newInstance(this.type); 
    } 

    @Override 
    protected Collection createCopy(Kryo kryo, Collection original) { 
     return kryo.newInstance(this.type); 
    } 
    } 


    Logger logger = LoggerFactory.getLogger(this.getClass()); 

    @Override 
    public void registerClasses(Kryo kryo) { 
    // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type 
    // because Kryo is not able to serialize them properly, we use this serializer for them 
    kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializer<>(ArrayList.class)); 
    kryo.register(YourAvroClassName.class); 
    } 
} 

希望を...

まだ
+0

同じUtilsクラスのコンパイル時の例外。 –

関連する問題