2017-06-30 5 views
0

私は数週間前にバッチからストリームに変更する最初のステップとしてv0.4に書いたヘイルキャストのJet 0.3 DAGシステムを改造しようとしています。興味深いことに、突然私はいくつかの奇妙な動作を経験しています。そこでは、頂点が期待どおりに機能するかどうかはわかりません。何が起きているのかを考えてみると、各頂点の内部の仕組みをどのように覗いていくかについてのオプションを見つけることができません。少なくともいくつかのエラーメッセージを出す方法はありますか?ヘーゼルキャストジェット頂点の仕組みを見てみるには?

問題を特定しようとして、私は非常に単純化した "リストから読み込み、マップに書き込むマップ" DAGにダムしようとしました。しかし、何かを得ることはまだ成功していません。

私の愚かな例の下では、もっと知り合った人がすぐに見ることができる非常に単純な間違いをするかもしれませんか?

出版社:

// every second via executorservice: 
final IStreamMap<Long, List<byte[]>> data = jet.getMap("data"); 
data.set(jet.getHazelcastInstance().getAtomicLong("key").getAndIncrement(), myByteArray); 

アナライザ:

jet.getList(key.toString()).addAll((List<byte[]>) jet.getMap("data").get(key)); 
jet.getMap("data").remove(key); 
logger.debug("List {} has size: {}", key, jet.getList(key.toString()).size()); 

final Vertex sourceDataMap = this.newVertex("sourceDataMap", readList(key.toString())).localParallelism(1); 
final Vertex parseByteArrayToMap = this.newVertex("parseByteArrayToMap", map(
    (byte[] e) -> new AbstractMap.SimpleEntry<>(jet.getHazelcastInstance().getAtomicLong("counter").getAndIncrement(), e))); 
final Vertex sinkIntoResultMap = this.newVertex("sinkIntoResultMap", writeMap("result")); 

this.edge(between(sourceDataMap, parseByteArrayToMap)) 
    .edge(between(parseByteArrayToMap, sinkIntoResultMap)); 

リスナー:

jet.getMap("result").addEntryListener((EntryAddedListener<Long, byte[]>) 
     (EntryEvent<Long, byte[]> entryEvent) 
      -> logger.debug("Got result: {} at {}",entryEvent.getValue().length, System.currentTimeMillis()) 
     , true); 

データ生成DAGが引き継ぐまでは正常ですが、エラーメッセージやDAGから来るものはありません。助言がありますか?

+0

あなたはhttps://stackoverflow.com(私たち[MCVE]を表示しことができます/ help/mcve)あなたの問題を示すことができますか?また、あなたが記述するツールを持つ 'DiagnosticProcessors'クラスをチェックすることもできます。 –

+0

'ジェット'を指すラムダがあります。 JetInstanceは直列化不可能なので、ラムダの一部として使用することはできません。一意のIDを生成したい場合は、 'UUID.randomUUID()'のようなものを使うことができます。 –

+0

投稿したDAGからジョブを実行しようとしましたが、頂点Canを記述する行に 'IllegalArgumentException'がスローされます。だから、仕事を始める機会を得る前でさえ、速く(そして大声で)失敗します。 –

答えて

1

ここに私の側に働く少し消毒あなたのコードがあります:私は見コンソールで

public class Main { 
    public static void main(String[] args) throws Exception { 
     JetInstance jet = Jet.newJetInstance(); 
     try { 
      HazelcastInstance hz = jet.getHazelcastInstance(); 
      ILogger logger = hz.getLoggingService().getLogger("a"); 

      // every second via executorservice: 
      final IStreamMap<Long, List<byte[]>> data = jet.getMap("data"); 
      List<byte[]> myByteArray = asList(new byte[1], new byte[2]); 
      IAtomicLong keyGen = hz.getAtomicLong("key"); 
      Long key = keyGen.getAndIncrement(); 
      data.set(key, myByteArray); 

      String stringKey = key.toString(); 
      hz.getList(stringKey).addAll((List<byte[]>) jet.getMap("data").get(key)); 
      jet.getMap("data").remove(key); 
      logger.severe(String.format("List %s has size: %d", key, jet.getList(stringKey).size())); 

      hz.getMap("result").addEntryListener((EntryAddedListener<Long, byte[]>) 
        (EntryEvent<Long, byte[]> entryEvent) -> logger.severe(String.format(
          "Got result: %d at %d", entryEvent.getValue().length, System.currentTimeMillis())), 
        true); 

      DAG dag = new DAG(); 
      Vertex sourceDataMap = dag.newVertex("sourceDataMap", readList(stringKey)).localParallelism(1); 
      Vertex parseByteArrayToMap = dag.newVertex("parseByteArrayToMap", map(
        (byte[] e) -> entry(randomUUID(), e))); 
      Vertex sinkIntoResultMap = dag.newVertex("sinkIntoResultMap", writeMap("result")); 

      dag.edge(between(sourceDataMap, parseByteArrayToMap)) 
       .edge(between(parseByteArrayToMap, sinkIntoResultMap)); 

      jet.newJob(dag).execute().get(); 
      Thread.sleep(1000); 
     } finally { 
      Jet.shutdownAll(); 
     } 
    } 
} 

SEVERE: [192.168.5.12]:5701 [jet] [0.4-SNAPSHOT] [3.8.2] List 0 has size: 2 
SEVERE: [192.168.5.12]:5701 [jet] [0.4-SNAPSHOT] [3.8.2] Got result: 2 at 1498822322228 
SEVERE: [192.168.5.12]:5701 [jet] [0.4-SNAPSHOT] [3.8.2] Got result: 1 at 1498822322228