2017-01-25 6 views
0

に比べて非常に遅いです私は、Pythonに簡単な集計を行いScalaのコードを移植:Pyspark集約は、Scalaの

from time import time 
from utils import notHeader, parse, pprint 
from pyspark import SparkContext 

start = time() 
src = "linkage" 
sc = SparkContext("spark://aiur.local:7077", "linkage - Python") 
rawRdd = sc.textFile(src) 
noheader = rawRdd.filter(notHeader) 
parsed = noheader.map(parse) 
grouped = parsed.groupBy(lambda md: md.matched) 
res = grouped.mapValues(lambda vals: len(vals)).collect() 
for x in res: pprint(x) 
diff = time() - start 
mins, secs = diff/60, diff % 60 
print "Analysis took {} mins and {} secs".format(int(mins), int(secs)) 
sc.stop() 

utils.py:

from collections import namedtuple 

def isHeader(line): 
    return line.find("id_1") >= 0 

def notHeader(line): 
    return not isHeader(line) 

def pprint(s): 
    print s 

MatchedData = namedtuple("MatchedData", "id_1 id_2 scores matched") 

def parse(line): 
    pieces = line.split(",") 
    return MatchedData(pieces[0], pieces[1], pieces[2:11], pieces[11]) 

そしてScalaのバージョン:

import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 

object SparkTest { 
    def main(args: Array[String]): Unit = { 
     val start: Long = System.currentTimeMillis/1000 
     val filePath = "linkage" 
     val conf = new SparkConf() 
      .setAppName("linkage - Scala") 
      .setMaster("spark://aiur.local:7077") 
     val sc = new SparkContext(conf) 
     val rawblocks = sc.textFile(filePath) 
     val noheader = rawblocks.filter(x => !isHeader(x)) 
     val parsed = noheader.map(line => parse(line)) 
     val grouped = parsed.groupBy(md => md.matched) 
     grouped.mapValues(x => x.size).collect().foreach(println) 
     val diff = System.currentTimeMillis/1000 - start 
     val (mins, secs) = (diff/60, diff % 60) 
     val pf = printf("Analysis took %d mins and %d secs", mins, secs) 
     println(pf) 
     sc.stop() 
    } 

    def isHeader(line: String): Boolean = { 
     line.contains("id_1") 
    } 

    def toDouble(s: String): Double = { 
     if ("?".equals(s)) Double.NaN else s.toDouble 
    } 

    case class MatchData(id1: Int, id2: Int, 
     scores: Array[Double], matched: Boolean) 

    def parse(line: String) = { 
     val pieces = line.split(",") 
     val id1 = pieces(0).toInt 
     val id2 = pieces(1).toInt 
     val scores = pieces.slice(2, 11).map(toDouble) 
     val matched = pieces(11).toBoolean 
     MatchData(id1, id2, scores, matched) 
    } 
} 

Scalaのバージョンは26秒で完了しますが、Pythonのバージョンは6分かかりました。ログはそれぞれのcollect()呼び出しの完了に非常に大きな違いを示します。

のPython:

17/01/25 16:22:10 INFO DAGScheduler: ResultStage 1 (collect at /Users/esamson/Hackspace/Spark/run_py/dcg.py:12) finished in 234.860 s 
17/01/25 16:22:10 INFO DAGScheduler: Job 0 finished: collect at /Users/esamson/Hackspace/Spark/run_py/dcg.py:12, took 346.675760 s 

スカラ:

17/01/25 16:26:23 INFO DAGScheduler: ResultStage 1 (collect at Spark.scala:17) finished in 9.619 s 
17/01/25 16:26:23 INFO DAGScheduler: Job 0 finished: collect at Spark.scala:17, took 22.022075 s 

は 'GROUPBY' 意義の唯一のコールのようです。ですから、Pythonコードのパフォーマンスを向上させる方法はありますか?

+3

データフレームAPIとCSV読み取りフォーマットを使用しない理由は何ですか?パフォーマンスは、それほど良くない場合は、スケーラレベルに向上する可能性があります。 – maasg

+0

@maasg間違いなくそれを試みますか – kerrigangster

答えて

1

RDDを使用しているため、RDDを変換するとき(groupby、mapなど)には関数を渡す必要があります。これらの関数をスカラーで渡すと、関数は単純に実行されます。 Pythonで同じことをする場合、sparkはこれらの関数を直列化し、各executorでPython VMを開き、実行する必要があるときにはscalaデータをPythonに変換し、Python VMに渡して渡します結果を変換します。

これらの変換はすべて多くの作業を必要とするため、pysparkでのRDDの作業は通常、scalaよりはるかに遅いです。

これを回避する方法は、舞台裏でスカラ関数を使用する(pyspark.sql.functionsで)すでに作成された関数を使用できるようにするデータフレームロジックを使用することです。これは、(スパーク2.0用)次のようになります。もちろん

from pyspark import SparkSession 
from pyspark.sql.functions import size 

src = "linkage" 
spark = SparkSession.builder.master(""spark://aiur.local:7077"").appName(""linkage - Python"").getOrCreate() 
df = spark.read.option("header", "true").csv(src) 
res = df.groupby("md").agg(size(df.vals)).collect() 
... 

これは、マッチしたとみなし、ヴァルスはカラム名です。