私は、Spark 1.3.1を使用していると私はカサンドラにデータをフィルタリングするための小さなプログラムを書かれているカサンドラスパークコネクタとフィルタリングデータ
val sc = new SparkContext(conf)
val rdd = sc.cassandraTable("foo", "bar")
val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))
println(rdd2.count())
sc.stop()
このプログラムは、非常に長い時間
などの印刷メッセージを実行します16/09/01 21:10:31 INFO Executor: Running task 46.0 in stage 0.0 (TID 46)
16/09/01 21:10:31 INFO TaskSetManager: Finished task 42.0 in stage 0.0 (TID 42) in 20790 ms on localhost (43/1350)
私はプログラムを終了し、
val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))
に私のコードを変更する場合、それはまだ非常にloのために実行されますプログラムは常にメモリに全体カサンドラテーブルをロードしよう(またはそれを完全にスキャンしてみてください)だけにして、フィルタを適用するよう
6/09/01 21:14:01 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
16/09/01 21:14:01 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 19395 ms on localhost (5/1350)
のようなメッセージでNG時は、だから、そうです。それは私には非常に非効率的です。
sparkがcassandraテーブル全体をRDDにロードしてからフィルタを適用しようとしないように、このコードをより良い方法で書くにはどうすればよいですか?コード
val rdd = sc.cassandraTable("foo", "bar")
val date = DateTime.now().minusDays(30)
rdd.filter(r => r.getDate("date").after(date.toDate)).count // Count Filtered RDD
の
日付列がクラスタリングキーであるかどうかを確認するにはどうすればよいですか?私が発行できるコマンドがありますか? –
私はあなたの提案を試みたが、フィルタの結果をrdd2に割り当ててからそれを数えた。まだ '' 0.0(TID 4)のタスク4.0がローカルホスト(11/1350)の112031ミリ秒で完了したと言っています ' –
クラスタリングキーは、カサンドラのディスク上の情報を整理するコンセプトです。あなたのカサンドラスキーマの中核部分です。コードを投稿していないので、なぜ時間がかかるのか答えられません。それは多くのタスクを実行している必要があります。しかし、プッシュダウンなしで完全なテーブルスキャンよりも遅くなるケースはほとんどありません。 – RussS