2017-01-09 8 views
1

私はkafkaのトピックからFlinkストリーミングのデータを読み込もうとしています。私は、エラーを以下しているApacheのカフカコネクタ、KafkaをソースとしたFlink

import java.util.Properties; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.util.serialization.DeserializationSchema; 
import org.apache.flink.streaming.util.serialization.SerializationSchema; 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema; 

public class stock_streaming_kafka { 

    public static void main(String[] args) throws Exception 
    { 
     StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); 
     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "localhost:9092"); 
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("nsestocks4k", new SimpleStringSchema(), properties); 

    DataStream<String> stream = env 
     .addSource(myConsumer) 
     .print(); 
} 

}

Exception in thread "main" java.lang.Error: Unresolved compilation problems: 
The type org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase cannot be resolved. It is indirectly referenced from required .class files 
The method addSource(SourceFunction<OUT>) in the type StreamExecutionEnvironment is not applicable for the arguments (FlinkKafkaConsumer09<String>) 

at stock_streaming_kafka.main(stock_streaming_kafka.java:25) 
を私はページAPACHE FLINK 1.1.3マニュアルの例としてそこにある以下の例のコードを実行しようとしています

この問題を解決するために教えてください。 Kafkaコネクタに依存する問題はありますか? は私のバージョンは以下のとおりです。FLINKの

  1. FLINK 1.1.3
  2. カフカ2.10
  3. FLINK - コネクタ - カフカ-0.9_2.11-1.0.0.jar

答えて

1

以下のバージョンをご利用ください。それはあなたのカフカバージョンで動作します。

<dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-streaming-java_2.11</artifactId> 
      <version>1.1.4</version> 
      <scope>provided</scope> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-connector-kafka-0.9_2.10</artifactId> 
      <version>1.1.3</version> 
     </dependency> 

コード内にコンパイルの問題があります。

変更この:

DataStream<String> stream = env 
     .addSource(myConsumer) 
     .print(); 

には:それはまだ、あなたのために動作しない場合

DataStream<String> stream = env 
     .addSource(myConsumer); 
stream.print(); 

私に知らせてくださいと私は、作業コードを共有することになります。

0

バージョンとFlinkコネクタが一致する必要があります。 flink-connectorの依存関係を1.1.3に更新します。

+0

Thanx Fabian。 jarのバージョンをflink-connector-kafka-0.9_2.11-1.1.3.jarに変更しました。しかし、エラーが発生しました_タイプorg.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaは解決できません。それは間接的に必要な.classファイルから参照されます_ – kadsank

0

答えがまだ受け入れられていないので、Flinkを使用してKafkaからデータを読み取る場合はcomplete Maven code exampleです。

あなたはKafkaとScalaのバージョンの設定に合わせてpom.xmlを微調整する必要があるかもしれません。

これが役に立ちます。

関連する問題