2016-06-22 9 views
1

私は以前に嵐を使用していました。私は嵐の中でバッチ処理を検索するために、よりバッチ処理能力が必要でした。 リアルタイムでマイクロバッチ処理を行うTridentが見つかりました。嵐のトライデントをバッチ処理タプルに使用する方法?

しかし、どういうわけか、トライデントがマイクロバッチ処理(フロー、バッチサイズ、バッチ間隔)を処理して、実際に私が必要としていることを知ることができません。

私がしたいのは、一定の間隔でスパウトによって放出されたタプルを収集/保存し、別の時間間隔で下流のコンポーネント/ボルト/機能に再放出することです。 (例えば、毎秒1つのタプルを発する噴出、次のトライデント機能は/収集タプルを保存し、次の機能に毎分50個のタプルを放出します。)

誰かが、私はこのケースでトライデントを適用することができますどのように私を導くことはできますか? または嵐の機能を使用する他の適用可能な方法?

答えて

1

優秀な質問!しかし残念ながら、この種のマイクロバッチ処理はTridentボックスからはサポートされていません。

しかし、独自の周波数駆動のマイクロバッチ処理を実装できます。このスケルトンの例のような何か:

import java.util.ArrayList; 
import java.util.List; 
import java.util.Map; 
import java.util.concurrent.LinkedBlockingQueue; 

import org.apache.storm.task.OutputCollector; 
import org.apache.storm.task.TopologyContext; 
import org.apache.storm.topology.OutputFieldsDeclarer; 
import org.apache.storm.topology.base.BaseRichBolt; 
import org.apache.storm.tuple.Tuple; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

public class MicroBatchingBolt extends BaseRichBolt { 

    private static final long serialVersionUID = 8500984730263268589L; 
    private static final Logger LOG = LoggerFactory.getLogger(MicroBatchingBolt.class); 

    protected LinkedBlockingQueue<Tuple> queue = new LinkedBlockingQueue<Tuple>(); 

    /** The threshold after which the batch should be flushed out. */ 
    int batchSize = 100; 

    /** 
    * The batch interval in sec. Minimum time between flushes if the batch sizes 
    * are not met. This should typically be equal to 
    * topology.tick.tuple.freq.secs and half of topology.message.timeout.secs 
    */ 
    int batchIntervalInSec = 45; 

    /** The last batch process time seconds. Used for tracking purpose */ 
    long lastBatchProcessTimeSeconds = 0; 

    private OutputCollector collector; 

    @Override 
    @SuppressWarnings("rawtypes") 
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     this.collector = collector; 
    } 

    @Override 
    public void execute(Tuple tuple) { 
     // Check if the tuple is of type Tick Tuple 
     if (isTickTuple(tuple)) { 
     // If so, it is indication for batch flush. But don't flush if previous 
     // flush was done very recently (either due to batch size threshold was 
     // crossed or because of another tick tuple 

     if ((System.currentTimeMillis()/1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) { 
      LOG.debug("Current queue size is " + this.queue.size() 
       + ". But received tick tuple so executing the batch"); 

      finishBatch(); 
     } else { 
      LOG.debug("Current queue size is " + this.queue.size() 
       + ". Received tick tuple but last batch was executed " 
       + (System.currentTimeMillis()/1000 - lastBatchProcessTimeSeconds) 
       + " seconds back that is less than " + batchIntervalInSec 
       + " so ignoring the tick tuple"); 
     } 
     } else { 
     // Add the tuple to queue. But don't ack it yet. 
     this.queue.add(tuple); 
     int queueSize = this.queue.size(); 
     LOG.debug("current queue size is " + queueSize); 
     if (queueSize >= batchSize) { 
      LOG.debug("Current queue size is >= " + batchSize 
       + " executing the batch"); 

      finishBatch(); 
     } 
     } 
    } 

    private boolean isTickTuple(Tuple tuple) { 
     // Check if it is tick tuple here 
     return false; 
    } 

    /** 
    * Finish batch. 
    */ 
    public void finishBatch() { 

     LOG.debug("Finishing batch of size " + queue.size()); 
     lastBatchProcessTimeSeconds = System.currentTimeMillis()/1000; 
     List<Tuple> tuples = new ArrayList<Tuple>(); 
     queue.drainTo(tuples); 

     for (Tuple tuple : tuples) { 
     // Prepare your batch here (may it be JDBC, HBase, ElasticSearch, Solr or 
     // anything else. 
     // List<Response> responses = externalApi.get("..."); 
     } 

     try { 
     // Execute your batch here and ack or fail the tuples 
     LOG.debug("Executed the batch. Processing responses."); 
     //  for (int counter = 0; counter < responses.length; counter++) { 
     //   if (response.isFailed()) { 
     //   LOG.error("Failed to process tuple # " + counter); 
     //   this.collector.fail(tuples.get(counter)); 
     //   } else { 
     //   LOG.debug("Successfully processed tuple # " + counter); 
     //   this.collector.ack(tuples.get(counter)); 
     //   } 
     //  } 
     } catch (Exception e) { 
     LOG.error("Unable to process " + tuples.size() + " tuples", e); 
     // Fail entire batch 
     for (Tuple tuple : tuples) { 
      this.collector.fail(tuple); 
     } 
     } 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     // ... 
    } 

} 

出典:http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/Using tick tuples with trident in storm

関連する問題