私のKafkaストリームアプリの1つとして、DSLとProcessor APIの両方の機能を使用する必要があります。ストリーミングアプリの流れはKafka Streams DSLプロセッサにカスタムStateStoreを追加する方法は?
source -> selectKey -> filter -> aggregate (on a window) -> sink
です。集計後、シンクに1つの集約メッセージを送信する必要があります。私はカスタムStateStore
を定義し、
public class MyProcessor implements Processor<String, String> {
private ProcessorContext context = null;
Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer);
KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore")
.withKeys(Serdes.String())
.withValues(invSerde)
.persistent()
.build()
.get();
public MyProcessor() {
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.register(invStore, false, null); // register the store
this.context.schedule(10 * 60 * 1000L);
}
@Override
public void process(String partitionKey, String message) {
try {
MessageModel smb = new MessageModel(message);
HashMapStore oldStore = invStore.get(partitionKey);
if (oldStore == null) {
oldStore = new HashMapStore();
}
oldStore.addSmb(smb);
invStore.put(partitionKey, oldStore);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void punctuate(long timestamp) {
// processes all the messages in the state store and sends single aggregate message
}
@Override
public void close() {
invStore.close();
}
}
私はアプリを実行すると、私はjava.lang.NullPointerException
例外を取得し、以下のように私のプロセッサに登録
KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> source = builder.stream(source_stream); source.selectKey(new MyKeyValueMapper()) .filterNot((k,v) -> k.equals("UnknownGroup")) .process(() -> new MyProcessor());
以下のように私は私のトポロジを定義しますスレッド「StreamThread-18」のjava.lang.NullPointerException 、org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:167) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:332) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:252) at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446) at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434) at org。 apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340) at org.apache。 kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
ここで何が問題になっているのでしょうか?
これは機能します。どうもありがとう。私はProcessorContext :: registerメソッドをどこで使うことができますか? – Samy
それはうれしいです。 –
独自の状態ストア(インターフェイス 'StateStore'を使用)を実装する場合は、' ProcessorContext#register'が必要です。 'StateStore#init()'で 'register'を呼び出す必要があります。 –