2017-12-26 7 views
0

私は、マスターノードを使用してクラスター内のスレーブノードと通信するコンピューターのクラスターを持っています。コアへの接続を終了せずに実行中のジョブを終了するにはどうすればよいですか? (現在execnetを使用しています)

私が直面している主な問題は、実行中の特定のジョブを強制終了してから、別のジョブが終了した同じコアで新しいジョブをキューに入れることができることです。(すべてのコア任意の所与の時間におけるスレーブノードの数)。

execnetを使って実行中のジョブを終了させる方法がないので、bashスクリプトを使って手動でジョブを終了させることができたら、sudo kill 12345としましょう。12345はジョブのPIDですジョブはexecnetでサポートされていない別のものですが、これは別のトピックです)、ジョブを終了してから終了した同じコアで別のものを再キューします。ジョブは正しく終了しますが、すべてのジョブが完了するまで、そのチャネル(コア、マスターノードは各コアに個別に通信します)への接続を閉じてから、そのコアを使用しなくなります。 実行中のジョブを終了させる方法はありますか?コアへの接続を強制終了しないでください。ここで

はジョブ

import execnet, os, sys 
import re 
import socket 
import numpy as np 
import pickle, cPickle 
from copy import deepcopy 
import time 
import job 


def main(): 
    print 'execnet source files are located at:\n {}/\n'.format(
      os.path.join(os.path.dirname(execnet.__file__)) 
     ) 

# Generate a group of gateways. 
work_dir = '/home/mpiuser/pn2/' 
f = 'cluster_core_info.txt' 
n_start, n_end = 250000, 250008 

ci = get_cluster_info(f) 
group, g_labels = make_gateway_group(ci, work_dir) 


mch = group.remote_exec(job) 

args = range(n_start, n_end+1) # List of parameters to compute factorial. 
manage_jobs(group, mch, queue, g_labels, args) 

# Close the group of gateways. 
group.terminate() 

def get_cluster_info(f): 
    nodes, ncores = [], [] 
    with open(f, 'r') as fid: 
     while True: 
      line = fid.readline() 
      if not line: 
       fid.close() 
       break 
      line = line.strip('\n').split() 
      nodes.append(line[0]) 
      ncores.append(int(line[1])) 
    return dict(zip(nodes, ncores)) 

def make_gateway_group(cluster_info, work_dir): 
    ''' Generate gateways on all cores in remote nodes. ''' 
    print 'Gateways generated:\n' 
    group = execnet.Group() 
    g_labels = [] 
    nodes = list(cluster_info.keys()) 
    for node in nodes: 
     for i in range(cluster_info[node]): 
      group.makegateway(
       "ssh={0}//id={0}_{1}//chdir={2}".format(
       node, i, work_dir 
       )) 
      sys.stdout.write(' ') 
      sys.stdout.flush() 
      print list(group)[-1] 
      # Generate a string 'node-id_core-id'. 
      g_labels.append('{}_{}'.format(re.findall(r'\d+',node)[0], i)) 
    print '' 
    return group, g_labels 

def get_mch_id(g_labels, string): 
    ids = [x for x in re.findall(r'\d+', string)] 
    ids = '{}_{}'.format(*ids) 
    return g_labels.index(ids) 

def manage_jobs(group, mch, queue, g_labels, args): 
    args_ref = deepcopy(args) 
    terminated_channels = 0 
    active_jobs, active_args = [], [] 
while True: 
    channel, item = queue.get() 

    if item == 'terminate_channel': 
     terminated_channels += 1 
     print " Gateway closed: {}".format(channel.gateway.id) 
     if terminated_channels == len(mch): 
      print "\nAll jobs done.\n" 
      break 
     continue 

    if item != "ready": 
     mch_id_completed = get_mch_id(g_labels, channel.gateway.id) 
     depopulate_list(active_jobs, mch_id_completed, active_args) 
     print " Gateway {} channel id {} returned:".format(
       channel.gateway.id, mch_id_completed) 
     print " {}".format(item) 

    if not args: 
     print "\nNo more jobs to submit, sending termination request...\n" 
     mch.send_each(None) 
     args = 'terminate_channel' 

    if args and \ 
     args != 'terminate_channel': 
     arg = args.pop(0) 
     idx = args_ref.index(arg) 
     channel.send(arg) # arg is copied by value to the remote side of 
          # channel to be executed. Maybe blocked if the 
          # sender queue is full. 

     # Get the id of current channel used to submit a job, 
     # this id can be used to refer mch[id] to terminate a job later. 
     mch_id_active = get_mch_id(g_labels, channel.gateway.id) 
     print "Job {}: {}! submitted to gateway {}, channel id {}".format(
       idx, arg, channel.gateway.id, mch_id_active) 
     populate_list(active_jobs, mch_id_active, 
         active_args, arg) 


def populate_list(jobs, job_active, args, arg_active): 
    jobs.append(job_active) 
    args.append(arg_active) 

def depopulate_list(jobs, job_completed, args): 
    i = jobs.index(job_completed) 
    jobs.pop(i) 
    args.pop(i) 


if __name__ == '__main__': 
    main() 

を提出するスクリプトで、ここで私のjob.pyスクリプトです:

#!/usr/bin/env python 
import os, sys 
import socket 
import time 
import numpy as np 
import pickle, cPickle 
import random 
import job 


def hostname(): 
    return socket.gethostname() 

def working_dir(): 
    return os.getcwd() 

def listdir(path): 
    return os.listdir(path) 

def fac(arg): 
    return np.math.factorial(arg) 

def dump(arg): 
    path = working_dir() + '/out' 
    if not os.path.exists(path): 
     os.mkdir(path) 
    f_path = path + '/fac_{}.txt'.format(arg) 
    t_0 = time.time() 
    num = fac(arg)         # Main operation 
    t_1 = time.time() 
    cPickle.dump(num, open(f_path, "w"), protocol=2) # Main operation 
    t_2 = time.time() 
    duration_0 = "{:.4f}".format(t_1 - t_0) 
    duration_1 = "{:.4f}".format(t_2 - t_1) 
    #num2 = cPickle.load(open(f_path, "rb")) 
    return '--Calculation: {} s, dumping: {} s'.format(
      duration_0, duration_1) 


if __name__ == '__channelexec__': 
    channel.send("ready") 

    for arg in channel: 
     if arg is None: 
      break 
     elif str(arg).isdigit(): 
      channel.send((
        str(arg)+'!', 
        job.hostname(), 
        job.dump(arg) 
       )) 
     else: 
      print 'Warnning! arg sent should be number | None' 

答えて

0

はい、あなたは正しい軌道に乗っています。 psutilライブラリを使ってプロセスを管理し、pidなどを見つけてください。 それを殺してください。どこでもbashを使用する必要はありません。 Pythonはそれをすべてカバーしています。

また、マスターがそう言ったときにスクリプトを終了するようにプログラムすることもできます。 これは通常そのように行われます。 もしあなたが必要であれば、それを終了する前に別のスクリプトを起動させることさえできます。 また、別のプロセスで実行しているのと同じ場合は、現在の作業を停止し、スクリプトを終了せずに新しいものを開始するだけです。

また、私が提案をすることができます。ファイルを1行ずつ読み込まず、ファイル全体を読み込み、* .splitlines()を使用してください。小さなファイルの場合、チャンクでそれらを読み取るだけでIOを拷問します。あなたは* .strip()も必要としません。また、未使用のインポートも削除する必要があります。

関連する問題