java生产者消费者代码(java实现生产者消费者问题)

java生产者消费者代码(java实现生产者消费者问题)

一、问题描述

生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。生产者生成一定量的数据放到缓冲区中,然后重复此过程;与此同时,消费者也在缓冲区消耗这些数据。生产者和消费者之间必须保持同步,要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。不够完善的解决方法容易出现死锁的情况,此时进程都在等待唤醒。

示意图:

二、解决方法思路

1.采用某种机制保护生产者和消费者之间的同步。有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。

2.在生产者和消费者之间建立一个管道。管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。

解决问题的核心

保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。

Java能实现的几种方法1.wait() / notify()方法2.await() / signal()方法3.BlockingQueue阻塞队列方法4.信号量5.管道三、代码实现1. wait() / notify()方法

当缓冲区已满时,生产者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行;当缓冲区已空时,消费者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行。

当生产者向缓冲区放入一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态;当消费者从缓冲区取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

仓库Storage.java

importjava.util.LinkedList;publicclassStorage{//仓库容量privatefinalintMAX_SIZE=10;//仓库存储的载体privateLinkedList<Object>list=newLinkedList<>();publicvoidproduce(){synchronized(list){while(list.size() 1>MAX_SIZE){System.out.println(“【生产者” Thread.currentThread().getName() “】仓库已满”);try{list.wait();}catch(InterruptedExceptione){e.printStackTrace();}}list.add(newObject());System.out.println(“【生产者” Thread.currentThread().getName() “】生产一个产品,现库存” list.size());list.notifyAll();}}publicvoidconsume(){synchronized(list){while(list.size()==0){System.out.println(“【消费者” Thread.currentThread().getName() “】仓库为空”);try{list.wait();}catch(InterruptedExceptione){e.printStackTrace();}}list.remove();System.out.println(“【消费者” Thread.currentThread().getName() “】消费一个产品,现库存” list.size());list.notifyAll();}}}

生产者

publicclassProducerimplementsRunnable{privateStoragestorage;publicProducer(){}publicProducer(Storagestorage){this.storage=storage;}@Overridepublicvoidrun(){while(true){try{Thread.sleep(1000);storage.produce();}catch(InterruptedExceptione){e.printStackTrace();}}}}

消费者

publicclassConsumerimplementsRunnable{privateStoragestorage;publicConsumer(){}publicConsumer(Storagestorage){this.storage=storage;}@Overridepublicvoidrun(){while(true){try{Thread.sleep(3000);storage.consume();}catch(InterruptedExceptione){e.printStackTrace();}}}}

主函数

publicclassMain{publicstaticvoidmain(String[]args){Storagestorage=newStorage();Threadp1=newThread(newProducer(storage));Threadp2=newThread(newProducer(storage));Threadp3=newThread(newProducer(storage));Threadc1=newThread(newConsumer(storage));Threadc2=newThread(newConsumer(storage));Threadc3=newThread(newConsumer(storage));p1.start();p2.start();p3.start();c1.start();c2.start();c3.start();}}

运行结果

【生产者p1】生产一个产品,现库存1【生产者p2】生产一个产品,现库存2【生产者p3】生产一个产品,现库存3【生产者p1】生产一个产品,现库存4【生产者p2】生产一个产品,现库存5【生产者p3】生产一个产品,现库存6【生产者p1】生产一个产品,现库存7【生产者p2】生产一个产品,现库存8【消费者c1】消费一个产品,现库存7【生产者p3】生产一个产品,现库存8【消费者c2】消费一个产品,现库存7【消费者c3】消费一个产品,现库存6【生产者p1】生产一个产品,现库存7【生产者p2】生产一个产品,现库存8【生产者p3】生产一个产品,现库存9【生产者p1】生产一个产品,现库存10【生产者p2】仓库已满【生产者p3】仓库已满【生产者p1】仓库已满【消费者c1】消费一个产品,现库存9【生产者p1】生产一个产品,现库存10【生产者p3】仓库已满。。。。。。以下省略

一个生产者线程运行produce方法,睡眠1s;一个消费者运行一次consume方法,睡眠3s。此次实验过程中,有3个生产者和3个消费者,也就是我们说的多对多的情况。仓库的容量为10,可以看出消费的速度明显慢于生产的速度,符合设定。

注意:

notifyAll()方法可使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。此时,优先级最高的哪个线程最先执行,但也有可能是随机执行的,这要取决于JVM虚拟机的实现。即最终也只有一个线程能被运行,上述线程优先级都相同,每次运行的线程都不确定是哪个,后来给线程设置优先级后也跟预期不一样,还是要看JVM的具体实现吧。

2. await() / signal()方法

在JDK5中,用ReentrantLock和Condition可以实现等待/通知模型,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

在这里只需改动Storage类

importjava.util.LinkedList;importjava.util.concurrent.locks.Condition;importjava.util.concurrent.locks.Lock;importjava.util.concurrent.locks.ReentrantLock;publicclassStorage{//仓库最大存储量privatefinalintMAX_SIZE=10;//仓库存储的载体privateLinkedList<Object>list=newLinkedList<Object>();//锁privatefinalLocklock=newReentrantLock();//仓库满的条件变量privatefinalConditionfull=lock.newCondition();//仓库空的条件变量privatefinalConditionempty=lock.newCondition();publicvoidproduce(){//获得锁lock.lock();while(list.size() 1>MAX_SIZE){System.out.println(“【生产者” Thread.currentThread().getName() “】仓库已满”);try{full.await();}catch(InterruptedExceptione){e.printStackTrace();}}list.add(newObject());System.out.println(“【生产者” Thread.currentThread().getName() “】生产一个产品,现库存” list.size());empty.signalAll();lock.unlock();}publicvoidconsume(){//获得锁lock.lock();while(list.size()==0){System.out.println(“【消费者” Thread.currentThread().getName() “】仓库为空”);try{empty.await();}catch(InterruptedExceptione){e.printStackTrace();}}list.remove();System.out.println(“【消费者” Thread.currentThread().getName() “】消费一个产品,现库存” list.size());full.signalAll();lock.unlock();}}

运行结果与wait()/notify()类似

3. BlockingQueue阻塞队列方法

BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。

put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。

importjava.util.concurrent.LinkedBlockingQueue;publicclassStorage{//仓库存储的载体privateLinkedBlockingQueue<Object>list=newLinkedBlockingQueue<>(10);publicvoidproduce(){try{list.put(newObject());System.out.println(“【生产者” Thread.currentThread().getName() “】生产一个产品,现库存” list.size());}catch(InterruptedExceptione){e.printStackTrace();}}publicvoidconsume(){try{list.take();System.out.println(“【消费者” Thread.currentThread().getName() “】消费了一个产品,现库存” list.size());}catch(InterruptedExceptione){e.printStackTrace();}}}

可能会出现put()或take()和System.out.println()输出不匹配的情况,是由于它们之间没有同步造成的。BlockingQueue可以放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。

4. 信号量

Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。计数为0的Semaphore是可以release的,然后就可以acquire(即一开始使线程阻塞从而完成其他执行。)。

importjava.util.LinkedList;importjava.util.concurrent.Semaphore;publicclassStorage{//仓库存储的载体privateLinkedList<Object>list=newLinkedList<Object>();//仓库的最大容量finalSemaphorenotFull=newSemaphore(10);//将线程挂起,等待其他来触发finalSemaphorenotEmpty=newSemaphore(0);//互斥锁finalSemaphoremutex=newSemaphore(1);publicvoidproduce(){try{notFull.acquire();mutex.acquire();list.add(newObject());System.out.println(“【生产者” Thread.currentThread().getName() “】生产一个产品,现库存” list.size());}catch(Exceptione){e.printStackTrace();}finally{mutex.release();notEmpty.release();}}publicvoidconsume(){try{notEmpty.acquire();mutex.acquire();list.remove();System.out.println(“【消费者” Thread.currentThread().getName() “】消费一个产品,现库存” list.size());}catch(Exceptione){e.printStackTrace();}finally{mutex.release();notFull.release();}}}5. 管道

一种特殊的流,用于不同线程间直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读数据。

inputStream.connect(outputStream)或outputStream.connect(inputStream)作用是使两个Stream之间产生通信链接,这样才可以将数据进行输出与输入。

这种方式只适用于两个线程之间通信,不适合多个线程之间通信。

1. PipedInputStream / PipedOutputStream (操作字节流)

Producer

importjava.io.IOException;importjava.io.PipedOutputStream;publicclassProducerimplementsRunnable{privatePipedOutputStreampipedOutputStream;publicProducer(){pipedOutputStream=newPipedOutputStream();}publicPipedOutputStreamgetPipedOutputStream(){returnpipedOutputStream;}@Overridepublicvoidrun(){try{for(inti=1;i<=5;i ){pipedOutputStream.write((“Thisisatest,Id=” i “!\n”).getBytes());}pipedOutputStream.close();}catch(IOExceptione){e.printStackTrace();}}}

Consumer

importjava.io.IOException;importjava.io.PipedInputStream;publicclassConsumerimplementsRunnable{privatePipedInputStreampipedInputStream;publicConsumer(){pipedInputStream=newPipedInputStream();}publicPipedInputStreamgetPipedInputStream(){returnpipedInputStream;}@Overridepublicvoidrun(){intlen=-1;byte[]buffer=newbyte[1024];try{while((len=pipedInputStream.read(buffer))!=-1){System.out.println(newString(buffer,0,len));}pipedInputStream.close();}catch(IOExceptione){e.printStackTrace();}}}

主函数

importjava.io.IOException;publicclassMain{publicstaticvoidmain(String[]args){Producerp=newProducer();Consumerc=newConsumer();Threadt1=newThread(p);Threadt2=newThread(c);try{p.getPipedOutputStream().connect(c.getPipedInputStream());t2.start();t1.start();}catch(IOExceptione){e.printStackTrace();}}}2. PipedReader / PipedWriter (操作字符流)

Producer

importjava.io.IOException;importjava.io.PipedWriter;publicclassProducerimplementsRunnable{privatePipedWriterpipedWriter;publicProducer(){pipedWriter=newPipedWriter();}publicPipedWritergetPipedWriter(){returnpipedWriter;}@Overridepublicvoidrun(){try{for(inti=1;i<=5;i ){pipedWriter.write(“Thisisatest,Id=” i “!\n”);}pipedWriter.close();}catch(IOExceptione){e.printStackTrace();}}}

Consumer

importjava.io.IOException;importjava.io.PipedReader;publicclassConsumerimplementsRunnable{privatePipedReaderpipedReader;publicConsumer(){pipedReader=newPipedReader();}publicPipedReadergetPipedReader(){returnpipedReader;}@Overridepublicvoidrun(){intlen=-1;char[]buffer=newchar[1024];try{while((len=pipedReader.read(buffer))!=-1){System.out.println(newString(buffer,0,len));}pipedReader.close();}catch(IOExceptione){e.printStackTrace();}}}

主函数

importjava.io.IOException;publicclassMain{publicstaticvoidmain(String[]args){Producerp=newProducer();Consumerc=newConsumer();Threadt1=newThread(p);Threadt2=newThread(c);try{p.getPipedWriter().connect(c.getPipedReader());t2.start();t1.start();}catch(IOExceptione){e.printStackTrace();}}}

源码:github.com/always0108/JAVA/tree/master/Thread

发表评论

登录后才能评论