MasterThread:
- 持有一个BlockingQueue队列,用于并发接收存储MetaData对象;
- 使用Hash一致性算法ketama,来选择SlaveThread节点;
- 从BlockingQueue队列中,取出MetaData对象,分配给各SlaveThread节点;
- SlaveThread节点负责真正处理MetaData对象;
SlaveThread:
- 持有一个BlockingQueue队列,用于存储MetaData对象;
- 负责真正处理MetaData对象;
Ketama:
一致性Hash算法,是一种分布式算法,常用于负载均衡;
测试说明:
- 假设处理每个MetaData对象需要耗时0.5秒,需要处理500个MetaData对象;
- 若是串行处理,则需要0.5*500=250秒;
- 使用上图所示的master/slave算法,则可以5秒内完成;
MasterThread.java
核心程序介绍
SlaveThread.java
核心程序
测试
执行结果:
省略......
省略......
完整程序
HashAlgorithm.java
package ll.concurrent.BlockingQueue.Ketama;
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
public enum HashAlgorithm {
KETAMA_HASH;
private static final String UTF_8 = "UTF-8";
private static final String MD5 = "MD5";
public long hash(byte[] digest, int nTime) {
long rv = ((long) (digest[3 + nTime * 4] & 0xFF) << 24)
| ((long) (digest[2 + nTime * 4] & 0xFF) << 16)
| ((long) (digest[1 + nTime * 4] & 0xFF) << 8)
| (digest[0 + nTime * 4] & 0xFF);
return rv & 0xffffffffL; /* Truncate to 32-bits */
}
public byte[] computeMd5(String key) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance(MD5);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 not supported", e);
}
md5.reset();
byte[] keyBytes = null;
try {
keyBytes = key.getBytes(UTF_8);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("Unknown string :" + key, e);
}
md5.update(keyBytes);
return md5.digest();
}
}
KetamaNodeLocator.java
package ll.concurrent.BlockingQueue.Ketama;
import java.util.Collection;
import java.util.TreeMap;
/**
* 一致性Hash算法:是一种分布式算法,常用于负载均衡;
*
* @param <T>
*/
public final class KetamaNodeLocator<T> {
private TreeMap<Long, T> ketamaNodes;
private HashAlgorithm hashAlg;
private int numReps = 160;
public KetamaNodeLocator(Collection<T> nodes) {
this(nodes, HashAlgorithm.KETAMA_HASH);
}
public KetamaNodeLocator(Collection<T> nodes, HashAlgorithm alg) {
this(nodes, HashAlgorithm.KETAMA_HASH, 160);
}
public KetamaNodeLocator(Collection<T> nodes, HashAlgorithm alg,
int nodeCopies) {
hashAlg = alg;
ketamaNodes = new TreeMap<Long, T>();
numReps = nodeCopies;
for (T node : nodes) {
for (int i = 0; i < numReps / 4; i++) {
byte[] digest = hashAlg.computeMd5(node.toString() + i);
for (int h = 0; h < 4; h++) {
long m = hashAlg.hash(digest, h);
ketamaNodes.put(m, node);
}
}
}
}
public T getNode(final String k) {
byte[] digest = hashAlg.computeMd5(k);
T rv = getNodeForKey(hashAlg.hash(digest, 0));
return rv;
}
T getNodeForKey(long hash) {
if (ketamaNodes.isEmpty()) {
return null;
}
if (!ketamaNodes.containsKey(hash)) {
Object ceilValue = ((TreeMap<Long, T>) ketamaNodes)
.ceilingKey(hash);
if (ceilValue != null) {
try {
hash = Long.valueOf(ceilValue.toString());
} catch (NumberFormatException e) {
e.printStackTrace();
hash = 0;
}
}
if (ceilValue == null || hash == 0) {
hash = ketamaNodes.firstKey();
}
}
return ketamaNodes.get(hash);
}
}
MasterThread.java
package ll.concurrent.BlockingQueue;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import ll.concurrent.BlockingQueue.Ketama.KetamaNodeLocator;
/**
* <pre>
* MasterThread:
* 1. 持有一个BlockingQueue队列,用于并发接收存储MetaData对象;
* 2. 使用Hash一致性算法ketama来选择SlaveThread节点;
* 3. 从BlockingQueue队列中,取出MetaData对象,分配给SlaveThread节点;
* 4. SlaveThread节点负责真正处理MetaData对象;
*
* SlaveThread:
* 1. 持有一个BlockingQueue队列,用于存储MetaData对象;
* 2. 负责真正处理MetaData对象;
* </pre>
*/
public class MasterThread {
private static int SLAVE_ENGINE_NUMBER_MAX = 100;
private static int _BLOCKSIZE = 5000;
private static MasterThread masterThread;
private BlockingQueue<MetaData> metaDataQueue;
private SlaveEngineThread slaveEngineThread;
public static synchronized MasterThread getInstance() {
if (masterThread == null) {
masterThread = new MasterThread();
}
return masterThread;
}
private MasterThread() {
metaDataQueue = new LinkedBlockingQueue<MetaData>(_BLOCKSIZE);
startSlaveThreadEngine();
}
private void startSlaveThreadEngine() {
slaveEngineThread = new SlaveEngineThread(SLAVE_ENGINE_NUMBER_MAX);
slaveEngineThread.start();
}
public synchronized void put(MetaData object) {
if (object == null)
return;
if (!metaDataQueue.offer(object)) {
System.err.println("BlockingQueue is up to max size:"
+ metaDataQueue.size());
}
}
private class SlaveEngineThread extends Thread {
private ExecutorService executorService;
private Map<Integer, SlaveThread> slaveThreadMap;
//本示例采用一致性Hash算法,选择SlaveThread
private KetamaNodeLocator<Integer> nodeLocator;
public SlaveEngineThread(final int nThreads) {
slaveThreadMap = new HashMap<Integer, SlaveThread>();
// 创建线程池,并发执行SlaveThread
executorService = Executors.newFixedThreadPool(nThreads);
for (int i = 0; i < nThreads; i++) {
SlaveThread command = new SlaveThread(i);
executorService.execute(command);
slaveThreadMap.put(i, command);
}
nodeLocator = new KetamaNodeLocator<Integer>(
slaveThreadMap.keySet());
}
@Override
public void run() {
while (true) {
MetaData metaData = null;
try {
// 堵塞获取
metaData = metaDataQueue.take();
// 通过hash算法获取slaveThread编号
Integer nodeNumber = nodeLocator.getNode(metaData.getId());
SlaveThread slaveThread = slaveThreadMap.get(nodeNumber);
if (slaveThread != null) {
// 将MetaData存入SlaveThread处理
slaveThread.put(metaData);
}
} catch (InterruptedException e) {
executorService.shutdown();
System.err.println("SlaveEngineThread is Interrupted ... ");
break;
} catch (Exception e) {
System.err.println("failed to handle meta data" + e);
}
}
System.err.println("Master Thread exit...");
}
}
public void exit() {
stopSlaveEngineThread();
}
private void stopSlaveEngineThread() {
slaveEngineThread.interrupt();
}
}
SlaveThread.java
package ll.concurrent.BlockingQueue;
import java.text.SimpleDateFormat;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class SlaveThread implements Runnable {
private BlockingQueue<MetaData> msgQueue = new LinkedBlockingQueue<MetaData>(
200);
private int slaveThreadID;
public SlaveThread() {
}
public SlaveThread(int slaveThreadID) {
super();
this.slaveThreadID = slaveThreadID;
}
public void run() {
while (true) {
MetaData metaData = null;
try {
// 堵塞获取
metaData = msgQueue.take();
handleMetaData(metaData);
} catch (InterruptedException e) {
System.err.println("SlaveThread[" + this.slaveThreadID + "] is Interrupted...");
break;
} catch (Exception e) {
System.err.println("failed to handle meta data" + e);
}
}
System.err.println("SlaveThread[" + this.slaveThreadID + "] exit...");
}
public void put(MetaData object) {
if (object == null)
return;
if (!msgQueue.offer(object)) {
System.err.println("SlaveThread BlockingQueue up to max size");
}
}
private void handleMetaData(MetaData metaData) throws Exception {
// 模拟处理,耗时500毫秒
Thread.sleep(500);
System.out.println("SlaveThread["+ this.slaveThreadID + "] "
+ metaData.toString()
+ new SimpleDateFormat("HH:mm:ss").format(System
.currentTimeMillis()));
}
}
MetaData.java
package ll.concurrent.BlockingQueue;
public class MetaData {
private String id;
private String desc;
public MetaData() {
super();
}
public MetaData(String id) {
super();
this.id = id;
}
public MetaData(String id, String desc) {
super();
this.id = id;
this.desc = desc;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "MetaData [id=" + id + ", desc=" + desc + "]";
}
}
TestCase.java
package ll.concurrent.BlockingQueue;
import java.text.SimpleDateFormat;
public class TestCase {
public static void main(String[] args) throws InterruptedException {
MasterThread masterThread = MasterThread.getInstance();
System.out.println("Start time:"
+ new SimpleDateFormat("HH:mm:ss").format(System
.currentTimeMillis()));
/**
* 每个MetaData都需要0.5S的处理时间,如果串行执行,则需要500*0.5=250;
* 现采用并行处理,只需要很短的时间即可执行完;
*/
for (int i = 0; i < 500; i++) {
MetaData metaData = new MetaData(i + "");
masterThread.put(metaData);
}
}
}
参考:
《Java编发编程实战》:7.2.1 日志服务章节: <N生产者,1个消费者>