2012-08-16 18 views
14

私は複数のスレッドから同じファイルに行を追加するアプリケーションを書いています。python - 複数のスレッドから同じファイルに追加する

私はいくつかの行が改行なしで追加されるという問題があります。

これにはどのような解決策がありますか?

class PathThread(threading.Thread): 
    def __init__(self, queue): 
     threading.Thread.__init__(self) 
     self.queue = queue 

    def printfiles(self, p): 
     for path, dirs, files in os.walk(p): 
      for f in files: 
       print(f, file=output) 

    def run(self): 
     while True: 
      path = self.queue.get() 
      self.printfiles(path) 
      self.queue.task_done() 


pathqueue = Queue.Queue() 
paths = getThisFromSomeWhere() 

output = codecs.open('file', 'a') 

# spawn threads 
for i in range(0, 5): 
    t = PathThread(pathqueue) 
    t.setDaemon(True) 
    t.start() 

# add paths to queue 
for path in paths: 
    pathqueue.put(path) 

# wait for queue to get empty 
pathqueue.join() 
+3

ポストいくつかのコードを、それが役立つだろう。 –

+2

新しい行を追加します。 – Kuf

+1

* impossibru *のようなサウンド。 – plaes

答えて

22

解決策は、1つのスレッドのみでファイルに書き込むことです。

import Queue # or queue in Python 3 
import threading 

class PrintThread(threading.Thread): 
    def __init__(self, queue): 
     threading.Thread.__init__(self) 
     self.queue = queue 

    def printfiles(self, p): 
     for path, dirs, files in os.walk(p): 
      for f in files: 
       print(f, file=output) 

    def run(self): 
     while True: 
      result = self.queue.get() 
      self.printfiles(result) 
      self.queue.task_done() 

class ProcessThread(threading.Thread): 
    def __init__(self, in_queue, out_queue): 
     threading.Thread.__init__(self) 
     self.in_queue = in_queue 
     self.out_queue = out_queue 

    def run(self): 
     while True: 
      path = self.in_queue.get() 
      result = self.process(path) 
      self.out_queue.put(result) 
      self.in_queue.task_done() 

    def process(self, path): 
     # Do the processing job here 

pathqueue = Queue.Queue() 
resultqueue = Queue.Queue() 
paths = getThisFromSomeWhere() 

output = codecs.open('file', 'a') 

# spawn threads to process 
for i in range(0, 5): 
    t = ProcessThread(pathqueue, resultqueue) 
    t.setDaemon(True) 
    t.start() 

# spawn threads to print 
t = PrintThread(resultqueue) 
t.setDaemon(True) 
t.start() 

# add paths to queue 
for path in paths: 
    pathqueue.put(path) 

# wait for queue to get empty 
pathqueue.join() 
resultqueue.join() 
+0

、行 - 結果= self.process(パス) ? あなたはハイブプロセス()メソッドをそこに持っていません.. – user1251654

+0

あなたはあなたが望むことをするためにプロセスメソッドを定義すると仮定します。私はこれを明確にするためにコードを修正するだけです。 – Dikei

+0

右、私の悪い。ありがとう。これは多くの助けになります。 – user1251654

0

多分、どこに改行してはいけないのでしょうか? 共有リソースに一度に複数のスレッドがアクセスしないようにしてください。そうしないと、予期しない結果が発生する可能性があります。 (これは、スレッドを使用している間に 'アトミック操作'を使用して呼び出されます) 少し直感のためにこのページを見てください。
Thread-Synchronization

1

あなたが行の途中で同じ行または新しい行にごちゃ混ぜにテキストを見たことがないという事実は、あなたが実際にファイルに追加シンクロする必要がいけないの手がかりです。問題は、printを使用して単一のファイルハンドルに書き込むことです。私はprintが実際には1つの呼び出しでファイルハンドルに2つの操作を行っていると思われ、それらの操作はスレッド間で競合しています。基本的にprintは何かやっている:

file_handle.write('whatever_text_you_pass_it') 
file_handle.write(os.linesep) 

を、別のスレッドが同じファイルハンドルに同時にこれをやっているので、時には1つのスレッドが最初の書き込みになりますし、他のスレッドは、その最初の書き込みになりますと、あなた2つのキャリッジリターンが連続して得られます。またはこれらの本当に任意の順列。

これを回避する最も簡単な方法は、printの使用をやめ、そのままwriteを使用することです。

output.write(f + os.linesep) 

これはまだ私にとって危険だと思われます。私は、同じファイルハンドルオブジェクトを使用し、その内部バッファのために競合するすべてのスレッドで何が期待できるのかわかりません。個人的には問題はすべての問題を解決し、すべてのスレッドが独自のファイルハンドルを取得するようにします。ライトバッファフラッシュのデフォルトはラインバッファであるため、ファイルがフラッシュされるときにはos.linesepで終了するため、これが機能することにも注意してください。それを強制的にラインバッファを使用するようにの3番目の引数として1を送信します。あなたはこのようにそれをテストすることができます

#!/usr/bin/env python 
import os 
import sys 
import threading 

def hello(file_name, message, count): 
    with open(file_name, 'a', 1) as f: 
    for i in range(0, count): 
     f.write(message + os.linesep) 

if __name__ == '__main__': 
    #start a file 
    with open('some.txt', 'w') as f: 
    f.write('this is the beginning' + os.linesep) 
    #make 10 threads write a million lines to the same file at the same time 
    threads = [] 
    for i in range(0, 10): 
    threads.append(threading.Thread(target=hello, args=('some.txt', 'hey im thread %d' % i, 1000000))) 
    threads[-1].start() 
    for t in threads: 
    t.join() 
    #check what the heck the file had 
    uniq_lines = set() 
    with open('some.txt', 'r') as f: 
    for l in f: 
     uniq_lines.add(l) 
    for u in uniq_lines: 
    sys.stdout.write(u) 

出力は次のようになります。

hey im thread 6 
hey im thread 7 
hey im thread 9 
hey im thread 8 
hey im thread 3 
this is the beginning 
hey im thread 5 
hey im thread 4 
hey im thread 1 
hey im thread 0 
hey im thread 2 
関連する問題