ロジスティック回帰モデルのパラメータを推定するためにpysparkを使用しています。私はsparkを使用して尤度と勾配を計算し、最適化のためのscipyの最小化関数(L-BFGS-B)を使用します。アプリケーションをしばらく実行した後のPysparkソケットタイムアウト例外
私はアプリケーションを実行するために糸クライアントモードを使用します。私のアプリケーションは何の問題もなく走り出すことができます。しかし、しばらくたってから、次のエラーを報告します。私は、スパークログレベルを「ALL」を設定すると
Traceback (most recent call last):
File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/simulation/20160716-1626/spark_1m_data.py", line 115, in <module>
res = trainEM2(distData, params0, verbose=True, em_tol=1e-5, opt_method='L-BFGS-B')
File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 166, in trainEM
options={'disp': False})
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/_minimize.py", line 447, in minimize
callback=callback, **options)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 330, in _minimize_lbfgsb
f, g = func_and_grad(x)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 278, in func_and_grad
f = fun(x, *args)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/optimize.py", line 289, in function_wrapper
return function(*(wrapper_args + args))
File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 146, in fun_observedQj
return dataAndWeightsj_old.map(lambda _: calObservedQj(_[0], _[1], vparamsj, params0)).sum()
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 995, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 869, in fold
vals = self.mapPartitions(func).collect()
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 772, in collect
return list(_load_from_socket(port, self._jrdd_deserializer))
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 142, in _load_from_socket
for item in serializer.load_stream(rf):
File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 139, in load_stream
16/07/16 20:59:10 ERROR python.PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.ServerSocket.implAccept(ServerSocket.java:545)
at java.net.ServerSocket.accept(ServerSocket.java:513)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
yield self._read_with_length(stream)
File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 156, in _read_with_length
length = read_int(stream)
File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 543, in read_int
length = stream.read(4)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/socket.py", line 384, in read
data = self._sock.recv(left)
socket.timeout: timed out
は私もpython broken pipe
エラーを検出しました。
私はSpark 1.6.2とJava 1.8.0_91を使用しています。何が起きているのか?
--Update--
私はこれは私が私のプログラムで使用される最適化ルーチンに関連しました。
私がやっていたことは、EMアルゴリズム(反復アルゴリズム)を使って最尤法を用いて統計モデルを推定することでした。各反復中に、私は最小化の問題を解決することによってパラメータを更新する必要があります。 Sparkは自分の尤度と勾配を計算し、それをL-BFGS-B法を使用するScipyの最小化ルーチンに渡します。このルーチンの中でSparkの仕事がクラッシュするようなものがあるようです。しかし、この問題のどの部分がルーチンのどの部分を担当しているのか分かりません。
同じサンプルと同じプログラムを使用している間に、私はパーティションの数を変更しました。パーティションの数が少ない場合、私のプログラムは問題なく終了することができます。しかし、パーティションの数が多くなると、プログラムがクラッシュするようになります。
をそれです大規模なタスクではデフォルト設定がクラッシュすることがあります。スパークは何のために作られたものではありませんか? – sudo