2016-10-31 4 views
1

同じ長さではない2つのデータフレームがあります。この条件を使用してPySpark DataFrameの行を返す

D  | Time_Start   | Z 
------ | ----------------------|----- 
Foo | 2016-10-01 00:12:15 | 7 
Cookie | 2016-10-03 00:45:15 | 99 

私の目標のようなこの

A  | Time_Stop    | B 
------ | ----------------------|----- 
Green | 2016-10-01 00:10:15 | 77 
Yellow | 2016-10-03 00:11:15 | 80 
Blue | 2016-10-04 00:12:15 | 6 

二ルックスのような最初のルックスは、一定の制限時間内にある最初のデータフレームからの行のみを返すことがある(5分以内に言うことができます)出力フレームは次のようになります

A  | Time_Stop    | B 
------ | ----------------------|----- 
Green | 2016-10-01 00:10:15 | 77 

私はこれを理解することに問題があります。これまでに私はこれをやろうとしました

from pyspark.sql import functions as F 
timeFmt = "yyyy-MM-dd' 'HH:mm:ss" 
result = df.where(F.unix_timestamp(df1.Time_Start, format = timeFmt) - F.unix_timestamp(df.Time_Stop, format = timeFmt) <= 300) 

これは動作しません。私が探している結果を達成するにはどうすればいいですか?

編集:両方のDataFramesの時間欄が文字列形式であることを忘れてしまいました。

編集2:私は以下を試してエラーを受け取りました。

from pyspark.sql.functions import expo 
df2 = df2.withColumn("Time_Start", df2["Time_Start"].cast("timestamp")) 
df = df.withColumn("Time_Stop", df['Time_Stop'].cast('timestamp')) 
condition = df.Time_Stop + expr("INTERVAL 10 MINUTES") <= df2.Time_Start 
df.filter(condition).show() 

AnalysisException: u'resolved attribute(s) starttime#2251 missing from pickup_time#1964,dropoff_latitude#2090,tip#2180,dropoff_longitude#2072,pickup_latitude#2036,pickup_longitude#2018,payment_type#2108,dropoff_time#2268,mta_tax#2162,trip_distance#2000,fare_amount#2126,toll#2198,rate_code#2054,total#2216,row#1946,surcharge#2144 in operator !Filter (cast(dropoff_time#2268 + interval 10 minutes as timestamp) <= starttime#2251);' 

編集3:私は、しかし私は、クラスタ上で実行するために、これを転送するときに私のコードはうまく翻訳するとは思わない、私のローカルマシンを使用して、この経由で動作することができました。ここで私のコードは、誰かがそれをより速く走らせる方法を指摘するかもしれないか、ちょうどきれいに見える。私はまだこの質問を開いたままにしています。 (正確にcollect()ように動作します)とループは、大きなデータセット(それがすべてでスパークcapacilitiesを使用していない)上の非常に非効率になりますlist()toLocalIterator()を使用して

df = list(df.toLocalIterator()) 
df1 = list(df1.toLocalIterator()) 
rand = [] 
for i in df: 
    for j in df1: 
     elapsed_time = (i['Time_Start'] - j['Time_Stop']).total_seconds() 
     time_limit = 600 
     if (abs(elapsed_time) <= time_limit): 
      rand.append(j) 
rand = list(set(rand)) 

答えて

1

この場合、デカルト結合が最良の解決策であるようです。 Time_StopのDFはfirstDFTime_Startの電話番号はsecondDFとなります。どちらも日付がタイムスタンプにキャストされています。 次にお試しください:

from pyspark.sql import functions as F 
interval = F.unix_timestamp(secondDF.Time_Start) - F.unix_timestamp(firstDF.Time_Stop) 
firstDF.join(secondDF).where(F.abs(interval) < 300).select('A', 'Time_Stop', 'B') 
+0

ありがとうございます。私が正しい道を歩いていたように見えます。フォローアップの質問と同じように。これにより、firstDFの各行とsecondDFのすべての行が比較されます。彼らは実際には実際にはかなり混乱している私の例で注文することが起こる。 –

+0

はい、デカルト結合はfirstDfのすべての行とsecondDfのすべての行とを一致させますが、最初の順序付けはまったく問題ありません。 – Mariusz

関連する問題