4

私はpysparkにはかなり新しく、csvファイルとして保存される大きなデータセットを処理するためにこのファイルを使用しようとしています。 私はCSVファイルをsparkデータフレームに読み込み、いくつかのカラムを削除して、新しいカラムを追加したいと思います。 どうすればいいですか?PySpark CSVをDataframeに読み込んで操作する方法

このデータをデータフレームに取り込む際に問題があります。これは、削減ステップでエラーTypeError: 'JavaPackage' object is not callableを生成

def make_dataframe(data_portion, schema, sql): 
    fields = data_portion.split(",") 
    return sql.createDateFrame([(fields[0], fields[1])], schema=schema) 

if __name__ == "__main__": 
    sc = SparkContext(appName="Test") 
    sql = SQLContext(sc) 

    ... 

    big_frame = data.flatMap(lambda line: make_dataframe(line, schema, sql)) 
       .reduce(lambda a, b: a.union(b)) 

    big_frame.write \ 
     .format("com.databricks.spark.redshift") \ 
     .option("url", "jdbc:redshift://<...>") \ 
     .option("dbtable", "my_table_copy") \ 
     .option("tempdir", "s3n://path/for/temp/data") \ 
     .mode("append") \ 
     .save() 

    sc.stop() 

:これは私がこれまで持っているもののストリップダウンバージョンです。

これは可能ですか?データフレームを減らすというアイデアは、得られたデータをデータベース(Redshift、spark-redshiftパッケージを使用して)に書き込むことができるようにすることです。

unionAll()map()partial()と一緒に使ってみましたが、動作させることはできません。

私はAmazonのEMRで、spark-redshift_2.10:2.0.0、AmazonのJDBCドライバRedshiftJDBC41-1.1.17.1017.jarを使っています。

+0

入力(CSVフィールド)とは何ですか、出力は何ですか?コードを再設計する必要があるかもしれません。私は、データフレームを作成してそれらを結合する必要性を避けることができます.... – Yaron

+0

@ Yaron csvは、データフレームのスキーマに配置できる単なる数字です。私はこれをRDDとして簡単かつ効率的に保存できることを知っていますが、それを行うと、私はそれが究極の目標である(私が知る限り)赤方偏移データベースに書き込むことができません。 –

+0

私は何をヒントしようとしましたか?あなたは、いくつかのデータフレーム+それらの結合を必要とせずに、1つのスパークデータフレームを使用して解決できると思います。もう一度 - どのアルゴリズムを使用しようとしていますか?期待される成果は? – Yaron

答えて

7

アップデート - 応答のコメントでもご質問:データフレームへのCSVから

読み取りデータを: あなただけCSVスパークデータフレームの中にファイルを読み込もうと思われます。

もしそうなら - 私の答えはここにあります:https://stackoverflow.com/a/37640154/5088142これをカバーしてください。

次のコードを使用して、 "ドロップ(COL)" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

ドロップを使用して列を削除することができ

import pyspark 
sc = pyspark.SparkContext() 
sql = SQLContext(sc) 

df = (sql.read 
     .format("com.databricks.spark.csv") 
     .option("header", "true") 
     .load("/path/to_csv.csv")) 

// these lines are equivalent in Spark 2.0 
spark.read.format("csv").option("header", "true").load("/path/to_csv.csv") 
spark.read.option("header", "true").csv("/path/to_csv.csv") 

ドロップカラム

火花データフレームにCSVを読んでください(列)

Returns a new DataFrame that drops the specified column. 
Parameters: col – a string name of the column to drop, or a Column to drop. 

>>> df.drop('age').collect() 
[Row(name=u'Alice'), Row(name=u'Bob')] 

>>> df.drop(df.age).collect() 
[Row(name=u'Alice'), Row(name=u'Bob')] 

>>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect() 
[Row(age=5, height=85, name=u'Bob')] 

>>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect() 
[Row(age=5, name=u'Bob', height=85)] 

追加カラムあなたが "withColumn" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

withColumn(colNameに、COL)を使用することができ

Returns a new DataFrame by adding a column or replacing the existing column that has the same name. 
Parameters: 

    colName – string, name of the new column. 
    col – a Column expression for the new column. 

>>> df.withColumn('age2', df.age + 2).collect() 
[Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)] 

注:スパークを使用することができる他の多くの機能を有している(例えば

+0

ありがとうございます。処理の一環として、データフレームから列を削除する(読み込み中のCSVファイルに含まれている)必要があります。また、IDを含むフレームに新しい列を追加してからデータをredshiftに書き込む必要があります。ここにあるメソッドを使用してデータを読み取ってから、この処理をデータフレームで実行できますか? –

+0

あなたのコメントにあなたの答えにあなたの答えを更新しました – Yaron

+0

@TimB - あなたの質問に答えたら、それを受け入れてください。 – Yaron

関連する問題