2016-10-19 8 views
1

私はKafkaトピックからJSONメッセージを読み込み、Elasticsearchインデックスに送信するためにLogstash 2.4を使用しています。Logstashフィルタを使用したKafkaトピックからのJSONメッセージの操作

JSON形式は以下の通りです - 私が欲しい、

{ 
    "_index" : "kafka_reloads", 
    "_type" : "logs", 
    "_id" : "AVfcyTU4SyCFNFP2z5-l", 
    "_score" : 1.0, 
    "_source" : { 
    "schema" : { 
     "type" : "struct", 
     "fields" : [ { 
     "type" : "string", 
     "optional" : false, 
     "field" : "reloadID" 
     }, { 
     "type" : "string", 
     "optional" : false, 
     "field" : "externalAccountID" 
     }, { 
     "type" : "int64", 
     "optional" : false, 
     "name" : "org.apache.kafka.connect.data.Timestamp", 
     "version" : 1, 
     "field" : "reloadDate" 
     }, { 
     "type" : "int32", 
     "optional" : false, 
     "field" : "reloadAmount" 
     }, { 
     "type" : "string", 
     "optional" : true, 
     "field" : "reloadChannel" 
     } ], 
     "optional" : false, 
     "name" : "reload" 
    }, 
    "payload" : { 
     "reloadID" : "155559213", 
     "externalAccountID" : "9831200014", 
     "reloadDate" : 1449529746000, 
     "reloadAmount" : 140, 
     "reloadChannel" : "C1" 
    }, 
    "@version" : "1", 
    "@timestamp" : "2016-10-19T11:56:09.973Z", 
    } 
} 

をしかし -

{ 
    "schema": 
      { 
      "type": "struct", 
     "fields": [ 
        { 
        "type":"string", 
        "optional":false, 
        "field":"reloadID" 
       }, 
       { 
        "type":"string", 
        "optional":false, 
        "field":"externalAccountID" 
       }, 
       { 
        "type":"int64", 
        "optional":false, 
        "name":"org.apache.kafka.connect.data.Timestamp", 
        "version":1, 
        "field":"reloadDate" 
       }, 
       { 
        "type":"int32", 
        "optional":false, 
        "field":"reloadAmount" 
       }, 
       { 
        "type":"string", 
        "optional":true, 
        "field":"reloadChannel" 
       } 
       ], 
     "optional":false, 
     "name":"reload" 
     }, 
    "payload": 
      { 
      "reloadID":"328424295", 
     "externalAccountID":"9831200013", 
     "reloadDate":1446242463000, 
     "reloadAmount":240, 
     "reloadChannel":"C1" 
     } 
} 

私のconfigファイル内の任意のフィルタがなければ、ESインデックスから対象文書は、以下のように見えます「ペイロード」フィールドの値の部分だけがターゲットJSON本体としてESインデックスに移動します。だから私は以下のように設定ファイルのフィルタ「を変異させる」を使用してみました - このフィルタで

input { 
    kafka { 
      zk_connect => "zksrv-1:2181,zksrv-2:2181,zksrv-4:2181" 
      group_id => "logstash" 
      topic_id => "reload" 
      consumer_threads => 3 
    } 
} 
filter { 
    mutate { 
    remove_field => [ "schema","@version","@timestamp" ] 
    } 
} 
output { 
    elasticsearch { 
        hosts => ["datanode-6:9200","datanode-2:9200"] 
        index => "kafka_reloads" 
    } 
} 

を、ES文書は現在、以下のように見える -

{ 
     "_index" : "kafka_reloads", 
     "_type" : "logs", 
     "_id" : "AVfch0yhSyCFNFP2z59f", 
     "_score" : 1.0, 
     "_source" : { 
     "payload" : { 
      "reloadID" : "850846698", 
      "externalAccountID" : "9831200013", 
      "reloadDate" : 1449356706000, 
      "reloadAmount" : 30, 
      "reloadChannel" : "C1" 
     } 
     } 
} 

しかし、実際にはそれはする必要があります以下のように -

{ 
     "_index" : "kafka_reloads", 
     "_type" : "logs", 
     "_id" : "AVfch0yhSyCFNFP2z59f", 
     "_score" : 1.0, 
     "_source" : { 
      "reloadID" : "850846698", 
      "externalAccountID" : "9831200013", 
      "reloadDate" : 1449356706000, 
      "reloadAmount" : 30, 
      "reloadChannel" : "C1" 
     } 
} 

これを行う方法はありますか?誰かが私にこれを助けることができますか?

私は、フィルタの下にしようとした -

filter { 
    json { 
     source => "payload" 
    } 
} 

しかし、それは私のようなエラー与えている -

を解析エラーJSON {:ソース=> "ペイロード"、:生=> {」 "reloadID" => "572584696"、 "externalAccountID" => "9831200011"、 "reloadDate" => 1449093851000、 "reloadAmount" => 180、 "reloadChannel" => "C1"}、例外=> java.lang.ClassCastException :org.jruby.RubyHashをorg.jruby.RubyIOにキャストすることはできません::level =>:warn}

ご協力いただければ幸いです。

おかげ ゴータムゴーシュ

答えて

5

あなたは以下のrubyフィルタ使用して欲しいものを達成することができます:すべてのフィールドを削除

  1. しかし​​1
  2. ruby { 
        code => " 
         event.to_hash.delete_if {|k, v| k != 'payload'} 
         event.to_hash.update(event['payload'].to_hash) 
         event.to_hash.delete_if {|k, v| k == 'payload'} 
        " 
        } 
    

    何それがないことです

  3. コピーall​​内側ルートレベル
  4. のフィールドはあなたが必要なものになってしまいます​​フィールド自体

を削除します。

+0

優秀!!それは完璧に働いた..おかげでたくさんの友達! –

+0

素晴らしいです、うれしかった! – Val

+0

すごいもの、ありがとう@Val。これは、Oracle GoldenGateを使用している人にとっては大いに役立つでしょう - > Kafka - > Logstash - > –

関連する問題