2016-03-25 8 views
1

Flumeエージェントを使用して、Flumeエージェント経由で外部データを収集します。外部データのバッチは10秒あたりほぼ1MBです。 Flumeエージェントを以下のように設定しました。NetcatソースからのイベントがKafkaチャンネルを経由しない

# Flume agent configuration as /flume/conf/agent.conf 
agent.sources = netcat-source 
agent.channels = kafka-channel 
agent.sinks = logger-sink 

######################################## 
# Netcat Source 
######################################## 

agent.sources.netcat-source.type = netcat 
agent.sources.netcat-source.bind = 0.0.0.0 
agent.sources.netcat-source.port = 4141 
agent.sources.netcat-source.max-line-length = 500000 
agent.sources.netcat-source.channels = kafka-channel 

######################################## 
# Kafka Channel 
######################################## 

agent.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel 
agent.channels.kafka-channel.brokerList = 10.212.136.108:9092,10.212.136.108:9092 
agent.channels.kafka-channel.zookeeperConnect = 10.212.136.108:2181,10.212.136.108:2181/kafka 
agent.channels.kafka-channel.topic = channel 
agent.channels.kafka-channel.groupId = fcd-group 


######################################## 
# Logger Sink 
######################################## 

agent.sinks.logger-sink.type = logger 
agent.sinks.logger-sink.channel = kafka-channel 

私は以下の方法で薬剤を活性化した。

flume-ng agent -n agent -c /flume/conf -f /flume/conf/agent.conf 

残念ながら、netcatソースがうまくいき、チャンネルやシンクで問題が発生していました。 Ubuntuのリソースモニタから、私は以下のパフォーマンスを見ることができます。 Network performance. Blue curve indicates input while red one indicates output ネットワークioで動作している他のアプリケーションがないと、この数字はFlumeエージェントに何が起こったのかを示しているはずです。

コンソールコンシューマを通じてトピック「チャンネル」のカフカコンテンツをチェックしたときに何も得られませんでした。また、私がflume.logをチェックしたとき、私はFlume出力にデータなしの状態しか得られませんでした。

私は

nc -lk 4141 >> my_data_check_file 
私のチャンネルで間違って何

やシンクを使用して、受信データを検証していた?

P.S.

メモリーチャンネル、ファイルチャンネルを使用すると、状況は同様に扱いにくくなりました。

答えて

1

ああ、最後に、私はこの問題を自分で解決しました!

重要な点は行区切り文字 '\ n'です。水路ソースコードNetcatSource.java

は、我々は 'の\ n' で終了する

private int processEvents(CharBuffer buffer, Writer writer) throws IOException { 
    int numProcessed = 0; 

    boolean foundNewLine = true; 
    while (foundNewLine) { 
    foundNewLine = false; 

    int limit = buffer.limit(); 
    for (int pos = buffer.position(); pos < limit; pos++) { 
     if (buffer.get(pos) == '\n') { 
     // parse event body bytes out of CharBuffer 
     buffer.limit(pos); // temporary limit 
     ByteBuffer bytes = Charsets.UTF_8.encode(buffer); 
     buffer.limit(limit); // restore limit 
... ... 
... ... 

コード力入力データを次のようtriky行を有します。それ以外の場合は、イベントはチャネルによって取得されません。この文字を必要に応じて変更し、カスタマイズしたソースを$ FLUME_HOME/libに入れることができます

関連する問題