あなたは2つのデータフレームにsocket1
と1とsocket2
と他のものを、あなたのデータフレームを分割し、その後leftanti
が結合を使用することができます:私はここからサンプルデータを借用し、C = 1とdest = 3
amounts src dest
522 1 2
80 1 3
フィルタリングの代わりに(spark >= 2.0
のために働く)。
df = spark.createDataFrame(
sc.parallelize([
[10,1,2,"A","B"],
[11,1,2,"B","C"],
[12,1,2,"C","D"],
[510,1,2,"C","D"],
[550,1,2,"B","C"],
[500,1,2,"A","B"],
[80,1,3,"A","B"]
]),
["amounts","src","dest","socket1","socket2"]
)
そして今、データフレームを分割する:
まず者は、データフレームを作成してみましょう
スパーク> = 2.0
df1 = df.withColumnRenamed("socket1", "socket").drop("socket2")
df2 = df.withColumnRenamed("socket2", "socket").drop("socket1")
res = df2.join(df1, ["src", "dest", "socket"], "leftanti")
スパーク1.6
df1 = df.withColumnRenamed("socket1", "socket").drop("socket2").withColumnRenamed("amounts", "amounts1")
df2 = df.withColumnRenamed("socket2", "socket").drop("socket1")
res = df2.join(df1.alias("df1"), ["src", "dest", "socket"], "left").filter("amounts1 IS NULL").drop("amounts1")
そして最後に集約:
import pyspark.sql.functions as psf
res.groupBy("src", "dest").agg(
psf.sum("amounts").alias("amounts")
).show()
+---+----+-------+
|src|dest|amounts|
+---+----+-------+
| 1| 3| 80|
| 1| 2| 522|
+---+----+-------+
あなたはすでに何を試してみましたか?あなたの実験を共有できますか? – Mariusz
はい、1秒は – guimption