私はmy-transformer
にいくつかのヘッダを追加してい:Spring Integrationはカスタムヘッダーを転送しませんか?
public Message<?> transform(final Message<?> message) {
List<Item> items = doStuff(message);
final MessageBuilder<?> messageBuilder = MessageBuilder
.withPayload(message.getPayload())
.copyHeadersIfAbsent(message.getHeaders());
for (final Item item : items) {
messageBuilder.setHeader(item.getHeaderName(), item.getValue());
}
return messageBuilder.build();
}
そして私は私のヘッダが出力チャネル上に存在することを確認する統合テストを書いた:私はのようなストリームを作成し
public static class HeaderTest extends TransformerTest {
@Test
public void test() throws Exception {
channels.input().send(new GenericMessage<>(TransformerTest.EXAMPLE_PAYLOAD));
final Message<?> out = this.collector.forChannel(this.channels.output()).poll(10, TimeUnit.SECONDS);
assertThat(out, HeaderMatcher.hasHeader("header-test", notNullValue()));
}
}
をしかし、 :
http --port=1234 | my-transformer | log --expression=toString()
及びIはで次のメッセージを受信し、同じEXAMPLE_PAYLOAD
を送信s log:GenericMessage [payload=..., headers={kafka_offset=0, id=f0a0727c-9351-274c-58b3-edee9ccbf6ce, kafka_receivedPartitionId=0, contentType=text/plain;charset=UTF-8, kafka_receivedTopic=myTopic.my-transformer, timestamp=1485171448947}]
。
メッセージヘッダーに私のheader-test
が含まれていないのはなぜですか? application.properties
でspring.cloud.stream.bindings.output.producer.headerMode=raw
セットで
public class MyTransformer implements Transformer {
private final EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter();
@Override
public Message<?> transform(final Message<?> message) {
List<Item> items = doStuff(message);
final MessageBuilder<byte[]> messageBuilder = MessageBuilder
.withPayload(((String) message.getPayload()).getBytes())
.copyHeadersIfAbsent(message.getHeaders());
final int itemsSize = items.size();
final String[] headerNames = new String[itemsSize];
for (int i = 0; i < itemsSize; i++) {
final Item item = items.get(i);
messageBuilder.setHeader(item.getHeaderName(), item.getValue());
headerNames[i] = item.getHeaderName();
}
final Message<byte[]> msg = messageBuilder.build();
final byte[] rawMessageWithEmbeddedHeaders;
try {
rawMessageWithEmbeddedHeaders = converter.embedHeaders(new MessageValues(msg), headerNames);
} catch (final Exception e) {
throw new HeaderEmbeddingException(String.format("Cannot embed headers from '%s' into message: %s", items, msg), e);
}
return new GenericMessage<>(rawMessageWithEmbeddedHeaders);
}
}
、その後、受信側でメッセージペイロードを変換:
- - EDIT私が正しく理解している場合
だから私はのような何かを行うことになっていますか?または、どういうわけか受信側が自動的にメッセージペイロードを変換するようにすることはできますか?
私は春のCDFを使用しています。問題は、私が前もって私のヘッダー名を知らないことです。私の変圧器のすべてのヘッダーをオプトインする方法はありますか? – aturkovic
残念ながら、 '*'のようなパターンはサポートされていません。私は私の答えに回避策を追加しました。 –
私の編集を見ていただけますか?ありがとうございました。 – aturkovic