私はRedisインスタンスからデータをストリーミングする必要があるStormトポロジを持っています.1つのRedisインスタンスからトポロジの読み取りを実行しようとしましたが、Redisから何も読み取られず、返されたキューは空です。私はストームバージョン0.9.3を使用しています。Storm with Redisをデータソースとして使用
これは私のRedisQueueSpout
です。このパターンは、指定されたパターン(別名キー)を使用してトポロジをRedisに接続し、Stormがポーリングするたびに入力データを探します。注ぎ口は、それに続くボルトにIDメッセージを持つ単一のフィールドを出します。
package storm.starter.spout;
import java.util.List;
import java.util.Map;
import redis.clients.jedis.Jedis;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class RedisQueueSpout extends BaseRichSpout {
static final long serialVersionUID = 737015318988609460L;
private SpoutOutputCollector _collector;
private final String host;
private final int port;
private final String pattern;
private transient JedisQueue jq;
public RedisQueueSpout(String host, int port, String pattern) {
this.host = host;
this.port = port;
this.pattern = pattern;
}
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
Jedis newJedis = new Jedis(host, port);
newJedis.connect();
this.jq = new JedisQueue(newJedis, pattern);
}
public void close() {}
public void nextTuple() {
List<String> ret = this.jq.dequeue();
if (ret == null) {
Utils.sleep(5L);
}
else {
System.out.println(ret);
_collector.emit(new Values(ret));
}
}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("name"));
}
}
これは、Redisのに裏打ちされた標準キューデータ構造の実装である私のJedisQueue
です。デキュー・メソッドは、基本的にこれが基本的なJedis実装が返すものであるため、幾分非公式にList<String>
を返します。これは、Redisが単一のキーに対して多くの値を格納できるためです。
コードはStorm-jedisから取得されています。詳細については、リンクを確認してください。
そして、これは私のトポロジです:
package storm.starter;
import org.tomdz.storm.esper.EsperBolt;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import storm.starter.spout.RedisQueueSpout;;
public class NameCountTopology {
public static void main (String[] args) throws Exception {
String host = "10.0.0.251";
int port = 6379;
String pattern = "Name:*";
TopologyBuilder builder = new TopologyBuilder();
EsperBolt bolt = new EsperBolt.Builder().inputs().aliasComponent("spout").toEventType("names").outputs()
.onDefaultStream().emit("nps").statements()
.add("select count(*) as nps from names.win:time_batch(1 sec)").build();
builder.setSpout("spout", new RedisQueueSpout(host,port,pattern),1);
builder.setBolt("count-bolt", bolt, 1).fieldsGrouping("spout", new Fields("name"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("name-count-topology", conf, builder.createTopology());
Utils.sleep(300000);
cluster.killTopology("name-count-topology");
cluster.shutdown();
}
}
}
マイRedisのキー値は次の形式でHMSETを使用して保存されている:これは私の上司からのログです
HMSET Name:1 NAME Mary YEAR 1880 GENDER F COUNT 7065
HMSET Name:2 NAME Anna YEAR 1880 GENDER F COUNT 2604
...
ノード:
2016-05-04 07:37:56 b.s.d.executor [INFO] Opened spout spout:(3)
2016-05-04 07:37:56 b.s.d.executor [INFO] Activating spout spout:(3)
2016-05-04 07:37:56 STDIO [INFO] Queue is empty...
2016-05-04 07:37:56 c.e.e.c.EPServiceProviderImpl [INFO] Initializing engine URI '[email protected]' version 4.3.0
2016-05-04 07:37:58 b.s.d.executor [INFO] Prepared bolt count-bolt:(2)
2016-05-04 07:38:54 b.s.d.executor [INFO] Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
2016-05-04 07:38:54 b.s.d.task [INFO] Emitting: __system __metrics [#<TaskInfo [email protected]> [#<DataPoint [__ack-count = {}]> #<DataPoint [memory/heap = {unusedBytes=9418640, usedBytes=14710896, maxBytes=259522560, initBytes=8035520, virtualFreeBytes=244811664, committedBytes=24129536}]> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 1]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [memory/nonHeap = {unusedBytes=1218808, usedBytes=36529928, maxBytes=224395264, initBytes=24313856, virtualFreeBytes=187865336, committedBytes=37748736}]> #<DataPoint [uptimeSecs = 77.358]> #<DataPoint [__transfer = {write_pos=0, read_pos=0, capacity=1024, population=0}]> #<DataPoint [startTimeSecs = 1.462347457159E9]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]]
2016-05-04 07:38:54 b.s.d.executor [INFO] Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
2016-05-04 07:38:54 b.s.d.task [INFO] Emitting: __acker __metrics [#<TaskInfo [email protected]> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]]
であり、ログはそのように繰り返されます。 、注ぎ口が動作していないと何も放出されない理由storm UI
が今私の質問は何もRedisのからピックアップされていないように思える: が、これはトポロジーを実行した後、私のUIです。
PS:私はホストとポートをチェックしました。私はRedisからのデータを得ることができるので、Redisへの接続には何も問題ありません。
'JedisQueue'の' this.jedis.blpop(0、this.pattern); 'が何かを返すのをもう一度チェックしましたか? 'Utils.sleep(5L);' - コレクタに出力が与えられていなければ、ストームはビジー待機を避けるために自動的に "スリープ"します。 –
@ MatthiasJ.Saxありがとうございました。問題はまさに 'JedisQueue'の' this.jedis.blpop(0、this.pattern); 'でした。ヒントをありがとう。 – Nima