2016-10-24 6 views
1

私のKafkaストリームアプリの1つとして、DSLとProcessor APIの両方の機能を使用する必要があります。ストリーミングアプリの流れはKafka Streams DSLプロセッサにカスタムStateStoreを追加する方法は?

source -> selectKey -> filter -> aggregate (on a window) -> sink 

です。集計後、シンクに1つの集約メッセージを送信する必要があります。私はカスタムStateStoreを定義し、

public class MyProcessor implements Processor<String, String> { 

    private ProcessorContext context = null; 
    Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer); 


    KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore") 
     .withKeys(Serdes.String()) 
     .withValues(invSerde) 
     .persistent() 
     .build() 
     .get(); 

    public MyProcessor() { 
    } 

    @Override 
    public void init(ProcessorContext context) { 
     this.context = context; 
     this.context.register(invStore, false, null); // register the store 
     this.context.schedule(10 * 60 * 1000L); 
    } 

    @Override 
    public void process(String partitionKey, String message) { 
     try { 
      MessageModel smb = new MessageModel(message); 
      HashMapStore oldStore = invStore.get(partitionKey); 
      if (oldStore == null) { 
       oldStore = new HashMapStore(); 
      } 
      oldStore.addSmb(smb); 
      invStore.put(partitionKey, oldStore); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void punctuate(long timestamp) { 
     // processes all the messages in the state store and sends single aggregate message 
    } 


    @Override 
    public void close() { 
     invStore.close(); 
    } 
} 

私はアプリを実行すると、私はjava.lang.NullPointerException

例外を取得し、以下のように私のプロセッサに登録

KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, String> source = builder.stream(source_stream); 
source.selectKey(new MyKeyValueMapper()) 
     .filterNot((k,v) -> k.equals("UnknownGroup")) 
     .process(() -> new MyProcessor()); 

以下のように私は私のトポロジを定義しますスレッド「StreamThread-18」のjava.lang.NullPointerException 、org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:167) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:332) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:252) at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446) at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434) at org。 apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340) at org.apache。 kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

ここで何が問題になっているのでしょうか?

答えて

6

を使用してプロセッサーの外側にストアを登録する必要があります。まずストアを作成してKStreamBuilderに登録してください。プロセッサを追加するときは、ストア名を入力してプロセッサとストアを接続します。

KStreamBuilder builder = new KStreamBuilder(); 

// create store 
StateStoreSupplier storeSupplier = (KeyValueStore)Stores.create("invStore") 
    .withKeys(Serdes.String()) 
    .withValues(invSerde) 
    .persistent() 
    .build(); 
// register store 
builder.addStateStore(storeSupplier); 

KStream<String, String> source = builder.stream(source_stream); 
source.selectKey(new MyKeyValueMapper()) 
     .filterNot((k,v) -> k.equals("UnknownGroup")) 
     .process(() -> new MyProcessor(), "invStore"); // connect store to processed by providing store name 
+0

これは機能します。どうもありがとう。私はProcessorContext :: registerメソッドをどこで使うことができますか? – Samy

+0

それはうれしいです。 –

+0

独自の状態ストア(インターフェイス 'StateStore'を使用)を実装する場合は、' ProcessorContext#register'が必要です。 'StateStore#init()'で 'register'を呼び出す必要があります。 –

関連する問題