2016-08-17 3 views
0

FLINKで複数のKAFKAクラスタからデータを読み込みたい。FLINK:同じStreamExecutionEnvironmentを使用して複数のカフカクラスタから読み込む方法

しかし、その結果、kafkaMessageStreamは最初のKafkaからのみ読み込み中です。

カフカクラスターの両方から読み取ることができます。私はと2つのストリームを別々に持っています。これはどちらもKafkaです。これは私が望むものではありません。

単一のリーダーに複数のソースを添付することは可能ですか?

サンプルコード

public class KafkaReader<T> implements Reader<T>{ 

private StreamExecutionEnvironment executionEnvironment ; 

public StreamExecutionEnvironment getExecutionEnvironment(Properties properties){ 
    executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); 
    executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1500)); 

    executionEnvironment.enableCheckpointing(
      Integer.parseInt(properties.getProperty(Constants.SSE_CHECKPOINT_INTERVAL,"5000")), CheckpointingMode.EXACTLY_ONCE); 
    executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000); 
    //executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
    //try { 
    // executionEnvironment.setStateBackend(new FsStateBackend(new Path(Constants.SSE_CHECKPOINT_PATH))); 
     // The RocksDBStateBackend or The FsStateBackend 
    //} catch (IOException e) { 
     // LOGGER.error("Exception during initialization of stateBackend in execution environment"+e.getMessage()); 
    } 

    return executionEnvironment; 
} 
public DataStream<T> readFromMultiKafka(Properties properties_k1, Properties properties_k2 ,DeserializationSchema<T> deserializationSchema) { 


    DataStream<T> kafkaMessageStream = executionEnvironment.addSource(new FlinkKafkaConsumer08<T>( 
      properties_k1.getProperty(Constants.TOPIC),deserializationSchema, 
      properties_k1)); 
    executionEnvironment.addSource(new FlinkKafkaConsumer08<T>( 
      properties_k2.getProperty(Constants.TOPIC),deserializationSchema, 
      properties_k2)); 

    return kafkaMessageStream; 
} 


public DataStream<T> readFromKafka(Properties properties,DeserializationSchema<T> deserializationSchema) { 


    DataStream<T> kafkaMessageStream = executionEnvironment.addSource(new FlinkKafkaConsumer08<T>( 
      properties.getProperty(Constants.TOPIC),deserializationSchema, 
      properties)); 

    return kafkaMessageStream; 
} 

}

私の呼び出し:

public static void main(String[] args) throws Exception 
{ 
    Properties pk1 = new Properties(); 
    pk1.setProperty(Constants.TOPIC, "flink_test"); 
    pk1.setProperty("zookeeper.connect", "localhost:2181"); 
    pk1.setProperty("group.id", "1"); 
    pk1.setProperty("bootstrap.servers", "localhost:9092"); 
    Properties pk2 = new Properties(); 
    pk2.setProperty(Constants.TOPIC, "flink_test"); 
    pk2.setProperty("zookeeper.connect", "localhost:2182"); 
    pk2.setProperty("group.id", "1"); 
    pk2.setProperty("bootstrap.servers", "localhost:9093"); 


    Reader<String> reader = new KafkaReader<String>(); 
    //Do not work 

    StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1); 
    DataStream<String> dataStream1 = reader.readFromMultiKafka(pk1,pk2,new SimpleStringSchema()); 
    DataStream<ImpressionObject> transform = new TsvTransformer().transform(dataStream); 

    transform.print();  


    //Works: 

    StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1); 
    DataStream<String> dataStream1 = reader.readFromKafka(pk1, new SimpleStringSchema()); 
    DataStream<String> dataStream2 = reader.readFromKafka(pk2, new SimpleStringSchema()); 

    DataStream<Tuple2<String, Integer>> transform1 = dataStream1.flatMap(new LineSplitter()).keyBy(0) 
    .timeWindow(Time.seconds(5)).sum(1).setParallelism(5); 
    DataStream<Tuple2<String, Integer>> transform2 = dataStream2.flatMap(new LineSplitter()).keyBy(0) 
      .timeWindow(Time.seconds(5)).sum(1).setParallelism(5); 


    transform1.print();  
    transform2.print();  

    environment.execute("Kafka Reader"); 
} 

答えて

5

問題を解決するには、私はそれが何(クラスタごとにFlinkKafkaConsumerの別々のインスタンスを作成することをお勧めしますあなたはすでにやっています)、結果のストリームを結合します:

StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1); 
DataStream<String> dataStream1 = reader.readFromKafka(pk1, new SimpleStringSchema()); 
DataStream<String> dataStream2 = reader.readFromKafka(pk2, new SimpleStringSchema()); 
DataStream<String> finalStream = dataStream1.union(dataStream2); 
+0

ありがとう..これは私のために働いた! –

+0

Cool。私はいくつかのポイントを取得するように正しいと答えをマークすることができます;) –

関連する問題