zoukankan      html  css  js  c++  java
  • PSQueue队列操作

      队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈(FILO,First In Last Out,先进后出)属于线性表一样,队列也是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头,即FIFO(First In First Out,先进先出)。

      高并发(High Concurrency)是互联网分布式系统架构设计中必须考虑的因素之一,它通常是指,通过设计保证系统能够同时并行处理很多请求。  

      在移动互联网高速发展的时代,各种电商平台的抢购业务变得越来越火爆,抢购业务所带来的高并发问题值得我们去探索,主要涉及的方面包括处理和响应速度、数据的一致性等。抢购开放的一瞬间,可能有成千上万的下订单请求发送到服务器去处理,如果只是简单的请求处理响应方式,不做任何处理,导致的结果很可能是很多客户长时间得不到响应,根本不知道自己是否下订单成功,或者下订单的数量是否已经超过了商品的库存数量,从而影响正常的业务。  

      高并发有诸多的解决方案,引入队列是其中一种。其主要用来减轻服务器压力负载,提高程序运行的稳定性。

      队列(Queue)又称先进先出表(First In First Out),即先进入队列的元素,先从队列中取出。利用消息队列可以很好地异步处理数据传送和存储,比如,当你频繁地向数据库中插入数据、频繁地向搜索引擎提交数据,就可采取消息队列来异步插入。另外,还可以将较慢的处理逻辑、有并发数量限制的处理逻辑,通过消息队列放在后台处理,例如FLV视频转换、发送手机短信、发送电子邮件等。

      如果要支持多台服务器,则需要更强的队列机制,如ActiveMQ,kafka等。在此,我们详解一种纯java实现的队列机制PSQueue。

      PSQueue Server 在使用中具有以下特征: 

         1.非常简单,基于 HTTP GET/POST 协议。

      PHPJavaPerlShellPythonRuby等支持HTTP协议的编程语言均可调用。

         2.完善的JMXl(Java Management Extensions,即Java管理扩展,是一个为应用程序、设备、系统等植入管理功能的框架,其主要作用是提供接口,允许有不同的实现)管理接口,所有方法全部可以由JMX来管理。为了安全管理,使用PSQueue需要口令权限。

         3.每个队列支持任意多消费者。

         4.非常快速,入队列、出队列速度超过40000/秒。

         5.高并发,支持5K以上的并发连接。

         6.支持多队列。

         7.队列个数无限制,只要系统的磁盘空间够用(缺省单个队列占用磁盘空间是2G)

         8.低内存消耗,海量数据存储,存储几十GB的数据只需不到200MB的物理内存缓冲区。

         9.可以实时查看指定队列状态(未读队列数量)。

         10.可以查看指定队列、指定消费者的内容,包括未出、已出的队列内容。

         11.查看队列内容时,支持多字符集编码。

      12.创新设计,比如,消费者可以故意倒回到老的偏移量再次消费数据。这虽然违反了队列的常见约定,但被证明是许多消费者的基本特征。

      接下来,我们详细看下PSQueue的具体使用。

      首先,我们要在电脑上安装与开启PSQueue服务器。

      在PSQueue的conf/conf.xml中,配置该项服务的账户名和密码,如下所示:

    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <conf>
        <bindAddress>*</bindAddress>
        <bindPort>1818</bindPort>
        <backlog>200</backlog>
        <soTimeout>60</soTimeout>
        <defaultCharset>UTF-8</defaultCharset>
        <gcInterval>30</gcInterval>
        <adminUser>admin</adminUser>
        <adminPass>123456</adminPass>
        <jmxPort>1819</jmxPort>
    </conf>    

      切换到PSQueue的bin文件夹下,在DOS窗口中执行下述命令:

    #安装psqueue-server
    psqueue.bat install
    
    #开启psqueue服务
    psqueue.bat start
    

       其次,在项目中导入PSQueue的jar包作为依赖,接着进行一些基本操作:

    package com.itszt.test1;
    import wjw.psqueue.client.PSQueueClient;
    import wjw.psqueue.msg.*;
    /**
     * PSQueue的基本用法
     */
    public class Test {
        //定义队列名称,订阅者名称,
        //以及要连接的队列服务器的用户名和密码
        static String queue_name = "szt_queue";
        static String sub_name = "szt_sub";
        static String username = "admin";
        static String pwd = "123456";
    
        public static void main(String[] args) {
            //定义操作队列服务器的客户端
            PSQueueClient queueClient = new PSQueueClient("127.0.0.1", 
                    1818, "UTF-8", 60 * 1000, 
                    60 * 1000);
    
            //判断指定队列是否创建
            ResQueueStatus status = queueClient.status(queue_name);
            System.out.println(status.getQueueName()+"--"+status);
            /*System.out.println(status.getStatus().code==0);
            System.out.println(ResultCode.SUCCESS.code);
            System.out.println(ResultCode.QUEUE_IS_EXIST.code);*/
            if (status.getStatus().code==0) {
                System.out.println("您已创建队列!---->" + 
                        queue_name + "
    请您操作队列");
            } else {
                System.out.println("您尚未创建队列---->" + 
                        queue_name + "
    接下来为您创建队列");
                //1.创建一个队列
                ResultCode rest = queueClient.createQueue(queue_name, 
                        10000000L, username, pwd);
                System.out.println(rest.code);
                if (rest.code == ResultCode.SUCCESS.code) {
                    System.out.println("队列创建成功!--->" + rest.toString());
                } else {
                    System.out.println("队列创建失败!--->" + rest.toString());
                }
            }
    
            //2.向队列中放数据
            /*ResAdd resAdd = queueClient.add(queue_name, 
            "Hello,Queue!" + Math.random() * 1000);
            if (resAdd.status.code == ResultCode.SUCCESS.code) {
                System.out.println("插入数据成功!--->" + resAdd);
            } else {
                System.out.println("插入数据失败!--->" + resAdd);
            }*/
    
            //3.先判断一个指定的订阅者是否存在,若不存在则创建,然后通过该订阅者来消费队列中的数据
            ResSubStatus statusForSub = queueClient.statusForSub(queue_name, sub_name);
            System.out.println(statusForSub);
    //        System.out.println(ResultCode.SUB_IS_EXIST.code+"---"+ResultCode.SUB_NOT_EXIST.code);
            if (statusForSub.getStatus().code==0) {
                System.out.println("您指定的用户--" + sub_name + 
                        "--已存在
    请通过该订阅者消费队列中的数据");
            } else {
                System.out.println("================");
                System.out.println("您指定的用户--" + sub_name + 
                        "--不存在
    请创建该用户");
                ResultCode resultCode = queueClient.createSub(queue_name, sub_name, username, pwd);
                if (resultCode.code == ResultCode.SUCCESS.code) {
                    System.out.println("用户" + sub_name + 
                            "创建成功!
    请通过该用户消费队列中的数据");
                } else {
                    System.out.println("用户创建失败,请联系管理员!");
                }
                System.out.println("==================");
            }
            //通过该订阅者消费队列中的数据
            /*ResData resData = queueClient.poll(queue_name, sub_name);
            if (resData.status.code == ResultCode.SUCCESS.code) {
                System.out.println("消费成功!--->" + resData.toString());
            } else if (resData.status.code == ResultCode.ALL_MESSAGE_CONSUMED.code) {
                System.out.println("队列中的数据已经全部消费完,请在生产后继续消费!");
            } else {
                System.out.println("resData = " + resData);
                System.out.println("消费数据失败,请联系管理员!");
            }*/
    
            //4.移除某个队列
            /*ResultCode remQueue = queueClient.removeQueue(queue_name,
             username, pwd);
            if(remQueue.code==ResultCode.SUCCESS.code){
                System.out.println("队列移除成功!");
            }else{
                System.out.println("队列移除失败,请联系管理员!");
            }*/
    
            //5.移除订阅者
            /*ResultCode remSub = queueClient.removeSub(queue_name, 
            sub_name, username, pwd);
            if(remSub.code==ResultCode.SUCCESS.code){
                System.out.println("订阅者移除成功!");
            }else{
                System.out.println("订阅者移除失败,请联系管理员!");
            }*/
    
            //6.查看队列中某个位置的数据
            ResData resData = queueClient.view(queue_name, 0);
            if(resData.status.code==ResultCode.SUCCESS.code){
                System.out.println("成功:" + resData.toString());
            }else{
                System.out.println("失败:" + resData.toString());
            }
    
            //7.列出全部队列
            ResList resList = queueClient.queueNames();
            System.out.println("全部队列:"+resList.data);
    
            //8.列出全部订阅者
            ResList subNames = queueClient.subNames(queue_name);
            System.out.println(queue_name+"队列的全部用户:" + subNames.data);
    
            //9.重置队列
            /*ResultCode resultCode = queueClient.resetQueue(queue_name,
                                    username, pwd);
            if(resultCode.code==ResultCode.SUCCESS.code){
                System.out.println(queue_name+"队列重置成功!");
            }else{
                System.out.println(queue_name+"队列重置失败!");
            }*/
        }
    }  

      接下来,我们再操作自定义类型的对象数据:

    package com.itszt.test2;
    
    import com.google.gson.Gson;
    import wjw.psqueue.client.PSQueueClient;
    import wjw.psqueue.msg.*;
    
    import java.util.Date;
    
    /**
     * 测试订单操作
     * 在原先导入EasyFastJson-2.7.2.jar和PSQueueClient-1.0.4.jar两个jar包后,
     * 再导入gson-2.7.jar包,并均对其添加依赖
     */
    public class Test {
        static String queue_name = "szt_queue";
        static String sub_name = "szt_sub";
        static String username = "admin";
        static String pwd = "123456";
    
        static PSQueueClient queueClient = new PSQueueClient("127.0.0.1", 1818, "UTF-8", 60 * 1000, 60 * 1000);
        static Gson gson=new Gson();
        public Test(){
            //查询指定队列是否存在,不存在则创建
            ResQueueStatus rqs = queueClient.status(queue_name);
            if(rqs.status.code== ResultCode.SUCCESS.code){
                System.out.println(queue_name+"对列已存在");
            }else{
                System.out.println(queue_name+"对列不存在,将创建队列");
                ResultCode resultCode = queueClient.createQueue(queue_name, 10000000L, username, pwd);
                if(resultCode.code==ResultCode.SUCCESS.code){
                    System.out.println(queue_name+"队列已创建成功!");
                }else{
                    System.out.println(queue_name+"队列创建失败!请联系管理员");
                }
            }
            //查询指定用户是否存在,不存在则创建
            ResSubStatus resSubStatus = queueClient.statusForSub(queue_name, sub_name);
            if(resSubStatus.getStatus().code==0){
                System.out.println(sub_name+"用户已存在!");
            }else{
                System.out.println(sub_name+"用户不存在!请创建用户!");
                ResultCode resultCode = queueClient.createSub(queue_name, sub_name, username, pwd);
                if (resultCode.code== ResultCode.SUCCESS.code) {
                    System.out.println(sub_name+"用户创建成功!");
                }else{
                    System.out.println(sub_name+"用户创建失败!");
                }
            }
        }
    
        public static void main(String[] args) {
    //        queueClient.resetQueue(queue_name,username,pwd);
            new AddThread().start();
            new AddThread().start();
            new PollThread().start();
            new PollThread().start();
            new PollThread().start();
            new PollThread().start();
            new AddThread().start();
            new AddThread().start();
        }
    
        //生产者线程
        static class AddThread extends Thread{
            private static int id=1;
            @Override
            public void run() {
                //生产1000条数据
                for(int i=0;i<1000;i++){
                    OrderRequest order=new OrderRequest(++id, "张三的订单" + id, new Date().toString());
                    ResAdd resAdd = queueClient.add(queue_name, gson.toJson(order));
                    if(resAdd.status.code==ResultCode.SUCCESS.code){
                        System.out.println("订单"+order.getOrderName()+"放入队列成功!");
                    }else{
                        System.out.println("订单"+order.getOrderName()+"放入队列失败!");
                    }
                }
            }
        }
    
        //消费者线程
        static class PollThread extends Thread{
            @Override
            public void run() {
                while (true){
                    ResData resData = queueClient.poll(queue_name, sub_name);
                    if(resData.status.code==ResultCode.QUEUE_POLL_ERROR.code){
                        System.out.println("队列中的数据取完了!");
                        break;
                    }else{
                        OrderRequest orderFromJson = gson.fromJson(resData.getData(), OrderRequest.class);
                        if(orderFromJson==null){
                            System.out.println("队列中的数据取完了!");
                            break;
                        }
                        System.out.println("取出一个数据:"+orderFromJson);
                    }
                }
            }
        }
    } 
    

      到此,PSQueue的演示操作基本完毕,希望对您有所帮助!

  • 相关阅读:
    go函数
    Linux 查看磁盘容量、查找大文件、查找大目录
    五分钟理解一致性哈希算法(consistent hashing)
    使用Java实现三个线程交替打印0-74
    Python实现IOC控制反转
    Wannafly挑战赛5 A珂朵莉与宇宙 前缀和+枚举平方数
    Yandex Big Data Essentials Week1 Scaling Distributed File System
    Yandex Big Data Essentials Week1 Unix Command Line Interface Processes managing
    Yandex Big Data Essentials Week1 Unix Command Line Interface File Content exploration
    Yandex Big Data Essentials Week1 Unix Command Line Interface File System exploration
  • 原文地址:https://www.cnblogs.com/lizhangyong/p/8595977.html
Copyright © 2011-2022 走看看