2016-05-16 5 views
1

こんにちは私はまったく新しいスパークで、最近いくつかのラップトップでスパークスタンドアロンクラスターをセットアップしました。スタンドアロンスパーククラスタ(pySpark)でFTP上のファイルを使用するにはどうすればよいですか?

私は「PySparkは、ローカルファイルシステム、HDFS、カサンドラを含め、Hadoopのでサポートされている任意のストレージソースから配信データセットを作成することができスパークガイド による new.txt名前と私のローカルFTPサーバ上のファイルを共有しています

$ MASTER=spark://IP:PORT ./bin/pyspark 

して、例を実行:私は次の操作を実行して、スタンドアロンクラスタモードでpyspark対話型シェルを開いた (http://spark.apache.org/docs/latest/programming-guide.html#external-datasets

HBaseの、アマゾンS3、等」コマンドを使用してください。

>>> ff= sc.textFile("ftp://192.168.125.124/new.txt") 
>>> ans = ff.map(lambda s: len(s)).reduce(lambda a, b: a + b) 


最初の行には、罰金実行し、RDDが作成されます。私はこのエラーを取得しています二行目の後に:

Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/home/shrey/spark-1.6.1/python/pyspark/rdd.py", line 797, in reduce 
    vals = self.mapPartitions(func).collect() 
    File "/home/shrey/spark-1.6.1/python/pyspark/rdd.py", line 771, in collect 
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    File "/home/shrey/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__ 
    File "/home/shrey/spark-1.6.1/python/pyspark/sql/utils.py", line 45, in deco 
    return f(*a, **kw) 
    File "/home/shrey/spark-1.6.1/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: java.io.IOException: Login failed on server - 192.168.125.124, port - 21 
    at org.apache.hadoop.fs.ftp.FTPFileSystem.connect(FTPFileSystem.java:133) 
    at org.apache.hadoop.fs.ftp.FTPFileSystem.getFileStatus(FTPFileSystem.java:390) 
    at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1701) 
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1647) 
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:222) 
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:58) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:926) 
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405) 
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 


それはそのいくつかの問題がftpに結果を返す戻ったように思えるログインがポート21で192.168.125.124に失敗したと言いますか?私は結果がどのように返されるのか理解できません。私はスレーブコンピュータのブラウザから自分のftpサーバーにアクセスし、login.My vsftpd.confに次のアクセス権が設定されています。

anonymous_enable=YES 
local_enable=YES 
write_enable=YES 
anon_upload_enable=YES 
anon_mkdir_write_enable=YES 
dirmessage_enable=YES 
xferlog_enable=YES 
listen=YES 
no_anon_password=YES 
anon_root=/srv/ftp 


私はマップを破ると、マップパーツが細かい動作しますが、私は減らす上で同じエラーを取得し

>>> ff= sc.textFile("ftp://192.168.125.124/new.txt") 
>>> df = ff.map(lambda s: len(s)) 
>>> df.reduce(lambda a, b: a + b) 


のように二つの異なる文に一部を減らす 。 クラスタ上で外部データセットを必要としない他の通常のジョブを正常に実行しました。このよう

>>> data=[f for x in xrange(10000)] 
>>> distData=sc.parallelize(data); 
>>>distData.reduce(lambda a, b: a + b) 


として私は今、どのように私はこの問題とどのような私が間違っているのを解決することができ聞かせてください。私はhdfsを使うことができましたが、ftpを使って何が問題なのかを知りたいのです。

答えて

1

thisを見てください:

あなたのURLがフォームftp://username:[email protected]/fileでなければなりません。匿名ログインの場合、ユーザとしてanonymousを使用し、パスワードは何でもかまいません。空であってはいけません。 mapは変換があるとreduceがアクションであるため、

ff= sc.textFile("ftp://anonymous:[email protected]/new.txt") 

そして、あなたは一部を削減するだけで、エラーを見ている理由です。 RDDは、アクションが呼び出されたときにのみ実現されます。

+0

は、私は:)私のftpの問題が解決感謝を考えることができます。しかし、減速はまだ実行されませんでした。私は "サポートされていないシーク"と言って新しいエラーが発生しました (http://pastebin.com/5NpJk8Dg) – rajat

+0

よく、FTPInputStreamはシークをサポートしていません。 https://github.com/hanborq/hadoop/blob/master/src/core/org/apache/hadoop/fs/ftp/FTPInputStream.javaこちらの回避策がありますhttp://stackoverflow.com/questions/ 36271305/read-file-on-remote-machine-in-apache-spark-using-ftp –

+0

彼は最初にaddFileを使用してファイルをダウンロードし、SparkFiles.getを使用してファイルの場所を取得します。しかし、その場所は私のローカルシステム上のファイルの場所です。私が先に進んで実行したとき、私はFileNotFoundExceptionを取得しました。ファイルは確実にスレーブシステムの他の場所に格納されます – rajat

0

二つの問題だったようだ:

  • FTP URLがFTP

きれいに上記答えた最初の問題によりサポートされていない機能を求める

  • 形成されたかを。

    シークの問題については、sc.wholeTextFiles("ftp://anonymous:[email protected]/new.txt")を見てみると、テキストファイルを単一の文字列として取り出すことができます。

    また、あなたがsc.wholeTextFiles("ftp://anonymous:[email protected]/new.txt").values

    希望とラインのRDDを作成することができ、この

  • 関連する問題