2016-07-17 11 views
3

ロジスティック回帰モデルのパラメータを推定するためにpysparkを使用しています。私はsparkを使用して尤度と勾配を計算し、最適化のためのscipyの最小化関数(L-BFGS-B)を使用します。アプリケーションをしばらく実行した後のPysparkソケットタイムアウト例外

私はアプリケーションを実行するために糸クライアントモードを使用します。私のアプリケーションは何の問題もなく走り出すことができます。しかし、しばらくたってから、次のエラーを報告します。私は、スパークログレベルを「ALL」を設定すると

Traceback (most recent call last): 
    File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/simulation/20160716-1626/spark_1m_data.py", line 115, in <module> 
    res = trainEM2(distData, params0, verbose=True, em_tol=1e-5, opt_method='L-BFGS-B') 
    File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 166, in trainEM 
    options={'disp': False}) 
    File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/_minimize.py", line 447, in minimize 
    callback=callback, **options) 
    File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 330, in _minimize_lbfgsb 
    f, g = func_and_grad(x) 
    File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 278, in func_and_grad 
    f = fun(x, *args) 
    File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/optimize.py", line 289, in function_wrapper 
    return function(*(wrapper_args + args)) 
    File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 146, in fun_observedQj 
    return dataAndWeightsj_old.map(lambda _: calObservedQj(_[0], _[1], vparamsj, params0)).sum() 
    File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 995, in sum 
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 
    File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 869, in fold 
    vals = self.mapPartitions(func).collect() 
    File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 772, in collect 
    return list(_load_from_socket(port, self._jrdd_deserializer)) 
    File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 142, in _load_from_socket 
    for item in serializer.load_stream(rf): 
    File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 139, in load_stream 
16/07/16 20:59:10 ERROR python.PythonRDD: Error while sending iterator 
java.net.SocketTimeoutException: Accept timed out 
    at java.net.PlainSocketImpl.socketAccept(Native Method) 
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409) 
    at java.net.ServerSocket.implAccept(ServerSocket.java:545) 
    at java.net.ServerSocket.accept(ServerSocket.java:513) 
    at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645) 
    yield self._read_with_length(stream) 
    File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 156, in _read_with_length 
    length = read_int(stream) 
    File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 543, in read_int 
    length = stream.read(4) 
    File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/socket.py", line 384, in read 
    data = self._sock.recv(left) 
socket.timeout: timed out 

は私もpython broken pipeエラーを検出しました。

私はSpark 1.6.2とJava 1.8.0_91を使用しています。何が起きているのか?

--Update--

私はこれは私が私のプログラムで使用される最適化ルーチンに関連しました。

私がやっていたことは、EMアルゴリズム(反復アルゴリズム)を使って最尤法を用いて統計モデルを推定することでした。各反復中に、私は最小化の問題を解決することによってパラメータを更新する必要があります。 Sparkは自分の尤度と勾配を計算し、それをL-BFGS-B法を使用するScipyの最小化ルーチンに渡します。このルーチンの中でSparkの仕事がクラッシュするようなものがあるようです。しかし、この問題のどの部分がルーチンのどの部分を担当しているのか分かりません。

同じサンプルと同じプログラムを使用している間に、私はパーティションの数を変更しました。パーティションの数が少ない場合、私のプログラムは問題なく終了することができます。しかし、パーティションの数が多くなると、プログラムがクラッシュするようになります。

答えて

0

詳細については、エグゼキュータログを確認してください。私は、エグゼキュータが死んだり、クラスタマネージャによって殺されたりすると、(通常はコンテナが設定されているよりも多くのメモリを使用している)同様のエラーが発生しています。

2

私にも同様の問題がありました。私には反復があり、時には実行時間がかかりすぎてタイムアウトになりました。 spark.executor.heartbeatIntervalを増やしても問題は解決したようです。私は再びタイムアウトに遭遇しないように、それを3600sに上げました。それ以来、すべてがうまくいきます。

から:http://spark.apache.org/docs/latest/configuration.html

spark.executor.heartbeatInterval 10s Interval between each executor's heartbeats to the driver. Heartbeats let the driver know that the executor is still alive and update it with metrics for in-progress tasks.

+2

をそれです大規模なタスクではデフォルト設定がクラッシュすることがあります。スパークは何のために作られたものではありませんか? – sudo

0

私は同様の問題を抱えていたし、私にとって、これはそれを修正:ここで他のオプションを設定する

import pyspark as ps 

conf = ps.SparkConf().setMaster("yarn-client").setAppName("sparK-mer") 
conf.set("spark.executor.heartbeatInterval","3600s") 
sc = ps.SparkContext('local[4]', '', conf=conf) # uses 4 cores on your local machine 

より多くの例: https://gist.github.com/robenalt/5b06415f52009c5035910d91f5b919ad

関連する問題