2016-03-28 14 views
0

下記のように(スパークで)マップ内でルックアップを使用しようとしています(PySparkを使用)、エラーが発生します。spark:マップ内でルックアップを使用する

これはスパークが許可していないものですか?

>>> rdd1 = sc.parallelize([(1,'a'),(2,'b'),(3,'c'),(4,'d')]).sortByKey() 
>>> rdd2 = sc.parallelize([2,4]) 
>>> rdd = rdd2.map(lambda x: (x, rdd1.lookup(x))) 
>>> rdd.collect() 

これを行う理由は、私が実際に取り組んでいる問題では、rdd1が巨大であるためです。 collectAsMapのような方法で辞書に変換するような解決策は効果的ではありません。

両方RDD1とRDD2が非常に大きいので、それらに参加することも非常に遅いです

おかげ

エラー:

16/03/28 05:02:28 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/03/28 05:02:28 INFO DAGScheduler: Stage 1 (sortByKey at <stdin>:1) finished in 0.148 s 
16/03/28 05:02:28 INFO DAGScheduler: Job 1 finished: sortByKey at <stdin>:1, took 0.189587 s 
>>> rdd2 = sc.parallelize([2,4]) 
>>> rdd = rdd2.map(lambda x: (x, rdd1.lookup(x))) 
>>> rdd.collect() 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py", line 676, in collect 
    bytesInJava = self._jrdd.collect().iterator() 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py", line 2107, in _jrdd 
    pickled_command = ser.dumps(command) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/serializers.py", line 402, in dumps 
    return cloudpickle.dumps(obj, 2) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 816, in dumps 
    cp.dump(obj) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 133, in dump 
    return pickle.Pickler.dump(self, obj) 
    File "/usr/lib64/python2.6/pickle.py", line 224, in dump 
    self.save(obj) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/usr/lib64/python2.6/pickle.py", line 562, in save_tuple 
    save(element) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 254, in save_function 
    self.save_function_tuple(obj, [themodule]) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 304, in save_function_tuple 
    save((code, closure, base_globals)) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/usr/lib64/python2.6/pickle.py", line 548, in save_tuple 
    save(element) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/usr/lib64/python2.6/pickle.py", line 600, in save_list 
    self._batch_appends(iter(obj)) 
    File "/usr/lib64/python2.6/pickle.py", line 636, in _batch_appends 
    save(tmp[0]) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 249, in save_function 
    self.save_function_tuple(obj, modList) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 309, in save_function_tuple 
    save(f_globals) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict 
    pickle.Pickler.save_dict(self, obj) 
    File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict 
    self._batch_setitems(obj.iteritems()) 
    File "/usr/lib64/python2.6/pickle.py", line 686, in _batch_setitems 
    save(v) 
    File "/usr/lib64/python2.6/pickle.py", line 331, in save 
    self.save_reduce(obj=obj, *rv) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 650, in save_reduce 
    save(state) 
    File "/usr/lib64/python2.6/pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict 
    pickle.Pickler.save_dict(self, obj) 
    File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict 
    self._batch_setitems(obj.iteritems()) 
    File "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems 
    save(v) 
    File "/usr/lib64/python2.6/pickle.py", line 306, in save 
    rv = reduce(self.proto) 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 304, in get_return_value 
py4j.protocol.Py4JError: An error occurred while calling o51.__getnewargs__. Trace: 
py4j.Py4JException: Method __getnewargs__([]) does not exist 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) 
    at py4j.Gateway.invoke(Gateway.java:252) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 


>>> 

答えて

1

Is this something that Spark just does not allow doing?

はい、あります。 Sparkはネストされたアクションや変換をサポートしていません。 joinとローカル変数をすでにカバーしているので、残りの唯一のオプションは、ルックアップのために外部システム(データベースなど)を使用することです。

0

としてRDDがRDD.Suchのプロセスで使用することはできません。rdd1rdd2:我々は2つのRDDを持って
rdd1.map(......)
をしかし、あなたはこれを行うことはできません:
あなたはこれを行うことができますが、いくつかの複雑なアクションを何をするときrdd1.map(.....rdd2....)
だから、これらに参加/ユニオンにしてみてください。

関連する問題