0

バッチで並列処理を実現するためにfork-joinパターンを使用しています。並列処理のためのフォーク結合パターンmule esb throw NullPointer例外

私は、次の質問を参照取るている:私は私の入力フォルダに非常に多くのファイルを持っていますが、私は、並列処理を実現する必要があるため Mule File Inbound Flow : Control Number of threads

を。したがって、このパターンを使用することを考えました。ここに私の設定フローがどのように見えますか?

とすぐ制御は、次の例外がスローされた要求 - 応答範囲に入るよう
  <quartz:connector name="Quartz1" validateConnections="true" doc:name="Quartz"> 
        <receiver-threading-profile maxThreadsActive="1"/> 
      </quartz:connector> 
      <flow name="Mainflow" processingStrategy="synchronous"> 
      <quartz:inbound-endpoint jobName="EventGeneration" repeatInterval="1000" connector-ref="Quartz1" responseTimeout="10000" doc:name="Quartz"> 
        <quartz:event-generator-job/> 
       </quartz:inbound-endpoint> 

       <mulerequester:request-collection config-ref="Mule_Requester" resource="file:///FileLocation?connector=FileMRTransformer" count="3" doc:name="Mule Requester"/> 
       <expression-filter expression="#[payload.size() != 0]" doc:name="Expression"/> 
     <request-reply doc:name="Request-Reply" timeout="300000"> 
        <processor-chain doc:name="Processor Chain"> 
         <collection-splitter doc:name="Collection Splitter"/> 
         <vm:outbound-endpoint exchange-pattern="one-way" doc:name="VM" connector-ref="VM" path="Batchinput" /> 
        </processor-chain> 
        <vm:inbound-endpoint exchange-pattern="one-way" doc:name="VM" connector-ref="VM" path="Batchoutput"> 
         <message-properties-transformer> 
          <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="3" /> 
         </message-properties-transformer> 
         <collection-aggregator /> 
        </vm:inbound-endpoint> 
    </request-reply> 
</flow> 

<batch:job name="BatchDemo" max-failed-records="-1"> 
     <batch:input> 
      <vm:inbound-endpoint exchange-pattern="one-way" path="Batchinput" connector-ref="VM" doc:name="VM"/> 
.... 
required processing..... 
. 
. 
<batch:on-complete> 
<vm:outbound-endpoint exchange-pattern="one-way" doc:name="VM" connector-ref="VM" path="Batchoutput"/> 
</batch:on-complete> 

ERROR 2016-06-23 10:32:56,190 [scheduler-multithreadint019.1.2_productindexing_hybris_fh_Worker-1] org.mule.exception.CatchMessagingExceptionStrategy: 
******************************************************************************** 
Message    : null (java.lang.NullPointerException). Message payload is of type: CopyOnWriteArrayList 
Type     : org.mule.api.MessagingException 
Code     : MULE_ERROR--2 
Payload    : [[[email protected], [[email protected], [[email protected]] 
JavaDoc    : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/MessagingException.html 
******************************************************************************** 
Exception stack is: 
1. null (java.lang.NullPointerException) 
    java.util.concurrent.ConcurrentHashMap:-1 (null) 
2. null (java.lang.NullPointerException). Message payload is of type: CopyOnWriteArrayList (org.mule.api.MessagingException) 
    org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor:32 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/MessagingException.html) 
******************************************************************************** 
Root Exception stack trace: 
java.lang.NullPointerException 
    at java.util.concurrent.ConcurrentHashMap.hash(Unknown Source) 
    at java.util.concurrent.ConcurrentHashMap.put(Unknown Source) 
    at org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.process(AbstractAsyncRequestReplyRequester.java:85) 
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) 
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:107) 

代替設定が要求 - 応答プロセッサに要求部に試み:

<vm:outbound-endpoint exchange-pattern="one-way" doc:name="VM" connector-ref="VM" path="Batchinput"> 
       <collection-splitter /> 
      </vm:outbound-endpoint> 

しかし、同じ例外が生じました。

MuleRequesterこの例外が発生しています。私はいくつかのJavaスニペットを使用してFileオブジェクトのコレクションを返します私はこの例外が発生せず、コントロールがバッチフローに期待どおりに入力しています。しかし、私はバッチの私の入力フェーズに変圧器(DataWeave)を持っている、と私の変圧器は、このファイルオブジェクトを解析することができません(たとえばjava.io.Fileのまたはjava.io.FileInputStream) 。したがって、MuleRequesterを使用して、ストリーミングを有効にすることができます。

このMulerequesterを使用すると何がうまくいかなかったのですか?

答えて

0

誰かが興味があれば、その代替手段が見つかりました。

pollコンポーネントを使用して、ファイルオブジェクトではなくfileNamesコレクションをJavaで返します。同じrequestreplyパターンを使用しました。しかし、今回は入力フェーズのファイルからペイロードを取得しています。MuleRequesterを使用しています。このファイル名にMuleRequesterを入力パスとして指定します。

これはうまく動作しています。