2016-09-08 8 views
3

Spring Cloud Stream Source BeanをSpringブートアプリケーション内に作成しようとしています。メソッドの結果をストリームに送信するだけです(基底のKafkaトピックはストリーム)。Spring Cloud Stream Sourceを使用してメソッドの結果をストリームに送信

私が見たほとんどのストリームサンプルでは、​​ポーラーを使用してストリームにデータを送信するために@InboundChannelAdapter注釈を使用しています。しかし、私はポーラーを使いたくありません。私は空の配列にポーラーを設定しようとしましたが、もう一つの問題は、@InboundChannelAdapterを使用するときにメソッドのパラメータを持つことができないということです。

私がしようとしていることの全体的な概念は、インバウンドストリームから読み取られます。いくつかの非同期処理を行い、その結果をアウトバウンドストリームに投稿します。したがって、プロセッサを使用することもオプションではないようです。私は@StreamListenerシンクチャンネルを使用してインバウンドストリームを読み込み、それが動作しています。

これは私が試してきたコードですが、これはまったく動作しません。私はシンクが多分そうではなかったので、これは簡単だろうと思っていました。誰かを探して、プロセッサではない(つまり、インバウンドチャンネルでリッスンする必要がない)ソースの例を指摘し、@InboundChannelAdapterを使用しないでください。別の方法で。ありがとう!あなたはポーラーを使用しない場合

@EnableBinding(Source.class) 
public class JobForwarder { 

    @ServiceActivator(outputChannel = Source.OUTPUT) 
    @SendTo(Source.OUTPUT) 
    public String forwardJob(String message) { 
     log.info(String.format("Forwarding a job message [%s] to queue [%s]", message, Source.OUTPUT)); 
     return message; 
    } 
} 

答えて

0

だから、何がforwardJob()メソッドが呼び出されますか?

メソッドを呼び出して、結果を出力チャンネルに送ることはできません。

現在の設定では、着信メッセージを含むサービス(およびそのチャネルにメッセージを送信するもの)にinputChannelが必要です。輸送に縛られる必要はありません。単純なMessageChannel@Beanにすることができます。

または、@Publisherを使用して、メソッド呼び出しの結果(呼び出し元に返される)とdocs hereを公開することができます。

@Publisher(channel = Source.OUTPUT) 
0

ありがとう。問題に戻るにはしばらく時間がかかりました。 @Publisherのドキュメントを読んでみました。それは私が必要としていたものであると思われましたが、適切な豆を正しく初期化して適切に配線することができませんでした。

forwardJob()メソッドは、入力の非同期処理の後に呼び出されます。

最終的に私はちょうどspring-kafkaライブラリを使用して実装しました。これははるかに明示的であり、やりやすくなっています。カフカを唯一のチャンネルバインディングとして縛りつけて、私たちはそのライブラリーに固執すると思います。

しかし、最終的には、Spring-Cloud-Streamライブラリが非常に簡単に機能します。ポーラーのない単一のソースのコードがありました。

@Component 
@EnableBinding(Source.class) 
public class JobForwarder { 

    private Source source; 

    @Autowired 
    public ScheduledJobForwarder(Source source) { 
     this.source = source; 
    } 

    public void forwardScheduledJob(String message) { 
     log.info(String.format("Forwarding a job message [%s] to queue [%s]", message, Source.OUTPUT)); 
     source.output().send(MessageBuilder.withPayload(message).build()); 
    } 
} 
3

あなたの本来の要件は、以下の手順で達成できます。

  1. カスタムバインドされたインターフェースを作成し、あなたのバインドされたチャンネル

    @Component 
    @EnableBinding(CustomSource.class) 
    public class CustomOutputEventSource { 
    
        @Autowired 
        private CustomSource customSource; 
    
        public void sendMessage(String message) { 
         customSource.output().send(MessageBuilder.withPayload(message).build()); 
        } 
    } 
    
  2. テストを注入

    public interface CustomSource { 
        String OUTPUT = "customoutput"; 
    
        @Output(CustomSource.OUTPUT) 
        MessageChannel output(); 
    } 
    
  3. (あなたにもデフォルト@EnableBinding(Source.class)を使用することができます)、それ

    @RunWith(SpringRunner.class) 
    @SpringBootTest 
    public class CustomOutputEventSourceTest { 
    
        @Autowired 
        CustomOutputEventSource output; 
    
        @Test 
        public void sendMessage() { 
         output.sendMessage("Test message from JUnit test"); 
        } 
    } 
    
関連する問題