Apache SparkとPostgreSQLとのJDBC接続があり、データベースにデータを挿入したいのですが、 append
モードを使用する場合は、それぞれDataFrame.Row
に対してid
を指定する必要があります。 Sparkが主キーを作成する方法はありますか?Apache Sparkの主キー
答えて
スカラ:あなたが必要とするすべてが一意の番号である場合
あなたはzipWithUniqueId
を使用して、データフレームを再作成することができます。まず、いくつかの輸入とダミーデータ:他の用途のための
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
エキススキーマ:
val schema = df.schema
は、idフィールド追加:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
同じ:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
をデータフレームを作成します。 のものPythonの:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
あなたは連続した番号を好む場合は、あなたがzipWithIndex
とzipWithUniqueId
を置き換えることができますが、それはもう少し高価です。 DataFrame
APIと直接
:
(ユニバーサルスカラ座やPython、Javaの、ほとんど同じ構文を持つR)
以前、私はのようにうまく動作するはずmonotonicallyIncreasingId
機能を見逃しています連続番号を必要としない限り長く:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
有用な間はmonotonicallyIncreasingId
は非決定論的です。 IDは実行ごとに異なるだけでなく、後続の操作にフィルタが含まれている場合に行を識別するために追加のトリックを使用することはできません。
注:残念ながら
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
:
rowNumber
ウィンドウ関数を使用することも可能である
は、ウィンドウをWARN:ウィンドウの操作に定義されたパーティション!すべてのデータを単一のパーティションに移動すると、パフォーマンスが大幅に低下する可能性があります。
自然な方法でデータを分割し、一意性を確保することが特に現時点では有効でない場合を除きます。
from pyspark.sql.functions import monotonically_increasing_id
df.withColumn("id", monotonically_increasing_id()).show()
df.withColumnの第2引数がmonotonically_increasing_idであることに注意してください()monotonically_increasing_idありません。
zipWithIndex()が望ましい動作である場合、すなわち連続した整数を要求する場合に、以下の解決策が比較的簡単であることが分かりました。
この場合、pysparkを使用して辞書の理解に頼って、元の行オブジェクトを一意索引を含む新しいスキーマに適合する新しい辞書にマップします。
# read the initial dataframe without index
dfNoIndex = sqlContext.read.parquet(dataframePath)
# Need to zip together with a unique integer
# First create a new schema with uuid field appended
newSchema = StructType([StructField("uuid", IntegerType(), False)]
+ dfNoIndex.schema.fields)
# zip with the index, map it to a dictionary which includes new field
df = dfNoIndex.rdd.zipWithIndex()\
.map(lambda (row, id): {k:v
for k, v
in row.asDict().items() + [("uuid", id)]})\
.toDF(newSchema)
- 1. Apache SparkとApache Ignite
- 2. Bluemix Apache Spark Metrics
- 3. Apache Spark RDDワークフロー
- 4. 主キー
- 5. RDDとApache Sparkのパーティション
- 6. Apache Spark RDDのScalazタイプクラス
- 7. Apache SparkのJava要件
- 8. Apache sparkのアクセス制御
- 9. 主キーは
- 10. インデックスと主キー
- 11. 主キー - SQL2005
- 12. 複合主キー
- 13. SQL Serverの:主キー
- 14. フェニックスの主キーが
- 15. 6データベースの主キー
- 16. 主キーと外部キー?
- 17. 変更主キー、外部キー
- 18. 主キーと外部キー
- 19. 再起動主キー
- 20. それは主キー
- 21. JPA主キー違反
- 22. コードファースト - 主キー制約
- 23. SQL:主キーにするか、主キーにしない?
- 24. Cassandra主キーと代替キーの定義
- 25. Grails:主キーとしての外部キー?
- 26. 2つのテーブルの主キー
- 27. ビュー内のエンティティフレームワークの主キー
- 28. テーブルの主キーの検出
- 29. SQLiteの:無名の主キー
- 30. ビューの主キーの設定
特別な要件はありますか?データ型、連続値、その他何か? – zero323
いいえ、ちょうど良い良いユニークな整数 – Nhor