2016-05-30 8 views
0

私のスパークアプリケーションは、私のKafkaトピックに1文字列だけを送信すると2つのメールを送信します。ここでは、コードの興味一部である:この1変換後のファイルと、この唯一のリターン1ライン上に保存しているので、メールが、2通の電子メールを送信した場合SparkStreaming同じコマンドを2回実行するか同じメールを2回送信する

JavaDStream<String> lines = kafkaStream.map ([returns the 2nd value of the tuple]; 
    lines.foreachRDD(new VoidFunction<JavaRDD<String>>() { 
    [... some stuff ...] 

    JavaRDD<String[]> flagAddedRDD = associatedToPersonRDD.map(new Function<String[],String[]>(){ 

       @Override 
       public String[] call(String[] arg0) throws Exception { 
        String[] s = new String[arg0.length+1]; 
        System.arraycopy(arg0, 0, s, 0, arg0.length); 
        int a = FilePrinter.getAge(arg0[CSVExampleDevice.LENGTH+People.BIRTH_DATE]); 
        int p = Integer.parseInt(arg0[CSVExampleDevice.PULSE]); 
        if(
         ((p<=45 || p>=185)&&(a<=12 || a>=70)) 
          || 
         (p>=190 || p<=40)){ 
         s[arg0.length]="1"; 
         Mailer.sendMail(mailTo, arg0); 
         } 
        else 
         s[arg0.length]="0"; 
        return s; 
       } 

      });` 

私が理解することはできません。

FilePrinter.saveAssociatedAsCSV(associatedSavePath, unifiedAssociatedStringRDD.collect()); //first action 

       JavaRDD<String[]> enrichedWithWeatherRDD = flagAddedRDD.map(new Function<String[],String[]>(){ [some more stuff] }); 

       JavaRDD<String> unifiedEnrichedStringRDD = enrichedWithWeatherRDD.map(unifyArrayIntoString); 

       FilePrinter.saveEnrichedAsCSV(enrichedSavePath, unifiedEnrichedStringRDD.collect()); //second action 

したがって全体変換が再び呼び出され、メーラー部が上記である:mailer.sendMail:私は変換の終わりに2つのアクションと呼ばれるので

public static void sendMail(String whoTo, String[] whoIsDying){ 
    Properties props = new Properties(); 
    props.put("mail.smtp.host", "mail.***.com"); //edited 
    props.put("mail.smtp.port", "25"); 
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
    Session session = Session.getInstance(props); 

    try { 

     Message message = new MimeMessage(session); 
     message.setFrom(new InternetAddress("***my-email***")); //edited 
     for (String string : whoTo.split(",")) 
      message.addRecipient(Message.RecipientType.TO, 
        new InternetAddress(string)); 
     message.setSubject(whoIsDying[PersonClass.TIMESTAMP]); 
     message.setText("trial"); 
     System.out.println("INFO: sent mail"); 
     Transport.send(message);  
    } catch (MessagingException e) { 
     throw new RuntimeException(e); 
    } 
} 
+0

メール送信ロジックが存在する場所を明示することはできますか?それは変容していますか?マップ、または出力操作、例えば、 foreachRDD? –

+0

おそらくcallメソッドを持つ関数がmapメソッドによって複数回呼び出されている可能性があります。確かにデバッガやprintln呼び出しでこれをトレースできますか? –

+0

@TarasMatyashovskyy関数はマップ変換で呼び出されますが、2回渡される理由はわかりません。 map関数はforeachRDDの中にあります。すべての行をメールで送る必要はないからです。 これは、取得したすべてが2回処理されていることを意味しますか? – Vale

答えて

0

これが起こる理由はこれらの両方の行動。

関連する問題