2012-04-18 17 views
0

スレッドでJavaでBully Algorithmを実装しようとしています。
ここに私が書いたコードがあります。私のコードでnotifyAll()メソッドが動作していません

package newbully; 

public class NewBully { 

    public static void main(String[] args) { 
     int total_processes = 4; 
     Thread1[] t = new Thread1[total_processes]; 
     for (int i = 0; i < total_processes; i++) { 
      t[i] = new Thread1(new Process(i+1, i+1), total_processes); 
     } 
     try { 
      Election.initialElection(t); 
     } catch (Exception e) { 
      System.out.println("Possibly you are using null references in array"); 
     } 
     for (int i = 0; i < total_processes; i++) { 
      new Thread(t[i]).start(); 
     } 
    } 
} 

package newbully; 

public class Election { 

    private static boolean pingFlag = false; 
    private static boolean electionFlag = false; 
    private static boolean messageFlag = false; 

    public static boolean isMessageFlag() { 
     return messageFlag; 
    } 

    public static void setMessageFlag(boolean messageFlag) { 
     Election.messageFlag = messageFlag; 
    } 

    public static boolean isPingFlag() { 
     return pingFlag; 
    } 

    public static void setPingFlag(boolean pingFlag) { 
     Election.pingFlag = pingFlag; 
    } 

    public static boolean isElectionFlag() { 
     return electionFlag; 
    } 

    public static void setElectionFlag(boolean electionFlag) { 
     Election.electionFlag = electionFlag; 
    } 

    public static void initialElection(Thread1[] t) { 
     Process temp = new Process(-1, -1); 
     for (int i = 0; i < t.length; i++) { 
      if (temp.getPriority() < t[i].getProcess().getPriority()) { 
       temp = t[i].getProcess(); 
      } 
     } 
     t[temp.pid - 1].getProcess().CoOrdinatorFlag = true; 
    } 
} 

package newbully; 

public class Process { 

    int pid; 
    boolean downflag,CoOrdinatorFlag; 

    public boolean isCoOrdinatorFlag() { 
     return CoOrdinatorFlag; 
    } 

    public void setCoOrdinatorFlag(boolean isCoOrdinator) { 
     this.CoOrdinatorFlag = isCoOrdinator; 
    } 
    int priority; 

    public boolean isDownflag() { 
     return downflag; 
    } 

    public void setDownflag(boolean downflag) { 
     this.downflag = downflag; 
    } 

    public int getPid() { 
     return pid; 
    } 

    public void setPid(int pid) { 
     this.pid = pid; 
    } 

    public int getPriority() { 
     return priority; 
    } 

    public void setPriority(int priority) { 
     this.priority = priority; 
    } 

    public Process() { 
    } 

    public Process(int pid, int priority) { 
     this.pid = pid; 
     this.downflag = false; 
     this.priority = priority; 
     this.CoOrdinatorFlag = false; 
    } 
} 

package newbully; 

import java.util.*; 
import java.io.*; 
import java.net.*; 

public class Thread1 implements Runnable { 

    private Process process; 
    private int total_processes; 
    ServerSocket[] sock; 
    Random r; 

    public Process getProcess() { 
     return process; 
    } 

    public void setProcess(Process process) { 
     this.process = process; 
    } 

    public Thread1(Process process, int total_processes) { 
     this.process = process; 
     this.total_processes = total_processes; 
     this.r = new Random(); 
     this.sock = new ServerSocket[total_processes]; 
    } 

    private void recovery() { 
    } 

    synchronized private void pingCoOrdinator() { 
     try { 
      if (Election.isPingFlag()) { 
       wait(); 
      } 
      if (!Election.isElectionFlag()) { 
       Election.setPingFlag(true); 
       System.out.println("Process[" + this.process.getPid() + "]: Are you alive?"); 
       Socket outgoing = new Socket(InetAddress.getLocalHost(), 12345); 
       outgoing.close(); 
       Election.setPingFlag(false); 
       notifyAll(); 
      } 
     } catch (Exception ex) { 
      //Initiate Election 
      System.out.println("process[" + this.process.getPid() + "]: -> Co-Ordinator is down\nInitiating Election"); 
      Election.setElectionFlag(true); 
      Election.setPingFlag(false); 
      notifyAll(); 
     } 
    } 

    synchronized private void executeJob() { 
     int temp = r.nextInt(20); 
     for (int i = 0; i <= temp; i++) { 
      try { 
       Thread.sleep(100); 
      } catch (InterruptedException e) { 
       System.out.println("Error Executing Thread:" + process.getPid()); 
       System.out.println(e.getMessage()); 
      } 
     } 
    } 

    synchronized private boolean sendMessage() { 
     boolean response = false; 
     int i = 0; 
     try { 
      if (Election.isMessageFlag()) { 
       wait(); 
      } 
      Election.setMessageFlag(true); 

      for (i = this.process.getPid() + 1; i <= this.total_processes; i++) { 
       try { 
        Socket electionMessage = new Socket(InetAddress.getLocalHost(), 10000 + i); 
        System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "] responded to election message successfully"); 
        electionMessage.close(); 
        response = true; 
       } catch (Exception ex) { 
        System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "] did not respond to election message"); 
       } 
      } 
      Election.setMessageFlag(false); 
      notifyAll(); 
     } catch (Exception ex1) { 
      System.out.println(ex1.getMessage()); 
     } 

     return response; 
    } 

    synchronized private void serve() { 
     try { 
      //service counter 
      ServerSocket s = new ServerSocket(12345); 
      for (int counter = 0; counter <= 10; counter++) { 
       Socket incoming = s.accept(); 
       System.out.println("Process[" + this.process.getPid() + "]:Yes"); 
       Scanner scan = new Scanner(incoming.getInputStream()); 
       PrintWriter out = new PrintWriter(incoming.getOutputStream(), true); 
       if (scan.hasNextLine()) { 
        if (scan.nextLine().equals("Who is the co-ordinator?")) { 
         System.out.print("Process[" + this.process.getPid() + "]:"); 
         out.println(this.process); 
        } 
       } 
       if (counter == 10) {//after serving 10 requests go down 
        this.process.setCoOrdinatorFlag(false); 
        this.process.setDownflag(true); 
        try { 
         incoming.close(); 
         s.close(); 
         sock[this.process.getPid() - 1].close(); 
         Thread.sleep((this.r.nextInt(10) + 1) * 50000);//going down 
         recovery(); 
        } catch (InterruptedException e) { 
         System.out.println(e.getMessage()); 
        } 
       } 
      } 
     } catch (IOException ex) { 
      System.out.println(ex.getMessage()); 
     } 
    } 

    @Override 
    public void run() { 
     try { 
      sock[this.process.getPid() - 1] = new ServerSocket(10000 + this.process.getPid()); 
     } catch (IOException ex) { 
      System.out.println(ex.getMessage()); 
     } 
     while (true) { 
      if (process.isCoOrdinatorFlag()) { 
       //serve other processes 
       serve(); 
      } else { 
       while (true) { 
        //Execute some task 
        executeJob(); 

        //Ping the co-ordinator 
        pingCoOrdinator(); 

        if (Election.isElectionFlag()) { 
         if (!sendMessage()) {//elect self as co-ordinator 
          System.out.println("New Co-Ordinator: Process[" + this.process.getPid() + "]"); 
          this.process.setCoOrdinatorFlag(true); 
          Election.setElectionFlag(false); 
          break; 
         } 
        } 
       } 
      } 
     } 
    } 
} 

私が作成した4つのスレッドの中からコードを実行しようとしているときに、スレッドがpre(wait)コールを使用して待機しています。 notifyAll()によって通知されていません。誰もがなぜこれが起こっていることを提案することができますか?

答えて

6

各スレッドは、(それ自身Thread1インスタンスに)自体にwait()を呼び出しています。つまり、同じThread1インスタンスでnotifyAll()を呼び出すと、待機中の単一のThread1のみが通知され、他のすべてのスレッドではありません。

すべてのThread1オブジェクトは、1つの共通オブジェクト上でwait()を呼び出し、同じオブジェクトに対してnotifyAll()を呼び出します。

もちろん、wait()またはnotifyAll()を呼び出すときに共通のオブジェクトで同期する必要があります。もしあなたがそうしなければ、あなたはIllegalMonitorStateExceptionを得ます。

// Object to be used as a lock; pass this to all Thread1 instances 
Object lock = new Object(); 

// Somewhere else in your code 
synchronized (lock) { 
    lock.wait(); 
} 

// Where you want to notify 
synchronized (lock) { 
    lock.notifyAll(); 
} 
+0

ありがとう! 本当に役に立ちました –

2

(またはnotifyAll())とwait()は、同じモニターのブロック​​に書き込む必要があります。例えば

synchronized(myLock) { 
    wait(); 
} 

.................. 

synchronized(myLock) { 
    notifyAll(); 
} 
+0

は、myLockブール変数ですか? –

関連する問題