2016-10-20 10 views
2

カフカを0.9.0から0.10.0にアップグレードしている間に、別のプロデューサを別のトピックに設定する際に問題があります。 2つのトピックを公開しながらspring-integration-kafka 2.1.0.RELEASEとKafka 0.10.0で異なるトピックの異なるプロデューサを設定する方法は?

<?xml version="1.0" encoding="UTF-8"?> 
 
<beans xmlns="http://www.springframework.org/schema/beans" 
 
\t xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" 
 
\t xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" 
 
\t xmlns:task="http://www.springframework.org/schema/task" 
 
\t xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd 
 
\t \t http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
 
\t \t http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
 
\t \t http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> 
 

 
\t <int:publish-subscribe-channel id="inputToKafka" /> 
 

 
\t <!-- Producer Config --> 
 

 
\t <int-kafka:outbound-channel-adapter 
 
\t \t id="fcmOutboundChannelAdapter" kafka-template="fcmNotificationTemplate" topic="trigger-fcm-notification" 
 
\t \t auto-startup="true" channel="inputToKafka"> 
 
\t \t <int-kafka:request-handler-advice-chain> 
 
\t \t \t <bean 
 
\t \t \t \t class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" /> 
 
\t \t </int-kafka:request-handler-advice-chain> 
 
\t </int-kafka:outbound-channel-adapter> 
 
\t 
 
\t <int-kafka:outbound-channel-adapter 
 
\t \t id="masOutboundChannelAdapter" kafka-template="microsoftAccountSyncTemplate" topic="sync-microsoft-account" 
 
\t \t auto-startup="true" channel="inputToKafka"> 
 
\t \t <int-kafka:request-handler-advice-chain> 
 
\t \t \t <bean 
 
\t \t \t \t class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" /> 
 
\t \t </int-kafka:request-handler-advice-chain> 
 
\t </int-kafka:outbound-channel-adapter> 
 
\t 
 
\t <bean id="fcmNotificationTemplate" class="org.springframework.kafka.core.KafkaTemplate"> 
 
\t \t <constructor-arg> 
 
\t \t \t <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> 
 
\t \t \t \t <constructor-arg> 
 
\t \t \t \t \t <map> 
 
\t \t \t \t \t \t <entry key="bootstrap.servers" value="localhost:9092" /> 
 
\t \t \t \t \t \t <entry key="retries" value="0" /> 
 
\t \t \t \t \t \t <entry key="batch.size" value="16384" /> \t \t \t \t \t \t 
 
\t \t \t \t \t \t <entry key="linger.ms" value="0" /> 
 
\t \t \t \t \t \t <entry key="buffer.memory" value="33554432" /> 
 
\t \t \t \t \t \t <entry key="key.serializer" 
 
\t \t \t \t \t \t \t value="org.apache.kafka.common.serialization.StringSerializer" /> 
 
\t \t \t \t \t \t <entry key="value.serializer" 
 
\t \t \t \t \t \t \t value="common.serializer.FcmNotificationVoSerializer" /> 
 
\t \t \t \t \t </map> 
 
\t \t \t \t </constructor-arg> 
 
\t \t \t </bean> 
 
\t \t </constructor-arg> 
 
\t </bean> 
 

 
\t <bean id="microsoftAccountSyncTemplate" class="org.springframework.kafka.core.KafkaTemplate"> 
 
\t \t <constructor-arg> 
 
\t \t \t <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> 
 
\t \t \t \t <constructor-arg> 
 
\t \t \t \t \t <map> 
 
\t \t \t \t \t \t <entry key="bootstrap.servers" value="localhost:9092" /> 
 
\t \t \t \t \t \t <entry key="retries" value="0" /> 
 
\t \t \t \t \t \t <entry key="batch.size" value="16384" /> 
 
\t \t \t \t \t \t <entry key="buffer.memory" value="33554432" /> \t 
 
\t \t \t \t \t \t <entry key="linger.ms" value="0" /> 
 
\t \t \t \t \t \t <entry key="key.serializer" 
 
\t \t \t \t \t \t \t value="org.apache.kafka.common.serialization.StringSerializer" /> 
 
\t \t \t \t \t \t <entry key="value.serializer" 
 
\t \t \t \t \t \t \t value="common.serializer.MicrosoftAccountSyncRequestVoSerializer" /> 
 
\t \t \t \t \t </map> 
 
\t \t \t \t </constructor-arg> 
 
\t \t \t </bean> 
 
\t \t </constructor-arg> 
 
\t </bean> 
 

 
\t <int-kafka:message-driven-channel-adapter 
 
\t \t id="kafka-message-channel-adapter-FCM" listener-container="fcmContainer" 
 
\t \t auto-startup="true" phase="100" send-timeout="5000" 
 
\t \t channel="ip-chanel-trigger-fcm-notification" mode="record" 
 
\t \t message-converter="messageConverter" /> 
 

 
\t <int-kafka:message-driven-channel-adapter 
 
\t \t id="kafka-message-channel-adapter-SMA" listener-container="microsoftAccountSyncContainer" 
 
\t \t auto-startup="true" phase="100" send-timeout="5000" 
 
\t \t channel="ip-chanel-sync-microsoft-account" mode="record" 
 
\t \t message-converter="messageConverter" /> 
 
\t \t 
 
\t <bean id="messageConverter" 
 
\t \t class="org.springframework.kafka.support.converter.MessagingMessageConverter" /> 
 

 
\t <!-- Consumer Config --> 
 
\t <int:service-activator input-channel="ip-chanel-trigger-fcm-notification" 
 
\t \t ref="fcmNotificationConsumer"> 
 
\t </int:service-activator> 
 
\t 
 
\t <int:service-activator input-channel="ip-chanel-sync-microsoft-account" 
 
\t \t ref="syncMicrosoftAccountConsumer"> 
 
\t </int:service-activator> 
 
\t 
 
\t <bean id="fcmContainer" 
 
\t \t class="org.springframework.kafka.listener.KafkaMessageListenerContainer"> 
 
\t \t <constructor-arg> 
 
\t \t \t <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> 
 
\t \t \t \t <constructor-arg> 
 
\t \t \t \t \t <map> 
 
\t \t \t \t \t \t <entry key="bootstrap.servers" value="localhost:9092" /> 
 
\t \t \t \t \t \t <entry key="enable.auto.commit" value="true" /> 
 
\t \t \t \t \t \t <entry key="auto.commit.interval.ms" value="100" /> 
 
\t \t \t \t \t \t <entry key="session.timeout.ms" value="15000" /> 
 
\t \t \t \t \t \t <entry key="group.id" value="trigger-fcm-notification" /> 
 
\t \t \t \t \t \t <entry key="key.deserializer" 
 
\t \t \t \t \t \t \t value="org.apache.kafka.common.serialization.StringDeserializer" /> 
 
\t \t \t \t \t \t <entry key="value.deserializer" 
 
\t \t \t \t \t \t \t value="common.deserializer.FcmNotificationVoDeserializer" /> 
 
\t \t \t \t \t </map> 
 
\t \t \t \t </constructor-arg> 
 
\t \t \t </bean> 
 
\t \t </constructor-arg> 
 
\t \t <constructor-arg> 
 
\t \t \t <bean class="org.springframework.kafka.listener.config.ContainerProperties"> 
 
\t \t \t \t <constructor-arg name="topics" value="trigger-fcm-notification" /> 
 
\t \t \t </bean> 
 
\t \t </constructor-arg> 
 
\t </bean> 
 
\t \t 
 
\t <bean id="microsoftAccountSyncContainer" 
 
\t \t class="org.springframework.kafka.listener.KafkaMessageListenerContainer"> 
 
\t \t <constructor-arg> 
 
\t \t \t <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> 
 
\t \t \t \t <constructor-arg> 
 
\t \t \t \t \t <map> 
 
\t \t \t \t \t \t <entry key="bootstrap.servers" value="localhost:9092" /> 
 
\t \t \t \t \t \t <entry key="enable.auto.commit" value="true" /> 
 
\t \t \t \t \t \t <entry key="auto.commit.interval.ms" value="100" /> 
 
\t \t \t \t \t \t <entry key="session.timeout.ms" value="15000" /> 
 
\t \t \t \t \t \t <entry key="group.id" value="sync-microsoft-account" /> 
 
\t \t \t \t \t \t <entry key="key.deserializer" 
 
\t \t \t \t \t \t \t value="org.apache.kafka.common.serialization.StringDeserializer" /> 
 
\t \t \t \t \t \t <entry key="value.deserializer" 
 
\t \t \t \t \t \t \t value="common.deserializer.MicrosoftAccountSyncRequestVoDeserializer" /> 
 
\t \t \t \t \t </map> 
 
\t \t \t \t </constructor-arg> 
 
\t \t \t </bean> 
 
\t \t </constructor-arg> 
 
\t \t <constructor-arg> 
 
\t \t \t <bean class="org.springframework.kafka.listener.config.ContainerProperties"> 
 
\t \t \t \t <constructor-arg name="topics" value="sync-microsoft-account" /> 
 
\t \t \t </bean> 
 
\t \t </constructor-arg> 
 
\t </bean> 
 

 
</beans>

の下に与えられたXML設定ファイルは、別途エラーを得ました。

(java.lang.String,java.lang.String,java.lang.String,java.util.Locale,org.springframework.ui.Model,java.security.Principal)]: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#1]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class common.vo.GcmNotificationVo to class common.serializer.MicrosoftAccountSyncRequestVoSerializer specified in value.serializer 
 
2016-10-20 18:12:53,849 [http-nio-8080-exec-4] DEBUG org.springframework.web.servlet.DispatcherServlet - Could not complete request 
 
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#1]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class common.vo.GcmNotificationVo to class common.serializer.MicrosoftAccountSyncRequestVoSerializer specified in value.serializer 
 
\t at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139) 
 
\t at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
 
\t at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
 
\t at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
 
\t at java.lang.reflect.Method.invoke(Method.java:498) 
 
\t at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333) 
 
\t at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) 
 
\t at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) 
 
\t at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$1.execute(AbstractRequestHandlerAdvice.java:75) 
 
\t at org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice.doInvoke(RequestHandlerCircuitBreakerAdvice.java:62) 
 
\t at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:70) 
 
\t at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
 
\t at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) 
 
\t at com.sun.proxy.$Proxy52.handleMessage(Unknown Source) 
 
\t at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236) 
 
\t at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185) 
 
\t at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 
 
\t at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
 
\t at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
 
\t at service.impl.AdminOperationsServiceImpl.publishToQueue(AdminOperationsServiceImpl.java:1191) 
 
\t at service.impl.AdminOperationsServiceImpl.update(AdminOperationsServiceImpl.java:1366) 
 
\t at service.TenantDocumentsController.update(TenantDocumentsController.java:277) 
 
\t at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
 
\t at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
 
\t at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
 
\t at java.lang.reflect.Method.invoke(Method.java:498) 
 
\t at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221) 
 
\t at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) 
 
\t at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:114) 
 
\t at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827) 
 
\t at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738) 
 
\t at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)

定義された2つの別々のシリアライザとデシリアライザクラスを次のようにスタックトレースがあります。しかし、それはどのように内部的に他のクラスを参照していますか 設定が忘れましたか?

答えて

1

カフカにお送りするので、Deserializerの件名はありません。 StackTraceによれば、GcmNotificationVoオブジェクトをinputToKafkaに送信するRESTサービスを実行します。

ここで、2番目のサブスクライバは、common.serializer.MicrosoftAccountSyncRequestVoSerializerを使用してそのオブジェクトのカフカシリアル化を実行できません。

別の操作にmasOutboundChannelAdapterを使用することをお勧めしますか?したがって、新しい別物channel

+0

はい、別の操作にmasOutboundChannelAdapterを使用します。 – Bhat

+0

私のコードに記載されている "デシリアライザ"は、値の部分の値を参照しています。デザイナの設定 – Bhat

+0

あなたのポイントを得ました。別のチャンネルが追加されました。期待どおりに動作します。 もう一度ありがとうございます。この質問にはクローズとしてマークしてください – Bhat

関連する問題