0
csvファイルをRDD形式にインポートしようとしています。私が.first()
コマンドを使用してrddの最初の行を取得すると、以下のようにエラーが発生します。first()は既存のRDDで動作しません
.map
機能は、.first()
と.count()
のようなコマンドが動作しないパイプライン型RDDにRDDを機能させるようです。これに取り組む他の方法はありますか?
import csv
import StringIO
def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["PassengerId","Survived","Pclass","Name","Sex","Age","SibSp","Parch","Ticket","Fare","Cabin","Embarked"])
return reader.next()
input = sc.textFile("C:\Users\rohit.guglani\Documents/train.csv",4).map(loadRecord)
type(input)
pyspark.rdd.PipelinedRDD
input.first()
は、このエラーを与える:Windows上で作業するとき
Py4JJavaError Traceback (most recent call last)
<ipython-input-9-d93d15081c08> in <module>()
----> 1 input.first()
C:\spark-1.6.1\python\pyspark\rdd.pyc in first(self)
1313 ValueError: RDD is empty
1314 """
-> 1315 rs = self.take(1)
1316 if rs:
1317 return rs[0]
C:\spark-1.6.1\python\pyspark\rdd.pyc in take(self, num)
1265 """
1266 items = []
-> 1267 totalParts = self.getNumPartitions()
1268 partsScanned = 0
1269
C:\spark-1.6.1\python\pyspark\rdd.pyc in getNumPartitions(self)
2361
2362 def getNumPartitions(self):
-> 2363 return self._prev_jrdd.partitions().size()
2364
2365 @property
C:\spark-1.6.1\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814
815 for temp_arg in temp_args:
C:\spark-1.6.1\python\pyspark\sql\utils.pyc in deco(*a, **kw)
43 def deco(*a, **kw):
44 try:
---> 45 return f(*a, **kw)
46 except py4j.protocol.Py4JJavaError as e:
47 s = e.java_exception.toString()
C:\spark-1.6.1\python\lib\py4j-0.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(
Py4JJavaError: An error occurred while calling o50.partitions.
ohit.guglani/Documents/train.csv
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64)
at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Unknown Source)
を使用しているときには注意しなければなりません。 textFileに渡すパス文字列をチェックして、バックスラッシュ文字を含まないか、文字列の先頭に 'r'を付けて、Pythonがバックスラッシュ文字を特殊文字と解釈しないようにします。 –
ありがとうフィリップ。バックスラッシュに問題がありました。パイプライン化されたRDDでは.first()を実行できませんが、.top(n)はパイプライン化されたRDDで動作します。 –