2016-04-18 15 views
-1

私はsparkで遊んでいますが、私はこの実行フローをどのように構築するかについて頭を下げることはできません。擬似コードは次のとおりです。Sparkでこの実行フローをどのように構成する必要がありますか?

Traceback (most recent call last): 
    File "/net/nas/SysGrid_Users/John.Richardson/Code/HistoricVars/sparkTest2.py", line 76, in <module> 
    varResults = distDates.map(varFunc).collect() 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", line 771, in collect 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", line 2379, in _jrdd 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", line 2299, in _prepare_for_python_RDD 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 428, in dumps 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 646, in dumps 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 107, in dump 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 408, in dump 
    self.save(obj) 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 475, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 740, in save_tuple 
    save(element) 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 475, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 199, in save_function 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 236, in save_function_tuple 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 475, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 725, in save_tuple 
    save(element) 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 475, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 770, in save_list 
    self._batch_appends(obj) 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 797, in _batch_appends 
    save(tmp[0]) 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 475, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 193, in save_function 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 241, in save_function_tuple 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 475, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 810, in save_dict 
    self._batch_setitems(obj.items()) 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 841, in _batch_setitems 
    save(v) 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 520, in save 
    self.save_reduce(obj=obj, *rv) 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 542, in save_reduce 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 475, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 810, in save_dict 
    self._batch_setitems(obj.items()) 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 836, in _batch_setitems 
    save(v) 
    File "/net/nas/uxhome/condor_ldrt-s/Python/lib/python3.5/pickle.py", line 495, in save 
    rv = reduce(self.proto) 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__ 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 312, in get_return_value 
py4j.protocol.Py4JError: An error occurred while calling o44.__getstate__. Trace: 
py4j.Py4JException: Method __getstate__([]) does not exist 
     at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335) 
     at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344) 
     at py4j.Gateway.invoke(Gateway.java:252) 
     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
     at py4j.commands.CallCommand.execute(CallCommand.java:79) 
     at py4j.GatewayConnection.run(GatewayConnection.java:209) 
     at java.lang.Thread.run(Thread.java:745) 

私は間違った方法これについてつもりだと思われる:

from pyspark import SparkConf, SparkContext, SQLContext 
sc = SparkContext(conf=conf) 
sqlSC = SQLContext(sc) 

df1 = getBigDataSetFromDb() 
ddf1 = sqlSC.createDataFrame(sc.broadcast(df1)) 

df2 = getOtherBigDataSetFromDb() 
ddf2 = sqlSC.createDataFrame(sc.broadcast(df2)) 

datesList = sc.parallelize(aListOfDates) 

def myComplicatedFunc(cobDate): 
    filteredDF1 = ddf1.filter(ddf1['BusinessDate'] == cobDate) 
    filteredDF2 = ddf2.filter(ddf2['BusinessDate'] == cobDate) 
    #some more complicated stuff that uses filteredDF1 & filteredDF2 
    return someValue 

results = datesList.map(myComplicatedFunc) 

はしかし、私が何を得る、このようなものです。私は、ブロードキャスト変数を使うことのポイントは、私がクロージャの中で使うことができると考えていました。しかし、おそらく私は代わりに何らかの参加をしなければならないでしょうか?私はドメインコンテキストの欠如についてのコメントに同意するものの

+0

より具体的にあなたが達成しようとしていることを説明できますか?あなたのコードについてはファンキーに見えるものがたくさんありますが、あなたがしようとしていることが分かっていれば説明するのは簡単でしょう。 – David

+0

私は時間のローリングウィンドウを持って関心の歴史の上に "スライド"する歴史的な値の大きな入力セットを必要とする計算を実行しようとしています。 SQL解析関数の動作と同様です。関心のある日に合うようにクラスタ全体のデータを再シャッフルすることは、実際には非常に遅くなるため、放送が最も良いと思った。サイズに関しては、2つの大きなデータセットは、〜3.5m〜〜11mの行から〜7列のプリミティブです。クロージャー内でフィルター操作をしようとしているため、エラーが発生したようです。 – ThatDataGuy

答えて

0

、私はこれが何をしたいとは思わない:

df2 = getOtherBigDataSetFromDb() 
ddf2 = sqlSC.createDataFrame(sc.broadcast(df2)) 

あなたはdf2の種類が何であるかを言うが、のが前提とさせてはいけません配列であり、実際にはDataFrameではありません(df*という名前が付けられていますが)。それは配列の場合は、どのようなあなたはおそらくしたいことは次のとおりです。

言われていること
df2 = getOtherBigDataSetFromDb() 
ddf2 = sqlSC.createDataFrame(sc.parallelize(df2)) 

getOtherBigDataSetFromDbは、それがうまく、実際のビッグデータセットを意味します。この流れがうまくいく間に、あなたのデータセットが本当に本当に大きければ、あなたはそれを塊で消費したいかもしれません。あなた自身で書くことができますか、おそらくすでにあなたのDBや選択肢から読み込むライブラリがあります。しかし、関係なく、私はあなたがparallelizeを意味していると信じていますbroadcast

+0

私はこれがPythonの欠点であると思います。変数型指定子はありません! :-)とにかく。 df2はパンダのデータフレームです。基本的には、日付ごとに大きなデータセットのサブセットを取得し、それを「複雑な」関数に渡す必要があります。 – ThatDataGuy

関連する問題