2013-10-05 10 views
7

私は単純なMapReduceフローを書き、Google Cloud StorageのファイルからCSVの行を読み込み、その後エンティティを作成しました。しかし、私はそれを複数のシャードで実行することはできませんAppEngineマップの縮尺を縮小するにはどうすればよいですか?

コードはmapreduce.control.start_mapを使用しており、次のようになります。

class LoadEntitiesPipeline(webapp2.RequestHandler): 
     id = control.start_map(map_name, 
          handler_spec="backend.line_processor", 
          reader_spec="mapreduce.input_readers.FileInputReader", 
          queue_name=get_queue_name("q-1"), 
          shard_count=shard_count, 
          mapper_parameters={ 
           'shard_count': shard_count, 
           'batch_size': 50, 
           'processing_rate': 1000000, 
           'files': [gsfile], 
           'format': 'lines'}) 

実際にどのメソッドが実際に必要なのかわからないため、私は両方の場所でshard_countを持っています。 shard_countを8から32のいずれかに設定すると、ステータスページで常に1/1シャードの実行が示されるため、何も変更されません。分けるために、私はすべてのインスタンスを多数持つバックエンドのキューですべてを実行させました。キューパラメータper this wikiを調整しようとしました。結局、それはちょうど連続的に実行されているようです。

アイデア?ありがとう!

アップデート(まだ成功):それはまだ、この新しいコードで

class ImportHandler(webapp2.RequestHandler): 

    def get(self, gsfile): 
     pipeline = LoadEntitiesPipeline2(gsfile) 
     pipeline.start(queue_name=get_queue_name("q-1")) 

     self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id) 


class LoadEntitiesPipeline2(base_handler.PipelineBase): 

    def run(self, gsfile): 
     yield mapreduce_pipeline.MapperPipeline(
      'loadentities2_' + gsfile, 
      'backend.line_processor', 
      'mapreduce.input_readers.FileInputReader', 
      params={'files': [gsfile], 'format': 'lines'}, 
      shards=32 
     ) 

:、私はそうのようなパイプラインに直接呼び出しを使用して電話を作ってみました物事を分離しようとする際に

1つのシャード上でのみ実行されます。 mapreduce.input_readers.FileInputReaderが行ごとに並列化できるかどうか疑問に思っています。それはの迅速な読み取りに基づいてシャーディングすることが可能であるべきでFileInputReaderのように私には見えます

答えて

0

https://code.google.com/p/appengine-mapreduce/source/browse/trunk/python/src/mapreduce/input_readers.py

それは「形式」のようになります:「行」を使用して分割する必要があります。self.get_current_fileを()のreadline ()

連続して動作しているときに正しく解釈しているようですか?多分、改行は間違ったエンコーディングなのかもしれません。

+0

ええ、私の改行コードでは何も変わっていませんし、各行を細かく処理することができます。実際には、本当に大きなファイルを小さなファイルに分割すると(それぞれ5000行)。私はshardへのmapreduceコールを得ることができますが、細かい粒度ではなくファイル名でシャーディングされているように見えます。 –

0

経験から、FileInputReaderはファイルごとに最大1つのシャードを行います。 解決方法:大きなファイルを分割します。私はhttps://github.com/johnwlockwood/karl_dataでsplit_fileを使用してファイルをクラウドストレージにアップロードする前に分割します。 大きなファイルが既にそこにある場合、Compute Engineインスタンスを使用してそれらをプルダウンし、シャーディングを実行できます。これは、転送速度が最も速くなるためです。 FYI:karldはcheeseshopにありますので、pip install karld

5

のようになります。FileInputReaderはファイルを介してのみシャードできます。 formatパラメータは、マッパー関数の呼び出し方法を変更するだけです。マッパーに複数のファイルを渡すと、複数のシャード上で実行されます。それ以外の場合は、データを処理するためにシャードを1つだけ使用します。

EDIT#1:より深いMapReduceのライブラリで発掘した後

。 MapReduceは、定義した各ファイルタイプに対して、can_splitメソッドリターンに基づいてファイルを分割するかどうかを決定します。現在、splitメソッドを実装する唯一のフォーマットはZipFormatです。そのため、ファイル形式がzipでない場合、ファイルを分割して複数のシャード上で実行することはありません。

@classmethod 
    def can_split(cls): 
    """Indicates whether this format support splitting within a file boundary. 

    Returns: 
     True if a FileFormat allows its inputs to be splitted into 
    different shards. 
    """ 

https://code.google.com/p/appengine-mapreduce/source/browse/trunk/python/src/mapreduce/file_formats.py

しかし、あなた自身のファイル形式の分割方法を記述することが可能であるように見えます。ハックして_TextFormatsplitメソッドを追加して、複数のシャードが実行中であるかどうか確認してください。

@classmethod 
def split(cls, desired_size, start_index, opened_file, cache): 
    pass 

EDIT#2:

簡単な回避策は、シリアルFileInputReader実行を残したが、reduceのステージを並列に時間がcosumingタスクを移動することになります。

def line_processor(line): 
    # serial 
    yield (random.randrange(1000), line) 

def reducer(key, values): 
    # parallel 
    entities = [] 
    for v in values: 
     entities.append(CREATE_ENTITY_FROM_VALUE(v)) 
    db.put(entities) 

EDIT#3:

ここでは、FileFormatを変更しようとした場合(まだテストされていない)の例である

from file_formats import _TextFormat, FORMATS 


class _LinesSplitFormat(_TextFormat): 
    """Read file line by line.""" 

    NAME = 'split_lines' 

    def get_next(self): 
    """Inherited.""" 
    index = self.get_index() 
    cache = self.get_cache() 
    offset = sum(cache['infolist'][:index]) 

    self.get_current_file.seek(offset) 
    result = self.get_current_file().readline() 
    if not result: 
     raise EOFError() 
    if 'encoding' in self._kwargs: 
     result = result.encode(self._kwargs['encoding']) 
    return result 

    @classmethod 
    def can_split(cls): 
    """Inherited.""" 
    return True 

    @classmethod 
    def split(cls, desired_size, start_index, opened_file, cache): 
    """Inherited.""" 
    if 'infolist' in cache: 
     infolist = cache['infolist'] 
    else: 
     infolist = [] 
     for i in opened_file: 
     infolist.append(len(i)) 
     cache['infolist'] = infolist 

    index = start_index 
    while desired_size > 0 and index < len(infolist): 
     desired_size -= infolist[index] 
     index += 1 
    return desired_size, index 


FORMATS['split_lines'] = _LinesSplitFormat 

次に、新しいファイル形式を経由して呼び出すことができますmapper_parametersをlinesからsplit_lineに変更します。

class LoadEntitiesPipeline(webapp2.RequestHandler): 
    id = control.start_map(map_name, 
         handler_spec="backend.line_processor", 
         reader_spec="mapreduce.input_readers.FileInputReader", 
         queue_name=get_queue_name("q-1"), 
         shard_count=shard_count, 
         mapper_parameters={ 
          'shard_count': shard_count, 
          'batch_size': 50, 
          'processing_rate': 1000000, 
          'files': [gsfile], 
          'format': 'split_lines'}) 
+0

詳細な回答と推奨される回避策をお寄せいただきありがとうございます。 – Alice

関連する問題