2017-01-06 16 views
1

sparkに複数のMySQLテーブルを結合しようとしています。これらのテーブルの中には、列名が重複しているものがあります(各テーブルには、そのテーブルに固有のidフィールドがあります)。sparkのカラム名が重複しているテーブルを結合する

私が実行しようとすると:

val myDF = session.read.jdbc("t1 inner join t2 on t1.t2_id = t2.id, queryTable, prop) 
myDF.show 

を両方のテーブルが(別の意味での)idフィールドを持っているので、私はjava.sql.SQLIntegrityConstraintViolationException: Column 'id' in field list is ambiguousを得る

私がやってみました:

val t1DF = spark.read.jdbc(dbstring, "t1", "id").alias("a") 
val t2DF = spark.read.jdbc(dbstring, "t2", "id").alias("b") 
val joinedDF = t1DF.join(t2DF, Seq("a.t2_id", "b.id")) 
    .selectExpr("ent.id as entity_id", "lnk.pagerank") 

私が得ましたエラーorg.apache.spark.sql.AnalysisException: using columns ['t1.t2_id,'t2.id] can not be resolved given input columns: [..]エイリアスがエイリアスの処理方法を知らないように見えます。

動作するようです唯一のオプションは、サブクエリを使用している:私は、任意のフィルタを行う事が非許容可能遅くなって、あらゆることができる前に

spark.read.jdbc(dbstring, "(select t1.id as t1_id, t1.t2_id from 
t1 inner join t2 on t1.t2_id = t2.id) t", "t2_id") 

ものの、その場合にはサブクエリが実行を終了する必要があります。クエリ分割は役に立たない。

スパークは、idのid#528id#570の間で曖昧さを取り除くための内部的な方法があるようですが、select文でそれらを参照する方法はわかりません。

答えて

0

私は同じ問題がありました。私がこれを解決するために見つけた唯一の方法は、列名に接尾辞を付けることでした。次のようになります。

val t1DF = spark.read.jdbc(dbstring, "t1", "id").select(col("id").alias("id_t1")) 
val t2DF = spark.read.jdbc(dbstring, "t2", "id").select(col("id").alias("id_t2")) 

val joinedDF = t1DF.join(t2DF, t1DF("id_t1") === t2DF("id_t2")) 
関連する問題