2016-08-04 6 views
3

questionからインスピレーションを得て、RKE(Parquetファイルから読み込まれたもの)、スキーマ(photo_id、data)をペアにして、タブで区切って保存するコードをいくつか書きました。ただ、詳細ベース64エンコードそれとして、このような:今そう分散型タブ区切りCSVを読む

def do_pipeline(itr): 
    ... 
    item_id = x.photo_id 

def toTabCSVLine(data): 
    return '\t'.join(str(d) for d in data) 

serialize_vec_b64pkl = lambda x: (x[0], base64.b64encode(cPickle.dumps(x[1]))) 

def format(data): 
    return toTabCSVLine(serialize_vec_b64pkl(data)) 

dataset = sqlContext.read.parquet('mydir') 
lines = dataset.map(format) 
lines.saveAsTextFile('outdir') 

、関心のポイント:そのデータセットを読んで、例えばその非直列化されたデータを印刷する方法は?

私はPython 2.6.6を使用しています。

だけですべてを行うことができることを検証するために、私はこのコードを書いたところ私の試みは、ここにある

テストのためにOKである、 collect()を呼び出しますが、現実世界で
deserialize_vec_b64pkl = lambda x: (x[0], cPickle.loads(base64.b64decode(x[1]))) 

base64_dataset = sc.textFile('outdir') 
collected_base64_dataset = base64_dataset.collect() 
print(deserialize_vec_b64pkl(collected_base64_dataset[0].split('\t'))) 

私が試したzero323さん:シナリオは...


編集を苦労しています提案:

PythonRDD[2] at RDD at PythonRDD.scala:43 
16/08/04 18:32:30 WARN TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4, gsta31695.tan.ygrid.yahoo.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/worker.py", line 98, in main 
    command = pickleSer._read_with_length(infile) 
    File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length 
    return self.loads(obj) 
    File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/serializers.py", line 422, in loads 
    return pickle.loads(obj) 
UnpicklingError: NEWOBJ class argument has NULL tp_new 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

16/08/04 18:32:30 ERROR TaskSetManager: Task 12 in stage 0.0 failed 4 times; aborting job 
16/08/04 18:32:31 WARN TaskSetManager: Lost task 14.3 in stage 0.0 (TID 38, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally) 
16/08/04 18:32:31 WARN TaskSetManager: Lost task 13.3 in stage 0.0 (TID 39, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally) 
16/08/04 18:32:31 WARN TaskSetManager: Lost task 16.3 in stage 0.0 (TID 42, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally) 
--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
/homes/gsamaras/code/read_and_print.py in <module>() 
    17  print(base64_dataset.map(str.split).map(deserialize_vec_b64pkl)) 
    18 
---> 19  foo = (base64_dataset.map(str.split).map(deserialize_vec_b64pkl)).collect() 
    20  print(foo) 

/home/gs/spark/current/python/lib/pyspark.zip/pyspark/rdd.py in collect(self) 
    769   """ 
    770   with SCCallSiteSync(self.context) as css: 
--> 771    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    772   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    773 

/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    811   answer = self.gateway_client.send_command(command) 
    812   return_value = get_return_value(
--> 813    answer, self.gateway_client, self.target_id, self.name) 
    814 
    815   for temp_arg in temp_args: 

/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    306     raise Py4JJavaError(
    307      "An error occurred while calling {0}{1}{2}.\n". 
--> 308      format(target_id, ".", name), value) 
    309    else: 
    310     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
+1

なぜ、 'base64_dataset.map(str.split).map(deserialize_vec_b64pkl)'はありませんか? – zero323

+0

@ zero323私は 'str.split'を使うことができないことを知りませんでした。私はまだこれに新しいので、私と一緒に裸にしてください。誰かが説明すれば、後でやり遂げることができると確信しています。あなたが提案しているものはRDDに帰着するはずです。すべてが機能することを確かめるために、どのように最初の要素を見ることができますか?あなたが言ったことを 'collect() 'しようとしましたが、エラーが発生しました(' Py4JJavaError:z:org.apache.spark.api.python.PythonRDD.collectAndServeを呼び出す際にエラーが発生しました。おそらく、その結果のRDDのデータレイアウトを理解すれば助けになるかもしれません。 – gsamaras

+0

@ zero323私はPython 2を使用しています。それをカバーするには十分でしょう。そこから、必要ならPython 3に行くことができます。 – gsamaras

答えて

2

簡単な例を試してみましょう:

foo = (base64_dataset.map(str.split).map(deserialize_vec_b64pkl)).collect() 

私はこれに沸くこのエラーを、得ました。便宜上、私は便利なtoolzライブラリを使用しますが、ここでは実際には必要ありません。

import sys 
import base64 

if sys.version_info < (3,): 
    import cPickle as pickle 
else: 
    import pickle 


from toolz.functoolz import compose 

rdd = sc.parallelize([(1, {"foo": "bar"}), (2, {"bar": "foo"})]) 

今、あなたのコードは現在、正確に移植可能ではありません。 Python 2ではbase64.b64encodestrを返し、Python 3ではbytesを返します。だから、パイプラインにデコードを追加することができます

  • のPython 2

    type(base64.b64encode(pickle.dumps({"foo": "bar"}))) 
    ## str 
    
  • にはPython 3に

    type(base64.b64encode(pickle.dumps({"foo": "bar"}))) 
    ## bytes 
    

を::

ことを示すことができます
# Equivalent to 
# def pickle_and_b64(x): 
#  return base64.b64encode(pickle.dumps(x)).decode("ascii") 

pickle_and_b64 = compose(
    lambda x: x.decode("ascii"), 
    base64.b64encode, 
    pickle.dumps 
) 

これはデータの特定の形状を想定していないことに注意してください。そのため、我々は唯一のキーをシリアル化するためにmapValuesを使用します:

serialized = rdd.mapValues(pickle_and_b64) 
serialized.first() 
## 1, u'KGRwMApTJ2ZvbycKcDEKUydiYXInCnAyCnMu') 

今、私たちは、単純な形式でそれを追跡し、保存することができます:

from tempfile import mkdtemp 
import os 

outdir = os.path.join(mkdtemp(), "foo") 

serialized.map(lambda x: "{0}\t{1}".format(*x)).saveAsTextFile(outdir) 

我々はプロセスを逆ファイル読み取るには:

# Equivalent to 
# def b64_and_unpickle(x): 
#  return pickle.loads(base64.b64decode(x)) 

b64_and_unpickle = compose(
    pickle.loads, 
    base64.b64decode 
) 

decoded = (sc.textFile(outdir) 
    .map(lambda x: x.split("\t")) # In Python 3 we could simply use str.split 
    .mapValues(b64_and_unpickle)) 

decoded.first() 
## (u'1', {'foo': 'bar'}) 
+0

また、Python 2.xの場合、a) 'str.split'が動作しないことがあります。代わりに完全な関数を使用してください。b)エラーメッセージを出力するときに、 'pickle'を少し冗長にテストするためにb)を使います。 – zero323

+0

2.6?!!しばらくの間これを見たことがありません:)私はそれをテストするために使用することができますenthronementがありません。最新のリリースでSparkが2.6のサポートを落としたことは言うまでもなく、ブランチは数年前にその寿命末期に達しました。 toolzに関して - 利便性以外の特別な理由はありません。私は甘やかされ、ネスティング関数呼び出しが面倒であることがわかります。私は完全な機能を追加しました。 – zero323

+1

ああ、私は機能を書いていたはずです、私の愚かなことは申し訳ありません!すべて良い今、私は私のコードをデバッグします、ありがとう! – gsamaras

関連する問題