2016-04-06 16 views
1

mapreduceに入れる前に特定のノードを抽出するためにXMLファイルを前処理しようとしています。私は、次のコードを持っている:Python mrjob mapreduce入力ファイルを前処理する方法

from mrjob.compat import jobconf_from_env 
from mrjob.job import MRJob 
from mrjob.util import cmd_line, bash_wrap 

class MRCountLinesByFile(MRJob): 
    def configure_options(self): 
     super(MRCountLinesByFile, self).configure_options() 
     self.add_file_option('--filter') 

    def mapper_cmd(self): 
     cmd = cmd_line([self.options.filter, jobconf_from_env('mapreduce.map.input.file']) 
     return cmd 



if __name__ == '__main__': 
    MRCountLinesByFile.run() 

を、コマンドラインで、私が入力します。

python3 test_job_conf.py --filter ./filter.py -r local < test.txt 

test.txthereのような通常のXMLファイルです。 filter.pyはすべてのタイトル情報を検索するスクリプトです。

しかし、私は次のエラーを取得しています:

Creating temp directory /tmp/test_job_conf.vagrant.20160406.042648.689625 
Running step 1 of 1... 
Traceback (most recent call last): 
    File "./filter.py", line 8, in <module> 
    with open(filename) as f: 
FileNotFoundError: [Errno 2] No such file or directory: 'None' 
Step 1 of 1 failed: Command '['./filter.py', 'None']' returned non-zero exit status 1 

mapreduce.map.input.file、この場合でNoneをレンダリングするように見えます。 mrjobが現在読んでいるファイルを読むために、mapper_cmd関数に関数を要求するにはどうすればよいですか?

答えて

0

あなたのself.add_file_optionにはあなたのファイルへのパスが必要です。

self.add_file_option('--items', help='Path to u.item') 

私はあなたのシナリオを正しく得ることはできませんが、ここで私の理解です。 configureオプションを使用して、ソース以外の別のファイルのデータを補助ルックアップする場合など、特定のファイルがすべてのマッパーに送信されて処理されるようにします。この補助的な検索ファイルは、self.add_file_option( ' - items'、help = 'Path to u.item')によって利用可能になります。

減速機またはマッパーフェーズ前に何かを前処理するには、reducer_initまたはmapper_initを使用します。これらのinitまたは処理ステップは、たとえば以下のようなステップ関数にも記述する必要があります。

def steps(self): 
     return [ 
      MRStep(mapper=self.mapper_get_name, 
        reducer_init=self.reducer_init, 
        reducer=self.reducer_count_name), 
      MRStep(reducer = self.reducer_find_maxname) 
     ] 

init関数内では、マッパーやレデューサーに送信する前に実際に必要な処理を行います。たとえば、ファイルxyzを開き、私のレデューサーで使用している別のフィールドの最初のフィールドに値をコピーして出力します。

def reducer_init(self): 
     self.movieNames = {}  
     with open("xyz") as f: 
      for line in f: 
       fields = line.split('|') 
       self.myNames[fields[0]] = fields[1] 

関連する問題