2016-09-02 1 views
0

私は、Spark 1.3.1を使用していると私はカサンドラにデータをフィルタリングするための小さなプログラムを書かれているカサンドラスパークコネクタとフィルタリングデータ

val sc = new SparkContext(conf) 
val rdd = sc.cassandraTable("foo", "bar") 
val date = DateTime.now().minusHours(1) 
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate)) 
println(rdd2.count()) 
sc.stop() 

このプログラムは、非常に長い時間

などの印刷メッセージを実行します
16/09/01 21:10:31 INFO Executor: Running task 46.0 in stage 0.0 (TID 46) 
16/09/01 21:10:31 INFO TaskSetManager: Finished task 42.0 in stage 0.0 (TID 42) in 20790 ms on localhost (43/1350) 

私はプログラムを終了し、

val date = DateTime.now().minusHours(1) 
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate)) 

に私のコードを変更する場合、それはまだ非常にloのために実行されますプログラムは常にメモリに全体カサンドラテーブルをロードしよう(またはそれを完全にスキャンしてみてください)だけにして、フィルタを適用するよう

6/09/01 21:14:01 INFO Executor: Running task 8.0 in stage 0.0 (TID 8) 
16/09/01 21:14:01 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 19395 ms on localhost (5/1350) 

のようなメッセージでNG時は、だから、そうです。それは私には非常に非効率的です。

sparkがcassandraテーブル全体をRDDにロードしてからフィルタを適用しようとしないように、このコードをより良い方法で書くにはどうすればよいですか?コード

val rdd = sc.cassandraTable("foo", "bar") 
val date = DateTime.now().minusDays(30) 
rdd.filter(r => r.getDate("date").after(date.toDate)).count // Count Filtered RDD 

答えて

1

あなたの最初の部分は、だから、注意してください。 RDDは不変なので、フィルタを適用するときは、関数を適用したRDDではなく、返されたRDDを使用する必要があります。カサンドラからの読み込みの詳細についてはefficency


val rdd = sc.cassandraTable("foo", "bar") 
val date = DateTime.now().minusDays(30) 
rdd.filter(r => r.getDate("date").after(date.toDate)) // Filters RDD 
println(rdd.cassandraCount()) // Ignores filtered rdd and counts everything 

あなたの日付列を使用すると、カサンドラに述語を押し下げる.where機能を使用することができますクラスタリング・キーである場合。それ以外に、データサーバー側をプルーニングすることはあまりありません。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#filtering-rows---where

+0

日付列がクラスタリングキーであるかどうかを確認するにはどうすればよいですか?私が発行できるコマンドがありますか? –

+0

私はあなたの提案を試みたが、フィルタの結果をrdd2に割り当ててからそれを数えた。まだ '' 0.0(TID 4)のタスク4.0がローカルホスト(11/1350)の112031ミリ秒で完了したと言っています ' –

+1

クラスタリングキーは、カサンドラのディスク上の情報を整理するコンセプトです。あなたのカサンドラスキーマの中核部分です。コードを投稿していないので、なぜ時間がかかるのか答えられません。それは多くのタスクを実行している必要があります。しかし、プッシュダウンなしで完全なテーブルスキャンよりも遅くなるケースはほとんどありません。 – RussS

関連する問題