2016-09-02 13 views
1

私はこのコードでカフカのトピックからJSONを取得しようとしている:FLINK +カフカ+ JSON - Javaの例

public class FlinkMain { 
public static void main(String[] args) throws Exception { 
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

    // parse user parameters 
    ParameterTool parameterTool = ParameterTool.fromArgs(args); 

    DataStream messageStream = env.addSource(
      new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic") 
      , new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties())); 

    messageStream.map(new MapFunction<String, String>() { 
     private static final long serialVersionUID = -6867736771747690202L; 

     @Override 
     public String map(String value) throws Exception { 
      return "Kafka and Flink says: " + value; 
     } 
    }); 

    env.execute(); 
} 

}

問題は、次のとおりです。

1)このプログラムが実行されないため、

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. 

The problem is at line: `messageStream.map(....` 

2)たぶん、上記の問題がDataStreamは何のTYを持っていないという事実に関係していますpe。私が作るしようとした場合でも、私が探していた

<dependencies> 
    <dependency> 
     <groupId>org.apache.flink</groupId> 
     <artifactId>flink-java</artifactId> 
     <version>1.1.1</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.flink</groupId> 
     <artifactId>flink-streaming-java_2.11</artifactId> 
     <version>1.1.1</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.flink</groupId> 
     <artifactId>flink-clients_2.11</artifactId> 
     <version>1.1.1</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.flink</groupId> 
     <artifactId>flink-connector-kafka-0.9_2.11</artifactId> 
     <version>1.1.1</version> 
    </dependency> 

</dependencies> 

DataStream<String> messageStream = env.addSource(...

は、コードが原因cannot resolve constructor FlinkKafkaConsumer09 ...

のpom.xml(重要な部分)がコンパイルされませんFlinkのコードでJSON DeserializationSchemaを成功させて使用しています。私はちょうどJSONKeyValueDeserializationSchemaのunittestを見つけましたlink

正しい方法を知っている人はいますか?

おかげ

答えて

3

あなたの誤差がラインmessageStream.map(new MapFunction<String, String>()です。定義したmapFunctionは、String型の入力とString型の出力を期待していますが、JSONKeyValueDeserializationSchemaをStringに変換すると、にMapFunctionは実際に同じ型のObjectNodeの入力を期待する必要があります。以下のコードを試してみてください。

messageStream.map(new MapFunction<ObjectNode, String>() { 
     private static final long serialVersionUID = -6867736771747690202L; 

     @Override 
     public String map(ObjectNode node) throws Exception { 
      return "Kafka and Flink says: " + node.get(0); 
     } 
    }); 
3

私はヴィシュヌviswanath答えを追った、しかしJSONKeyValueDeserializationSchemaでも{"name":"John Doe"}ような単純なJSONのために、JSONパーサーステップ中に例外が発生します。

スローコードは次のとおりです。

DataStream<ObjectNode> messageStream = env.addSource(
    new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic") 
    , new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties())); 

messageStream.rebalance().map(new MapFunction<ObjectNode, String>() { 
    private static final long serialVersionUID = -6867736771747690202L; 

    @Override 
    public String map(ObjectNode node) throws Exception { 
     return "Kafka and Flink says: " + node.get(0); 
    } 
}).print(); 

出力:

09/05/2016 11:16:02 Job execution switched to status FAILED. 
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:822) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.NullPointerException 
    at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:790) 
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2215) 
    at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:52) 
    at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:38) 
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227) 
    at java.lang.Thread.run(Thread.java:745) 

は、私は別のデシリアライゼーションのスキーマを使用して成功したJSONDeserializationSchema

 DataStream<ObjectNode> messageStream = env.addSource(
      new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic") 
        , new JSONDeserializationSchema(), parameterTool.getProperties())); 

    messageStream.rebalance().map(new MapFunction<ObjectNode, String>() { 
     private static final long serialVersionUID = -6867736771747690202L; 

     @Override 
     public String map(ObjectNode value) throws Exception { 
      return "Kafka and Flink says: " + value.get("key").asText(); 
     } 
    }).print(); 
関連する問題