2017-02-19 6 views
0

Apache Flinkを使用してTwitter Streaming APIでメッセージを取得しようとしています。Apache Flink - データを取得できません。

しかし、私のコードは出力ファイルに何も書き込んでいません。私は特定の単語の入力データを数えようとしています。

pleseは私の例を確認してください。

import java.util.Properties 

import org.apache.flink.api.scala._ 
import org.apache.flink.streaming.connectors.twitter._ 
import org.apache.flink.api.java.utils.ParameterTool 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
import com.twitter.hbc.core.endpoint.{Location, StatusesFilterEndpoint, StreamingEndpoint} 
import org.apache.flink.streaming.api.windowing.time.Time 

import scala.collection.JavaConverters._ 


////////////////////////////////////////////////////// 
// Create an Endpoint to Track our terms 
class myFilterEndpoint extends TwitterSource.EndpointInitializer with Serializable { 
    @Override 
    def createEndpoint(): StreamingEndpoint = { 
    //val chicago = new Location(new Location.Coordinate(-86.0, 41.0), new Location.Coordinate(-87.0, 42.0)) 
    val endpoint = new StatusesFilterEndpoint() 
    //endpoint.locations(List(chicago).asJava) 
    endpoint.trackTerms(List("odebrecht", "lava", "jato").asJava) 
    endpoint 
    } 
} 

object Connection { 
    def main(args: Array[String]): Unit = { 

    val props = new Properties() 

    val params: ParameterTool = ParameterTool.fromArgs(args) 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 

    env.getConfig.setGlobalJobParameters(params) 
    env.setParallelism(params.getInt("parallelism", 1)) 

    props.setProperty(TwitterSource.CONSUMER_KEY, params.get("consumer-key")) 
    props.setProperty(TwitterSource.CONSUMER_SECRET, params.get("consumer-key")) 
    props.setProperty(TwitterSource.TOKEN, params.get("token")) 
    props.setProperty(TwitterSource.TOKEN_SECRET, params.get("token-secret")) 

    val source = new TwitterSource(props) 
    val epInit = new myFilterEndpoint() 

    source.setCustomEndpointInitializer(epInit) 

    val streamSource = env.addSource(source) 

    streamSource.map(s => (0, 1)) 
     .keyBy(0) 
     .timeWindow(Time.minutes(2), Time.seconds(30)) 
     .sum(1) 
     .map(t => t._2) 
     .writeAsText(params.get("output")) 

    env.execute("Twitter Count") 
    } 
} 

ポイントである、私はエラーメッセージを持っていないと私は私のダッシュボードで確認することができます。私のソースはTriggerWindowにデータを送信しています。しかし、それはデータを受け取っていません: enter image description here

私は一度に2つの質問があります。

最初に:私のソースは何も受信されていない場合、私のTriggerWindowにバイトを送信しているのはなぜですか?

Seccond:私のコードに間違っていますが、私はTwitterからデータを受け取ることができませんか?

+0

最初の結果は、2分後(つまり、ウィンドウの長さ)に書き込まれます。あなたはそれを待ったことがありますか? TriggerWindowはデータを受信しましたが、43秒後にはファイルに何も書き込まれません。あなたのコードはすべて上手く見えます。 –

+0

こんにちは@DawidWysakowicz、はい私はそれを待っています。私はこのコードを2時間実行していました。私は質問のためにプリントを取った。しかし、Flinkからの出力はありません:( –

答えて

1

あなたのアプリケーションソースは、レコードのレコードを見て見ることができるウィンドウに実際のレコードを送信しませんでした。送信されるバイトは、Flinkがタスク間で随時送信する制御メッセージに属します。より具体的には、Flinkジョブのエンドツーエンドの待ち時間を測定するのに使用されるのはLatencyMarkerメッセージです。

コードはよく見えます。私もあなたのコードを試して私のために働いた。したがって、私は、Twitterの接続資格情報に何か問題があると結論づけています。正しい資格情報を入力したかどうか再確認してください。

関連する問題