2017-01-27 6 views
0

私はBig Dataを初めて使い、Flumeを使用してcsvファイルをHDFSに転送するタスクを持っていますが、これらのCSVをavroに変換する必要もあります。私が使用して水路の構成以下のことを実行しようとしました:csvファイルをhdfsに転送し、flumeを使用してavroに変換する

a1.channels = dataChannel 
a1.sources = dataSource 
a1.sinks = dataSink 

a1.channels.dataChannel.type = memory 
a1.channels.dataChannel.capacity = 1000000 
a1.channels.dataChannel.transactionCapacity = 10000 

a1.sources.dataSource.type = spooldir 
a1.sources.dataSource.spoolDir = {spool_dir} 
a1.sources.dataSource.fileHeader = true 
a1.sources.dataSource.fileHeaderKey = file 
a1.sources.dataSource.basenameHeader = true 
a1.sources.dataSource.basenameHeaderKey = basename 
a1.sources.dataSource.interceptors.attach-schema.type = static 
a1.sources.dataSource.interceptors.attach-schema.key = flume.avro.schema.url 
a1.sources.dataSource.interceptors.attach-schema.value = {path_to_schema_in_hdfs} 

a1.sinks.dataSink.type = hdfs 
a1.sinks.dataSink.hdfs.path = {sink_path} 
a1.sinks.dataSink.hdfs.format = text 
a1.sinks.dataSink.hdfs.inUsePrefix = . 
a1.sinks.dataSink.hdfs.filePrefix = drone 
a1.sinks.dataSink.hdfs.fileSuffix = .avro 
a1.sinks.dataSink.hdfs.rollSize = 180000000 
a1.sinks.dataSink.hdfs.rollCount = 100000 
a1.sinks.dataSink.hdfs.rollInterval = 120 
a1.sinks.dataSink.hdfs.idleTimeout = 3600 
a1.sinks.dataSink.hdfs.fileType = DataStream 
a1.sinks.dataSink.serializer = avro_event 

水路のデフォルトschema.IとアブロファイルもAvroEventSerializerを使用しようとした出力が、私はちょうど別のエラーの多くは、私はそれらのすべてを解決しました、これ以外:

ERROR hdfs.HDFSEventSink: process failed 
java.lang.ExceptionInInitializerError 
     at org.apache.hadoop.hdfs.DFSOutputStream.computePacketChunkSize(DFSOutputStream.java:1305) 
     at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1243) 
     at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1266) 
     at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1101) 
     at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1059) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:232) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:75) 

ありがとうございます。

答えて

1

設定の間違いのためのソーリー。私はそれらを修正し、CSSをavroに変換する方法を見つけました。私は少しはAvroEventSerializerをこのように変更:

public void write(Event event) throws IOException { 
     if (dataFileWriter == null) { 
      initialize(event); 
     } 
     String[] items = new String(event.getBody()).split(","); 
     city.put("deviceID", Long.parseLong(items[0])); 
     city.put("groupID", Long.parseLong(items[1])); 
     city.put("timeCounter", Long.parseLong(items[2])); 
     city.put("cityCityName", items[3]); 
     city.put("cityStateCode", items[4]); 
     city.put("sessionCount", Long.parseLong(items[5])); 
     city.put("errorCount", Long.parseLong(items[6])); 
     dataFileWriter.append(citi); 
    } 

、ここではcity定義です:

private GenericRecord city = null; 

あなたは

ことを行うには良い方法を知っていれば、返信してください
関連する問題