4

へのMessageListenerを埋めるためEmitterProcessorを使用する方法:https://github.com/codependent/spring5-playground春 - ホット出版 - サンプルプロジェクトがここに置かれているイベントストリーム

私は反応性にJMSキューから受信したメッセージを橋渡ししたいと思いますメッセージをイベントストリームとして公開するコントローラ。

私は、メッセージが届いていて、サブスクライバがいない場合、後でサブバンドが送信されたくないので、EmitterProcessorを使用しています。

@Component 
public class AlertEmitterProcessor { 

    private Logger logger = LoggerFactory.getLogger(getClass()); 

    private EmitterProcessor<Alert> processor; 

    public AlertEmitterProcessor(){ 
     processor = EmitterProcessor.<Alert>create(); 
     processor.connect(); 
    } 

    public EmitterProcessor<Alert> getProcessor() { 
     return processor; 
    } 

    public void onNext(Alert alert){ 
     logger.info("onNext [{}]", alert); 
     processor.onNext(alert); 
    } 

    public void onComplete(){ 
     logger.info("onComplete"); 
     processor.onComplete(); 
    } 

    public void onError(Throwable t){ 
     logger.error("onError", t); 
     processor.onError(t); 
    } 
} 

これは私のMessageListenerのです:

@Component 
public class AlertMessageListener implements MessageListener{ 

    private Logger logger = LoggerFactory.getLogger(getClass()); 

    @Autowired 
    private AlertEmitterProcessor alertProcessor; 

    @Autowired 
    private MappingJackson2HttpMessageConverter jacksonMessageConverter; 

    @Override 
    public void onMessage(Message message) { 
     logger.info("Message received: [{}]", message); 
     TextMessage tm = (TextMessage)message; 
     try { 
      Alert alert = jacksonMessageConverter.getObjectMapper().readValue(tm.getText(), Alert.class); 
      alertProcessor.onNext(alert); 
     } catch (IOException | JMSException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

そして最後に、私の休憩コントローラー:

@Autowired 
private AlertEmitterProcessor alertTopicProcessor; 

@Autowired 
private AlertMessageListener messageListener; 

@Autowired 
private MappingJackson2HttpMessageConverter jacksonMessageConverter; 

@GetMapping(value="/accounts/{id}/alerts/live2", produces="text/event-stream") 
public Flux<Alert> getAccountAlertsStreaming2(@PathVariable Integer id) { 
    return alertTopicProcessor.getProcessor() 
     .log().filter(a -> a.getAccountId().equals(id)); 
} 
その挙動をテストする

Iはキューに挿入シミュレートするために、このコントローラメソッドを追加しました:

@GetMapping(value="/mock/accounts/{id}/alerts/put", produces="text/event-stream") 
public void putAlert(@PathVariable Integer id) throws JsonProcessingException { 
    Alert alert = new Alert(id, (long)Math.round(Math.random()*10), "Message"); 
    String alertStr = jacksonMessageConverter.getObjectMapper().writeValueAsString(alert); 
    TextMessage tm = new MockTextMessage(alertStr); 
    messageListener.onMessage(tm); 
} 

は、適切なアプリケーションを起動した後、私はhttp://localhost:8080/accounts/1/alerts/live2をロードし、ブラウザがデータを待ちます。

2016-10-03 13:43:38.755 DEBUG 12800 --- [nio-8080-exec-1] o.s.web.reactive.DispatcherHandler  : Processing GET request for [http://localhost:8080/accounts/1/alerts/live2] 
2016-10-03 13:43:38.770 DEBUG 12800 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /accounts/1/alerts/live2 
2016-10-03 13:43:38.778 DEBUG 12800 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public reactor.core.publisher.Flux<com.codependent.spring5.playground.reactive.dto.Alert> com.codependent.spring5.playground.reactive.web.AccountsRestController.getAccountAlertsStreaming2(java.lang.Integer)] 
2016-10-03 13:43:38.779 DEBUG 12800 --- [nio-8080-exec-1] o.s.b.f.s.DefaultListableBeanFactory  : Returning cached instance of singleton bean 'accountsRestController' 
2016-10-03 13:43:38.800 INFO 12800 --- [nio-8080-exec-1] reactor.unresolved      : onSubscribe([email protected]) 
2016-10-03 13:43:38.802 INFO 12800 --- [nio-8080-exec-1] reactor.unresolved      : request(unbounded) 
2016-10-03 13:43:38.803 INFO 12800 --- [nio-8080-exec-1] reactor.unresolved      : onNext(1) 
2016-10-03 13:43:38.822 INFO 12800 --- [nio-8080-exec-1] reactor.Flux.EmitterProcessor.2   : onSubscribe([email protected]f2) 
2016-10-03 13:43:38.822 INFO 12800 --- [nio-8080-exec-1] reactor.Flux.EmitterProcessor.2   : request(1) 
2016-10-03 13:43:38.823 INFO 12800 --- [nio-8080-exec-1] reactor.unresolved      : onComplete() 

次に、いくつかのメッセージhttp://localhost:8080/mock/accounts/1/alerts/putを公開します。

2016-10-03 13:43:43.063 DEBUG 12800 --- [nio-8080-exec-2] o.s.web.reactive.DispatcherHandler  : Processing GET request for [http://localhost:8080/mock/accounts/1/alerts/put] 
2016-10-03 13:43:43.063 DEBUG 12800 --- [nio-8080-exec-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /mock/accounts/1/alerts/put 
2016-10-03 13:43:43.068 DEBUG 12800 --- [nio-8080-exec-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public void com.codependent.spring5.playground.reactive.web.AccountsRestController.putAlert(java.lang.Integer) throws com.fasterxml.jackson.core.JsonProcessingException] 
2016-10-03 13:43:43.069 DEBUG 12800 --- [nio-8080-exec-2] o.s.b.f.s.DefaultListableBeanFactory  : Returning cached instance of singleton bean 'accountsRestController' 
2016-10-03 13:43:43.071 INFO 12800 --- [nio-8080-exec-2] reactor.unresolved      : onSubscribe([email protected]) 
2016-10-03 13:43:43.071 INFO 12800 --- [nio-8080-exec-2] reactor.unresolved      : request(unbounded) 
2016-10-03 13:43:43.072 INFO 12800 --- [nio-8080-exec-2] reactor.unresolved      : onNext(1) 
2016-10-03 13:43:43.112 INFO 12800 --- [nio-8080-exec-2] c.c.s.p.r.message.AlertMessageListener : Message received: [com.[email protected]37262c9e] 
2016-10-03 13:43:43.145 INFO 12800 --- [nio-8080-exec-2] c.c.s.p.r.message.AlertEmitterProcessor : onNext [Alert [alertId=3, message=Message, accountId=1]] 
2016-10-03 13:43:43.146 INFO 12800 --- [nio-8080-exec-2] reactor.Flux.EmitterProcessor.2   : onNext(Alert [alertId=3, message=Message, accountId=1]) 
2016-10-03 13:43:43.177 INFO 12800 --- [nio-8080-exec-2] reactor.unresolved      : onComplete() 
2016-10-03 13:43:43.177 DEBUG 12800 --- [nio-8080-exec-2] o.s.h.s.r.ServletHttpHandlerAdapter  : Successfully completed request 

ただし、ブラウザには何も表示されません。このイベントは500エラー(ログなし)で終了します。それはデータの受信を開始するいくつかの手動の再試行の後

...

2016-10-03 13:45:07.726 DEBUG 12800 --- [nio-8080-exec-8] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public reactor.core.publisher.Flux<com.codependent.spring5.playground.reactive.dto.Alert> com.codependent.spring5.playground.reactive.web.AccountsRestController.getAccountAlertsStreaming2(java.lang.Integer)] 
2016-10-03 13:45:07.726 DEBUG 12800 --- [nio-8080-exec-8] o.s.b.f.s.DefaultListableBeanFactory  : Returning cached instance of singleton bean 'accountsRestController' 
2016-10-03 13:45:07.727 INFO 12800 --- [nio-8080-exec-8] reactor.unresolved      : onSubscribe([email protected]) 
2016-10-03 13:45:07.727 INFO 12800 --- [nio-8080-exec-8] reactor.unresolved      : request(unbounded) 
2016-10-03 13:45:07.727 INFO 12800 --- [nio-8080-exec-8] reactor.unresolved      : onNext(1) 
2016-10-03 13:45:07.729 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9   : onSubscribe([email protected]e) 
2016-10-03 13:45:07.729 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9   : request(1) 
2016-10-03 13:45:07.729 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9   : onNext(Alert [alertId=4, message=Message, accountId=1]) 
2016-10-03 13:45:07.730 INFO 12800 --- [nio-8080-exec-8] reactor.unresolved      : onComplete() 
2016-10-03 13:45:07.747 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9   : request(1) 
2016-10-03 13:45:07.747 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9   : onNext(Alert [alertId=0, message=Message, accountId=1]) 
2016-10-03 13:45:07.748 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9   : request(1) 

...しかし、他の多くの時間、それはいずれかを取得していません。

答えて

関連する問題