2017-12-05 3 views
0

私たちはライブマシンのデータをjsonとして取得しており、RabbitMQからこのデータを取得しています。以下のデータは、その後、下記の「X」分の期間のためにウィンドウ表示され Apache Spark - ワーカーノードでのデータのグループ化と実行

JSONのサンプル、

{"DeviceId":"MAC-1001","DeviceType":"Sim-1","TimeStamp":"05-12-2017 10:25:35","data":{"Rate":10,"speed":2493,"Mode":1,"EMode":2,"Run":1}} 
 
{"DeviceId":"MAC-1001","DeviceType":"Sim-1","TimeStamp":"05-12-2017 10:25:36","data":{"Rate":10,"speed":2493,"Mode":1,"EMode":2,"Run":1}} 
 
{"DeviceId":"MAC-1002","DeviceType":"Sim-1","TimeStamp":"05-12-2017 10:25:37","data":{"Rate":10,"speed":2493,"Mode":1,"EMode":2,"Run":1}} 
 
{"DeviceId":"MAC-1002","DeviceType":"Sim-1","TimeStamp":"05-12-2017 10:25:38","data":{"Rate":10,"speed":2493,"Mode":1,"EMode":2,"Run":1}}
たちは

  1. グループ達成するために何をしたいですdataがdeviceIdである場合、これは行われますが、データセットを取得できるかどうかわからない

  2. 上記のグループ化されたコードをワーカー・ノード内で実行するようにforeachPartitionを使用して各デバイスの集計ロジック用に実行します。

私の思考プロセスが間違っている場合は、私に修正してください。

以前のコードでは、データを収集し、RDDをループし、DataSetに変換し、Spark SqlContext APIを使用してDataSetに集約ロジックを適用しました。

負荷テストを行ったときに、処理の90%がマスターノードで実行されていましたが、しばらくしてCPU使用率が100%になり、プロセスが爆発しました。

これで、ワーカーノードで最大限のロジックを実行するためにプロセス全体を再設計しようとしています。

以下

することは、これまでのロジック

public static void main(String[] args) { 
 
\t \t 
 
\t \t try { 
 
\t \t \t 
 
\t \t \t mconf = new SparkConf(); 
 
\t \t \t mconf.setAppName("OnPrem"); 
 
\t \t \t mconf.setMaster("local[*]"); 
 
\t \t \t 
 
\t \t \t JavaSparkContext sc = new JavaSparkContext(mconf); 
 
\t \t \t 
 
\t \t \t jssc = new JavaStreamingContext(sc, Durations.seconds(60)); 
 

 
\t \t \t SparkSession spksess = SparkSession.builder().appName("Onprem").getOrCreate(); 
 
\t \t \t //spksess.sparkContext().setLogLevel("ERROR"); 
 
\t \t \t 
 
\t \t \t Map<String, String> rabbitMqConParams = new HashMap<String, String>(); 
 
\t \t \t rabbitMqConParams.put("hosts", "localhost"); 
 
\t \t \t rabbitMqConParams.put("userName", "guest"); 
 
\t \t \t rabbitMqConParams.put("password", "guest"); 
 
\t \t \t rabbitMqConParams.put("vHost", "/"); 
 
\t \t \t rabbitMqConParams.put("durable", "true"); 
 
\t \t \t 
 
\t \t \t List<JavaRabbitMQDistributedKey> distributedKeys = new LinkedList<JavaRabbitMQDistributedKey>(); 
 
\t \t \t distributedKeys.add(new JavaRabbitMQDistributedKey(QUEUE_NAME, new ExchangeAndRouting(EXCHANGE_NAME, "fanout", ""), rabbitMqConParams)); 
 
\t \t \t 
 
\t \t \t Function<Delivery, String> messageHandler = new Function<Delivery, String>() { 
 

 
\t \t \t \t public String call(Delivery message) { 
 
\t \t \t \t \t return new String(message.getBody()); 
 
\t \t \t \t } 
 
\t \t \t }; 
 
\t \t \t 
 
\t \t \t JavaInputDStream<String> messages = RabbitMQUtils.createJavaDistributedStream(jssc, String.class, distributedKeys, rabbitMqConParams, messageHandler); 
 
\t \t \t 
 
\t \t \t JavaDStream<String> machineDataRDD = messages.window(Durations.minutes(2),Durations.seconds(60)); //every 60 seconds one RDD is Created 
 
\t \t \t machineDataRDD.print(); 
 
\t \t \t 
 
\t \t \t JavaPairDStream<String, String> pairedData = machineDataRDD.mapToPair(s -> new Tuple2<String, String>(getMap(s).get("DeviceId").toString(), s)); 
 
\t \t \t 
 
\t \t \t JavaPairDStream<String, Iterable<String>> groupedData = pairedData.groupByKey(); \t 
 
\t \t \t 
 
\t \t \t groupedData.foreachRDD(new VoidFunction<JavaPairRDD<String,Iterable<String>>>(){ 
 

 
\t \t \t \t @Override 
 
\t \t \t \t public void call(JavaPairRDD<String, Iterable<String>> data) throws Exception { 
 
\t \t \t \t \t 
 
\t \t \t \t \t data.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Iterable<String>>>>(){ 
 

 
\t \t \t \t \t \t @Override 
 
\t \t \t \t \t \t public void call(Iterator<Tuple2<String, Iterable<String>>> data) throws Exception { 
 
\t \t \t \t \t \t 
 
\t \t \t \t \t \t \t while(data.hasNext()){ 
 
\t \t \t \t \t \t \t \t LOGGER.error("Machine Data == >>"+data.next()); 
 
\t \t \t \t \t \t \t } 
 
\t \t \t \t \t \t } 
 
\t \t \t \t \t \t 
 
\t \t \t \t \t }); 
 
\t \t \t \t \t 
 
\t \t \t \t } 
 
\t \t \t 
 
\t \t \t }); 
 
\t \t \t jssc.start(); 
 
\t \t \t jssc.awaitTermination(); 
 
\t \t \t 
 
\t \t } 
 
\t \t catch (Exception e) 
 
\t \t { 
 
\t \t \t e.printStackTrace(); 
 
\t \t }

を集約するため、実際のワーカーノードで動作しますが、我々はDataSetを取得するためにまだあることを以下のグループ化のコードは私たちに、文字列の反復処理可能を与えるコードでありますデバイスのために、理想的に我々は

JavaPairDStream<String, String> pairedData = machineDataRDD.mapToPair(s -> new Tuple2<String, String>(getMap(s).get("DeviceId").toString(), s)); 
 
JavaPairDStream<String, Iterable<String>> groupedData = pairedData.groupByKey();
DataSetを取得したいと思い

私にとって重要なことは、実行中のコードがワーカーノードにプッシュされるようにforeachPartitionを使用するループです。

答えて

0

より多くのコードサンプルとガイドラインsqlcontextを調べた後、sparksessionはシリアル化されず、ワーカーノードでも使用できないため、foreachpartitionループを使用してデータセットを構築しないという戦略を変更します。

関連する問題