2016-08-04 29 views
0

私は機能を持つファイルを処理していますし、このように私のスレッドを開始します:Perlのスレッドとセマフォ

for my $file (@files){ 
    $threads[$k] = threads->create('function', $file); 
    $k++; 
} 

私は並列プロセスの数を制限したいと思います。それはどうですか?私はたくさんのセマフォ/キューの例を見てきましたが、私の場合には単純なものは何も見つかりませんでした。

スレッド番号を単に制限することはできますか?

+0

[この回答](http://stackoverflow.com/a/12371377/1331451)は、スレッドではなくフォークに関するもので、関連しています。スレッドはPerlの最も強力な機能の1つではありません。別のテクニックを使うほうが良いかもしれません。 – simbabque

+0

したがって、Forks()の例は、これらの関数がThreads()では実装されていないため、まったく役に立ちません。 – Alex

+0

フォークの提案は、脅威をまったく使用しないようにすることを目的としていましたが、代わりにフォークに頼っています。 – simbabque

答えて

6

並列性を制限する最も単純な方法、およびスレッドを展開するためのより効果的な方法の1つは、「ワーカースレッド」モデルを操作することです。

具体的には、ループ内にあり、キューを読み取り、そのスレッド上で動作するスレッドがあります。このようなものになるだろう

#!/usr/bin/perl 

use strict; 
use warnings; 

use threads; 

use Thread::Queue; 

my $nthreads = 5; 

my $process_q = Thread::Queue->new(); 
my $failed_q = Thread::Queue->new(); 

#this is a subroutine, but that runs 'as a thread'. 
#when it starts, it inherits the program state 'as is'. E.g. 
#the variable declarations above all apply - but changes to 
#values within the program are 'thread local' unless the 
#variable is defined as 'shared'. 
#Behind the scenes - Thread::Queue are 'shared' arrays. 

sub worker { 
    #NB - this will sit a loop indefinitely, until you close the queue. 
    #using $process_q -> end 
    #we do this once we've queued all the things we want to process 
    #and the sub completes and exits neatly. 
    #however if you _don't_ end it, this will sit waiting forever. 
    while (my $server = $process_q->dequeue()) { 
     chomp($server); 
     print threads->self()->tid() . ": pinging $server\n"; 
     my $result = `/bin/ping -c 1 $server`; 
     if ($?) { $failed_q->enqueue($server) } 
     print $result; 
    } 
} 

#insert tasks into thread queue. 
open(my $input_fh, "<", "server_list") or die $!; 
$process_q->enqueue(<$input_fh>); 
close($input_fh); 

#we 'end' process_q - when we do, no more items may be inserted, 
#and 'dequeue' returns 'undefined' when the queue is emptied. 
#this means our worker threads (in their 'while' loop) will then exit. 
$process_q->end(); 

#start some threads 
for (1 .. $nthreads) { 
    threads->create(\&worker); 
} 

#Wait for threads to all finish processing. 
foreach my $thr (threads->list()) { 
    $thr->join(); 
} 

#collate results. ('synchronise' operation) 
while (my $server = $failed_q->dequeue_nb()) { 
    print "$server failed to ping\n"; 
} 

セマフォは、限られたリソースへのアクセスを調停について、およびプロセスの一部を「警備」のため実際にあります。あなたは含めるしたいのであれば

は - と言う - あなたのコードでssh操作が、同時に20の以上の接続を持っている必要はありませんでした、あなたがしたい:

my $ssh_limit = Thread::Semaphore -> new (20); 

そして、あなたのスレッドで:

$ssh_limit -> down; 
#do ssh thing 
$ssh_limit -> up; 

各スレッドは、使用可能なリソースがなくなるまでブロックします。

しかし、これは「スレッド全体」を制御する効果的な方法ではありません。まず最初に正しい番号を入力し、キューを使用してデータを入力します。

関連する問題