2016-06-11 6 views
3

私はSparkを使用してPostgresデータベースにテキストファイルをエクスポートしようとしています。私は個々のテキストファイルをエクスポートするために以下のコードを使用しています。私は同じフォルダ内に200近いテキストファイルを持ち、すべてのテキストファイルは同じ構造をしています。残念ながら、年の値は私の入力ファイルの一部ではないので、私はそれをコーディングしています。Spark - Automationを使用してPostgreSQLにテキストファイルをエクスポートする

私は一度にこれらのファイルをすべてアップロードしたいと思いますが、それを行う方法はわかりません。

from pyspark.sql import SQLContext, Row 
sqlContext = SQLContext(sc) 

lines = sc.textFile("/aaaa/bbbb/DataFile/t-1870.txt") 
splits = lines.map(lambda l: l.split(",")) 
raw_data = splits.map(lambda b: Row(name=b[0], gender=b[1],count=int(b[2]),year=int(1870))) 

schemaBabies = sqlContext.createDataFrame(raw_data) 
schemaBabies.registerTempTable("raw_data") 

df = sqlContext.sql("select * from raw_data") 

pgurl="jdbc:postgresql://localhost:5432/sparkling?user=XXXX&password=XXXX" 
properties={"user":"XXXX","password":"XXXX","driver":"org.postgresql.Driver","mode":"append"} 

df.write.jdbc(url = pgurl ,table = "EDW.raw_data",properties=properties) 

答えて

2

は、あなたのデータは次のようになりますと仮定しましょう:

from pyspark.sql.types import * 

schema = StructType([ 
    StructField("name", StringType(), True), 
    StructField("gender", StringType(), True), 
    StructField("count", LongType(), True) 
]) 

df = (sqlContext.read.format("com.databricks.spark.csv") 
    .schema(schema) 
    .load(out)) 

エキス年:

import csv 
import tempfile 
import os 

out = tempfile.mkdtemp() 
data = [ 
    ("1870", [("Jane Doe", "F", 3)]), 
    ("1890", [("John Doe", "M", 1)]), 
] 

for year, rows in data: 
    with open(os.path.join(out, "t-{0}.txt".format(year)), "w") as fw: 
     csv.writer(fw).writerows(rows) 

スタートPySparkセッションまたはスクリプトが指定されたスキーマで正しいspark-csv--packagesへの引数と負荷データを渡す提出ファイル名から次のように記述します。

from pyspark.sql.functions import input_file_name, regexp_extract 

df_with_year = (df.withColumn(
    "year", 
    regexp_extract(input_file_name(), "[1-2][0-9]{3}", 0).cast("int"))) 

df_with_year.show() 
## +--------+------+-----+----+ 
## | name|gender|count|year| 
## +--------+------+-----+----+ 
## |John Doe|  M| 1|1890| 
## |Jane Doe|  F| 3|1870| 
## +--------+------+-----+----+ 

df_with_year.write.jdbc(...) 

重要:Spark < 2.0では、このアプローチはPythonとJVMの間でデータを渡さないことに依存します。 は、とPython UDFまたはDataFrame.rdd.mapを使用できません。

+1

あなたの入力に基づいて私のコードにいくつかの変更を加えました。私はすべての200個のテキストファイルをデータベースに読み込むことができました。本当にあなたの助けを感謝します。 – ytasfeb15

関連する問題