2016-05-04 12 views
1

私はRedisインスタンスからデータをストリーミングする必要があるStormトポロジを持っています.1つのRedisインスタンスからトポロジの読み取りを実行しようとしましたが、Redisから何も読み取られず、返されたキューは空です。私はストームバージョン0.9.3を使用しています。Storm with Redisをデータソースとして使用

これは私のRedisQueueSpoutです。このパターンは、指定されたパターン(別名キー)を使用してトポロジをRedisに接続し、Stormがポーリングするたびに入力データを探します。注ぎ口は、それに続くボルトにIDメッセージを持つ単一のフィールドを出します。

package storm.starter.spout; 

import java.util.List; 
import java.util.Map; 
import redis.clients.jedis.Jedis; 
import backtype.storm.spout.SpoutOutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.base.BaseRichSpout; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Values; 
import backtype.storm.utils.Utils; 


public class RedisQueueSpout extends BaseRichSpout { 
    static final long   serialVersionUID = 737015318988609460L; 
    private SpoutOutputCollector _collector; 
    private final String   host; 
    private final int   port; 
    private final String   pattern; 
    private transient JedisQueue jq; 

    public RedisQueueSpout(String host, int port, String pattern) { 
    this.host = host; 
    this.port = port; 
    this.pattern = pattern; 
    } 

    @SuppressWarnings("rawtypes") 
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 
    _collector = collector; 
    Jedis newJedis = new Jedis(host, port); 
    newJedis.connect(); 
    this.jq = new JedisQueue(newJedis, pattern); 
    } 

    public void close() {} 

    public void nextTuple() { 
    List<String> ret = this.jq.dequeue(); 
    if (ret == null) { 
     Utils.sleep(5L); 
    } 
    else { 
     System.out.println(ret); 
     _collector.emit(new Values(ret)); 
    } 
    } 

    @Override 
    public void ack(Object msgId) {} 

    @Override 
    public void fail(Object msgId) {} 

    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
    declarer.declare(new Fields("name")); 
    } 
} 

これは、Redisのに裏打ちされた標準キューデータ構造の実装である私のJedisQueueです。デキュー・メソッドは、基本的にこれが基本的なJedis実装が返すものであるため、幾分非公式にList<String>を返します。これは、Redisが単一のキーに対して多くの値を格納できるためです。

コードはStorm-jedisから取得されています。詳細については、リンクを確認してください。

そして、これは私のトポロジです:

package storm.starter; 

import org.tomdz.storm.esper.EsperBolt; 
import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.StormSubmitter; 
import backtype.storm.topology.TopologyBuilder; 
import backtype.storm.tuple.Fields; 
import backtype.storm.utils.Utils; 
import storm.starter.spout.RedisQueueSpout;; 

public class NameCountTopology { 
    public static void main (String[] args) throws Exception { 
    String host = "10.0.0.251"; 
    int port = 6379; 
    String pattern = "Name:*"; 
    TopologyBuilder builder = new TopologyBuilder(); 

    EsperBolt bolt = new EsperBolt.Builder().inputs().aliasComponent("spout").toEventType("names").outputs() 
      .onDefaultStream().emit("nps").statements() 
      .add("select count(*) as nps from names.win:time_batch(1 sec)").build(); 

    builder.setSpout("spout", new RedisQueueSpout(host,port,pattern),1); 
    builder.setBolt("count-bolt", bolt, 1).fieldsGrouping("spout", new Fields("name")); 


    Config conf = new Config(); 
    conf.setDebug(true); 


    if (args != null && args.length > 0) { 
     conf.setNumWorkers(1); 

     StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 

    } else { 

     LocalCluster cluster = new LocalCluster(); 
     cluster.submitTopology("name-count-topology", conf, builder.createTopology()); 
     Utils.sleep(300000); 
     cluster.killTopology("name-count-topology"); 
     cluster.shutdown(); 

    } 
} 

}

マイRedisのキー値は次の形式でHMSETを使用して保存されている:これは私の上司からのログです

HMSET Name:1 NAME Mary YEAR 1880 GENDER F COUNT 7065 
HMSET Name:2 NAME Anna YEAR 1880 GENDER F COUNT 2604 
... 

ノード:

2016-05-04 07:37:56 b.s.d.executor [INFO] Opened spout spout:(3) 
2016-05-04 07:37:56 b.s.d.executor [INFO] Activating spout spout:(3) 
2016-05-04 07:37:56 STDIO [INFO] Queue is empty... 
2016-05-04 07:37:56 c.e.e.c.EPServiceProviderImpl [INFO] Initializing engine URI '[email protected]' version 4.3.0 
2016-05-04 07:37:58 b.s.d.executor [INFO] Prepared bolt count-bolt:(2) 
2016-05-04 07:38:54 b.s.d.executor [INFO] Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
2016-05-04 07:38:54 b.s.d.task [INFO] Emitting: __system __metrics [#<TaskInfo [email protected]> [#<DataPoint [__ack-count = {}]> #<DataPoint [memory/heap = {unusedBytes=9418640, usedBytes=14710896, maxBytes=259522560, initBytes=8035520, virtualFreeBytes=244811664, committedBytes=24129536}]> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 1]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [memory/nonHeap = {unusedBytes=1218808, usedBytes=36529928, maxBytes=224395264, initBytes=24313856, virtualFreeBytes=187865336, committedBytes=37748736}]> #<DataPoint [uptimeSecs = 77.358]> #<DataPoint [__transfer = {write_pos=0, read_pos=0, capacity=1024, population=0}]> #<DataPoint [startTimeSecs = 1.462347457159E9]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]] 
2016-05-04 07:38:54 b.s.d.executor [INFO] Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
2016-05-04 07:38:54 b.s.d.task [INFO] Emitting: __acker __metrics [#<TaskInfo [email protected]> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]] 

であり、ログはそのように繰り返されます。 、注ぎ口が動作していないと何も放出されない理由storm UI

が今私の質問は何もRedisのからピックアップされていないように思える: が、これはトポロジーを実行した後、私のUIです。

PS:私はホストとポートをチェックしました。私はRedisからのデータを得ることができるので、Redisへの接続には何も問題ありません。

+0

'JedisQueue'の' this.jedis.blpop(0、this.pattern); 'が何かを返すのをもう一度チェックしましたか? 'Utils.sleep(5L);' - コレクタに出力が与えられていなければ、ストームはビジー待機を避けるために自動的に "スリープ"します。 –

+0

@ MatthiasJ.Saxありがとうございました。問題はまさに 'JedisQueue'の' this.jedis.blpop(0、this.pattern); 'でした。ヒントをありがとう。 – Nima

答えて

1
  1. HMSETはハッシュ用、BLPOPはリスト用です。彼らは互換性がありません。
  2. BLPOPはパターンを想定していません。正確なキー名が必要です。詳細はhttp://redis.io/commands/blpopを参照してください。
  3. SpoutはシングルスレッドからnextTuple()、ack()、fail()メソッドを実行するので、長い(または無限の)タイムアウトを伴うBLPOPは、

希望します。

+0

本当にありがとうございました、あなたは人生を満喫しています...私は初心者ですから、私は実際にそれを見ませんでした... – Nima

+0

私のスパウトがそれらを放出できるように、 'Name:*'パターンと一致するキーをすべて取得したい場合、どうすればいいですか?ハッシュもパターンをサポートしていないようです... – Nima

+0

パターンに一致するすべてのキーを取得するためにスキャンすることはできますが、Spoutの待ち時間を短縮するRedisのキースペース全体をスキャンする必要があります。あなたが本当にSpoutからRedisデータを使いたいのであれば、最良の方法はリストをキューとして使うことです。ユーザーが異なるキーとして格納されているハッシュデータを実際に格納する必要がある場合は、実際のユーザーデータ用であるか、キューイング用(キー名のみ、またはシリアル化されたフルデータのみ)のユーザーデータを2回格納できます。 LUAスクリプトを使用すると、データを2つの場所にアトミックに格納できます。 –

関連する問題