私はこのコードでカフカのトピックからJSONを取得しようとしている:FLINK +カフカ+ JSON - Javaの例
public class FlinkMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream messageStream = env.addSource(
new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
, new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));
messageStream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
});
env.execute();
}
}
問題は、次のとおりです。
1)このプログラムが実行されないため、
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
The problem is at line: `messageStream.map(....`
2)たぶん、上記の問題がDataStream
は何のTYを持っていないという事実に関係していますpe。私が作るしようとした場合でも、私が探していた
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
:
DataStream<String> messageStream = env.addSource(...
は、コードが原因cannot resolve constructor FlinkKafkaConsumer09 ...
のpom.xml(重要な部分)がコンパイルされませんFlinkのコードでJSON DeserializationSchemaを成功させて使用しています。私はちょうどJSONKeyValueDeserializationSchema
のunittestを見つけましたlink
正しい方法を知っている人はいますか?
おかげ