2015-10-13 17 views
18

Apache SparkとPostgreSQLとのJDBC接続があり、データベースにデータを挿入したいのですが、 appendモードを使用する場合は、それぞれDataFrame.Rowに対してidを指定する必要があります。 Sparkが主キーを作成する方法はありますか?Apache Sparkの主キー

+0

特別な要件はありますか?データ型、連続値、その他何か? – zero323

+0

いいえ、ちょうど良い良いユニークな整数 – Nhor

答えて

30

スカラ:あなたが必要とするすべてが一意の番号である場合

あなたはzipWithUniqueIdを使用して、データフレームを再作成することができます。まず、いくつかの輸入とダミーデータ:他の用途のための

import sqlContext.implicits._ 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructType, StructField, LongType} 

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar") 

エキススキーマ:

val schema = df.schema 

は、idフィールド追加:

val dfWithPK = sqlContext.createDataFrame(
    rows, StructType(StructField("id", LongType, false) +: schema.fields)) 

同じ:

val rows = df.rdd.zipWithUniqueId.map{ 
    case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)} 

をデータフレームを作成します。 のものPythonの

from pyspark.sql import Row 
from pyspark.sql.types import StructField, StructType, LongType 

row = Row("foo", "bar") 
row_with_index = Row(*["id"] + df.columns) 

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF() 

def make_row(columns): 
    def _make_row(row, uid): 
     row_dict = row.asDict() 
     return row_with_index(*[uid] + [row_dict.get(c) for c in columns]) 
    return _make_row 

f = make_row(df.columns) 

df_with_pk = (df.rdd 
    .zipWithUniqueId() 
    .map(lambda x: f(*x)) 
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields))) 

あなたは連続した番号を好む場合は、あなたがzipWithIndexzipWithUniqueIdを置き換えることができますが、それはもう少し高価です。 DataFrame APIと直接

(ユニバーサルスカラ座やPython、Javaの、ほとんど同じ構文を持つR)

以前、私はのようにうまく動作するはずmonotonicallyIncreasingId機能を見逃しています連続番号を必要としない限り長く:

import org.apache.spark.sql.functions.monotonicallyIncreasingId 

df.withColumn("id", monotonicallyIncreasingId).show() 
// +---+----+-----------+ 
// |foo| bar|   id| 
// +---+----+-----------+ 
// | a|-1.0|17179869184| 
// | b|-2.0|42949672960| 
// | c|-3.0|60129542144| 
// +---+----+-----------+ 

有用な間はmonotonicallyIncreasingIdは非決定論的です。 IDは実行ごとに異なるだけでなく、後続の操作にフィルタが含まれている場合に行を識別するために追加のトリックを使用することはできません。

:残念ながら

from pyspark.sql.window import Window 
from pyspark.sql.functions import rowNumber 

w = Window().orderBy() 
df.withColumn("id", rowNumber().over(w)).show() 

rowNumberウィンドウ関数を使用することも可能である

は、ウィンドウをWARN:ウィンドウの操作に定義されたパーティション!すべてのデータを単一のパーティションに移動すると、パフォーマンスが大幅に低下する可能性があります。

自然な方法でデータを分割し、一意性を確保することが特に現時点では有効でない場合を除きます。

+0

これはRでのみ動作しますか?私はあなたが上記のスカラを使用したことを知っていますが、私はこの 'zipWithUniqueId'について見つけることができるのはSparkRの文書にのみあります – Nhor

+0

実際にはScalaです。 Pythonソリューションが必要ですか?プレーンSQL? – zero323

+0

いいえ、私はあなたのコードを理解することができます、私はちょうど 'zipWithUniqueId'についてのpysparkドキュメントに何かがあるかどうか尋ねていましたが、結局私がそれを見つけたので、あなたの解決策のためにたくさんの! – Nhor

7
from pyspark.sql.functions import monotonically_increasing_id 

df.withColumn("id", monotonically_increasing_id()).show() 

df.withColumnの第2引数がmonotonically_increasing_idであることに注意してください()monotonically_increasing_idありません。

3

zipWithIndex()が望ましい動作である場合、すなわち連続した整数を要求する場合に、以下の解決策が比較的簡単であることが分かりました。

この場合、pysparkを使用して辞書の理解に頼って、元の行オブジェクトを一意索引を含む新しいスキーマに適合する新しい辞書にマップします。

# read the initial dataframe without index 
dfNoIndex = sqlContext.read.parquet(dataframePath) 
# Need to zip together with a unique integer 

# First create a new schema with uuid field appended 
newSchema = StructType([StructField("uuid", IntegerType(), False)] 
         + dfNoIndex.schema.fields) 
# zip with the index, map it to a dictionary which includes new field 
df = dfNoIndex.rdd.zipWithIndex()\ 
         .map(lambda (row, id): {k:v 
               for k, v 
               in row.asDict().items() + [("uuid", id)]})\ 
         .toDF(newSchema)