并发编程实现模型之(三)Producer-Consumer模式_并发 producer/consumer-程序员宅基地

技术标签: 并发  Java并发编程  java  编程  

生产者-消费模式,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务。两者之间通过共享内存缓冲去进行通信。

一、架构模式图:

类图:

生产者:提交用户请求,提取用户任务,并装入内存缓冲区;

消费者:在内存缓冲区中提取并处理任务;

内存缓冲区:缓存生产者提交的任务或数据,供消费者使用;

任务:生产者向内存缓冲区提交的数据结构;

Main:使用生产者和消费者的客户端。


二、代码实现一个基于生产者-消费者模式的求整数平方的并行计算:

(1)Producer生产者线程:

  1. package ProducerConsumer;  
  2.   
  3. import java.util.Random;  
  4. import java.util.concurrent.BlockingQueue;  
  5. import java.util.concurrent.TimeUnit;  
  6. import java.util.concurrent.atomic.AtomicInteger;  
  7.   
  8. public class Producer  implements Runnable{  
  9.       
  10.     //Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。  
  11.     //而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。  
  12.     //这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。  
  13.     private volatile  boolean isRunning= true;  
  14.       
  15.     //内存缓冲区  
  16.     private BlockingQueue<PCData> queue;  
  17.       
  18.     //总数,原子操作  
  19.     private static AtomicInteger count = new AtomicInteger();  
  20.        
  21.     private static final int SLEEPTIME=1000;  
  22.       
  23.       
  24.     public Producer(BlockingQueue<PCData> queue) {  
  25.           
  26.         this.queue = queue;  
  27.     }  
  28.   
  29.   
  30.   
  31.   
  32.     @Override  
  33.     public void run() {  
  34.         PCData data=null;  
  35.         Random r  = new Random();  
  36.         System.out.println("start producer id = "+ Thread .currentThread().getId());  
  37.         try{  
  38.             while(isRunning){  
  39.                 Thread.sleep(r.nextInt(SLEEPTIME));  
  40.                 //构造任务数据  
  41.                 data= new PCData(count.incrementAndGet());  
  42.                 System.out.println("data is put into queue ");  
  43.                 //提交数据到缓冲区  
  44.                 if(!queue.offer(data,2,TimeUnit.SECONDS)){  
  45.                     System.out.println("faile to  put data:  "+ data);  
  46.                 }  
  47.             }  
  48.         }catch (InterruptedException e){  
  49.             e.printStackTrace();  
  50.             Thread.currentThread().interrupt();  
  51.               
  52.         }  
  53.           
  54.           
  55.     }  
  56.   
  57.     public void stop(){  
  58.           
  59.         isRunning=false;  
  60.     }  
  61.   
  62.   
  63. }  

(2)Consumer消费者线程:

  1. package ProducerConsumer;  
  2.   
  3. import java.text.MessageFormat;  
  4. import java.util.Random;  
  5. import java.util.concurrent.BlockingQueue;  
  6.   
  7. public class Consumer implements Runnable {  
  8.     //缓冲区     
  9.     private BlockingQueue<PCData> queue;  
  10.     private static final int SLEEPTIME=1000;  
  11.       
  12.       
  13.     public Consumer(BlockingQueue<PCData> queue) {          
  14.         this.queue = queue;  
  15.     }  
  16.   
  17.   
  18.     @Override  
  19.     public void run() {  
  20.         System.out.println("start Consumer id= "+ Thread .currentThread().getId());  
  21.         Random r = new Random();  
  22.           
  23.             try {  
  24.                 //提取任务  
  25.                 while(true){  
  26.                     PCData data= queue.take();  
  27.                     if(null!= data){  
  28.                         //计算平方  
  29.                         int re= data.getData()*data.getData();  
  30.                         System.out.println(MessageFormat.format("{0}*{1}={2}",  
  31.                                     data.getData(),data.getData(),re  
  32.                                 ));  
  33.                         Thread.sleep(r.nextInt(SLEEPTIME));  
  34.                                                   
  35.                     }  
  36.                 }  
  37.             } catch (InterruptedException e) {                
  38.                 e.printStackTrace();  
  39.                 Thread.currentThread().interrupt();  
  40.             }  
  41.               
  42.           
  43.           
  44.     }  
  45.       
  46.       
  47.   
  48.       
  49.   
  50. }  

(3)PCData共享数据模型:

  1. package ProducerConsumer;  
  2.   
  3. public  final class PCData {  
  4.   
  5.     private final int intData;  
  6.   
  7.     public PCData(int d) {  
  8.         intData=d;  
  9.     }  
  10.       
  11.     public PCData(String  d) {  
  12.         intData=Integer.valueOf(d);  
  13.     }  
  14.       
  15.     public int getData(){  
  16.           
  17.         return intData;  
  18.           
  19.     }  
  20.     @Override  
  21.     public String toString(){  
  22.         return "data:"+ intData ;  
  23.     }  
  24.       
  25. }  

(4)Main函数:

  1. package ProducerConsumer;  
  2.   
  3. import java.util.concurrent.BlockingQueue;  
  4. import java.util.concurrent.Executor;  
  5. import java.util.concurrent.ExecutorService;  
  6. import java.util.concurrent.Executors;  
  7. import java.util.concurrent.LinkedBlockingDeque;  
  8.   
  9. public class Main {  
  10.   
  11.     /** 
  12.      * @param args 
  13.      */  
  14.     public static void main(String[] args)  throws InterruptedException{  
  15.         //建立缓冲区  
  16.         BlockingQueue<PCData> queue = new LinkedBlockingDeque<PCData>(10);  
  17.         //建立生产者  
  18.         Producer producer1 = new Producer(queue);  
  19.         Producer producer2 = new Producer(queue);  
  20.         Producer producer3 = new Producer(queue);  
  21.           
  22.         //建立消费者  
  23.         Consumer consumer1 = new Consumer(queue);  
  24.         Consumer consumer2 = new Consumer(queue);  
  25.         Consumer consumer3 = new Consumer(queue);         
  26.                   
  27.         //建立线程池  
  28.         ExecutorService service = Executors.newCachedThreadPool();  
  29.           
  30.         //运行生产者  
  31.         service.execute(producer1);  
  32.         service.execute(producer2);  
  33.         service.execute(producer3);  
  34.         //运行消费者  
  35.         service.execute(consumer1);  
  36.         service.execute(consumer2);  
  37.         service.execute(consumer3);  
  38.       
  39.         Thread.sleep(10*1000);  
  40.           
  41.         //停止生产者  
  42.         producer1.stop();  
  43.         producer2.stop();  
  44.         producer3.stop();  
  45.           
  46.         Thread.sleep(3000);  
  47.         service.shutdown();  
  48.     }  
  49.   
  50. }  

三、注意:

    volatile关键字:Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。

    生产-消费模式的核心组件是共享内存缓冲区,是两者的通信桥梁,起到解耦作用,优化系统整体结构。

    由于缓冲区的存在,生产者和消费者,无论谁在某一局部时间内速度相对较高,都可以使用缓冲区得到缓解,保证系统正常运行,这在一定程度上缓解了性能瓶颈对系统系能的影响。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zmx729618/article/details/78517675

智能推荐

ARP协议图文详解,包含完整测试代码-程序员宅基地

文章浏览阅读4.1k次,点赞3次,收藏56次。图文详解ARP协议,花5分钟时间彻底弄懂ARP协议。_arp协议

oracle查看锁表语句、解锁方法_oracle查询锁表语句-程序员宅基地

文章浏览阅读2.2w次,点赞8次,收藏57次。oracle死锁原因查看以及解决办法_oracle查询锁表语句

Seata介绍、原理、配置_seata配置-程序员宅基地

文章浏览阅读4.5k次,点赞4次,收藏29次。Seata是一款开源的分布式事务解决方案,可以用于解决分布式系统中的数据一致性问题。它是由阿里巴巴集团发起的开源项目,目前得到了广泛的应用和支持。在 Seata 开源之前,其内部版本在阿里系内部一直扮演着应用架构层数据一致性的中间件角色,帮助经济体平稳的度过历年的双11在分布式系统中,由于数据存储在不同的节点上,因此需要保证分布式事务的原子性(即要么全部成功,要么全部失败),否则将会出现数据不一致的情况。Seata提供了一种可靠的分布式事务解决方案,可以对全局事务进行管理和协调,从而实现分布式事务的原子_seata配置

Golang | Leetcode Golang题解之第36题有效的数独-程序员宅基地

文章浏览阅读520次。Golang | Leetcode Golang题解之第36题有效的数独

稍微深入分析Ubuntu环境下安装NVIDIA驱动导致黑屏的原因_prime-select nvidia 黑屏-程序员宅基地

文章浏览阅读8.2k次,点赞5次,收藏48次。本文承接之前写的有关如何用正确姿势安装NVIDIA驱动的博文 (https://blog.csdn.net/Edward_ed_liu/article/details/109552761)。首先之所以要更新Linux内核,是因为不更新内核就无法使用笔记本自带的无线网卡。其次,目前NVIDIA官方不建议把Linux内核更新到最新版(5.9),而且这条消息只在英文的官网才有,中文的则是广告。TWICE如果强行更新到5.9版本,之后的Cuda安装表面上会显示成功,但在实际使用Cuda的过程中._prime-select nvidia 黑屏

java中间件有哪些_金九银十期间成功斩获58万架构师Offer!六面字节跳动面经和面试题分享 - 小梦爱Java...-程序员宅基地

文章浏览阅读167次。金九银十期间成功斩获58万Offer!六面字节跳动面经(成功关键:吃透九大核心知识+狂刷大厂面试真题)第一轮:团队面试第一轮基本上是你的团队成员面试你,是和你同级或者高你一个P的师兄来面你,我的话基本没问什么特别的,主要还是讲自己简历上的做的项目,这里需要你很熟悉自己的项目才行,我个人觉得这里你要把项目里你的角色做了什么没做什么讲清楚,然后最好能把自己做的那部分重点展开来讲,然后面试官会从你讲的内..._java架构师 中间件简历

随便推点

SQL Server数据归档的解决方案-程序员宅基地

文章浏览阅读910次。最近新接到的一项工作是把SQL Server中保存了四五年的陈年数据(合同,付款,报销等等单据)进行归档,原因是每天的数据增量很大,而历史数据又不经常使用,影响生产环境的数据查询等操作。要求是:   1 归档的数据与生产环境数据分开保存,以便提高查询效率和服务器性能。  2 前端用户能够查询已归档的数据,即系统提供的功能不能发生改变   看起来要求不是很高,我自然会联想到两种方..._sql server 根据日期归档

过 DNF TP 驱动保护(二)-程序员宅基地

文章浏览阅读127次。01. 博文简介:02. 环境及工具准备:03. 分析 TP 所做的保护:04. 干掉 NtOpenProcess 中的 Deep InLine Hook:05. 干掉 NtOpenThread 中的 Deep InLine Hook:06. 干掉 NtReadVirtualMemory 中的 InLine Hook:07. 干掉 NtWriteVirtualMemory ...

edge同步chrome书签_如何通过 iCloud 同步 chromium edge 的书签?-程序员宅基地

文章浏览阅读980次。我又用回了edge。chrome太蠢了。icloud同步问题暂时还没法解决。先这样吧。————————几天后更新:额 建议别用edge。怎么吹都无法改变还有很多未知问题。比已知的chrome还是差很多,虽然在界面内核上有可能更好。但还是别。你并不能帮微软开发edge。微软做软件确实有点蠢。但凡你还在用苹果全家桶,最好离微软全家桶远一点。和win的交互尽量用谷歌这种第三方来解决。额 我是mac用sa..._edge 和chrome同步

wpf绑定全局静态变量(mvvm)-程序员宅基地

文章浏览阅读1.4k次,点赞2次,收藏2次。原文 wpf绑定全局静态变量(mvvm)在实际的开发中,有一些集合或者属性可能是全局的,比如当你做一个oa的时候,可能需要展示所有的人员,这时这个所有的人员列表显然可以作为全局参数,比如这里有一个全局的静态属性UserList。而你在使用mvvm做wpf开发的时候,一般每个view都已经指定好了viewmodel。而viewmodel显然是不包含UserList。这时如果你想在绑定了viewm..._mvvm 静态属性绑定

GNSS算法相关开源代码(含多传感器融合相关项目)_rtklib fast-程序员宅基地

文章浏览阅读3.8k次,点赞7次,收藏70次。GNSS算法相关开源代码(含多传感器融合相关项目)_rtklib fast

初识中央处理器CPU_cpu总线架构-程序员宅基地

文章浏览阅读2k次,点赞2次,收藏5次。CPU_cpu总线架构