私は数千万の行のデータを持っています。スパークストリーミングを使用して、1週間または1日以内にこれらのすべてを分析することは可能ですか?データ量の点でストリーミングを開始するには限界がありますか?私はストリームがおそらくそれ以上処理できないので、上限と何かを私のデータベースに入れるべきか分からない。私はまた、データを分離するためにウィンドウ操作を使用する異なる時間ウィンドウ1,3,6時間などを持っています。一度データ量に関してストリーミングを開始するにはどのような制限がありますか?
conf = SparkConf().setAppName(appname)
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,300)
sqlContext = SQLContext(sc)
channels = sc.cassandraTable("abc","channels")
topic = 'abc.crawled_articles'
kafkaParams = {"metadata.broker.list": "0.0.0.0:9092"}
category = 'abc.crawled_article'
category_stream = KafkaUtils.createDirectStream(ssc, [category], kafkaParams)
category_join_stream = category_stream.map(lambda x:read_json(x[1])).filter(lambda x:x!=0).map(lambda x:categoryTransform(x)).filter(lambda x:x!=0).map(lambda x:(x['id'],x))
article_stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x))
#axes topic integration the article and the axes
axes_topic = 'abc.crawled_axes'
axes_stream = KafkaUtils.createDirectStream(ssc, [axes_topic], kafkaParams)
axes_join_stream = axes_stream.filter(lambda x:'delete' not in str(x)).map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:axesTransformData(x)).filter(lambda x: x!=0).map(lambda x:(str(x['id']),x)).map(lambda x:(x[0],{'id':x[0], 'attitudes':x[1]['likes'],'reposts':0,'comments':x[1]['comments'],'speed':x[1]['comments']}))
#axes_join_stream.reduceByKeyAndWindow(lambda x, y: x + y, 30, 10).transform(axestrans).pprint()
#join
statistics = article_join_stream.window(1*60*60,5*60).cogroup(category_join_stream.window(1*60*60,60)).cogroup((axes_join_stream.window(24*60*60,5*60)))
statistics.transform(joinstream).pprint()
ssc.start() # Start the computation ssc.awaitTermination()
ssc.awaitTermination()
ここには複数の質問がありますが、明確に区別して答えるのに役立ちます。また、問題を説明するのに十分な最小サンプルに含まれるコードを最小限に抑えると便利です – etov