2016-11-28 12 views
0

並行処理の少ないプロセスをシミュレートするbashスクリプトを作成します。
MAX_CONCURRENT_TASKSには、一度に実行できるタスクの数が表示されますが、そのうちの1つはランダムに2〜6秒で「動作」します。 Bash並列処理シミュレーションスクリプト

は、これを行うために、より「創造」の方法は、単純な while..doループと、現在のプロセス数が MAX_CONCURRENT_TASKSと等しい場合 waitを使用するよりも、あり、そして2〜6秒のための sleep
誰もがこのために信号を使用する方法を知っていますか?多分何か?

@ EDIT:私は仕事をするスクリプトを書いていますが、私はそれを行うツールを知っています。

+1

ようこそスタックオーバーフロー!良い質問をするのを助けるために私たちの[SO Question Checklist](http://meta.stackoverflow.com/questions/260648/stack-overflow-question-checklist)をよく読んで、良い答えを得てください。 –

+0

これに 'parallel'、' sem'、 'xargs -P'を使うことができます。 –

+0

あなたはGNU 'parallel'と' xargs'の '-P'オプションを知っていますか?あなたは本当にあなた自身のことを書いていますか?がんばろう! – shellter

答えて

2

GNU Parallelを使用してください。

for i in `seq 1 1000`; do 
    echo "someFile$i" 
done | parallel -j10 'md5sum {}' 

いくつかの簡単な例

0

はい、私はあなたがそれを好きと思うこれを試してみてください。 THREADLIMITとMAXTHREADDURをパラメータに調整してみてください。

このプログラムは実際にプロセス生成マネージャと呼ばれるべきですが、これはBASHがアンパサンドを使用してforkするとき、fork()またはclone()システムコールを使用してメモリを共有するpthread_create()のようなものではなく、別のメモリ空間です。 BASHが後者をサポートしていれば、各「実行順序」はまったく同じように動作し、より効率的なメモリフットプリントを得ながら従来のスレッドと呼ぶことができます。機能的には同じですが、GLOBAL変数は各ワーカークローンで利用できないため、プロセス間通信ファイルと基本的なフロックセマフォをクリティカルセクションを管理するために使用することはできません。もちろんBASHからのフォークは基本的な答えですが、人々はそれを知っているように感じていますが、実際にスポークされているものを管理し、それを忘れて忘れるように思っているような気がします。これは、1つのリソースにすべてアクセスするフォークされたプロセスのインスタンスを最大200個管理する方法を示しています。私はあなたがそれを楽しむことを願って、私はそれを書くことを楽しんだ

#!/bin/bash 

ME=$(basename $0) 
IPC="/tmp/$ME.ipc" #interprocess communication file (global thread accounting stats) 
DBG=/tmp/$ME.log 
echo 0 > $IPC   #initalize counter 
F1=thread 
SPAWNED=0  
COMPLETE=0 
SPAWN=10000  #number of jobs to process 
SPEEDFACTOR=1   #dynamically compensates for execution time 
THREADLIMIT=200  #maximum concurrent threads 
TPS=1     #threads per second delay 
THREADCOUNT=0   #number of running threads 
SCALE="scale=5"   #controls bc's precision 
START=$(date +%s)  #whence we began 
MAXTHREADDUR=30   #maximum thread life span - demo mode 

LOWER=$[$THREADLIMIT*100*90/10000] #90% worker utilization threshold 
UPPER=$[$THREADLIMIT*100*95/10000] #95% worker utilization threshold 
DELTA=10        #initial percent speed change 

threadspeed()  #dynamically adjust spawn rate based on worker utilization 
{ 
    #vaguely assumes thread execution average will be consistent 
    THREADCOUNT=$(threadcount) 
    if [ $THREADCOUNT -ge $LOWER ] && [ $THREADCOUNT -le $UPPER ] ;then 
     echo SPEED HOLD >> $DBG 
     return 
    elif [ $THREADCOUNT -lt $LOWER ] ;then 
     #if maxthread is free speed up 
     SPEEDFACTOR=$(echo "$SCALE;$SPEEDFACTOR*(1-($DELTA/100))"|bc) 
     echo SPEED UP $DELTA%>> $DBG 
    elif [ $THREADCOUNT -gt $UPPER ];then 
     #if maxthread is active then slow down 
     SPEEDFACTOR=$(echo "$SCALE;$SPEEDFACTOR*(1+($DELTA/100))"|bc) 
     DELTA=1       #begin fine grain control 
     echo SLOW DOWN $DELTA%>> $DBG 
    fi 

    echo SPEEDFACTOR $SPEEDFACTOR >> $DBG 

    #average thread duration (total elapsed time/number of threads completed) 
    #if threads completed is zero (less than 100), default to maxdelay/2 maxthreads 

    COMPLETE=$(cat $IPC) 

    if [ -z $COMPLETE ];then 
     echo BAD IPC READ ============================================== >> $DBG 
     return 
    fi 

    #echo Threads COMPLETE $COMPLETE >> $DBG 
    if [ $COMPLETE -lt 100 ];then 
     AVGTHREAD=$(echo "$SCALE;$MAXTHREADDUR/2"|bc) 
    else 
     ELAPSED=$[$(date +%s)-$START] 
     #echo Elapsed Time $ELAPSED >> $DBG 
     AVGTHREAD=$(echo "$SCALE;$ELAPSED/$COMPLETE*$THREADLIMIT"|bc) 
    fi 
    echo AVGTHREAD Duration is $AVGTHREAD >> $DBG 

    #calculate timing to achieve spawning each workers fast enough 
    # to utilize threadlimit - average time it takes to complete one thread/max number of threads 
    TPS=$(echo "$SCALE;($AVGTHREAD/$THREADLIMIT)*$SPEEDFACTOR"|bc) 
    #TPS=$(echo "$SCALE;$AVGTHREAD/$THREADLIMIT"|bc) # maintains pretty good 
    #echo TPS $TPS >> $DBG 

} 
function plot() 
{ 
    echo -en \\033[${2}\;${1}H 

    if [ -n "$3" ];then 
     if [[ $4 = "good" ]];then 
      echo -en "\\033[1;32m" 
     elif [[ $4 = "warn" ]];then 
      echo -en "\\033[1;33m" 
     elif [[ $4 = "fail" ]];then 
      echo -en "\\033[1;31m" 
     elif [[ $4 = "crit" ]];then 
      echo -en "\\033[1;31;4m" 
     fi 
    fi 
     echo -n "$3" 
     echo -en "\\033[0;39m" 
} 

trackthread() #displays thread status 
{ 
    WORKERID=$1 
    THREADID=$2 
    ACTION=$3 #setactive | setfree | update 
    AGE=$4 

    TS=$(date +%s) 

    COL=$[(($WORKERID-1)/50)*40] 
    ROW=$[(($WORKERID-1)%50)+1] 

    case $ACTION in 
     "setactive") 
     touch /tmp/$ME.$F1$WORKERID #redundant - see main loop 
     #echo created file $ME.$F1$WORKERID >> $DBG 
     plot $COL $ROW "Worker$WORKERID: ACTIVE-TID:$THREADID INIT " good 
     ;; 
     "update") 
     plot $COL $ROW "Worker$WORKERID: ACTIVE-TID:$THREADID AGE:$AGE" warn 
     ;; 
     "setfree") 
     plot $COL $ROW "Worker$WORKERID: FREE       " fail 
     rm /tmp/$ME.$F1$WORKERID 
     ;; 
     *) 

     ;; 
    esac 
} 

getfreeworkerid() 
{ 
    for i in $(seq 1 $[$THREADLIMIT+1]) 
    do 
     if [ ! -e /tmp/$ME.$F1$i ];then 
     #echo "getfreeworkerid returned $i" >> $DBG 
     break 
     fi 
    done 
    if [ $i -eq $[$THREADLIMIT+1] ];then 
     #echo "no free threads" >> $DBG 
     echo 0 
     #exit 
    else 
     echo $i 
    fi 
} 

updateIPC() 
{ 
    COMPLETE=$(cat $IPC)  #read IPC 
    COMPLETE=$[$COMPLETE+1]  #increment IPC 
    echo $COMPLETE > $IPC  #write back to IPC 
} 


worker() 
{ 
    WORKERID=$1 
    THREADID=$2 
    #echo "new worker WORKERID:$WORKERID THREADID:$THREADID" >> $DBG 

    #accessing common terminal requires critical blocking section 
    (flock -x -w 10 201 
     trackthread $WORKERID $THREADID setactive 
    )201>/tmp/$ME.lock 

    let "RND = $RANDOM % $MAXTHREADDUR +1" 

    for s in $(seq 1 $RND)  #simulate random lifespan 
    do 
     sleep 1; 
     (flock -x -w 10 201 
     trackthread $WORKERID $THREADID update $s 
    )201>/tmp/$ME.lock 
    done 

    (flock -x -w 10 201 
     trackthread $WORKERID $THREADID setfree 
    )201>/tmp/$ME.lock 

    (flock -x -w 10 201 
     updateIPC 
    )201>/tmp/$ME.lock 
} 

threadcount() 
{ 
    TC=$(ls /tmp/$ME.$F1* 2> /dev/null | wc -l) 
    #echo threadcount is $TC >> $DBG 
    THREADCOUNT=$TC 
    echo $TC 
} 

status() 
{ 
    #summary status line 
    COMPLETE=$(cat $IPC) 
    plot 1 $[$THREADLIMIT+2] "WORKERS $(threadcount)/$THREADLIMIT SPAWNED $SPAWNED/$SPAWN COMPLETE $COMPLETE/$SPAWN SF=$SPEEDFACTOR TIMING=$TPS" 
    echo -en '\033[K'     #clear to end of line 
} 

function main() 
{ 
    while [ $SPAWNED -lt $SPAWN ] 
    do 
     while [ $(threadcount) -lt $THREADLIMIT ] && [ $SPAWNED -lt $SPAWN ] 
     do 
     WID=$(getfreeworkerid) 
     worker $WID $SPAWNED & 
     touch /tmp/$ME.$F1$WID #if this loops faster than file creation in the worker thread it steps on itself, thread tracking is best in main loop 
     SPAWNED=$[$SPAWNED+1] 
     (flock -x -w 10 201 
      status 
     )201>/tmp/$ME.lock 
     sleep $TPS 
     if ((! $[$SPAWNED%100]));then 
      #rethink thread timing every 100 threads 
      threadspeed 
     fi 
     done 
     sleep $TPS 
    done 

    while [ "$(threadcount)" -gt 0 ] 
    do 
     (flock -x -w 10 201 
     status 
    )201>/tmp/$ME.lock 
     sleep 1; 
    done 

    status 
} 

clear 
threadspeed 
main 
wait 
status 
echo