zoukankan      html  css  js  c++  java
  • 十八、curator recipes之DistributedDelayQueue

    简介

    curator实现了类似DelayQueue的分布式延迟队列

    官方文档:http://curator.apache.org/curator-recipes/distributed-delay-queue.html

    javaDoc:http://curator.apache.org/apidocs/org/apache/curator/framework/recipes/queue/DistributedDelayQueue.html

    代码示例

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.queue.*;
    import org.apache.curator.framework.state.ConnectionState;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    import java.io.UnsupportedEncodingException;
    
    public class DistributedDelayQueueDemo {
        private static CuratorFramework              client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(3000, 2));
        private static String                        path = "/queue/0001";
        private static DistributedDelayQueue<String> queue;
        static {
            client.start();
        }
    
        public static void main(String[] args) throws Exception {
            QueueConsumer<String> consumer = new QueueConsumer<String>() {
                @Override
                public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                    System.out.println("state changed");
                }
    
                @Override
                public void consumeMessage(String s) {
                    System.out.println("消费数据:" + s);
                }
            };
            queue = QueueBuilder.builder(client, consumer, new QueueSerializer<String>() {
                @Override
                public byte[] serialize(String s) {
                    try {
                        return s.getBytes("utf-8");
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                    return null;
                }
    
                @Override
                public String deserialize(byte[] bytes) {
                    return new String(bytes);
                }
            }, path).buildDelayQueue();
            queue.start();
            System.out.println("queue built");
            // 当前时间 + 延迟时间 = 未来时间
            queue.put("a", System.currentTimeMillis() + 1000);
            queue.put("b", System.currentTimeMillis() + 20000);
            System.out.println("put ended");
            Thread.sleep(30000);
            queue.close();
            Thread.sleep(50000);
            client.close();
        }
    }
  • 相关阅读:
    分布式事务--AT+TCC
    Java基础面试题
    JVM问题
    集合问题
    线程问题
    微服务面试题
    【入职准备】安装STS以及整合maven
    事务----四大特性
    html小知识--创建表单
    通过css润色html表格
  • 原文地址:https://www.cnblogs.com/lay2017/p/10276579.html
Copyright © 2011-2022 走看看