2015-09-08 8 views
7

スパークストリーミングを使用してCassandraから読み取ると問題が発生します。スパークストリーミングを使用してCassandraから読む

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext

上記のリンクは、私はカサンドラからデータを選択する

val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3) 

を使用しますが、スパークストリーミングが一度ただ一つのクエリを持っているようだが、私はそれを使用してクエリを実行し続けたいと間隔10秒。

私のコードは以下のとおりです。

ありがとうございます!

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import com.datastax.spark.connector.streaming._ 
import org.apache.spark.rdd._ 
import scala.collection.mutable.Queue 


object SimpleApp { 
def main(args: Array[String]){ 
    val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1") 

    val ssc = new StreamingContext(conf, Seconds(10)) 

    val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu") 

    //rdd.collect().foreach(println) 

    val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]() 


    val dstream = ssc.queueStream(rddQueue) 

    dstream.print() 

    ssc.start() 
    rdd.collect().foreach(println) 
    rddQueue += rdd 
    ssc.awaitTermination() 
} 

}

+0

はあなたが達成したいん何を記述することができますか?各区間の全テーブルを読んでください。ストリーミングデータはどこから来ていますか? – maasg

+0

@maasg時間に関連するレコードを照会するために、各間隔(10秒など)にテーブルを読みたいと思います。つまり、私はカサンドラをスパークストリーミングの源にしたいと思っています。つまり、私はDStreamの作成時にブロックされます。いくつかのヒントと例を挙げますか?どうもありがとう! –

答えて

6

あなたは、入力としてCassandraRDDとConstantInputDStreamを作成することができます。 ConstantInputDStreamは各ストリーミング間隔で同じRDDを提供し、そのRDDでアクションを実行することでRDD系列のマテリアライゼーションがトリガーされ、毎回Cassandraでクエリが実行されます。

問合せ時間が長くなり、ストリーミング処理が不安定にならないように、問合せ対象のデータが無制限に増加していないことを確認してください。

このような何かが(出発点として、あなたのコードを使用して)トリックを行う必要があります。

import org.apache.spark.streaming.dstream.ConstantInputDStream 

val ssc = new StreamingContext(conf, Seconds(10)) 

val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu") 

val dstream = new ConstantInputDStream(ssc, cassandraRDD) 

dstream.foreachRDD{ rdd => 
    // any action will trigger the underlying cassandra query, using collect to have a simple output 
    println(rdd.collect.mkString("\n")) 
} 
ssc.start() 
ssc.awaitTermination() 
+3

最後のRDDが処理されてからテーブルに保存された**新しいデータ**のみを読みたい場合はどうすればいいですか?それは可能ですか? –

+2

古いデータが再度フェッチされないようにする方法はありますか?それは無限ループにとどまります。 –

+0

@yurishkuro AFAIKは現在できません。 – maasg

0

私は同じ問題を持っていたとInputDStreamクラスのサブクラスを作成することによって、解決策を見つけました。 start()compute()メソッドを定義する必要があります。

start()は、調製に使用することができる。メインロジックはcompute()にあります。それはOption[RDD[T]]を返すでしょう。 クラスをフレキシブルにするには、InputStreamQueryの特性が定義されています。 Cassandraのテーブルkeyspace.test について

trait InputStreamQuery[T] { 
    // where clause condition for partition key 
    def partitionCond : (String, Any) 
    // function to return next partition key 
    def nextValue(v:Any) : Option[Any] 
    // where clause condition for clustering key 
    def whereCond : (String, (T) => Any) 
    // batch size 
    def batchSize : Int 
} 

date鍵分割してテーブルを再編成test_by_dateを作成します。 testテーブルの

CREATE TABLE IF NOT exists keyspace.test 
(id timeuuid, date text, value text, primary key (id)) 

CREATE MATERIALIZED VIEW IF NOT exists keyspace.test_by_date AS 
SELECT * 
FROM keyspace.test 
WHERE id IS NOT NULL 
PRIMARY KEY (date, id) 
WITH CLUSTERING ORDER BY (id ASC); 

一つの可能​​な実装は、次のようにCassandraInputStreamクラスで使用することができる

class class Test(id:UUID, date:String, value:String) 

trait InputStreamQueryTest extends InputStreamQuery[Test] { 
    val dateFormat = "uuuu-MM-dd" 

    // set batch size as 10 records 
    override def batchSize: Int = 10 

    // partitioning key conditions, query string and initial value 
    override def partitionCond: (String, Any) = ("date = ?", "2017-10-01") 
    // clustering key condition, query string and function to get clustering key from the instance 
    override def whereCond: (String, Test => Any) = (" id > ?", m => m.id) 
    // return next value of clustering key. ex) '2017-10-02' for input value '2017-10-01' 
    override def nextValue(v: Any): Option[Any] = { 

    import java.time.format.DateTimeFormatter 

    val formatter = DateTimeFormatter.ofPattern(dateFormat) 
    val nextDate = LocalDate.parse(v.asInstanceOf[String], formatter).plusDays(1) 
    if (nextDate.isAfter(LocalDate.now())) None 
    else Some(nextDate.format(formatter)) 
    } 
} 

なければなりません。すべてのクラスを組み合わせる

class CassandraInputStream[T: ClassTag] 
(_ssc: StreamingContext, keyspace:String, table:String) 
(implicit rrf: RowReaderFactory[T], ev: ValidRDDType[T]) 
extends InputDStream[T](_ssc) with InputStreamQuery[T] { 

var lastElm:Option[T] = None 
var partitionKey : Any = _ 

override def start(): Unit = { 

    // find a partition key which stores some records 
    def findStartValue(cql : String, value:Any): Any = { 
    val rdd = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, value).limit(1) 

    if (rdd.cassandraCount() > 0) value 
    else { 
     nextValue(value).map(findStartValue(cql, _)).getOrElse(value) 
    } 
    } 
    // get query string and initial value from partitionCond method 
    val (cql, value) = partitionCond 
    partitionKey = findStartValue(cql, value) 
} 

override def stop(): Unit = {} 

override def compute(validTime: Time): Option[RDD[T]] = { 
    val (cql, _) = partitionCond 
    val (wh, whKey) = whereCond 

    def fetchNext(patKey: Any) : Option[CassandraTableScanRDD[T]] = { 
    // query with partitioning condition 
    val query = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, patKey) 

    val rdd = lastElm.map{ x => 
     query.where(wh, whKey(x)).withAscOrder.limit(batchSize) 
    }.getOrElse(query.withAscOrder.limit(batchSize)) 

    if (rdd.cassandraCount() > 0) { 
     // store the last element of this RDD 
     lastElm = Some(rdd.collect.last) 
     Some(rdd) 
    } 
    else { 
     // find the next partition key which stores data 
     nextValue(patKey).flatMap{ k => 
     partitionKey = k 
     fetchNext(k)} 
    } 
    } 

    fetchNext(partitionKey) 
} 
} 

val conf = new SparkConf().setAppName(appName).setMaster(master) 
val ssc = new StreamingContext(conf, Seconds(10)) 

val dstream = new CassandraInputStream[Test](ssc, "keyspace", "test_by_date") with InputStreamQueryTest 

dstream.map(println).saveToCassandra(...) 

ssc.start() 
ssc.awaitTermination() 
関連する問題