-2

spark-CassandraとScalaの新機能です。私は既存のRDDを持っています。言わせてください:ScalaスパークフィルターRDD using Cassandra

((url_hash、url、created_timestamp))。

url_hashに基づいてこのRDDをフィルタリングしたいと考えています。 Cassandraテーブルにurl_hashが存在する場合は、RDDからフィルタリングして、新しいURLだけを処理することができます。

カサンドラ表は次のようになります。

url_hash| url | created_timestamp | updated_timestamp 

任意のポインタは素晴らしいものです。

case class UrlInfoT(url_sha256: String, full_url: String, created_ts: Date) 
    def timestamp = new java.utils.Date() 
    val rdd1 = rdd.map(row => (calcSHA256(row(1)), (row(1), timestamp))) 
    val rdd2 = sc.cassandraTable[UrlInfoT]("keyspace", "url_info").select("url_sha256", "full_url", "created_ts") 
    val rdd3 = rdd2.map(row => (row.url_sha256,(row.full_url, row.created_ts))) 
    newUrlsRDD = rdd1.subtractByKey(rdd3) 

私は取得していますカサンドラ・エラー

java.lang.NullPointerException: Unexpected null value of column full_url in  keyspace.url_info.If you want to receive null values from Cassandra, please wrap the column type into Option or use JavaBeanColumnMapper 

NULL値がカサンドラテーブル

+1

を楽しみにしていますかCassandraのテーブルを別のRDD、 'map'に変換して、' url_hash'をキーとして使用し、 'subtractByKey'を使用しますか? –

+0

ポインタありがとう。私は何を試して質問を更新しました。今私はNULLポインタの例外を取得しています – Abhishek

答えて

1

おかげで原型的なポールではありません。

は、私はこのようなものを試してみました!

私は誰かがこれが有用であることを願っています。 CaseクラスにOptionを追加しなければならなかった。

あなたは何を試してみました?より良いソリューション

case class UrlInfoT(url_sha256: String, full_url: Option[String], created_ts: Option[Date]) 

def timestamp = new java.utils.Date() 
val rdd1 = rdd.map(row => (calcSHA256(row(1)), (row(1), timestamp))) 
val rdd2 = sc.cassandraTable[UrlInfoT]("keyspace", "url_info").select("url_sha256", "full_url", "created_ts") 
val rdd3 = rdd2.map(row => (row.url_sha256,(row.full_url, row.created_ts))) 
newUrlsRDD = rdd1.subtractByKey(rdd3) 
関連する問題