2016-12-09 15 views
0

私は現在、redis pub/subシステムを設計するためにjavaを使用しており、問題が発生しています。私はあなたの詳細が表示されます:ここredisサブスクライバはredisパブリッシャでは使用できません

出版社:

public class RedisMessagePublisher implements MessagePublisher { 

public RedisMessagePublisher(StringRedisTemplate redisTemplate,ChannelTopic topic) 
{ 
    this.redisTemplate = redisTemplate; 
    this.topic = topic; 
} 

private StringRedisTemplate redisTemplate; 

private ChannelTopic topic; 

@Override 
public void publish(String message) { 
    redisTemplate.convertAndSend(topic.getTopic(), message); 
    } 
} 

パブリッシャが正しいか、正しく動作することができます。

その後のは、加入者クラスに移動してみましょう:私ははるかに簡単に使用できるように、加入者クラスで

public class RedisMessageSubscriber implements MessageListener { 

//action inspect here 
private Action2<Message, byte[]> action; 

public void setAction(Action2<Message, byte[]> action) { 
    logger.info("action set"); 
    this.action = action; 
} 

private static Logger logger = LogManager.getLogger(RedisMessageSubscriber.class); 

@Override 
public void onMessage(Message message, byte[] bytes) { 

    logger.info("===> redis subscribe message in <==="); 

    if (action != null) 
     action.call(message, bytes); 
    else 
     logger.info("===> action is null <==="); 
    } 
} 

は、私がアクションを注入するRxJavaを使用。

しかし、私は出版社からのメッセージを公開した後の質問は、ここにある、私のメッセージは、onMessageメソッドに転送することができることをC、ログの印刷は、私が期待したものではなかったことができます:私が期待したもの

===> redis subscribe message in <=== 
===> action is null <=== 

新しいメッセージを公開すると、加入者はそれを取得し、作成したActionを実行しました。

私は以下のパブリッシャーとサブスクライバをトリガするために使用されるサービス:

redisMessageSubscriber.setAction(new Action2<Message, byte[]>() { 
    @Override 
    public void call(Message message, byte[] bytes) { 
     ObjectMapper objectMapper = new ObjectMapper(); 
     try { 
      String result = objectMapper.readValue(message.getBody(), String.class); 
      logger.info("receive:"+result); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 
}); 

@RestController("redispubsubcontroller") 
@RequestMapping(value = "/redis") 
public class redispubsubcontroller { 

@Autowired 
private RedisMessagePublisher redisMessagePublisher; 

@Autowired 
private RedisMessageSubscriber redisMessageSubscriber; 

private static Logger logger = LogManager.getLogger(redispubsubcontroller.class); 

@RequestMapping(value = "/publisher", method = {RequestMethod.GET}) 
public ApiResponse getConfig(String message,HttpServletRequest request, 
              HttpServletResponse response) { 

    redisMessageSubscriber.setAction(new Action2<Message, byte[]>() { 
     @Override 
     public void call(Message message, byte[] bytes) { 
      ObjectMapper objectMapper = new ObjectMapper(); 
      try { 
       String result = objectMapper.readValue(message.getBody(), String.class); 
       logger.info("receive:"+result); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 
     } 
    }); 

    redisMessagePublisher.publish(message); 

    return new ApiResponse("success","message sent"); 
    } 
} 

コード以上のことから、私は、トピックをサブスクライブし、加入者に新しいアクションを設定し、Cすることができます

しかし、なぜ出版社を起動させた後でも、サブスクライバはメッセージを受け取ることができますが、NULLのアクションはまだ実行されませんでした。

誰でも手助けできますか?このメカニズムに問題はありますか?

==== EDIT =====以下

RedisMessageConfigコード:

@Configuration 
public class RedisMessageConfig { 

@Bean 
ChannelTopic topic() { 
    return new ChannelTopic("useraddresspubsub:queue"); 
} 

@Bean 
MessageListenerAdapter messageListener() { 
    return new MessageListenerAdapter(new RedisMessageSubscriber()); 
} 

@Autowired 
private RedisConnectionFactory JedisConnectionFactory; 

@Bean 
RedisMessageListenerContainer redisContainer() { 
    final RedisMessageListenerContainer container = new RedisMessageListenerContainer(); 
    container.setConnectionFactory(JedisConnectionFactory); 
    container.addMessageListener(messageListener(), topic()); 
    return container; 
    } 
} 

==== ====解決

最後に、私はこれを得ました1つのmpのアイデアで解決され、わずかにmyredismessagescriberからredredessageconfigへのredismessageconfigにフローがあるので、myredismessageconfigに変更されたので、redismessageconfigでは、まずアクションtそれでは、redismessageconfigは新しいredismessagesubscriberを作成し、作成された新しいアクションを保持します。以下のコード:以下

@Component 
public class MyRedisMessageConfig extends RedisMessageConfig { 

private static Logger logger =LogManager.getLogger(MyRedisMessageConfig.class); 

public MyRedisMessageConfig() { 
    super.action = new Action2<Message, byte[]>() { 
     @Override 
     public void call(Message message, byte[] bytes) { 
      String result = new String(message.getBody()); 
       logger.info("received:" + result); 
      } 
     }; 
    } 
} 

スクリーンショット: enter image description here

答えて

1

MessageListenerが動作するように意図されている方法ではありません。さらに、共有可能な可変状態を作成します。 2つの並行呼び出しは、RedisMessageSubscriberの状態を同時に変更します。

actionを1つのスレッドに設定すると、表示の問題が発生し、メッセージの受信が別のスレッドで行われるとします。

MessageListenerごとに異なる動作が必要な場合は、その動作を実装する複数のリスナーを作成します。

+0

この問題の原因となったマルチスレッドを意味しますか? – CharlieShi

+0

いいえ。複数のスレッドがあなたに見えるだけです。この問題は、共有可能な可変状態によって引き起こされます。 – mp911de

+0

あなたはこのシナリオを打ち消すためにいくつかの実行可能なアイデアを共有していますか? – CharlieShi

関連する問題