zoukankan      html  css  js  c++  java
  • 【并发编程】使用BlockingQueue实现<多生产者,多消费者>




     MasterThread:
    1.  持有一个BlockingQueue队列,用于并发接收存储MetaData对象;
    2.  使用Hash一致性算法ketama,来选择SlaveThread节点;
    3.  从BlockingQueue队列中,取出MetaData对象,分配给各SlaveThread节点;
    4.  SlaveThread节点负责真正处理MetaData对象;
         
     SlaveThread:
    1. 持有一个BlockingQueue队列,用于存储MetaData对象;
    2. 负责真正处理MetaData对象;

    Ketama:
     一致性Hash算法,是一种分布式算法,常用于负载均衡;

    测试说明:
    1. 假设处理每个MetaData对象需要耗时0.5秒,需要处理500个MetaData对象;
    2. 若是串行处理,则需要0.5*500=250秒;
    3. 使用上图所示的master/slave算法,则可以5秒内完成;




    MasterThread.java

    核心程序介绍


    SlaveThread.java

    核心程序

    测试

    执行结果:
    省略......
    省略......


    完整程序

    HashAlgorithm.java
    1. package ll.concurrent.BlockingQueue.Ketama;
    2. import java.io.UnsupportedEncodingException;
    3. import java.security.MessageDigest;
    4. import java.security.NoSuchAlgorithmException;
    5. public enum HashAlgorithm {
    6. KETAMA_HASH;
    7. private static final String UTF_8 = "UTF-8";
    8. private static final String MD5 = "MD5";
    9. public long hash(byte[] digest, int nTime) {
    10. long rv = ((long) (digest[3 + nTime * 4] & 0xFF) << 24)
    11. | ((long) (digest[2 + nTime * 4] & 0xFF) << 16)
    12. | ((long) (digest[1 + nTime * 4] & 0xFF) << 8)
    13. | (digest[0 + nTime * 4] & 0xFF);
    14. return rv & 0xffffffffL; /* Truncate to 32-bits */
    15. }
    16. public byte[] computeMd5(String key) {
    17. MessageDigest md5;
    18. try {
    19. md5 = MessageDigest.getInstance(MD5);
    20. } catch (NoSuchAlgorithmException e) {
    21. throw new RuntimeException("MD5 not supported", e);
    22. }
    23. md5.reset();
    24. byte[] keyBytes = null;
    25. try {
    26. keyBytes = key.getBytes(UTF_8);
    27. } catch (UnsupportedEncodingException e) {
    28. throw new RuntimeException("Unknown string :" + key, e);
    29. }
    30. md5.update(keyBytes);
    31. return md5.digest();
    32. }
    33. }

    KetamaNodeLocator.java
    1. package ll.concurrent.BlockingQueue.Ketama;
    2. import java.util.Collection;
    3. import java.util.TreeMap;
    4. /**
    5. * 一致性Hash算法:是一种分布式算法,常用于负载均衡;
    6. *
    7. * @param <T>
    8. */
    9. public final class KetamaNodeLocator<T> {
    10. private TreeMap<Long, T> ketamaNodes;
    11. private HashAlgorithm hashAlg;
    12. private int numReps = 160;
    13. public KetamaNodeLocator(Collection<T> nodes) {
    14. this(nodes, HashAlgorithm.KETAMA_HASH);
    15. }
    16. public KetamaNodeLocator(Collection<T> nodes, HashAlgorithm alg) {
    17. this(nodes, HashAlgorithm.KETAMA_HASH, 160);
    18. }
    19. public KetamaNodeLocator(Collection<T> nodes, HashAlgorithm alg,
    20. int nodeCopies) {
    21. hashAlg = alg;
    22. ketamaNodes = new TreeMap<Long, T>();
    23. numReps = nodeCopies;
    24. for (T node : nodes) {
    25. for (int i = 0; i < numReps / 4; i++) {
    26. byte[] digest = hashAlg.computeMd5(node.toString() + i);
    27. for (int h = 0; h < 4; h++) {
    28. long m = hashAlg.hash(digest, h);
    29. ketamaNodes.put(m, node);
    30. }
    31. }
    32. }
    33. }
    34. public T getNode(final String k) {
    35. byte[] digest = hashAlg.computeMd5(k);
    36. T rv = getNodeForKey(hashAlg.hash(digest, 0));
    37. return rv;
    38. }
    39. T getNodeForKey(long hash) {
    40. if (ketamaNodes.isEmpty()) {
    41. return null;
    42. }
    43. if (!ketamaNodes.containsKey(hash)) {
    44. Object ceilValue = ((TreeMap<Long, T>) ketamaNodes)
    45. .ceilingKey(hash);
    46. if (ceilValue != null) {
    47. try {
    48. hash = Long.valueOf(ceilValue.toString());
    49. } catch (NumberFormatException e) {
    50. e.printStackTrace();
    51. hash = 0;
    52. }
    53. }
    54. if (ceilValue == null || hash == 0) {
    55. hash = ketamaNodes.firstKey();
    56. }
    57. }
    58. return ketamaNodes.get(hash);
    59. }
    60. }

    MasterThread.java
    1. package ll.concurrent.BlockingQueue;
    2. import java.util.HashMap;
    3. import java.util.Map;
    4. import java.util.concurrent.BlockingQueue;
    5. import java.util.concurrent.ExecutorService;
    6. import java.util.concurrent.Executors;
    7. import java.util.concurrent.LinkedBlockingQueue;
    8. import ll.concurrent.BlockingQueue.Ketama.KetamaNodeLocator;
    9. /**
    10. * <pre>
    11. * MasterThread
    12. * 1. 持有一个BlockingQueue队列,用于并发接收存储MetaData对象;
    13. * 2. 使用Hash一致性算法ketama来选择SlaveThread节点;
    14. * 3. BlockingQueue队列中,取出MetaData对象,分配给SlaveThread节点;
    15. * 4. SlaveThread节点负责真正处理MetaData对象;
    16. *
    17. * SlaveThread
    18. * 1. 持有一个BlockingQueue队列,用于存储MetaData对象;
    19. * 2. 负责真正处理MetaData对象;
    20. * </pre>
    21. */
    22. public class MasterThread {
    23. private static int SLAVE_ENGINE_NUMBER_MAX = 100;
    24. private static int _BLOCKSIZE = 5000;
    25. private static MasterThread masterThread;
    26. private BlockingQueue<MetaData> metaDataQueue;
    27. private SlaveEngineThread slaveEngineThread;
    28. public static synchronized MasterThread getInstance() {
    29. if (masterThread == null) {
    30. masterThread = new MasterThread();
    31. }
    32. return masterThread;
    33. }
    34. private MasterThread() {
    35. metaDataQueue = new LinkedBlockingQueue<MetaData>(_BLOCKSIZE);
    36. startSlaveThreadEngine();
    37. }
    38. private void startSlaveThreadEngine() {
    39. slaveEngineThread = new SlaveEngineThread(SLAVE_ENGINE_NUMBER_MAX);
    40. slaveEngineThread.start();
    41. }
    42. public synchronized void put(MetaData object) {
    43. if (object == null)
    44. return;
    45. if (!metaDataQueue.offer(object)) {
    46. System.err.println("BlockingQueue is up to max size:"
    47. + metaDataQueue.size());
    48. }
    49. }
    50. private class SlaveEngineThread extends Thread {
    51. private ExecutorService executorService;
    52. private Map<Integer, SlaveThread> slaveThreadMap;
    53. //本示例采用一致性Hash算法,选择SlaveThread
    54. private KetamaNodeLocator<Integer> nodeLocator;
    55. public SlaveEngineThread(final int nThreads) {
    56. slaveThreadMap = new HashMap<Integer, SlaveThread>();
    57. // 创建线程池,并发执行SlaveThread
    58. executorService = Executors.newFixedThreadPool(nThreads);
    59. for (int i = 0; i < nThreads; i++) {
    60. SlaveThread command = new SlaveThread(i);
    61. executorService.execute(command);
    62. slaveThreadMap.put(i, command);
    63. }
    64. nodeLocator = new KetamaNodeLocator<Integer>(
    65. slaveThreadMap.keySet());
    66. }
    67. @Override
    68. public void run() {
    69. while (true) {
    70. MetaData metaData = null;
    71. try {
    72. // 堵塞获取
    73. metaData = metaDataQueue.take();
    74. // 通过hash算法获取slaveThread编号
    75. Integer nodeNumber = nodeLocator.getNode(metaData.getId());
    76. SlaveThread slaveThread = slaveThreadMap.get(nodeNumber);
    77. if (slaveThread != null) {
    78. // MetaData存入SlaveThread处理
    79. slaveThread.put(metaData);
    80. }
    81. } catch (InterruptedException e) {
    82. executorService.shutdown();
    83. System.err.println("SlaveEngineThread is Interrupted ... ");
    84. break;
    85. } catch (Exception e) {
    86. System.err.println("failed to handle meta data" + e);
    87. }
    88. }
    89. System.err.println("Master Thread exit...");
    90. }
    91. }
    92. public void exit() {
    93. stopSlaveEngineThread();
    94. }
    95. private void stopSlaveEngineThread() {
    96. slaveEngineThread.interrupt();
    97. }
    98. }

    SlaveThread.java
    1. package ll.concurrent.BlockingQueue;
    2. import java.text.SimpleDateFormat;
    3. import java.util.concurrent.BlockingQueue;
    4. import java.util.concurrent.LinkedBlockingQueue;
    5. public class SlaveThread implements Runnable {
    6. private BlockingQueue<MetaData> msgQueue = new LinkedBlockingQueue<MetaData>(
    7. 200);
    8. private int slaveThreadID;
    9. public SlaveThread() {
    10. }
    11. public SlaveThread(int slaveThreadID) {
    12. super();
    13. this.slaveThreadID = slaveThreadID;
    14. }
    15. public void run() {
    16. while (true) {
    17. MetaData metaData = null;
    18. try {
    19. // 堵塞获取
    20. metaData = msgQueue.take();
    21. handleMetaData(metaData);
    22. } catch (InterruptedException e) {
    23. System.err.println("SlaveThread[" + this.slaveThreadID + "] is Interrupted...");
    24. break;
    25. } catch (Exception e) {
    26. System.err.println("failed to handle meta data" + e);
    27. }
    28. }
    29. System.err.println("SlaveThread[" + this.slaveThreadID + "] exit...");
    30. }
    31. public void put(MetaData object) {
    32. if (object == null)
    33. return;
    34. if (!msgQueue.offer(object)) {
    35. System.err.println("SlaveThread BlockingQueue up to max size");
    36. }
    37. }
    38. private void handleMetaData(MetaData metaData) throws Exception {
    39. // 模拟处理,耗时500毫秒
    40. Thread.sleep(500);
    41. System.out.println("SlaveThread["+ this.slaveThreadID + "] "
    42. + metaData.toString()
    43. + new SimpleDateFormat("HH:mm:ss").format(System
    44. .currentTimeMillis()));
    45. }
    46. }

    MetaData.java
    1. package ll.concurrent.BlockingQueue;
    2. public class MetaData {
    3. private String id;
    4. private String desc;
    5. public MetaData() {
    6. super();
    7. }
    8. public MetaData(String id) {
    9. super();
    10. this.id = id;
    11. }
    12. public MetaData(String id, String desc) {
    13. super();
    14. this.id = id;
    15. this.desc = desc;
    16. }
    17. public String getId() {
    18. return id;
    19. }
    20. public void setId(String id) {
    21. this.id = id;
    22. }
    23. public String getDesc() {
    24. return desc;
    25. }
    26. public void setDesc(String desc) {
    27. this.desc = desc;
    28. }
    29. @Override
    30. public String toString() {
    31. return "MetaData [id=" + id + ", desc=" + desc + "]";
    32. }
    33. }

    TestCase.java
    1. package ll.concurrent.BlockingQueue;
    2. import java.text.SimpleDateFormat;
    3. public class TestCase {
    4. public static void main(String[] args) throws InterruptedException {
    5. MasterThread masterThread = MasterThread.getInstance();
    6. System.out.println("Start time:"
    7. + new SimpleDateFormat("HH:mm:ss").format(System
    8. .currentTimeMillis()));
    9. /**
    10. * 每个MetaData都需要0.5S的处理时间,如果串行执行,则需要500*0.5=250;
    11. * 现采用并行处理,只需要很短的时间即可执行完;
    12. */
    13. for (int i = 0; i < 500; i++) {
    14. MetaData metaData = new MetaData(i + "");
    15. masterThread.put(metaData);
    16. }
    17. }
    18. }



    参考:
    《Java编发编程实战》:7.2.1 日志服务章节: <N生产者,1个消费者> 




  • 相关阅读:
    Pandas Statistical Functions
    pyspark 记录
    走出浮躁的泥沼:学会享受学习过程的乐趣
    寻找知识不变的那部分
    走出浮躁的泥沼:把一件事做到某种境界
    走出浮躁的泥沼:为什么会浮躁
    对抗拖延症最直接有效的方法
    实施“番茄工作法”需要注意的一些细节
    东西学了容易忘?学会跟踪你的知识
    再谈读书与信息的获取、沉淀
  • 原文地址:https://www.cnblogs.com/ssslinppp/p/6279796.html
Copyright © 2011-2022 走看看