2016-10-26 5 views
1

記事の大きなXMLファイルを繰り返し処理し、各記事を新しいファイルに書き込むPythonスクリプトを作成しています。私はまた、ファイル内で各記事が始まる場所のインデックスを保持したいと思います。 (何時間もかかる)ファイルを処理している間にスクリプトが失敗した場合は、file.seek()のようなものを使用して中断した箇所を取り上げることができます。行の先頭の実際のファイル位置を調べるPython

これは私の現在のコードです:

with open(inputPath, 'rb') as inputfile: # Iterate over the dump 
    for line in inputfile: 
    # do stuff... 

私はfile.tell() inconsistencyはここに適用され、私に正しいファイル位置を与えないだろうと信じています。私はまた、終わりの代わりに行の始まりの位置を決定したいと思います。私はファイルを反復処理するよう<page>タグが付けられた行の先頭の位置を取得したいと思い

<pages> 
    <page> 
    This is an article. Article-ly stuff goes here. 
    </page> 
    <page> 
    ... x5,000,000 
    </page> 
</pages> 

XMLファイルは次のようになります。

+0

あなたは、XMLファイルの一部を追加してもらえますか?ありがとう! –

+0

私は使用している形式のサンプルXMLで質問を更新しました。ありがとう! – miller9904

+0

各行を読むときにカウンタを更新するだけではどうですか? –

答えて

0

自分でバイトポインタを制御できます。この例では、dictionaryをバイトポインタとともに使用し、shelfに格納します。その後、xmlからファイルhtmlをエクスポートします。

このスクリプトは、エクスポートされた1000ページごとのステータスをデフォルトで出力します。このポストの最後にあるように、プロジェクトのルートにデフォルトでグラフイメージ( 'xml-split-performance.jpg'が生成されます)が表示されるので、プロセスが正常に動作しているかどうかを確認できます。

私はデフォルトで8人の作業者を使用しています。私は8ページをメモリに保存し、8人の作業者が書き込む8ページを配布します。それが最善の方法だとは思っていませんが、50k htmlでうまくいきました。質問に記載の形式でxmlhtml000 50とS作成する

from multiprocessing.pool import ThreadPool as Pool 
import matplotlib.pyplot as plt 
import itertools, os, sys, datetime, math 
from urlparse import urljoin 
import shelve 
import collections 
from functools import partial 

class xml_split(): 

    # Flags 
    track_line = False 
    last_line = False 
    output_file = False 
    all_pages_extracted = False 

    # Modes available: 
    # - Read output folder and calculate missing files: c (default) 
    # - Start from scratch: s 
    resume_modes = { 
     'complete_file_count': 'c', 
     'start_from_scratch': 's', 
    } 

    # Queue of pages in memory, before writting to file 
    pages_queue = [] 

    def __init__(
      self, 

      # Number of workers 
      pool_size = 8, 

      # Read x pages at a time 
      page_memory = 30, 

      # Path to the input xml file 
      input_xml_file_path = "sample.xml", 

      # File name prefix and extension 
      output_file_prefix = 'page_', 
      output_file_extension = '.html', 

      # Path to the output folder: will be created under the script path 
      page_files_folder = 'pages', 

      # Path to the cache folder: will be created under the script path 
      cache_files_folder = 'cache', 

      # filename of a graph that shows the average performance 
      graph_file_path = 'xml-split-performance.jpg', 

      # update graph each x number of pages extracted 
      update_graph_each = 1000, 

      # print status on stdout each x number of pages extracted 
      show_status_each = 1000, 

      # group files in folders of x pages 
      batch_size = 1000, 

      # tags to track 
      start_string = '<page>', 
      end_string = '</page>', 
      start_doc = '<pages>', 
      end_doc = '</pages>', 

      # A little template to output the status 
      log_template = """ 
      Page:         {page} 
      Time:         {exported} 
      Time since the beginning:    {queue} 
      Reporting each n exports:    {show_status_each} 
      Time since last report:    {process} 
      Average entry export time:    {average} 
      """, 
     ): 

     self.pool_size = pool_size 
     self.pool = Pool(pool_size) 

     self.input_xml_file_path = input_xml_file_path 
     self.input_file = open(input_xml_file_path) 

     self.output_file_prefix = output_file_prefix 
     self.output_file_extension = output_file_extension 
     self.page_files_folder = page_files_folder 
     self.cache_files_folder = cache_files_folder 

     self.page_memory = page_memory 
     self.show_status_each = show_status_each 
     self.update_graph_each = update_graph_each 
     self.batch_size = batch_size 

     self.graph_file_path = graph_file_path 

     self.start_string = start_string 
     self.end_string = end_string 
     self.end_doc = end_doc 
     self.start_doc = start_doc 
     self.log_template = log_template 
     self.chunk_tail = '' 

     # Set project path to the current script path 
     self.project_path = os.getcwd() + os.sep 

     # Folder to place output files 
     self.page_files_path = urljoin(self.project_path, self.page_files_folder) + os.sep 

     # Folder to place cache files 
     self.cache_files_path = urljoin(self.project_path, self.cache_files_folder) + os.sep 

     self.create_folder(self.page_files_path) 
     self.create_folder(self.cache_files_path) 

     # keep track of time, to calculate time spent 
     self.main_start = datetime.datetime.now() 
     self.start = self.main_start 

     # by default, set the resume mode to check output folder 
     self.resume_mode = self.resume_modes['complete_file_count'] 

     # Uncomment this line to ask for user input, on the shell. 
     # self.resume_mode = raw_input("s) Start from scratch c) Resume from missing files:") 

     # Create or open a shelf to keep a cache of line number, page number, and performance stats 
     self.chunk_pointers = shelve.open(self.cache_files_path + 'chunk_pointers.log') 
     self.persistent_performance_tracker = shelve.open(self.cache_files_path + 'persistent_performance_tracker.log') 


     # Init shelf counter 


     # *** Resume from missing files on the output folder 
     # (Resume an interrupted operation by checking the existing files on the output folders) 
     if self.resume_mode == self.resume_modes['complete_file_count']: 
      previously_existent_file_count = 0 
      for output_root, output_dirnames, output_filenames in os.walk(self.page_files_path): 
       for dirname in output_dirnames: 
        for root, dirnames, filenames in os.walk(self.page_files_path + dirname): 
         for filename in filenames: 
          if filename.endswith(self.output_file_extension) and filename.startswith(self.output_file_prefix): 
           previously_existent_file_count += 1 

      resume_from_page = int(math.floor(previously_existent_file_count/self.pool_size) * self.pool_size) 
      if '%s' % (resume_from_page) in self.chunk_pointers: 
       self.page_count = resume_from_page 
       self.byte_count = self.chunk_pointers['%s' % self.page_count] 
      else: 
       self.byte_count = 0 
       self.page_count = 0 

     # *** Do not resume 
     elif resume == self.start_from_scratch: 
      self.byte_count = 0 
      self.page_count = 0 

    # Create folder if doesn't exist 
    def create_folder(self, path): 
     if not os.path.exists(path): 
      os.makedirs(path) 

    # Get 30 pages a time and store them in memory 
    def slice_file(self, start=0, end=30): 
     max_pages = end - start 
     chunk = self.chunk_tail 
     pages_stored = 0 
     while True and max_pages: 
      new_chunk = self.input_file.read(10000) 
      if new_chunk: 

       chunk += new_chunk 
       pages_stored = len(chunk.split(self.end_string)) 

       if pages_stored > max_pages: 
        pages_for_next_slice = max_pages - pages_stored 
        if pages_for_next_slice == 0: 
         pages_for_next_slice = -1 
        self.chunk_tail = ''.join(chunk.split(self.end_string)[pages_for_next_slice:]) 
        return ''.join(chunk.split(self.end_string)[0:max_pages]) 

      else: 
       return ''.join(chunk.split(self.end_string)) 


    def get_folder_name(self): 
     folder_name = int(math.floor(self.page_count/self.batch_size) * self.batch_size) 
     folder_name = '%s%s' % (folder_name, os.sep) 
     return folder_name 

    def save_page(self, path, file_contents): 
     with open(path, 'w') as output_file: 
      output_file.write(file_contents) 

    def process_queue(self): 
     for page in self.pages_queue: 
      self.pool.apply_async(
       self.save_page, 
       args = (page[0], page[1]) 
      ) 

    def save_graph(self): 

     performance_seconds = [] 
     performance_page_count = [] 
     vals = self.persistent_performance_tracker.items() 
     ordered_vals = sorted(vals, key=lambda i: int(i[0])) 

     for val in ordered_vals: 
      performance_seconds += [val[1]] 
      performance_page_count += [val[0]] 


     plt.clf() 
     plt.plot(performance_page_count, performance_seconds) 
     plt.ylabel('Task duration progress') 

     plt.savefig(self.graph_file_path) 

    def handle_status_reports(self): 

     # Update graph 
     if self.page_count % self.update_graph_each == 0: 

      self.end = datetime.datetime.now() 
      average = (self.end - self.start)/self.show_status_each 
      average = average.total_seconds() 


      self.persistent_performance_tracker['%s' % self.page_count] = average 
      self.persistent_performance_tracker.sync() 

      self.save_graph() 

     # Print status to stdout 
     if self.page_count % self.show_status_each == 0: 

      self.end = datetime.datetime.now() 
      average = (self.end - self.start)/self.show_status_each 
      average = average.total_seconds() 

      log = self.log_template.format(
        page= self.page_count, 
        exported = self.end, 
        average = average, 
        show_status_each = self.show_status_each, 
        process = self.end - self.start, 
        queue = self.end - self.main_start 
       ) 


      self.persistent_performance_tracker['%s' % self.page_count] = average 
      self.persistent_performance_tracker.sync() 

      sys.stdout.write(log) 
      sys.stdout.flush() 
      self.start = datetime.datetime.now() 



    # Go through xml file lines and output data to html files 
    def read_xml(self): 

     tag_contents = '' 

     # Seek page where to pick up from 
     self.slice_file(0, self.page_count) 


     # self.slice_file(0, self.page_memory) 

     while self.all_pages_extracted == False: 
      # check if there are still bytes to read 
      try: 
       chunk = self.slice_file(0, self.page_memory) 
      except: 
       break 

      if not chunk: 
       break 

      pages_in_chunk = chunk.split(self.start_string)[1:] 


      for page_i, page_contents in enumerate(pages_in_chunk): 

       # new page start was found, count 
       self.page_count += 1 

       # create batch folder 
       if self.page_count % self.batch_size == 0 or self.page_count == 1: 
        self.create_folder(self.page_files_path + self.get_folder_name()) 


       output_file_name = '{pre}{page_count}{ext}'.format(
        pre = self.output_file_prefix, 
        page_count = self.page_count, 
        ext = self.output_file_extension 
       ) 

       # if it's the last page, set the flag and ignore closing tag 
       if self.end_doc in page_contents: 
        page_contents = page_contents.split(self.end_doc)[0] 
        self.all_pages_extracted = True 

       self.pages_queue += [(
        self.page_files_path + self.get_folder_name() + output_file_name, 
        tag_contents + page_contents 
       )] 

       self.handle_status_reports() 

       if self.page_count % self.pool_size == 0: 
        # keep track of byte pointers where worker pools start 
        self.chunk_pointers['%s' % self.page_count] = self.byte_count 
        self.chunk_pointers.sync() 
        self.process_queue() 
        self.pages_queue = [] 

       self.byte_count += len(page_contents) 


     self.close() 


    def close(self): 
     self.process_queue() 
     self.save_graph() 
     self.chunk_pointers.close() 
     self.persistent_performance_tracker.close() 
     self.pool.close() 
     self.pool.join() 

if __name__ == '__main__': 

    xso = xml_split() 

    xso.read_xml() 

:スクリプトは平均エントリ抽出時間を追跡するためにmathplotlibグラフを生成

を、それ安定しているようです。

graph showing average time for each entry export

プロセスをキャンセルすること自由に感じて、それを起動します。これは(y軸上の秒単位でエントリー輸出期間、と、軸xにエクスポートしたエントリの数)私のPC上で50.000エントリのグラフです。再び。既定の動作では、既存のエクスポートされたファイルをチェックし、そこから再開します。

+0

私はlxmlなどに精通していません。あたかもファイル全体がメモリにロードされているように見えますが、私のファイルは57GBのWikipediaダンプであるため動作しません。今、私は、ファイルごとに行ごとに実行しており、 ''タグが見つかると分割しています。私は単純に位置の代わりに行番号を私の索引に書くのを間違えました。 – miller9904

+0

私の現在のスクリプトはここにあります:https://gist.github.com/miller9904/e51e698154fe388484c6868eec09988b – miller9904

+0

申し訳ありませんが、あなたのコードをまだ調べることができませんでした。私は別のアプローチで答えを更新しました。どう考えているか教えてください? –

1

はここにあなたのリンクの答えに基づいてソリューションです:

offsets = [] 
with open('test.xml', 'rb') as inputfile: # Iterate over the dump 
    # Save 1st line offset 
    current = inputfile.tell() 
    for line in iter(inputfile.readline,''): 
     if line.lstrip().startswith('<page>'): 
      offsets.append(current) 
     # Update to the current offset (line about to be read) 
     current = inputfile.tell() 

# Demo the offsets are lines with <page> 
with open('test.xml', 'rb') as inputfile: # Iterate over the dump 
    for offset in offsets: 
     inputfile.seek(offset) 
     print offset,inputfile.readline() 

出力:私たちはより良い理解を持つことができるように

9 <page> 

82 <page> 
+0

私は57 GB xmlファイルを処理していますが、一度に1行ずつ読み込むのは遅すぎると思います。 – miller9904

+1

@ miller9904、これは質問に記載されておらず、あなたの質問でやっていたことでした。私は質問に答えた。 –

関連する問題