2017-01-23 11 views
0

私はmy-transformerにいくつかのヘッダを追加してい:Spring Integrationはカスタムヘッダーを転送しませんか?

public Message<?> transform(final Message<?> message) {   
    List<Item> items = doStuff(message); 

    final MessageBuilder<?> messageBuilder = MessageBuilder 
      .withPayload(message.getPayload()) 
      .copyHeadersIfAbsent(message.getHeaders()); 


    for (final Item item : items) { 
     messageBuilder.setHeader(item.getHeaderName(), item.getValue()); 
    } 

    return messageBuilder.build(); 
} 

そして私は私のヘッダが出力チャネル上に存在することを確認する統合テストを書いた:私はのようなストリームを作成し

public static class HeaderTest extends TransformerTest { 
    @Test 
    public void test() throws Exception { 
     channels.input().send(new GenericMessage<>(TransformerTest.EXAMPLE_PAYLOAD)); 
     final Message<?> out = this.collector.forChannel(this.channels.output()).poll(10, TimeUnit.SECONDS); 

     assertThat(out, HeaderMatcher.hasHeader("header-test", notNullValue())); 
    } 
} 

をしかし、 :

http --port=1234 | my-transformer | log --expression=toString() 

及びIはで次のメッセージを受信し、同じEXAMPLE_PAYLOADを送信s log:GenericMessage [payload=..., headers={kafka_offset=0, id=f0a0727c-9351-274c-58b3-edee9ccbf6ce, kafka_receivedPartitionId=0, contentType=text/plain;charset=UTF-8, kafka_receivedTopic=myTopic.my-transformer, timestamp=1485171448947}]

メッセージヘッダーに私のheader-testが含まれていないのはなぜですか? application.propertiesspring.cloud.stream.bindings.output.producer.headerMode=rawセットで

public class MyTransformer implements Transformer { 

    private final EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter(); 

    @Override 
    public Message<?> transform(final Message<?> message) { 
     List<Item> items = doStuff(message); 

     final MessageBuilder<byte[]> messageBuilder = MessageBuilder 
       .withPayload(((String) message.getPayload()).getBytes()) 
       .copyHeadersIfAbsent(message.getHeaders()); 

     final int itemsSize = items.size(); 
     final String[] headerNames = new String[itemsSize]; 
     for (int i = 0; i < itemsSize; i++) { 
      final Item item = items.get(i); 
      messageBuilder.setHeader(item.getHeaderName(), item.getValue()); 
      headerNames[i] = item.getHeaderName(); 
     } 

     final Message<byte[]> msg = messageBuilder.build(); 

     final byte[] rawMessageWithEmbeddedHeaders; 
     try { 
      rawMessageWithEmbeddedHeaders = converter.embedHeaders(new MessageValues(msg), headerNames); 
     } catch (final Exception e) { 
      throw new HeaderEmbeddingException(String.format("Cannot embed headers from '%s' into message: %s", items, msg), e); 
     } 

     return new GenericMessage<>(rawMessageWithEmbeddedHeaders); 
    } 
} 

、その後、受信側でメッセージペイロードを変換:

- - EDIT私が正しく理解している場合

だから私はのような何かを行うことになっていますか?または、どういうわけか受信側が自動的にメッセージペイロードを変換するようにすることはできますか?

答えて

1

あなたはSpring XDかSpring Cloud DataFlowのどちらを使用しているのかは言いませんが、それぞれの場合の解決策は同じです。

kafkaにはヘッダーのネイティブサポートがないため、メッセージペイロードに埋め込む必要があります。不要なヘッダーを転送したくないため、Spring XDの場合はservers.yml、またはSpring Cloud Streamアプリケーションの場合はapplication.yml(または.properties)のヘッダー名を設定して、転送するヘッダーをオプトインする必要があります。

EDIT

は残念ながら、パターンはサポートされていません。 1つの選択肢はEmbeddedHeadersMessageConverterを自分で使い、kafka modeをraw(トランスの出力先)に設定することです。 Rawモードはバインダーがヘッダーを埋め込まないことを意味します。

そのようにして、次のアプリ(モードrawなし)は、ヘッダーをトランスフォーマのバインダーでエンコードされているかのようにデコードできるはずです。 Javadocs here

255個のヘッダーに限定されています。

+0

私は春のCDFを使用しています。問題は、私が前もって私のヘッダー名を知らないことです。私の変圧器のすべてのヘッダーをオプトインする方法はありますか? – aturkovic

+0

残念ながら、 '*'のようなパターンはサポートされていません。私は私の答えに回避策を追加しました。 –

+0

私の編集を見ていただけますか?ありがとうございました。 – aturkovic

関連する問題