2016-11-21 9 views
0

をPySpark CHECK_NUMBER ==>フィルターは、私がPySparkを使用していると私がチェックする方法を探してい

を考えると... rdd2からこのCHECK_NUMBERに関するすべての情報を取得含まれています

rdd1 = sc.parallelize([(u'_guid_F361IeVTC8Q0kckDRw7iOJCe64ELpRmMKQgESgf-uEE=', 
         u'serviceXXX', 
         u'testAB_02', 
         u'2016-07-03')]) 

最初の要素がIDであり、2番目がサービス名で、3番目がテスト名で、IDであり、4番目の要素が日付であるとします。

rdd2 = sc.parallelize([(u'9b023b8233c242c09b93506942002e0a', 
         u'01', 
         u'2016-11-02'), 

         (u'XXXX52547412558933nnBlmquhdyhM', 
         u'02', 
         u'2016-11-04')]) 

第1要素がIDであり、第2要素がテストIDであり、最後の要素が日付であるとします。

rdd1testAB_02これは私のcheck_numberと一致しません(サービス名はcheck_numberの値で終わらなければなりません)。私のオブジェクトは、すべての行をrdd2から、01をテストIDとして取得する場合です。ここで予想される出力は次のようにする必要があります。

[(u'9b023b8233c242c09b93506942002e0a', 
    u'01', 
    u'2016-11-02') 

これは私のコードです:

def update_typesdecohorte_table(rdd1, rdd2): 

    if rdd1.filter(lambda x : (re.match('.*?' + check_number, x[2]))).isEmpty() is True: 

     new_rdd2 = rdd2.filter(lambda x : x[1] == check_number) 

    else: 

     pass 

    return new_rdd2 

new_rdd2 = update_typesdecohorte_table(rdd1, rdd2) 

WICHができます:

[(u'9b023b8233c242c09b93506942002e0a', u'01', u'2016-11-02')] 

このコードは動作しますが、私は方法が好きではありません..これを行う最も効率的な方法は何ですか?あなたはRDD1に一致する要素を持っていないRDD2からすべてのレコードを取得したい場合は

答えて

1

を使用できcartesian

new_rdd2 = rdd1.cartesian(rdd2) 
    .filter(lambda r: not r[0][2].endswith(r[1][1])) 
    .map(lambda r: r[1]) 

あなたCHECK_NUMBERが固定されている場合は、この値によって、エンドフィルターで:

new_rdd2.filter(lambda r: r[1] == check_number).collect() 

check_numberが固定されていて、両方のRDDが大きければ、ジョイン時にパーティション上でシャッフルする必要があるため、あなたのソリューションよりも速度が遅くなります(コードはシャフリング以外の変換のみ行います)。

+0

Thxマリウス!これは私が望むものであり、その方法は頑強に見えます! – DataAddicted

関連する問題