java 多线程并发设计模式之二: Master worker 模式应用

摘要: 在多线程程序设计中Master worker 模式是常用的并行模式之一,核心思想是由两类进程协助完成的,Master 进程负责接收和分配任务并保存结果集,Worker 负责处理任务, 并把结果返回给Master 进程. 这类设计模式最大的好处是 将一个大任务分配成若干个小任务并行执行。下面是一个简单的Master-Worker模式的框架

在多线程程序设计中Master worker 模式是常用的并行模式之一,核心思想是由两类进程协助完成的,Master 进程负责接收和分配任务并保存结果集,Worker 负责处理任务, 并把结果返回给Master 进程. 这类设计模式最大的好处是 将一个大任务分配成若干个小任务并行执行。下面是一个简单的Master-Worker模式的框架

Master.java

package com.yihaomen.masterwork;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Master {
	//任务队列
	protected Queue workQueue = new ConcurrentLinkedQueue();
	//Worker线程队列
	protected Map threadMap=new HashMap();
	//子任务处理结果集
	protected Map resultMap = new ConcurrentHashMap();
	
	//是否所有的子任务都结束了
	public boolean isComplete(){
		for(Map.Entry entry:threadMap.entrySet()){
			if(entry.getValue().getState()!=Thread.State.TERMINATED){
				return false;
			}
		}
		return true;
	}
	
	//Master的构造,需要一个Worker进程逻辑,和需要的Worker进程数量
	public Master(Worker worker,int countWorker){
		worker.setWorkQueue(workQueue);
		worker.setResultMap(resultMap);
		for(int i=0;i  getResultMap(){
		return resultMap;
	}
	
	//开始运行所有的Worker进程,进行处理
	public void execute(){
		for(Map.Entry entry:threadMap.entrySet()){
			entry.getValue().start();
		}
	}
}



Worker.java
package com.yihaomen.masterwork;
import java.util.Map;
import java.util.Queue;
public class Worker implements Runnable{
	//任务队列,用于取得子任务
	protected Queue workQueue;
	//子任务处理结果集
	protected Map resultMap;
	public void setWorkQueue(Queue workQueue) {
		this.workQueue = workQueue;
	}

	public void setResultMap(Map resultMap) {
		this.resultMap = resultMap;
	}
	
	//子任务处理的逻辑,在子类中实现具体逻辑
	public Object handle(Object input){
		return input;
	}
	
	@Override
	public void run() {
		while (true) {
			 //获取子任务
			 Object input = workQueue.poll();
			 if (input == null) break;
			 //处理子任务
			 Object re=handle(input);
			 //将处理结果写入结果集
			 resultMap.put(Integer.toString(input.hashCode()), re);
		}
	}
}



注意这里面用到了java concurrent 包中的. ConcurrentLinkedQueue, ConcurrentHashMap, 这是线程安全的类. 下面是一个测试例子,计算1-100 的立方和. 过程如下:首先继承 Worker 类 实现一个 PlusWorker 的类,orerride handle 方法. 然后提交100个任务,开始计算.

package com.yihaomen.masterwork;

import java.util.Map;
import java.util.Set;

import org.junit.Test;

public class TestMasterWorker {

	public class PlusWorker extends Worker{
		public Object handle(Object input){
			Integer i=(Integer)input;
			return i*i*i;
		}
	}
	
	@Test
	public void testMasterWorker() {
		Master m=new Master(new PlusWorker(),5);
		for(int i=0;i<100;i++)
			m.submit(i);
		m.execute();
		int re=0;
		Map resultMap=m.getResultMap();
		while(resultMap.size()>0 || !m.isComplete()){
			Set keys=resultMap.keySet();
			String key=null;
			for(String k:keys){
				key=k;
				break;
			}
			Integer i=null;
			if(key!=null)
				i=(Integer)resultMap.get(key);
			if(i!=null)
				re+=i;
			if(key!=null)
				resultMap.remove(key);
		}
		
		System.out.println("testMasterWorker:"+re);
	}
	
	@Test
	public void testPlus(){
		int re=0;
		for(int i=0;i<100;i++){
			re+=i*i*i;
		}
		System.out.println("testPlus:"+re);
	}

}



由此可见master-worker 模式适合与将大任务化成小任务并行执行的情况,各个小任务基本独立运行. 测试例子下载:

java multiple master worker pattern

上一篇: java 多线程并发设计模式之一: Future 模式应用
下一篇: java 多线程并发设计模式之三:Guarded suspension 模式

Avatar

放学不走 评论于: 2015-01-21

很完美..赞一个.[face33]
问下博主,这个队列ConcurrentLinkedQueue一般用于什么任务?其实我想问的是,对于多线程读写大文件,这个适用吗?

Avatar

亮剑 评论于: 2014-04-01

看到你的文章,才明白我还是在山脚下
 评论 ( What Do You Think )
名称
邮箱
网址
评论
验证
   
 

 


  • 微信公众号

  • 我的微信

站点声明:

1、一号门博客CMS,由Python, MySQL, Nginx, Wsgi 强力驱动

2、部分文章或者资源来源于互联网, 有时候很难判断是否侵权, 若有侵权, 请联系邮箱:summer@yihaomen.com, 同时欢迎大家注册用户,主动发布无版权争议的 文章/资源.

3、鄂ICP备14001754号-3, 鄂公网安备 42280202422812号