Producer mode consumer mode

Production, consumer mode is a classic multi-threaded design mode, which provides a good solution for the cooperation between multi-threaded.
There are usually two roles:
Several producer threads, if one consumer thread. The producer thread is responsible for submitting the user's request, and the consumer thread is responsible for handling the specific tasks submitted by the producer. Producers and consumers communicate through shared memory buffers.
The main function of memory buffer in producer consumer mode is to share data among multiple threads. In addition, it can alleviate the poor performance between producers and consumers.

public class Main {

public static void main(String[] args) throws InterruptedException {
    BlockingQueue<PCData> queue = new LinkedBlockingDeque<>();  //Buffer area
    Producer producer1 = new Producer(queue);
    Producer producer2 = new Producer(queue);//Producer
    Producer producer3 = new Producer(queue);
    Consumer consumer1 = new Consumer(queue);
    Consumer consumer2 = new Consumer(queue);//Consumer
    Consumer consumer3 = new Consumer(queue);
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(producer1);
    executorService.execute(producer2);
    executorService.execute(producer3);
    executorService.execute(consumer1);
    executorService.execute(consumer2);
    executorService.execute(consumer3);

    Thread.sleep(10*1000);
    producer1.stop();
    producer2.stop();
    producer3.stop();
    Thread.sleep(3000);
    executorService.shutdown();
}

}
public class Producer implements Runnable{

private volatile boolean isRunning = true;
private BlockingQueue<PCData> queue;
private static AtomicInteger count = new AtomicInteger();
private static final int SLEEEPTIME =1000;

public Producer(BlockingQueue<PCData> queue) {
    this.queue = queue;
}

@Override
public void run() {
    PCData data = null;
    Random random = new Random();
    System.out.println("start producer name"+Thread.currentThread().getName());
    try{
        while (isRunning){
            Thread.sleep(random.nextInt(SLEEEPTIME));
            data = new PCData(count.incrementAndGet());
            System.out.println(data+"is put into queue");
            if(!queue.offer(data,2,TimeUnit.SECONDS)){
                System.err.println("failed to put data"+data);
            }
        }
    }catch (Exception e){
        e.printStackTrace();
        Thread.currentThread().interrupt();
    }
}

public void stop(){
    isRunning=false;
}

}
public class Consumer implements Runnable {

private BlockingQueue<PCData> queue;
private static final int SLEEPTIME = 1000;
public Consumer(BlockingQueue<PCData> queue) {
    this.queue = queue;
}

@Override
public void run() {
    System.out.println("start Consumer id"+Thread.currentThread().getName());
    Random random = new Random();
    try{
        while(true){
            PCData pcData = queue.take();
            if(pcData!=null){
                int re = pcData.getData()*pcData.getData();
                System.out.println(MessageFormat.format("{0}*{1}={2}",pcData.getData(),pcData.getData(),re));
                Thread.sleep(random.nextInt(SLEEPTIME));
            }
        }
    }catch (Exception e){
        e.printStackTrace();
        Thread.currentThread().interrupt();
    }
}

}
public class PCData {

private final int intData;

public PCData(int intData) {
    this.intData = intData;
}
public PCData(String data) {
    this.intData = Integer.valueOf(data);
}
public int getData(){
    return intData;
}

@Override
public String toString() {
    return "PCData{" +
            "intData=" + intData +
            '}';
}

}

Tags: Java

Posted on Sat, 02 Nov 2019 16:30:45 -0700 by jkatcherny