2016-08-05 22 views
1

私はcsvデータを持ち、read_csvを使ってpnadasデータフレームを作成し、すべてのカラムを文字列として強制します。 次に、pandasデータフレームからsparkデータフレームを作成しようとすると、以下のエラーメッセージが表示されます。pandas dataframe to data frame "タイプエラーをマージできません"

from pyspark import SparkContext 
    from pyspark.sql import SQLContext 
    from pyspark.sql.types import * 
    z=pd.read_csv("mydata.csv", dtype=str) 
    z.info() 
<class 'pandas.core.frame.DataFrame'> 
Int64Index: 74044003 entries, 0 to 74044002 
Data columns (total 12 columns): 
primaryid  object 
event_dt  object 
age    object 
age_cod   object 
age_grp   object 
sex    object 
occr_country object 
drug_seq  object 
drugname  object 
route   object 
outc_cod  object 
pt    object 

q= sqlContext.createDataFrame(z) 

File "<stdin>", line 1, in <module> 
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 425, in createDataFrame 
rdd, schema = self._createFromLocal(data, schema) 
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 341, in _createFromLocal 
struct = self._inferSchemaFromList(data) 
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 241, in _inferSchemaFromList 
schema = reduce(_merge_type, map(_infer_schema, data)) 
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 862, in _merge_type 
for f in a.fields] 
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 856, in _merge_type 
raise TypeError("Can not merge type %s and %s" % (type(a), type(b))) 
TypeError: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.StringType'> 

これは例です。パブリックデータをダウンロードしてパンダデータフレームを作成していますが、スパークはパンダデータフレームからスパークデータフレームを作成しません。

 import pandas as pd 
     from pyspark import SparkContext 
     from pyspark.sql import SQLContext 
     from pyspark.sql.types import * 

     url ="http://www.nber.org/fda/faers/2016/demo2016q1.csv.zip" 

     import requests, zipfile, StringIO 
     r = requests.get(url, stream=True) 
     z = zipfile.ZipFile(StringIO.StringIO(r.content)) 
     z.extractall() 


     z=pd.read_csv("demo2016q1.csv") # creates pandas dataframe 

    Data_Frame = sqlContext.createDataFrame(z) 
+0

a)は、なぜあなただ​​けの並列化データをローカルに読んでください。これは反パターンです。 b)「オブジェクト」としてマークされているすべての列は、Spark DataFramesでサポートされていない異種データを示唆しています。 – zero323

+0

あなたは正しいです、それはローカルで読む正しい方法ではありませんが、他のオプションが失敗したので、私はパンダからのデータフレームが簡単に処理できることを期待しました。あなたが言ったように、コラムは異種です。私が試すことができる回避策はありますか? –

+0

あなたは[mcve]を提供できますか?いくつかのおもちゃのサンプルは、そこに起こっていることを説明します... – zero323

答えて

3

短いストーリーは、スキーマの推論に依存しません。それは一般に高価で扱いにくいものです。特に、データ内のいくつかの列(たとえばevent_dt_num)には値がないため、Pandasは混合型(文字列が欠落していない場合は文字列、欠損値がある場合はNaN)として表現します。

疑問がある場合は、すべてのデータを文字列として読み取り、その後にキャストする方がよいでしょう。コードブックにアクセスできる場合は、問題を回避して全体のコストを削減するために、常にスキーマを提供する必要があります。

最後にドライバからデータを渡すことは、アンチパターンです。あなたが直接csv形式(スパーク2.0.0+)またはspark-csvライブラリを使用してこのデータを読み取ることができなければなりません(1.6スパーク以下):

df = (spark.read.format("csv").options(header="true") 
    .load("/path/tp/demo2016q1.csv")) 

## root 
## |-- primaryid: string (nullable = true) 
## |-- caseid: string (nullable = true) 
## |-- caseversion: string (nullable = true) 
## |-- i_f_code: string (nullable = true) 
## |-- i_f_code_num: string (nullable = true) 
## ... 
## |-- to_mfr: string (nullable = true) 
## |-- occp_cod: string (nullable = true) 
## |-- reporter_country: string (nullable = true) 
## |-- occr_country: string (nullable = true) 
## |-- occp_cod_num: string (nullable = true) 

inferSchema="true"オプションを追加する。この特定のケースにおいても動作するはずですが、それはありますまだそれを避ける方が良い。次のようにも、スキーマを提供することができます。

読者に直接
from pyspark.sql.types import StructType 

schema = StructType.fromJson({'fields': [{'metadata': {}, 
    'name': 'primaryid', 
    'nullable': True, 
    'type': 'integer'}, 
    {'metadata': {}, 'name': 'caseid', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'caseversion', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'i_f_code', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 
    'name': 'i_f_code_num', 
    'nullable': True, 
    'type': 'integer'}, 
    {'metadata': {}, 'name': 'event_dt', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'event_dt_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'mfr_dt', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'mfr_dt_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'init_fda_dt', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 
    'name': 'init_fda_dt_num', 
    'nullable': True, 
    'type': 'string'}, 
    {'metadata': {}, 'name': 'fda_dt', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'fda_dt_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'rept_cod', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 
    'name': 'rept_cod_num', 
    'nullable': True, 
    'type': 'integer'}, 
    {'metadata': {}, 'name': 'auth_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'mfr_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'mfr_sndr', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'lit_ref', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'age', 'nullable': True, 'type': 'double'}, 
    {'metadata': {}, 'name': 'age_cod', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'age_grp', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'age_grp_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'sex', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'e_sub', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'wt', 'nullable': True, 'type': 'double'}, 
    {'metadata': {}, 'name': 'wt_cod', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'rept_dt', 'nullable': True, 'type': 'integer'}, 
    {'metadata': {}, 'name': 'rept_dt_num', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'to_mfr', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 'name': 'occp_cod', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 
    'name': 'reporter_country', 
    'nullable': True, 
    'type': 'string'}, 
    {'metadata': {}, 'name': 'occr_country', 'nullable': True, 'type': 'string'}, 
    {'metadata': {}, 
    'name': 'occp_cod_num', 
    'nullable': True, 
    'type': 'integer'}], 
'type': 'struct'}) 

(spark.read.schema(schema).format("csv").options(header="true") 
    .load("/path/to/demo2016q1.csv")) 
+0

すばらしい説明をありがとう。実のところ、私はJupyterにspark-csvライブラリを正常に追加できなかったので、私はPandasに切り替えました。私はHDP 2.4(Spark 1.6)を使用しています。私はJupyterをインストールしました。私はspark-csvとcommons-csvをダウンロードし、Jupiterのノートブックスターターでこれらのjarファイルへのパスを指定しましたが、csvデータを読み込もうとするとライブラリを取得できませんでした。さて、私は火花の殻から試してみました。 Jupyter(ipython)ノートブックでspark-csvライブラリを使用したことがありますか? –

+0

確かに、このメソッドはうまくいくはずですhttp://stackoverflow.com/a/35762809/1560062 – zero323

+0

それは魅力のように動作します!百万をありがとう。私は他の多くのオプションを試して、多くの時間を費やしました。あなたが提供したリンクは、分単位で完了するのに役立ちました。 –

関連する問題