2016-08-22 33 views
4

現在、Apache Beamを使ってPythonでgzipファイルを読むことはできますか?私たちは、ファイルがあるように見える圧縮Pythonのビームのソースコードに気づいpythonでgzipファイルを開くApache Beam

UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte 

: 私のパイプラインのコード行でGCSからのgzipファイルを引っ張っている。

beam.io.Read(beam.io.TextFileSource('gs://bucket/file.gz', compression_type='GZIP')) 

しかし、私はこのエラーを取得していますシンクに書き込むときに処理されます。 https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py#L445

より詳細なトレースバック:

Traceback (most recent call last): 
    File "beam-playground.py", line 11, in <module> 
    p.run() 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run 
    return self.runner.run(self) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 103, in run 
    super(DirectPipelineRunner, self).run(pipeline) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run 
    pipeline.visit(RunVisitor(self)) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit 
    self._root_transform().visit(visitor, self, visited) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit 
    part.visit(visitor, pipeline, visited) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit 
    visitor.visit_transform(self) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform 
    self.runner.run_transform(transform_node) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform 
    return m(transform_node) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 99, in func_wrapper 
    func(self, pvalue, *args, **kwargs) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 258, in run_Read 
    read_values(reader) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 245, in read_values 
    read_result = [GlobalWindows.windowed_value(e) for e in reader] 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/io/fileio.py", line 807, in __iter__ 
    yield self.source.coder.decode(line) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/coders/coders.py", line 187, in decode 
    return value.decode('utf-8') 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode 
    return codecs.utf_8_decode(input, errors, True) 
UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte 

答えて

2

UPDATE:TextIOのPython SDKでは現在、圧縮ファイルからの読み込みをサポートしています。

今日のPython SDKのTextIOは、実際には圧縮ファイルの読み込みをサポートしていません。

+0

クイックレスポンスありがとう!私はその記事を見て回ります。質問にトレースバック全体を追加しました。 – agsolid

+0

データはUTF-8で圧縮されています。そして、元のテキストをバイトからユニコードにデコードして確認しました。エラーはありませんでした。私が間違っている場合は私を修正しますが、ビームソースコードを見ると、TextFileSourceは圧縮ファイルを処理しないようです。 – agsolid

+1

私はもっと深く見てきました - 私はサポートの深さについて間違っていました。このクラスは実際のサポートに備えてcompression_typeパラメータを受け取ります。 –

3

同様の問題が発生しました。私は解析し、データを取り出そうとしていたカスタムバイナリソースを持っています。問題は、file.io APIがCSVまたはARVOを基にしていることであり、私が試したことに関係なく、私に行を分けようとせずに行を渡すことはありませんでした。あなたが想像することができるように、バイナリファイルはこれをうまく処理しません。

最初はカスタムソースを試してみましたが、実装するには3つのクラスがあり、コアのDataflow/Beamコードを複製していました。最後に、私が必要としていたもの(ここでの深層ソースコードのテスト)を得るために、この素​​敵なモンキーパッチングのコードを書きました。

import apache_beam as beam 
from apache_beam.io.fileio import coders 

def _TextFileReader__iter(self): 
    # The full data file is had here and can be read like normal 
    # You can even limit the character bit here. (I did 9 to grab the file format) 
    data = self._file.read() 
    # Now you can either yield the whole file as a single data entry 
    # and run a ParDo to split it, or you can iterate in here and 
    # yield each row. I chose the latter, but I'm showing an example 
    # of the former. 
    yield data 

# This monkeypatch good! 
beam.io.fileio.TextFileReader.__iter__ = _TextFileReader__iter 

このソースを呼び出し、それがBINARYだ、私は次のようでしたことを確認する:

pipeline | 'start_3' >> beam.io.Read(
    beam.io.TextFileSource('gs://MY_BUCKET/sample.bin', 
     coder=coders.BytesCoder() 
    ) 
) 

お知らせcoders.BytesCoders()?それがなければ、バイトをバイナリ以外のものに変換しようとしましたが、これは私の解析エンジンには良くないものでした。 ;)

私はこのことを理解するのに良い一日のチャンクを取った。ただし、このメソッドを使用する場合は、Dataflowのfile.ioクラスを使用してほとんどすべてを実行できます。 ;)

1

同じ問題が発生しました。私は、GCSからバイナリのGZファイルを読み込み、解凍してから別の場所に送り込んで処理しようとしていました。私は2つのステップでそれを解決しました。

まず、適切なPythonライブラリを使用していることを確認してください。私の元の図書館は古いです(私は少なくともv0.4を使用しています):pip install --upgrade google-cloud-dataflow

import apache_beam as beam 
from apache_beam import (coders, io, transforms) 

raw_logs = (p 
      | io.Read("ReadLogsFromGCS", beam.io.TextFileSource(
         "gs://my-bucket/logs-*.gz", 
         coder=coders.BytesCoder())) 
      | transforms.Map(lambda x: x) 
      | io.Write("WriteToLocalhost", io.textio.WriteToText(
         "/tmp/flattened-logs", 
         file_name_suffix=".json"))) 
p.run() 

あなたは、ファイルがパイプラインを実行した後/tmp/flattened-logs.jsonと呼ばれている必要があります。次のように

第二に、私は私のパイプラインを構築しました。

関連する問題