0
私はスパークの初心者です。私はコードを作り、それをマルチノードで走らせました。なぜいくつかのノードがsparkでデータを割り当てるように割り当てられていないのですか?
私は1つのマスターノードと4つのワーカーノードを持っています。私は自分のコードを複数回走らせていましたが、驚いたことに、時にはそれらのいくつかがうまく動作せず、マスタが指定したデータを持つように割り当てられていたため、
詳細な設定が行われていないため、この現象は私にはうかがえます。 より速く、より速い結果を得るために、すべてのワーカーノードを同時に処理したいと思っています。どのように私の要件を達成するには?
私のコードとコマンドが添付されました。それは非常に簡単なので、私は詳細な説明をスキップしました。ありがとう。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* Created by dst on 2/1/17.
*/
public class Test {
public static void main(String[] args) throws Exception {
String inputFile = args[0];
String outputFile = args[1];
SparkConf conf = new SparkConf().setAppName("Data Transformation")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile(inputFile);
JavaRDD<String> newLine = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
List<String> ret = new ArrayList<String>();
List<String> ls = Arrays.asList(s.split("\t"));
String values = ls.get(ls.size()-1);
List<String> value = Arrays.asList(values.split("\\|"));
for(int i=0;i<value.size();++i){
String ns = ls.get(0)+"\t"+ls.get(1)+"\t"+ls.get(2)+"\t"+ls.get(3)+"\t"+ls.get(4)+"\t"+ls.get(5);
ns = ns + "\t" + value.get(i);
ret.add(ns);
}
return ret.iterator();
}
});
newLine.saveAsTextFile(outputFile);
}
}
スパーク提出。 documentationを参照
spark-submit \
--class Test \
--master spark://spark.dso.xxxx \
--executor-memory 10G \
/home/jumbo/user/sclee/dt/jars/dt_01_notcache-1.0-SNAPSHOT.jar \
/user/sclee/data/ /user/sclee/output