2016-07-01 1 views
1

奇妙なことに私はストリーミング中の一連の辞書のk、vペアの数を確認するためにチェックしたいが、私はできないようだこの。例えばストリーミングされた辞書でスパークストリーミングと操作を実行

lines = ssc.socketTextStream("127.0.0.1", 5006) 
json_format = lines.flatMap(lambda recieved: json.loads(recieved)) 
dict_format = json_format.flatMap(lambda x : len(x)).reduce(lambda a, b: a+b) 

私は次のエラーを取得する:

File "/home/xx/spark-1.6.1/python/pyspark/rdd.py", line 1776, in combineLocally 
    merger.mergeValues(iterator) 
    File "/home/xx/spark-1.6.1/python/pyspark/shuffle.py", line 236, in mergeValues 
    for k, v in iterator: 
TypeError: 'int' object is not iterable 

私たちは辞書のシリーズを持っていると仮定することができます - そこに障害がjson.loads()ではありませんが、私はこの単純な長さを取るように見えることはできません。

答えて

0

Sparkは、flatMapに提供された関数が、ソースRDD/DStreamから処理する各要素のトラバーサル/反復可能な結果(リストなど)を返すことを期待しています。 TypeError: 'int' object is not iterableエラーは、おそらく、SparkがflatMapに渡すラムダの1つから返された反復可能でない値を反復しようとしているために発生します。 len(...)は常にintを返しますので、

flatMapコール(json_format.flatMap)は、確かに問題であり、それはほとんどの犯人はここにあります。 int(つまり、長さ)に1対1の変換を行うことが目的であるように見えるので、代わりにflatMapmapに置き換えることでその問題を修正できるはずです。

最初にflatMapが有効かどうかは入力によって異なります。ソースファイルの各行がJSON配列に解析される文字列であることが確実な場合は、予期したとおりに動作するはずです。ただし、ファイル内の任意の行のJSON構文解析で配列以外のが生成された場合、構文解析関数は反復不可能な結果を​​flatMapに送信し、表示されているものと似たエラーでジョブが失敗します現在:

>>> type(json.loads('{"asdf": "qwerty"}')) 
<class 'dict'> 
>>> type(json.loads('[{"asdf": "qwerty"}, [1,2,3]]')) 
<class 'list'> 
>>> type(json.loads('3')) 
<class 'int'> 
関連する問題