Cześć
Próbuje zaimplementować prostą relacje gdzie Producer tworzy wartość int a Consumer je pobiera i wszystko ma działać współbieżnie.
Program niby wykonuje się poprawnie natomiast nie wiem jak go zakończyć. Gdy producer wyprodukuje już wszystkie wartość i kończy swoją metode run() consumer nadal oczekuje na nowe wartości. Ktoś byłby miły pomóc to poprawnie zaimplementować i ew. wskazać jakie błędy popęłniłem?
class Buffor {
private int[] buff;
private int readIdx = 0;
private int writeIdx = 0;
private int totalCap = 0;
{
buff = new int[5];
Arrays.fill(buff,-1);
}
public void push(int val) {
try {
synchronized(this) {
while(totalCap == buff.length) {
System.out.println("przepelnienie - czekanie");
this.wait();
}
buff[writeIdx % buff.length] = val;
writeIdx++;
totalCap++;
this.notify();
}
}
catch(InterruptedException e) {
System.out.println("przerwanie [push]: " + e);
}
}
public int pop() {
try {
synchronized(this) { //proba pozyskania blokady monitora obiektu
while(totalCap == 0) {
System.out.println("pusto - czekanie");
this.wait();
}
int val = buff[readIdx % buff.length];
buff[readIdx % buff.length] = -1;
readIdx++;
totalCap--;
this.notify();
return val;
}
}
catch(InterruptedException e) {
System.out.println("przerwanie [push]: " + e);
}
return -666;
}
public synchronized void show() {
System.out.println(Arrays.toString(buff));
}
}
class Producer implements Runnable {
int[] products;
private Buffor buff;
private final int numOfProd = 10;
private int produceCounter = 0;
public Producer(Buffor buff) {
this.buff = buff;
SecureRandom rand = new SecureRandom();
products = (int[])IntStream.iterate(0, x->x+1).limit(numOfProd).toArray();
System.out.println("products: " + Arrays.toString(products));
}
@Override public void run() {
for(int i=0; i<numOfProd; ++i) {
buff.push(products[i]);
produceCounter++;
buff.show();
}
System.out.println("Koniec producer!!!!!!!");
}
}
class Consumer implements Runnable {
private Buffor buff;
public Consumer(Buffor buff) {
this.buff = buff;
}
@Override public void run() {
while(!Thread.interrupted()) {
int val = buff.pop();
System.out.println("consume val:" + val);
buff.show();
}
}
}
public class ProducentConsumentNotifyWait {
public static void main(String[] args) throws Exception {
Buffor buff = new Buffor();
Producer prod = new Producer(buff);
Consumer con = new Consumer(buff);
ExecutorService exec = Executors.newFixedThreadPool(2);
Future<?> f1 = exec.submit(prod);
Future<?> f2 = exec.submit(con);
if(f1.isDone()) {
System.out.println("Robota zrobiona");
f2.cancel(true);
}
}
}