1

こんにちは私はcloudera flume agentを使用してツイートを取得し、flumeイベントを保持するチャネルとしてKafkaを使用しています。カフカチャンネルイベント(ヘッダーJson Tweets)で書かれたイベントの構造は次のようになります。:::::は次のように Mやjsonのようなセパレータが続くヘッダです: これはflumeイベントオブジェクトです。構造:以下のようにスパーク・スカラのkafkaチャンネルからflumeイベントを読み込む?

event(headers, jsonTweet) 

オブジェクトは、次のとおりです。

::::�M{"filter_level":"low","retweeted":false,"in_reply_to_screen_name":null,"possibly_sensitive":false,"truncated":false,"lang":"en","in_reply_to_status_id_str":null,"id":715916168905633792,"in_reply_to_user_id_str":null,"timestamp_ms":"1459522690403","in_reply_to_status_id":null,"created_at":"Fri Apr 01 14:58:10 +0000 2016","favorite_count":0,"place":null,"coordinates":null,"text":"RT @frmikeschmitz: What I wish I had been able to say about #BvS \n\nThe Damage Done - A requiem for an American","contributors":null,"retweeted_status":{"filter_level":"low","retweeted":false,"in_reply_to_screen_name":null,"possibly_sensitive":false,"truncated":false,"lang":"en","in_reply_to_status_id_str":null,"id":715549110728847360,"in_reply_to_user_id_str":null,"in_reply_to_status_id":null,"created_at":"Thu Mar 31 14:39:36 +0000 2016","favorite_count":22,"place":null,"coordinates":null,"text":"What I wish I had been able to say about #BvS \n\nThe Damage Done - A requiem for an American icon. via @bmoviesd","contributors":null,"geo":null,"entities":{"symbols":[],"urls":[{"expanded_url":"/2016/03/30/superman-and-the-damage-done","indices":[98,121],"display_url":"birthmoviesdeath.com/2016/03/30/sup\u2026","url":""}],"hashtags":[{"text":"BvS","indices":[41,45]}],"user_mentions":[{"id":202668848,"name":"Birth.Movies.Death.","indices":[126,135],"screen_name":"bmoviesd","id_str":"202668848"}]},"is_quote_status":false,"source":"<a href=\"\" rel=\"nofollow\">Twitter for iPhone<\/a>","favorited":false,"in_reply_to_user_id":null,"retweet_count":5,"id_str":"715549110728847360","user":{"location":null,"default_profile":true,"profile_background_tile":false,"statuses_count":3001,"lang":"en","profile_link_color":"0084B4","profile_banner_url":"","id":565216911,"following":null,"protected":false,"favourites_count":1858,"profile_text_color":"333333","verified":false,"description":null,"contributors_enabled":false,"profile_sidebar_border_color":"C0DEED","name":"fathermikeschmitz","profile_background_color":"C0DEED","created_at":"Sat Apr 28 03:49:18 +0000 2012","default_profile_image":false,"followers_count":17931,"profile_image_url_https":"","geo_enabled":false,"profile_background_image_url":"":"":null,"url":"","utc_offset":-14400,"time_zone":"Eastern Time (US & Canada)","notifications":null,"profile_use_background_image":true,"friends_count":81,"profile_sidebar_fill_color":"DDEEF6","screen_name":"frmikeschmitz","id_str":"565216911","profile_image_url":"","listed_count":138,"is_translator":false}},"geo":null,"entities":{"symbols":[],"urls":[{"expanded_url":"display_url":"birthmoviesdeath.com/2016/03/30/sup\u2026","url":""}],"hashtags":[{"text":"BvS","indices":[60,64]}],"user_mentions":[{"id":565216911,"name":"fathermikeschmitz","indices":[3,17],"screen_name":"frmikeschmitz","id_str":"565216911"},{"id":202668848,"name":"Birth.Movies.Death.","indices":[139,140],"screen_name":"bmoviesd","id_str":"202668848"}]},"is_quote_status":false,"source":"<a href=\"" rel=\"nofollow\">Twitter for iPhone<\/a>","favorited":false,"in_reply_to_user_id":null,"retweet_count":0,"id_str":"715916168905633792","user":{"location":null,"default_profile":true,"profile_background_tile":false,"statuses_count":25,"lang":"en","profile_link_color":"0084B4","id":2987773015,"following":null,"protected":false,"favourites_count":90,"profile_text_color":"333333","verified":false,"description":null,"contributors_enabled":false,"profile_sidebar_border_color":"C0DEED","name":"Pam Anderson","profile_background_color":"C0DEED","created_at":"Sun Jan 18 02:14:41 +0000 2015","default_profile_image":false,"followers_count":22,"profile_image_ur":"","geo_enabled":false,"profile_background_image_url":"","profile_background_image_url_httpd":"":null,"url":null,"utc_offset":null,"time_zone":null,"notifications":null,"profile_use_background_image":true,"friends_count":59,"profile_sidebar_fill_color":"DDEEF6","screen_name":"pamawah25","id_str":"2987773015","profile_image_url":"","listed_count":0,"is_translator":false}} 

次のように私が働いている水路剤である:

agent1.sources.twitter-data.type = com.cloudera.flume.source.TwitterSource 
agent1.sources.twitter-data.consumerKey = "" 
agent1.sources.twitter-data.consumerSecret = "" 
agent1.sources.twitter-data.accessToken = "" 
agent1.sources.twitter-data.accessTokenSecret = "" 
agent1.sources.twitter-data.keywords = superman, batman, iron man, tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql 

agent1.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel 
agent1.channels.kafka-channel.capacity = 10000 
agent1.channels.kafka-channel.transactionCapacity = 1000 
agent1.channels.kafka-channel.brokerList = kafka:9092 
agent1.channels.kafka-channel.topic = twitter 
agent1.channels.kafka-channel.zookeeperConnect = kafka:2181 
#agent1.channels.kafka-channel.parseAsFlumeEvent = false 

agent1.sinks.hdfs-sink.type = hdfs 
agent1.sinks.hdfs-sink.hdfs.path = hdfs:///user/Hadoop/twitter_data 
agent1.sinks.hdfs-sink.fileType = DataStream 
agent1.sinks.hdfs-sink.writeFormat = Text 
agent1.sinks.hdfs-sink.batchSize = 1000 
agent1.sinks.hdfs-sink.rollSize = 0 
agent1.sinks.hdfs-sink.rollCount = 10000 

agent1.sources.twitter-data.channels = kafka-channel 

私はRたいですこのデータをスパークストリーミング/スパークSqlで処理して保存してください。

だから、次のように私は、コードを使用しますが、それは理由水路イベントのヘルプをdosent:

val ssc = new StreamingContext(sc, Seconds(2)) 
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)//new org.apache.spark.sql.SQLContext(sc) 

// Create direct kafka stream with brokers and topics 
val topicsSet = topics.split(",").toSet 
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet) 

// Get the data (tweets) from kafka 
val tweets = messages.map(_._2) 




tweets.foreachRDD { rdd => 
    val jsonRDD = sqlContext.read.json(rdd) 
    val tweetTable = jsonRDD.toDF() 
    tweetTable.printSchema() 
    tweetTable.show(5) 
    tweetTable.write.mode("append").saveAsTable("twitterStream") 

} 
+0

"body:" + new String(e.event.getBody.array)) "のようなものがあります。他のポストから "印刷"し、カフカチャンネルでどのように使用するかを理解しようとしていますか? –

答えて

0

あなたはカフカの流れからそれらを消費している場合は、手動経由で値を解析する必要がありますセパレータは:

val tweets = messages.map { case (_, tweet) => { 
    val splitTweet = tweet.split("?M") 
    (splitTweet(0), splitTweet(1)) 
    } 
} 

これはタプルの最初の値として連結ヘッダをもたらす、及び第2の値は、ツイートを表すJSONを含むであろう。

+0

ありがとう、私は同じことをやってみたが、問題はtweet.split( "?M")は常に静的ではないということです。これらは非文字オブジェクトまたはバイナリです。*((az)|(AZ)もし私が分割することができれば –

+0

@Mouzzam私は参照してくださいあなたの質問で指定する必要があります。BTW - Flumeレシーバにネイティブスパークを使用すると、間違いなくあなたのための解析が楽になります。[this](http://spark.apache。 –

+0

私はこれを学びたいと思っています;挑戦は今、私はカッターというトピックのデータをtwitterと呼んでいます。私はkafkaをflumeチャンネルとあなたが言及したflumeストリームとして使用しています。リンクでflumeストリームが得られます... tweets = messages.map(_._2)rddをflumeストリームに渡すことはできますか? –

関連する問題