2016-02-29 14 views
5

私は数千万の行のデータを持っています。スパークストリーミングを使用して、1週間または1日以内にこれらのすべてを分析することは可能ですか?データ量の点でストリーミングを開始するには限界がありますか?私はストリームがおそらくそれ以上処理できないので、上限と何かを私のデータベースに入れるべきか分からない。私はまた、データを分離するためにウィンドウ操作を使用する異なる時間ウィンドウ1,3,6時間などを持っています。一度データ量に関してストリーミングを開始するにはどのような制限がありますか?

conf = SparkConf().setAppName(appname) 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc,300) 
sqlContext = SQLContext(sc) 
channels = sc.cassandraTable("abc","channels") 
topic = 'abc.crawled_articles' 
kafkaParams = {"metadata.broker.list": "0.0.0.0:9092"} 

category = 'abc.crawled_article' 
category_stream = KafkaUtils.createDirectStream(ssc, [category], kafkaParams) 
category_join_stream = category_stream.map(lambda x:read_json(x[1])).filter(lambda x:x!=0).map(lambda x:categoryTransform(x)).filter(lambda x:x!=0).map(lambda x:(x['id'],x)) 


article_stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams) 
article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x)) 

#axes topic integration the article and the axes 
axes_topic = 'abc.crawled_axes' 
axes_stream = KafkaUtils.createDirectStream(ssc, [axes_topic], kafkaParams) 
axes_join_stream = axes_stream.filter(lambda x:'delete' not in str(x)).map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:axesTransformData(x)).filter(lambda x: x!=0).map(lambda x:(str(x['id']),x)).map(lambda x:(x[0],{'id':x[0], 'attitudes':x[1]['likes'],'reposts':0,'comments':x[1]['comments'],'speed':x[1]['comments']})) 
#axes_join_stream.reduceByKeyAndWindow(lambda x, y: x + y, 30, 10).transform(axestrans).pprint() 

#join 
statistics = article_join_stream.window(1*60*60,5*60).cogroup(category_join_stream.window(1*60*60,60)).cogroup((axes_join_stream.window(24*60*60,5*60))) 
statistics.transform(joinstream).pprint() 

ssc.start() # Start the computation ssc.awaitTermination() 
ssc.awaitTermination() 
+1

ここには複数の質問がありますが、明確に区別して答えるのに役立ちます。また、問題を説明するのに十分な最小サンプルに含まれるコードを最小限に抑えると便利です – etov

答えて

1

ワン:

以下の私のコードを見つけてください

  • それは[一定時間]内、[行のいくつかの大規模な数]を分析することは可能ですか?

一般的に、はい - スパークは、原則的に、あなたが大規模なクラスタを起動することができるはずと私たちは時間を話したりしていると仮定すると(比較的短時間で大量のデータをクランチので、あなたは、多くのマシン間でスケールアウトすることができますオーバーヘッドのために問題になる可能性があります)。

具体的には、数千万件のレコードで質問に示されているような処理を行うことは、合理的な時間(つまり、非常に大きなクラスターを使用しない)で実現可能です。

  • データ量に関してスパークストリーミングの制限はありますか?

私は分かりませんが、あなたはそれに慣れていません。非常に大規模な展開の例があります。 ebay(「毎日平均30TBを超える何百もの測定基準」)。また、FAQを参照してください。これには、8000台のマシンのクラスタとデータのPBの処理が含まれています。

  • 結果はいつ[何らかの種類のストレージ]に書き込まれるべきですか?

スパークストリーミングのbasic modelによれば、データはマイクロバッチで処理されます。データが実際にストリームである(すなわち、明確なエンディングを有さない)場合、最も単純なアプローチは、各RDDの処理結果(すなわち、マイクロバッチ)を格納することであろう。

データがストリームではない場合(例:いくつかの静的ファイルを時々処理しているので、ストリーム部分を放棄することを検討してください(たとえば、バークプロセッサとしてSparkだけを使用するなど)。

質問には数時間のウィンドウサイズが記載されているので、バッチオプションを検討することをお勧めします。

  • 異なるデータウィンドウで同じデータをどのように処理できますか?

あなたはスパーク・ストリーミングを使用している場合、あなたは(例えばmapWithStateを使用して)複数の状態を維持することができ - 各時間ウィンドウのための1つを。

もう1つのアイデア(コードは簡単で、操作に関しては複雑です) - それぞれが独自のウィンドウを持つ複数のクラスタを同じストリームから読み込むことができます。

バッチ処理の場合、同じ操作を複数の異なる時間枠で複数回実行することができます。複数のウィンドウサイズを持つreduceByWindow

関連する問題