zoukankan      html  css  js  c++  java
  • 10.Curator队列

        Curator也提供ZK Recipe的分布式队列实现。利用ZK的 PERSISTENTSEQUENTIAL节点,可以保证放入到队列中的项目是按照顺序排队的。如果单一的消费者从队列中取数据,那么它是先入先出的,这也是队列的特点。如果你严格要求顺序,你就得使用单一的消费者,可以使用leader选举只让leader作为唯一的消费者。但是,根据Netflix的Curator作者所说,ZooKeeper真心不适合做Queue,或者说ZK没有实现一个好的Queue,详细内容可以看 Tech Note 4,原因有五:
    • ZK有1MB 的传输限制。实践中ZNode必须相对较小,而队列包含成千上万的消息,非常的大
    • 如果有很多节点,ZK启动时相当的慢。而使用queue会导致好多ZNode。你需要显著增大 initLimit 和 syncLimit
    • ZNode很大的时候很难清理。Netflix不得不创建了一个专门的程序做这事
    • 当很大量的包含成千上万的子节点的ZNode时,ZK的性能变得不好
    • ZK的数据库完全放在内存中。大量的Queue意味着会占用很多的内存空间
    尽管如此,Curator还是创建了各种Queue的实现。如果Queue的数据量不太多,数据量不太大的情况下,酌情考虑,还是可以使用的。

    1.DistributedQueue

    1. DistributedQueue介绍
    DistributedQueue是最普通的一种队列。 它设计以下四个类:
    • QueueBuilder - 创建队列使用QueueBuilder,它也是其它队列的创建类
    • QueueConsumer - 队列中的消息消费者接口
    • QueueSerializer - 队列消息序列化和反序列化接口,提供了对队列中的对象的序列化和反序列化
    • DistributedQueue - 队列实现类
        QueueConsumer是消费者,它可以接收队列的数据。处理队列中的数据的代码逻辑可以放在QueueConsumer.consumeMessage()中。
        正常情况下先将消息从队列中移除,再交给消费者消费。但这是两个步骤,不是原子的。可以调用Builder的lockPath()消费者加锁,当消费者消费数据时持有锁,这样其它消费者不能消费此消息。如果消费失败或者进程死掉,消息可以交给其它进程。这会带来一点性能的损失。最好还是单消费者模式使用队列。
    2.编写示例程序
    1. public class DistributedQueueExample
    2. {
    3. private static final String PATH = "/example/queue";
    4. public static void main(String[] args) throws Exception
    5. {
    6. CuratorFramework clientA = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
    7. clientA.start();
    8. CuratorFramework clientB = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
    9. clientB.start();
    10. DistributedQueue<String> queueA = null;
    11. QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
    12. queueA = builderA.buildQueue();
    13. queueA.start();
    14. DistributedQueue<String> queueB = null;
    15. QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
    16. queueB = builderB.buildQueue();
    17. queueB.start();
    18. for (int i = 0; i < 100; i++)
    19. {
    20. queueA.put(" test-A-" + i);
    21. Thread.sleep(10);
    22. queueB.put(" test-B-" + i);
    23. }
    24. Thread.sleep(1000 * 10);// 等待消息消费完成
    25. queueB.close();
    26. queueA.close();
    27. clientB.close();
    28. clientA.close();
    29. System.out.println("OK!");
    30. }
    31. /** 队列消息序列化实现类 */
    32. private static QueueSerializer<String> createQueueSerializer()
    33. {
    34. return new QueueSerializer<String>()
    35. {
    36. @Override
    37. public byte[] serialize(String item)
    38. {
    39. return item.getBytes();
    40. }
    41. @Override
    42. public String deserialize(byte[] bytes)
    43. {
    44. return new String(bytes);
    45. }
    46. };
    47. }
    48. /** 定义队列消费者 */
    49. private static QueueConsumer<String> createQueueConsumer(final String name)
    50. {
    51. return new QueueConsumer<String>()
    52. {
    53. @Override
    54. public void stateChanged(CuratorFramework client, ConnectionState newState)
    55. {
    56. System.out.println("连接状态改变: " + newState.name());
    57. }
    58. @Override
    59. public void consumeMessage(String message) throws Exception
    60. {
    61. System.out.println("消费消息(" + name + "): " + message);
    62. }
    63. };
    64. }
    65. }
    以上程序创建两个client(A和B),它们在同一路径上创建队列(A和B),同时发消息、同时消费消息。
    3.示例程序运行结果
        运行结果控制台(观察ClientA可以消费ClientB的消息):
    1. 消费消息(A): test-A-0
    2. 消费消息(A): test-B-0
    3. ......
    4. 消费消息(B): test-A-51
    5. 消费消息(B): test-B-51
    6. 消费消息(B): test-A-52
    7. 消费消息(B): test-B-52
    8. 消费消息(B): test-A-53
    9. 消费消息(B): test-B-54
    10. 消费消息(B): test-A-55
    11. ......
    12. 消费消息(A): test-A-99
    13. 消费消息(A): test-B-99
    14. OK!
        Zookeeper节点信息如下:

    2.DistributedIdQueue

        DistributedIdQueue和上面的队列类似,但是可以为队列中的每一个元素设置一个ID。可以通过ID把队列中任意的元素移除。
    1.DistributedIdQueue结束
    DistributedIdQueue的使用与上面队列的区别是:
    • 通过下面方法创建:builder.buildIdQueue()
    • 放入元素时:queue.put(aMessage, messageId);
    • 移除元素时:int numberRemoved = queue.remove(messageId);
    2.编写示例程序
    在这个例子中,有些元素还没有被消费者消费时就移除了,这样消费者不会收到删除的消息。(此示例是根据上面例子修改而来)
    1. public class DistributedIdQueueExample
    2. {
    3. private static final String PATH = "/example/queue";
    4. public static void main(String[] args) throws Exception
    5. {
    6. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
    7. client.start();
    8. DistributedIdQueue<String> queue = null;
    9. QueueConsumer<String> consumer = createQueueConsumer("A");
    10. QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
    11. queue = builder.buildIdQueue();
    12. queue.start();
    13. for (int i = 0; i < 10; i++)
    14. {
    15. queue.put(" test-" + i, "Id" + i);
    16. Thread.sleep((long) (50 * Math.random()));
    17. queue.remove("Id" + i);
    18. }
    19. Thread.sleep(1000 * 3);
    20. queue.close();
    21. client.close();
    22. System.out.println("OK!");
    23. }
    24. ......
    25. }
    3.示例程序运行结果
        运行结果控制台:
    1. 消费消息(A): test-2
    2. 消费消息(A): test-3
    3. 消费消息(A): test-4
    4. 消费消息(A): test-7
    5. OK!

    3.DistributedPriorityQueue

        优先级队列对队列中的元素按照优先级进行排序。Priority越小,元素月靠前,越先被消费掉。
    1.DistributedPriorityQueue介绍
        通过builder.buildPriorityQueue(minItemsBeforeRefresh)方法创建。
        当优先级队列得到元素增删消息时,它会暂停处理当前的元素队列,然后刷新队列。minItemsBeforeRefresh指定刷新前当前活动的队列的最小数量。主要设置你的程序可以容忍的不排序的最小值。
        放入队列时需要指定优先级:queue.put(aMessage, priority);
    2.编写示例程序
    1. public class DistributedPriorityQueueExample
    2. {
    3. private static final String PATH = "/example/queue";
    4. public static void main(String[] args) throws Exception
    5. {
    6. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
    7. client.start();
    8. DistributedPriorityQueue<String> queue = null;
    9. QueueConsumer<String> consumer = createQueueConsumer("A");
    10. QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
    11. queue = builder.buildPriorityQueue(0);
    12. queue.start();
    13. for (int i = 0; i < 5; i++)
    14. {
    15. int priority = (int) (Math.random() * 100);
    16. System.out.println("test-" + i + " 优先级:" + priority);
    17. queue.put("test-" + i, priority);
    18. Thread.sleep((long) (50 * Math.random()));
    19. }
    20. Thread.sleep(1000 * 2);
    21. queue.close();
    22. client.close();
    23. }
    24. ......
    25. }
    3.示例程序运行结果
        运行结果控制台:
    1. test-0 优先级:34
    2. test-1 优先级:51
    3. test-2 优先级:63
    4. test-3 优先级:45
    5. test-4 优先级:36
    6. 消费消息(A): test-0
    7. 消费消息(A): test-4
    8. 消费消息(A): test-3
    9. 消费消息(A): test-1
    10. 消费消息(A): test-2
    11. OK!

    4.DistributedDelayQueue

        JDK中也有DelayQueue,不知道你是否熟悉。DistributedDelayQueue也提供了类似的功能,元素有个delay值,消费者隔一段时间才能收到元素。
    1. DistributedDelayQueue介绍
    放入元素时可以指定delayUntilEpoch:queue.put(aMessage, delayUntilEpoch);
    注意:delayUntilEpoch不是离现在的一个时间间隔,比如20毫秒,而是未来的一个时间戳,如 System.currentTimeMillis() + 10秒。如果delayUntilEpoch的时间已经过去,消息会立刻被消费者接收。
    2.编写示例程序
    1. public class DistributedDelayQueueExample
    2. {
    3. private static final String PATH = "/example/queue";
    4. public static void main(String[] args) throws Exception
    5. {
    6. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
    7. client.start();
    8. DistributedDelayQueue<String> queue = null;
    9. QueueConsumer<String> consumer = createQueueConsumer("A");
    10. QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
    11. queue = builder.buildDelayQueue();
    12. queue.start();
    13. for (int i = 0; i < 10; i++)
    14. {
    15. queue.put("test-" + i, System.currentTimeMillis() + 3000);
    16. }
    17. System.out.println("put 完成!");
    18. Thread.sleep(1000 * 5);
    19. queue.close();
    20. client.close();
    21. System.out.println("OK!");
    22. }
    23. ......
    24. }
    3.示例程序运行结果
        运行结果控制台:
    1. put 完成!
    2. 消费消息(A): test-0
    3. 消费消息(A): test-3
    4. 消费消息(A): test-1
    5. 消费消息(A): test-2
    6. 消费消息(A): test-6
    7. 消费消息(A): test-4
    8. 消费消息(A): test-5
    9. 消费消息(A): test-7
    10. 消费消息(A): test-8
    11. 消费消息(A): test-9
    12. OK!

    5.SimpleDistributedQueue

        前面虽然实现了各种队列,但是你注意到没有,这些队列并没有实现类似JDK一样的接口。SimpleDistributedQueue提供了和JDK一致性的接口(但是没有实现Queue接口)。
    1. SimpleDistributedQueue介绍
    SimpleDistributedQueue常用方法:
    1. // 创建
    2. public SimpleDistributedQueue(CuratorFramework client, String path)
    3. // 增加元素
    4. public boolean offer(byte[] data) throws Exception
    5. // 删除元素
    6. public byte[] take() throws Exception
    7. // 另外还提供了其它方法
    8. public byte[] peek() throws Exception
    9. public byte[] poll(long timeout, TimeUnit unit) throws Exception
    10. public byte[] poll() throws Exception
    11. public byte[] remove() throws Exception
    12. public byte[] element() throws Exception
    没有add方法,多了take方法。take方法在成功返回之前会被阻塞。而poll在队列为空时直接返回null。
    -------------------------------------------------------------------------------------------------------------------------------



  • 相关阅读:
    this.props.children 踩坑
    3.webpack配置
    2.项目初始化配置
    1项目库的建立
    Idea-代码背景设置
    SpringBoot+nacos-环境切换-配置文件
    Docker-镜像地址无法访问
    端口-映射、开放、定义
    Linux-命令
    Nginx-命令
  • 原文地址:https://www.cnblogs.com/LiZhiW/p/4951529.html
Copyright © 2011-2022 走看看