Thursday, February 16, 2012

Producer Consumer Design Pattern with Blocking Queue Example in Java

Producer Consumer Design pattern is a classic concurrency or threading pattern which reduces coupling between
Producer and Consumer by separating Identification of work with Execution of Work. In producer consumer design pattern a shared queue is used to control the flow and this separation allows you to code producer and consumer separately. It also addresses the issue of different timing require to produce item or consuming item. by using producer consumer pattern both Producer and Consumer Thread can work with different speed. In this article we will see What is producer consumer problem which is very popular multi-threading interview question, How to solve producer consumer problem using Blocking Queue and Benefits of using Producer Consumer design pattern.

 

Real World Example of Producer Consumer Design Pattern

Producer Consumer design pattern BlockingQueue example JavaProducer consumer pattern is every where in real life and depict coordination and collaboration. Like one person is preparing food (Producer) while other one is serving food (Consumer), both will use shared table for putting food plates and taking food plates. Producer which is the person preparing food will wait if table is full and Consumer (Person who is serving food) will wait if table is empty. table is a shared object here. On Java library Executor framework itself implement Producer Consumer design pattern be separating responsibility of addition and execution of task.
 

Benefit of Producer Consumer Pattern

Its indeed a useful design pattern and used most commonly while writing multi-threaded or concurrent code. here
is few of its benefit:

1) Producer Consumer Pattern simple development. you can Code Producer and Consumer independently and Concurrently, they just need to know shared object.

2) Producer doesn't need to know about who is consumer or how many consumers are there. Same is true with Consumer.

3) Producer and Consumer can work with different speed. There is no risk of Consumer consuming half-baked item.
In fact by monitoring consumer speed one can introduce more consumer for better utilization.

4) Separating producer and Consumer functionality result in more clean, readable and manageable code.
 

Producer Consumer Problem in Multi-threading

Producer-Consumer Problem is also a popular java interview question where interviewer ask to implement producer consumer design pattern so that Producer should wait if Queue or bucket is full and Consumer should wait if queue or
bucket is empty. This problem can be implemented or solved by different ways in Java, classical way is using wait and notify method to communicate between Producer and Consumer thread and blocking each of them on individual condition like full queue and empty queue. With introduction of BlockingQueue Data Structure in Java 5 Its now much simpler because BlockingQueue provides this control implicitly by introducing blocking methods put() and take(). Now you don't require to use wait and notify to communicate between Producer and Consumer. BlockingQueue put() method will block if Queue is full in case of Bounded Queue and take() will block if Queue is empty. In next section we will see a code example of Producer Consumer design pattern.
 

Using Blocking Queue to implement Producer Consumer Pattern

BlockingQueue amazingly simplifies implementation of Producer-Consumer design pattern by providing outofbox support of blocking on put() and take(). Developer doesn't need to write confusing and critical piece of wait-notify code to implement communication. BlockingQuue is an interface and Java 5 provides different implantation like ArrayBlockingQueue and LinkedBlockingQueue , both implement FIFO order or elements, while ArrayLinkedQueue is bounded in nature LinkedBlockingQueue is optionally bounded. here is a complete code example of Producer Consumer pattern with BlockingQueue. Compare it with classic wait notify code, its much simpler and easy to understand.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ProducerConsumerPattern {

    public static void main(String args[]){
  
     //Creating shared object
     BlockingQueue sharedQueue = new LinkedBlockingQueue();
 
     //Creating Producer and Consumer Thread
     Thread prodThread = new Thread(new Producer(sharedQueue));
     Thread consThread = new Thread(new Consumer(sharedQueue));

     //Starting producer and Consumer thread
     prodThread.start();
     consThread.start();
    }
 
}

//Producer Class in java
class Producer implements Runnable {

    private final BlockingQueue sharedQueue;

    public Producer(BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=0; i<10; i++){
            try {
                System.out.println("Produced: " + i);
                sharedQueue.put(i);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

}

//Consumer Class in Java
class Consumer implements Runnable{

    private final BlockingQueue sharedQueue;

    public Consumer (BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }
  
    @Override
    public void run() {
        while(true){
            try {
                System.out.println("Consumed: "+ sharedQueue.take());
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
  
  
}

Output:
Produced: 0
Produced: 1
Consumed: 0
Produced: 2
Consumed: 1
Produced: 3
Consumed: 2
Produced: 4
Consumed: 3
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Consumed: 6
Produced: 8
Consumed: 7
Produced: 9
Consumed: 8
Consumed: 9

You see Producer Thread  produced number and Consumer thread consumes it in FIFO order because blocking queue allows elements to be accessed in FIFO.

That’s all on How to use Blocking Queue to solve Producer Consumer problem or example of Producer consumer design pattern. I am sure its much better than wait notify example but be prepare with both if you are going for any Java Interview as Interview may ask you both way.

Other Java threading tutorial you may like:

8 comments :

Anonymous said...

Recently I got this question in interview with different scenario. How to resolve the producer and consumer problem so that my CPU cycle can be used to 100%. For ex if producer is producing less and consumer is consuming fast then your CPU cycle is getting wasted which is associated with cost. So what would be strategy to resolve this. Any suggestion?

Mudit Srivastava said...

Well i see something is missing, how to define the size of queue ?

SARAL SAXENA said...

Hi Javin, ..gr8 article few things that I want to add in this is...

BlockingQueue Code Example

Here is an example of how to use a BlockingQueue. The example uses the ArrayBlockingQueue implementation of the BlockingQueue interface.

First, the BlockingQueueExample class which starts a Producer and a Consumer in separate threads. The Producer inserts strings into a shared BlockingQueue, and the Consumer takes them out.

public class BlockingQueueExample {

public static void main(String[] args) throws Exception {

BlockingQueue queue = new ArrayBlockingQueue(1024);

Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);

new Thread(producer).start();
new Thread(consumer).start();

Thread.sleep(4000);
}
}
Here is the Producer class. Notice how it sleeps a second between each put() call. This will cause the Consumer to block, while waiting for objects in the queue.

public class Producer implements Runnable{

protected BlockingQueue queue = null;

public Producer(BlockingQueue queue) {
this.queue = queue;
}

public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Here is the Consumer class. It just takes out the objects from the queue, and prints them to System.out.

public class Consumer implements Runnable{

protected BlockingQueue queue = null;

public Consumer(BlockingQueue queue) {
this.queue = queue;
}

public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

yyc said...

The above example is good but too simple. Would you minding providing a example with multiple producers and consumers and stopping the program once producers have done their jobs?

Anonymous said...

I tried out following implementation of Producer-Consumer and it doesn't list out all the elements from the list. Few of them are missing.

I wanted to have one producer & multiple consumers and have the consumers take data out of the list in round robin fashion, but not working as expected. Would you be able to point me out the error in the code? Thanks.

public class ProdConController {
public static void main(String[] args) throws Exception {
List stringList = new ArrayList();
SignalObject signalObject = new SignalObject();
ProdWorker prodWorker = new ProdWorker(stringList, signalObject);
ConWorker conWorker1 = new ConWorker(stringList, signalObject, "worker1");
ConWorker conWorker2 = new ConWorker(stringList, signalObject, "worker2");
Thread prodThrd = new Thread(prodWorker);
Thread conThrd1 = new Thread(conWorker1);
Thread conThrd2 = new Thread(conWorker2);
prodThrd.start();
conThrd1.start();
conThrd2.start();

prodThrd.join();
conThrd1.join();
conThrd2.join();
}
}

public class SignalObject {
private boolean isDataAvailable = false;

public synchronized boolean isDataAvailable() {
return isDataAvailable;
}

public synchronized void setDataAvailable(boolean dataAvailable) {
isDataAvailable = dataAvailable;
}
}

public class ProdWorker implements Runnable {
private List stringList;
private SignalObject signalObject;

public ProdWorker(List stringList, SignalObject signalObject) {
this.stringList = stringList;
this.signalObject = signalObject;
}

@Override
public void run() {
for (int i = 1; i <= 10; i++) {
System.out.println("Adding " + i + " to queue");
stringList.add(String.valueOf(i));
}

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

synchronized (signalObject) {
signalObject.setDataAvailable(true);
signalObject.notifyAll();
}
}
}

public class ConWorker implements Runnable {
private final List stringList;
private String name;
private SignalObject signalObject;

public ConWorker(List stringList, SignalObject signalObject, String name) {
this.stringList = stringList;
this.name = name;
this.signalObject = signalObject;
}

@Override
public void run() {
while (!signalObject.isDataAvailable()) {
try {
synchronized (signalObject) {
signalObject.wait();
}
} catch (InterruptedException ex) {
System.out.println("Received interrupt");
ex.printStackTrace();
}
}

synchronized (stringList) {
for (int i = 0; i < stringList.size(); i++) {
System.out.println("Received:" + stringList.get(i) + " by worker:" + name);
stringList.remove(i);
// if (i % 2 == 0 && this.name.equals("worker1")) {
// System.out.println("Received:" + stringList.get(i) + " by worker:" + name);
// }
// else {
// System.out.println("Received:" + stringList.get(i) + " by worker:" + name);
// }
}
}

System.out.println("Finished consuming all data");
}
}

Stanislav Lorents said...

Recently I got this question in interview with different scenario. How to resolve the producer and consumer problem so that my CPU cycle can be used to 100%. For ex if producer is producing less and consumer is consuming fast then your CPU cycle is getting wasted which is associated with cost. So what would be strategy to resolve this. Any suggestion?
====================
(Count of Producers == Count of Consumers ) >= max CPU threads (cores / hyper threads) While consumers are waiting, producers will allocate all CPU resources and vice versa. Also the queue size >= max CPU threads.

Anonymous said...

Can you please share solution of Producer Consumer problem using Semaphore? I know it can be solved using multiple way including BlockingQueue, wait and notify as shown above, but I am really interested in using Semaphore. Thanks

sathish said...

package Thread;

import java.util.concurrent.Semaphore;

class SharedResource
{
int n;
static Semaphore semCons = new Semaphore(0);
static Semaphore semProd = new Semaphore(1);

void put(int n)
{
try {
semProd.acquire();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
this.n=n;
System.out.println("Put : " + n);
semCons.release();
}

void get()
{
try {
semCons.acquire();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Got : "+n);
semProd.release();
}
}

class Consumer implements Runnable
{
SharedResource sr;
public Consumer(SharedResource sr)
{
this.sr = sr;
new Thread(this, "Consumer").start();
}
@Override
public void run()
{
for(int i=0;i<10;i++)
{
sr.get();
}
}
}

class Producer implements Runnable
{
SharedResource sr;
public Producer(SharedResource sr)
{
this.sr= sr;
new Thread(this,"Producer").start();
}

@Override
public void run()
{
for(int i=0;i<10;i++)
{
sr.put(i);
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
SharedResource sr = new SharedResource();

new Consumer(sr);
new Producer(sr);

}
}

Post a Comment