0
私のスパークアプリケーションは、私のKafkaトピックに1文字列だけを送信すると2つのメールを送信します。ここでは、コードの興味一部である:この1変換後のファイルと、この唯一のリターン1ライン上に保存しているので、メールが、2通の電子メールを送信した場合SparkStreaming同じコマンドを2回実行するか同じメールを2回送信する
JavaDStream<String> lines = kafkaStream.map ([returns the 2nd value of the tuple];
lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
[... some stuff ...]
JavaRDD<String[]> flagAddedRDD = associatedToPersonRDD.map(new Function<String[],String[]>(){
@Override
public String[] call(String[] arg0) throws Exception {
String[] s = new String[arg0.length+1];
System.arraycopy(arg0, 0, s, 0, arg0.length);
int a = FilePrinter.getAge(arg0[CSVExampleDevice.LENGTH+People.BIRTH_DATE]);
int p = Integer.parseInt(arg0[CSVExampleDevice.PULSE]);
if(
((p<=45 || p>=185)&&(a<=12 || a>=70))
||
(p>=190 || p<=40)){
s[arg0.length]="1";
Mailer.sendMail(mailTo, arg0);
}
else
s[arg0.length]="0";
return s;
}
});`
私が理解することはできません。
FilePrinter.saveAssociatedAsCSV(associatedSavePath, unifiedAssociatedStringRDD.collect()); //first action
JavaRDD<String[]> enrichedWithWeatherRDD = flagAddedRDD.map(new Function<String[],String[]>(){ [some more stuff] });
JavaRDD<String> unifiedEnrichedStringRDD = enrichedWithWeatherRDD.map(unifyArrayIntoString);
FilePrinter.saveEnrichedAsCSV(enrichedSavePath, unifiedEnrichedStringRDD.collect()); //second action
したがって全体変換が再び呼び出され、メーラー部が上記である:mailer.sendMail:私は変換の終わりに2つのアクションと呼ばれるので
public static void sendMail(String whoTo, String[] whoIsDying){
Properties props = new Properties();
props.put("mail.smtp.host", "mail.***.com"); //edited
props.put("mail.smtp.port", "25");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Session session = Session.getInstance(props);
try {
Message message = new MimeMessage(session);
message.setFrom(new InternetAddress("***my-email***")); //edited
for (String string : whoTo.split(","))
message.addRecipient(Message.RecipientType.TO,
new InternetAddress(string));
message.setSubject(whoIsDying[PersonClass.TIMESTAMP]);
message.setText("trial");
System.out.println("INFO: sent mail");
Transport.send(message);
} catch (MessagingException e) {
throw new RuntimeException(e);
}
}
メール送信ロジックが存在する場所を明示することはできますか?それは変容していますか?マップ、または出力操作、例えば、 foreachRDD? –
おそらくcallメソッドを持つ関数がmapメソッドによって複数回呼び出されている可能性があります。確かにデバッガやprintln呼び出しでこれをトレースできますか? –
@TarasMatyashovskyy関数はマップ変換で呼び出されますが、2回渡される理由はわかりません。 map関数はforeachRDDの中にあります。すべての行をメールで送る必要はないからです。 これは、取得したすべてが2回処理されていることを意味しますか? – Vale