2016-09-17 5 views
0

TensorBox ReInspectの実装(https://github.com/Russell91/TensorBox)を1 psと2人の作業者に配布しようとしています。 sv.managed_sessionにトレーニングコードを追加しました。CancelledError:分散テンソルを実行している間RunManyGraphs

def train(H, test_images, server): 
''' 
Setup computation graph, run 2 prefetch data threads, and then run the main loop 
''' 

if not os.path.exists(H['save_dir']): os.makedirs(H['save_dir']) 

ckpt_file = H['save_dir'] + '/save.ckpt' 
with open(H['save_dir'] + '/hypes.json', 'w') as f: 
    json.dump(H, f, indent=4) 

x_in = tf.placeholder(tf.float32) 
confs_in = tf.placeholder(tf.float32) 
boxes_in = tf.placeholder(tf.float32) 
q = {} 
enqueue_op = {} 
for phase in ['train', 'test']: 
    dtypes = [tf.float32, tf.float32, tf.float32] 
    grid_size = H['grid_width'] * H['grid_height'] 
    shapes = (
     [H['image_height'], H['image_width'], 3], 
     [grid_size, H['rnn_len'], H['num_classes']], 
     [grid_size, H['rnn_len'], 4], 
     ) 
    q[phase] = tf.FIFOQueue(capacity=30, dtypes=dtypes, shapes=shapes) 
    enqueue_op[phase] = q[phase].enqueue((x_in, confs_in, boxes_in)) 

def make_feed(d): 
    return {x_in: d['image'], confs_in: d['confs'], boxes_in: d['boxes'], 
      learning_rate: H['solver']['learning_rate']} 

def thread_loop(sess, enqueue_op, phase, gen): 
    for d in gen: 
     sess.run(enqueue_op[phase], feed_dict=make_feed(d)) 

(config, loss, accuracy, summary_op, train_op, 
smooth_op, global_step, learning_rate, encoder_net) = build(H, q) 

saver = tf.train.Saver(max_to_keep=None) 
writer = tf.train.SummaryWriter(
    logdir=H['save_dir'], 
    flush_secs=10 
) 

init_op = tf.initialize_all_variables() 


#Assigning the first worker as supervisor 
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), 
         #logdir="/tmp/train_logs", 
         init_op=init_op, 
         summary_op=summary_op, 
         saver=saver, 
         global_step=global_step, 
         save_model_secs=600) 

#Starting training in managed session distributed across the cluster 
# with tf.Session(config=config) as sess: 
with sv.managed_session(server.target) as sess: 
    tf.train.start_queue_runners(sess=sess) 
    for phase in ['train', 'test']: 
     # enqueue once manually to avoid thread start delay 
     gen = train_utils.load_data_gen(H, phase, jitter=H['solver']['use_jitter']) 
     d = gen.next() 
     sess.run(enqueue_op[phase], feed_dict=make_feed(d)) 
     t = tf.train.threading.Thread(target=thread_loop, 
             args=(sess, enqueue_op, phase, gen)) 
     t.daemon = True 
     t.start() 

    tf.set_random_seed(H['solver']['rnd_seed']) 
    # sess.run(tf.initialize_all_variables()) 
    writer.add_graph(sess.graph) 
    weights_str = H['solver']['weights'] 
    if len(weights_str) > 0: 
     print('Restoring from: %s' % weights_str) 
     saver.restore(sess, weights_str) 

    # train model for N iterations 
    start = time.time() 
    max_iter = H['solver'].get('max_iter', FLAGS.iter) 
    for i in xrange(max_iter): 
     display_iter = H['logging']['display_iter'] 
     adjusted_lr = (H['solver']['learning_rate'] * 
         0.5 ** max(0, (i/H['solver']['learning_rate_step']) - 2)) 
     lr_feed = {learning_rate: adjusted_lr} 

     if i % display_iter != 0: 
      # train network 
      batch_loss_train, _ = sess.run([loss['train'], train_op], feed_dict=lr_feed) 
     else: 
      # test network every N iterations; log additional info 
      if i > 0: 
       dt = (time.time() - start)/(H['batch_size'] * display_iter) 
      start = time.time() 
      (train_loss, test_accuracy, summary_str, 
       _, _) = sess.run([loss['train'], accuracy['test'], 
            summary_op, train_op, smooth_op, 
           ], feed_dict=lr_feed) 
      writer.add_summary(summary_str, global_step=global_step.eval(session=sess)) 
      print_str = string.join([ 
       'Step: %d', 
       'lr: %f', 
       'Train Loss: %.2f', 
       'Test Accuracy: %.1f%%', 
       'Time/image (ms): %.1f' 
      ], ', ') 
      print(print_str % 
        (i, adjusted_lr, train_loss, 
        test_accuracy * 100, dt * 1000 if i > 0 else 0)) 

     if global_step.eval(session=sess) % H['logging']['save_iter'] == 0 or global_step.eval(session=sess) == max_iter - 1: 
      saver.save(sess, ckpt_file, global_step=global_step) 
sv.stop() 

トレーニング開始されますが、最後の反復を印刷する前に、私は上司に次のエラーを取得する(労働者:1):

W tensorflow/core/kernels/queue_base.cc:294] _0_fifo_queue: Skipping cancelled enqueue attempt with queue not closed 
W tensorflow/core/kernels/queue_base.cc:294] _1_fifo_queue_1: Skipping cancelled enqueue attempt with queue not closed 
Exception in thread Thread-2: 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner 
    self.run() 
    File "/usr/lib/python2.7/threading.py", line 763, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "distributed-train.py", line 461, in thread_loop 
    sess.run(enqueue_op[phase], feed_dict=make_feed(d)) 
    File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 717, in run 
    run_metadata_ptr) 
    File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 915, in _run 
    feed_dict_string, options, run_metadata) 
    File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 965, in _do_run 
    target_list, options, run_metadata) 
    File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 985, in _do_call 
    raise type(e)(node_def, op, message) 
CancelledError: RunManyGraphs 

*** Error in `python': corrupted double-linked list: 0x00007f9a702b8eb0 *** 
Aborted (core dumped) 

はどのようにこの問題を解決することができますか?

答えて

2

CancelledErrorは比較的害がありません。メインスレッドがwith sv.managed_session() as sess:ブロックを終了し、セッションを終了し、2つのプリフェッチスレッドによって行われた要求を含む、すべての保留中の要求をキャンセルすると思われます。

このエラーが発生しないように、プリフェッチに使用するスレッドを管理するには、tf.train.Coordinator and tf.train.QueueRunnerクラスを使用することをお勧めします。これにより、トレーニング終了時に確実にスレッドをシャットダウンすることができます。 (特に、あなたのアプリケーションには理想的な実験用のFeedingQueueRunnerがあります)。

コアダンプの原因はあまり明確ではなく、セッションクローズまたは分散セッションコードにバグがある可能性があります。このバグについては、(入力データなどに頼らずに)バグを再現する最小限の例を作り、GitHub issueを提出してください。

関連する問題