2017-02-13 4 views
0

これは、一日と言うために通ってくるしているどのように多くのエラーメッセージ/警告メッセージの数を保つためにPyspark - スパークセッションのうち転送制御(SC)

Pyspark filter operation on Dstream

のフォローアップの質問です、時間 - どのように仕事をデザインするのか。

私が試してみました:

from __future__ import print_function 

import sys 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 


    def counts(): 
      counter += 1 
      print(counter.value) 

    if __name__ == "__main__": 

      if len(sys.argv) != 3: 
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr) 
      exit(-1) 


      sc = SparkContext(appName="PythonStreamingNetworkWordCount") 
      ssc = StreamingContext(sc, 5) 
      counter = sc.accumulator(0) 

      lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) 
      errors = lines.filter(lambda l: "error" in l.lower()) 
      errors.foreachRDD(lambda e : e.foreach(counts)) 
      errors.pprint() 

      ssc.start() 
      ssc.awaitTermination() 

これはしかし、印刷で開始する、複数の問題があります動作しません(出力はstdoutにない、私はそれについて読んだことが、私が使用することができる最高はここにありますロギング)。その関数の出力をテキストファイルに保存し、代わりにそのファイルをテールできますか?

私はプログラムがちょうど出てくる理由は、エラーがないことを確認していない/

どのように1は状態を保持ん(火花1.6.2)に、さらに見てどこかダンプ?

foreachRDD(Dstream): 
    if RDD.contains("keyword1 | keyword2 | keyword3"): 
    dictionary[keyword] = dictionary.get(keyword,0) + 1 //add the keyword if not present and increase the counter 
    print dictionary //or send this dictionary to else where 

:私は何をしようとしていますが、サーバーおよび重症度によりログを集約することで、別のユースケースは、特定のキーワードのために私が試してみたいもののため

擬似コードを見ることによって処理されたどのように多くのトランザクション数えることです送信または印刷辞書の最後の部分は、スパークストリーミングコンテキストの切り替えを必要とする - 誰かがその概念を説明できますか?

答えて

0

プリントは、私がスパークドキュメントのdesign patterns sectionを読んで推薦

を動作しません。もしあれば、それはprint文は労働者ではなく、ドライバ上で実行されることは注目に値するが、これは(あなたのコールprint作業を取得します

def _process(iter): 
    for item in iter: 
     print item 

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) 
errors = lines.filter(lambda l: "error" in l.lower()) 
errors.foreachRDD(lambda e : e.foreachPartition(_process)) 

ので:私は大体何をしたいことは、このようなものであることを考えますクラスタ上でこのコードを実行すると、ワーカーログにのみ表示されます)。

しかし、それはあなたの第二の問題を解決することはできません。

どのように1は、状態を保存しますか?

updateStateByKeyrelated exampleをご覧ください。

+0

何かの理由で、何らかの理由でプログラムのどこかで "print"を使うとエラーが出なくなります。 – GreenThumb

関連する問題