In the context of multiple producers and consumers, the same times of producing and consuming occur alternately. For example, four times of push, then four times of pop, then four times of push again, etc. It is evident that the max length minuses the min length of the queue is value of the same times.
1 Producer.java
package thread; import java.util.Random; import entity.Queue; public class Producer extends Thread { private Queue queue; public Producer(int id, Queue queue) { this.queue = queue; this.setName("producer-" + id); } @Override public void run() { while (true) { Random r = new Random(); int value = r.nextInt(10) + 100; queue.push(value); try { Thread.sleep(r.nextInt(3)*1000); }catch(Exception e){} } } }
2 Consumer.java
package thread; import java.util.Random; import entity.Queue; public class Consumer extends Thread { private Queue queue; public Consumer(int id, Queue queue) { this.queue = queue; this.setName("consumer-" + id); } @Override public void run() { while (true) { queue.pop(); Random r = new Random(); try { Thread.sleep(r.nextInt(3)*1000); }catch(Exception e){} } } }
3 Queue.java
package entity; import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Queue { private int maxLen; private int minLen; private boolean pushOp = true; private Listlist = new ArrayList (); private Lock lock = new ReentrantLock(); private Condition proCondition = lock.newCondition(); private Condition conCondition = lock.newCondition(); public Queue(int maxLen, int minLen) { this.maxLen = maxLen; this.minLen = minLen; } public void push(Integer value) { try { lock.lock(); while (list.size() == maxLen) { proCondition.await(); } if (pushOp) { list.add(value); System.out.println(Thread.currentThread().getName() + " push=" + value + ", List=" + list.toString()); if (list.size() == maxLen) { pushOp = false; conCondition.signalAll(); } } } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } public Integer pop() { Integer value = null; try { lock.lock(); while (list.size() == minLen) { conCondition.await(); } if (!pushOp) { value = list.get(0); list.remove(0); System.out.println(Thread.currentThread().getName() + " pop=" + value + ", List=" + list.toString()); if (list.size()==minLen) { pushOp = true; proCondition.signalAll(); } } } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } return value; } }
4 Main.java
package app; import entity.Queue; import thread.Consumer; import thread.Producer; public class Main { public static void main(String[] args) throws InterruptedException{ Queue myQueue = new Queue(5, 1); // set the max & min length of the queue for (int i=0;i<10;i++) { // Fewer threads, such as 3, is ok too. new Producer(i+1, myQueue).start(); new Consumer(i+1, myQueue).start(); } } }
5 snapshot of running