Threads eines Pools aufeinander warten lassen
-
Ich habe einen Threadpool, der mehrere Worker startet, die parallel ihre Operationen ausführen. Zu Testzwecken möchte ich nun erreichen, dass immer nur einer der Worker gleichzeitig aktiv ist, während die anderen warten.
Wenn der aktive Worker eine Operation beendet hat, soll er den nächsten benachrichtigen, dass dieser nun eine Operation durchführen darf.Das Benachrichtigen habe ich mit einer statischen Integervariable gelöst, in der der aktive Worker jeweils die Nummer des nächsten speichert. Das Problem: Ich schaffe es nicht, die Worker zu pausieren, da wait() zu einer IllegalMonitorStateException führt. Hat jemand eine Idee, wie man das machen könnte?
-
von wo aus rufst du denn wait auf? Lass den Thread den du pausieren möchtest selbst wait aufrufen das sollte funktionieren.
-
Hmm ich steh da ein bisschen auf dem Schlauch... um das ganze zu vereinfachen, hab ich schnell ein Primzahlberechnungsprogramm geschrieben, dessen Threadsystem exakt dem meines anderen Programmes entspricht.
[edit: inzwischen funktioniert es, wie es soll. Nur ist die Struktur des Programmes arg kompliziert geworden. Hat vielleicht jemand eine Idee, wie man das vereinfachen könnte? hier der Programmtext:]
// PrimeSimulator.java package prime; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class PrimeSimulator extends Thread { private ExecutorService service; public static final int THREADS = 4, START = 500000001, STOP = 500000200; public boolean[] done; public boolean[] stepDone; private boolean cDone; public PrimeSimulator() { service = Executors.newCachedThreadPool(); done = new boolean[PrimeSimulator.THREADS]; stepDone = new boolean[PrimeSimulator.THREADS]; } public static void main (String args[]) { PrimeSimulator sim = new PrimeSimulator(); System.out.println("start"); sim.cDone = false; for (int ti = 0; ti < PrimeSimulator.THREADS; ++ ti) { sim.done[ti] = false; sim.stepDone[ti] = false; } sim.stepDone[PrimeSimulator.THREADS - 1] = true; // so that first one can start synchronized (sim) { for (int ti = 0; ti < PrimeSimulator.THREADS; ++ ti) sim.service.execute(new CalcRunnable(ti, sim)); while (!sim.cDone) { try { sim.wait(); sim.cDone = true; for (int ti = 0; ti < PrimeSimulator.THREADS; ++ ti) if (!sim.done[ti]) sim.cDone = false; } catch (InterruptedException e) { e.printStackTrace(); } } } // end sync System.out.println("stop"); System.exit(0); } } // CalcRunnable.java package prime; class CalcRunnable implements Runnable { private PrimeSimulator sim; private int number; public CalcRunnable(int number, PrimeSimulator sim) { this.number = number; this.sim = sim; } public void run() { System.out.println(Thread.currentThread().getName() + " running..."); try { for (int i = PrimeSimulator.START + number * 2; i < PrimeSimulator.STOP; i += PrimeSimulator.THREADS * 2) { try { int lastNumber = number - 1; // number of previously active thread if (lastNumber < 0) lastNumber = PrimeSimulator.THREADS - 1; synchronized (sim) { while (!sim.stepDone[lastNumber]) { // waiting until previous thread finished step try { sim.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } // has waited until previous thread finished step and starts now if (isPrime(i)) System.out.println(Thread.currentThread().getName() + " -> " + i + " is prime"); sim.stepDone[number] = true; sim.stepDone[lastNumber] = false; } finally { synchronized (sim) { // only one step finished, changing to next thread sim.notifyAll(); } } } // end for sim.done[number] = true; } // end try finally { synchronized (sim) { System.out.println(Thread.currentThread().getName() + " finished completely"); sim.notifyAll(); } } } // end run public static boolean isPrime(long number){ int bPrime = 0; for (long i = 1; i < Math.sqrt(number); ++ i) if (number % i == 0) ++ bPrime; return (bPrime <= 1); } }
Die Primzahlen werden wie gewünscht in aufsteigender Reihenfolge berechnet.
-
Ist es egal, welcher Thread als nächster dran kommt? Ich geh jetzt mal davon aus, dass dies nicht der Fall ist. Falls doch, würde ich einfach eine binäre Semaphore benutzen. In dem Fall, dass die Reihenfolge von Bedeutung ist, würde ich das so implementieren:
import java.util.concurrent.Semaphore; public class Warehouse { public static Semaphore[] sem; public static void init() { // Semaphoren initialisieren sem = new Semaphore[4]; sem[0] = new Semaphore(1); for(int i = 1; i < sem.length; i++) { sem[i] = new Semaphore(0); } } } public class Runner extends Warehouse implements Runnable { private int id; public Runner(int id) { this.id = id; } public void run() { for(;;) { // auf Vorgaenger warten sem[(id + 1) % sem.length].acquireUninterruptibly(); // tue irgendwas System.out.println("Thread " + id + " ist dran ..."); // Semaphore fuer Nachfolger freigeben sem[id].release(); } } } public class ConcurrentTest { public static void main(String[] args) { Warehouse.init(); Thread T0 = new Thread(new Runner(0)); Thread T1 = new Thread(new Runner(1)); Thread T2 = new Thread(new Runner(2)); Thread T3 = new Thread(new Runner(3)); // Threads starten T0.start(); T1.start(); T2.start(); T3.start(); } }
p.s. Ich bin ein Freund von Semaphoren, weil mir das Monitorkonzept von Java in vielen Fällen zu schwerfällig ist. Die Funktionsweise lässt sich natürlich auch mit Monitoren implementieren.
-
Danke, sieht interessant aus
Aber was genau macht diese Zeile?
sem[(id + 1) % sem.length].acquireUninterruptibly();
id + 1 ist ja nicht der Vorgänger, sondern der Nachfolger... sagt der Thread dem Nachfolger so, dass er warten soll?
-
Du hast Recht, dass es sich nicht um den Nachfolger, sondern um den Vorgänger handelt. Du Modulo-Funktion in Java ist leider so gestrickt, dass mir eine Nachfolger-Variante nicht spontan eingefallen ist. An sich sind alle Semaphoren so gesetzt, dass sie nicht freigegeben sind und jeder Thread erstmal auf die Freigabe der Semaphore warten muss. Nur eine einzige Semaphore ist freigegeben und kann von einem Thread passiert werden. Dieser gibt die Semaphore des Vorgängers frei und so weiter und so fort.