2017-01-14 2 views
3

(RDDは、数百万行を持っている)、私はデータフレームDATAFRAME私は1.6.2</p> <p>私は.CSVデータを持っている、それは約8万行が含まれているApacheのスパークを使用してい

に変換したいのRDDを変換できません。

しかし、私は、私は

はRDDをマッピングしたいこと(列)のデータを取得するためにマッピングを行うために最初RDDにそれを変換する必要がありますが正常に動作しますが、それはデータフレームにRDDを変換することになると、スパークは、エラーがスローされます

Traceback (most recent call last): 
    File "C:/Users/Dzaky/Project/TJ-source/source/201512/final1.py", line 38, in <module> 
    result_iso = input_iso.map(extract_iso).toDF() 
    File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 64, in toDF 
    File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 423, in createDataFrame 
    File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 310, in _createFromRDD 
    File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 254, in _inferSchema 
    File "c:\spark\python\lib\pyspark.zip\pyspark\rdd.py", line 1315, in first 
    File "c:\spark\python\lib\pyspark.zip\pyspark\rdd.py", line 1297, in take 
    File "c:\spark\python\lib\pyspark.zip\pyspark\context.py", line 939, in runJob 
    File "c:\spark\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py", line 813, in __call__ 
    File "c:\spark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 45, in deco 
    File "c:\spark\python\lib\py4j-0.9-src.zip\py4j\protocol.py", line 308, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.net.SocketException: Connection reset by peer: socket write error 

これらは私のコードです:私はかどうかを知りません

def extract_iso(line): 
    fields = line.split(',') 
    return [fields[-2], fields[1]] 

input_iso = sc.textFile("data.csv") 
result_iso = input_iso.map(extract_iso).toDF() 

data.csvは800万人以上の行を持っていますが、それが唯一の< 500行を持ってまで、私は行をsubstractとき、プログラムが正常に動作しますSparkに行制限などがありますが、何らかの方法でRDDを変換できますか?

また、RDDをマップするようにDataFrameをマップする方法はありますか?

追加情報:

データが乱雑で、各行の合計の列は、私が最初にそれをマップする必要が理由です、別の 1からoftenly異なっています。 しかし、私が必要とするデータは常に同じインデックス[1]と[-2](2番目の列と2番目の最後の列)であり、列間の合計列は行ごとに異なります

ありがとうございます。

+1

などのspark-csvパッケージを試しましたか? https://github.com/databricks/spark-csv –

+0

@RajatMishraはい私はそれを試みましたが、問題は、データが厄介であり、各行の合計列がしばしば1つずつ異なっているためです。データのDataFrameを構築する前にまずマップを作成してください –

+1

'sc.wholeTextFile'を使って試してみてください。 http://stackoverflow.com/questions/41195924/error-while-reading-very-large-files-with-spark-csv-package –

答えて

4

おそらく、Sparkが新しく作成したデータフレームのスキーマを特定しようとしている可能性があります。 RDDをDFにマッピングする第2の方法を試してください。スキーマを指定して

>>> from pyspark.sql.types import * 
>>> schema = StructType([StructField('a', StringType()),StructField('b', StringType())]) 
>>> df = sqlContext.createDataFrame(input_iso.map(extract_iso), schema) 
+0

オハイオ州の神IT ITORKS !!!!非常にありがとう、はい、あなたは正しいです、問題は私が再び非常に非常に非常にありがとう、スキーマを特定していないです:) –

関連する問題