zoukankan      html  css  js  c++  java
  • PSQueue队列操作

      队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈(FILO,First In Last Out,先进后出)属于线性表一样,队列也是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头,即FIFO(First In First Out,先进先出)。

      高并发(High Concurrency)是互联网分布式系统架构设计中必须考虑的因素之一,它通常是指,通过设计保证系统能够同时并行处理很多请求。  

      在移动互联网高速发展的时代,各种电商平台的抢购业务变得越来越火爆,抢购业务所带来的高并发问题值得我们去探索,主要涉及的方面包括处理和响应速度、数据的一致性等。抢购开放的一瞬间,可能有成千上万的下订单请求发送到服务器去处理,如果只是简单的请求处理响应方式,不做任何处理,导致的结果很可能是很多客户长时间得不到响应,根本不知道自己是否下订单成功,或者下订单的数量是否已经超过了商品的库存数量,从而影响正常的业务。  

      高并发有诸多的解决方案,引入队列是其中一种。其主要用来减轻服务器压力负载,提高程序运行的稳定性。

      队列(Queue)又称先进先出表(First In First Out),即先进入队列的元素,先从队列中取出。利用消息队列可以很好地异步处理数据传送和存储,比如,当你频繁地向数据库中插入数据、频繁地向搜索引擎提交数据,就可采取消息队列来异步插入。另外,还可以将较慢的处理逻辑、有并发数量限制的处理逻辑,通过消息队列放在后台处理,例如FLV视频转换、发送手机短信、发送电子邮件等。

      如果要支持多台服务器,则需要更强的队列机制,如ActiveMQ,kafka等。在此,我们详解一种纯java实现的队列机制PSQueue。

      PSQueue Server 在使用中具有以下特征: 

         1.非常简单,基于 HTTP GET/POST 协议。

      PHPJavaPerlShellPythonRuby等支持HTTP协议的编程语言均可调用。

         2.完善的JMXl(Java Management Extensions,即Java管理扩展,是一个为应用程序、设备、系统等植入管理功能的框架,其主要作用是提供接口,允许有不同的实现)管理接口,所有方法全部可以由JMX来管理。为了安全管理,使用PSQueue需要口令权限。

         3.每个队列支持任意多消费者。

         4.非常快速,入队列、出队列速度超过40000/秒。

         5.高并发,支持5K以上的并发连接。

         6.支持多队列。

         7.队列个数无限制,只要系统的磁盘空间够用(缺省单个队列占用磁盘空间是2G)

         8.低内存消耗,海量数据存储,存储几十GB的数据只需不到200MB的物理内存缓冲区。

         9.可以实时查看指定队列状态(未读队列数量)。

         10.可以查看指定队列、指定消费者的内容,包括未出、已出的队列内容。

         11.查看队列内容时,支持多字符集编码。

      12.创新设计,比如,消费者可以故意倒回到老的偏移量再次消费数据。这虽然违反了队列的常见约定,但被证明是许多消费者的基本特征。

      接下来,我们详细看下PSQueue的具体使用。

      首先,我们要在电脑上安装与开启PSQueue服务器。

      在PSQueue的conf/conf.xml中,配置该项服务的账户名和密码,如下所示:

    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <conf>
        <bindAddress>*</bindAddress>
        <bindPort>1818</bindPort>
        <backlog>200</backlog>
        <soTimeout>60</soTimeout>
        <defaultCharset>UTF-8</defaultCharset>
        <gcInterval>30</gcInterval>
        <adminUser>admin</adminUser>
        <adminPass>123456</adminPass>
        <jmxPort>1819</jmxPort>
    </conf>    

      切换到PSQueue的bin文件夹下,在DOS窗口中执行下述命令:

    #安装psqueue-server
    psqueue.bat install
    
    #开启psqueue服务
    psqueue.bat start
    

       其次,在项目中导入PSQueue的jar包作为依赖,接着进行一些基本操作:

    package com.itszt.test1;
    import wjw.psqueue.client.PSQueueClient;
    import wjw.psqueue.msg.*;
    /**
     * PSQueue的基本用法
     */
    public class Test {
        //定义队列名称,订阅者名称,
        //以及要连接的队列服务器的用户名和密码
        static String queue_name = "szt_queue";
        static String sub_name = "szt_sub";
        static String username = "admin";
        static String pwd = "123456";
    
        public static void main(String[] args) {
            //定义操作队列服务器的客户端
            PSQueueClient queueClient = new PSQueueClient("127.0.0.1", 
                    1818, "UTF-8", 60 * 1000, 
                    60 * 1000);
    
            //判断指定队列是否创建
            ResQueueStatus status = queueClient.status(queue_name);
            System.out.println(status.getQueueName()+"--"+status);
            /*System.out.println(status.getStatus().code==0);
            System.out.println(ResultCode.SUCCESS.code);
            System.out.println(ResultCode.QUEUE_IS_EXIST.code);*/
            if (status.getStatus().code==0) {
                System.out.println("您已创建队列!---->" + 
                        queue_name + "
    请您操作队列");
            } else {
                System.out.println("您尚未创建队列---->" + 
                        queue_name + "
    接下来为您创建队列");
                //1.创建一个队列
                ResultCode rest = queueClient.createQueue(queue_name, 
                        10000000L, username, pwd);
                System.out.println(rest.code);
                if (rest.code == ResultCode.SUCCESS.code) {
                    System.out.println("队列创建成功!--->" + rest.toString());
                } else {
                    System.out.println("队列创建失败!--->" + rest.toString());
                }
            }
    
            //2.向队列中放数据
            /*ResAdd resAdd = queueClient.add(queue_name, 
            "Hello,Queue!" + Math.random() * 1000);
            if (resAdd.status.code == ResultCode.SUCCESS.code) {
                System.out.println("插入数据成功!--->" + resAdd);
            } else {
                System.out.println("插入数据失败!--->" + resAdd);
            }*/
    
            //3.先判断一个指定的订阅者是否存在,若不存在则创建,然后通过该订阅者来消费队列中的数据
            ResSubStatus statusForSub = queueClient.statusForSub(queue_name, sub_name);
            System.out.println(statusForSub);
    //        System.out.println(ResultCode.SUB_IS_EXIST.code+"---"+ResultCode.SUB_NOT_EXIST.code);
            if (statusForSub.getStatus().code==0) {
                System.out.println("您指定的用户--" + sub_name + 
                        "--已存在
    请通过该订阅者消费队列中的数据");
            } else {
                System.out.println("================");
                System.out.println("您指定的用户--" + sub_name + 
                        "--不存在
    请创建该用户");
                ResultCode resultCode = queueClient.createSub(queue_name, sub_name, username, pwd);
                if (resultCode.code == ResultCode.SUCCESS.code) {
                    System.out.println("用户" + sub_name + 
                            "创建成功!
    请通过该用户消费队列中的数据");
                } else {
                    System.out.println("用户创建失败,请联系管理员!");
                }
                System.out.println("==================");
            }
            //通过该订阅者消费队列中的数据
            /*ResData resData = queueClient.poll(queue_name, sub_name);
            if (resData.status.code == ResultCode.SUCCESS.code) {
                System.out.println("消费成功!--->" + resData.toString());
            } else if (resData.status.code == ResultCode.ALL_MESSAGE_CONSUMED.code) {
                System.out.println("队列中的数据已经全部消费完,请在生产后继续消费!");
            } else {
                System.out.println("resData = " + resData);
                System.out.println("消费数据失败,请联系管理员!");
            }*/
    
            //4.移除某个队列
            /*ResultCode remQueue = queueClient.removeQueue(queue_name,
             username, pwd);
            if(remQueue.code==ResultCode.SUCCESS.code){
                System.out.println("队列移除成功!");
            }else{
                System.out.println("队列移除失败,请联系管理员!");
            }*/
    
            //5.移除订阅者
            /*ResultCode remSub = queueClient.removeSub(queue_name, 
            sub_name, username, pwd);
            if(remSub.code==ResultCode.SUCCESS.code){
                System.out.println("订阅者移除成功!");
            }else{
                System.out.println("订阅者移除失败,请联系管理员!");
            }*/
    
            //6.查看队列中某个位置的数据
            ResData resData = queueClient.view(queue_name, 0);
            if(resData.status.code==ResultCode.SUCCESS.code){
                System.out.println("成功:" + resData.toString());
            }else{
                System.out.println("失败:" + resData.toString());
            }
    
            //7.列出全部队列
            ResList resList = queueClient.queueNames();
            System.out.println("全部队列:"+resList.data);
    
            //8.列出全部订阅者
            ResList subNames = queueClient.subNames(queue_name);
            System.out.println(queue_name+"队列的全部用户:" + subNames.data);
    
            //9.重置队列
            /*ResultCode resultCode = queueClient.resetQueue(queue_name,
                                    username, pwd);
            if(resultCode.code==ResultCode.SUCCESS.code){
                System.out.println(queue_name+"队列重置成功!");
            }else{
                System.out.println(queue_name+"队列重置失败!");
            }*/
        }
    }  

      接下来,我们再操作自定义类型的对象数据:

    package com.itszt.test2;
    
    import com.google.gson.Gson;
    import wjw.psqueue.client.PSQueueClient;
    import wjw.psqueue.msg.*;
    
    import java.util.Date;
    
    /**
     * 测试订单操作
     * 在原先导入EasyFastJson-2.7.2.jar和PSQueueClient-1.0.4.jar两个jar包后,
     * 再导入gson-2.7.jar包,并均对其添加依赖
     */
    public class Test {
        static String queue_name = "szt_queue";
        static String sub_name = "szt_sub";
        static String username = "admin";
        static String pwd = "123456";
    
        static PSQueueClient queueClient = new PSQueueClient("127.0.0.1", 1818, "UTF-8", 60 * 1000, 60 * 1000);
        static Gson gson=new Gson();
        public Test(){
            //查询指定队列是否存在,不存在则创建
            ResQueueStatus rqs = queueClient.status(queue_name);
            if(rqs.status.code== ResultCode.SUCCESS.code){
                System.out.println(queue_name+"对列已存在");
            }else{
                System.out.println(queue_name+"对列不存在,将创建队列");
                ResultCode resultCode = queueClient.createQueue(queue_name, 10000000L, username, pwd);
                if(resultCode.code==ResultCode.SUCCESS.code){
                    System.out.println(queue_name+"队列已创建成功!");
                }else{
                    System.out.println(queue_name+"队列创建失败!请联系管理员");
                }
            }
            //查询指定用户是否存在,不存在则创建
            ResSubStatus resSubStatus = queueClient.statusForSub(queue_name, sub_name);
            if(resSubStatus.getStatus().code==0){
                System.out.println(sub_name+"用户已存在!");
            }else{
                System.out.println(sub_name+"用户不存在!请创建用户!");
                ResultCode resultCode = queueClient.createSub(queue_name, sub_name, username, pwd);
                if (resultCode.code== ResultCode.SUCCESS.code) {
                    System.out.println(sub_name+"用户创建成功!");
                }else{
                    System.out.println(sub_name+"用户创建失败!");
                }
            }
        }
    
        public static void main(String[] args) {
    //        queueClient.resetQueue(queue_name,username,pwd);
            new AddThread().start();
            new AddThread().start();
            new PollThread().start();
            new PollThread().start();
            new PollThread().start();
            new PollThread().start();
            new AddThread().start();
            new AddThread().start();
        }
    
        //生产者线程
        static class AddThread extends Thread{
            private static int id=1;
            @Override
            public void run() {
                //生产1000条数据
                for(int i=0;i<1000;i++){
                    OrderRequest order=new OrderRequest(++id, "张三的订单" + id, new Date().toString());
                    ResAdd resAdd = queueClient.add(queue_name, gson.toJson(order));
                    if(resAdd.status.code==ResultCode.SUCCESS.code){
                        System.out.println("订单"+order.getOrderName()+"放入队列成功!");
                    }else{
                        System.out.println("订单"+order.getOrderName()+"放入队列失败!");
                    }
                }
            }
        }
    
        //消费者线程
        static class PollThread extends Thread{
            @Override
            public void run() {
                while (true){
                    ResData resData = queueClient.poll(queue_name, sub_name);
                    if(resData.status.code==ResultCode.QUEUE_POLL_ERROR.code){
                        System.out.println("队列中的数据取完了!");
                        break;
                    }else{
                        OrderRequest orderFromJson = gson.fromJson(resData.getData(), OrderRequest.class);
                        if(orderFromJson==null){
                            System.out.println("队列中的数据取完了!");
                            break;
                        }
                        System.out.println("取出一个数据:"+orderFromJson);
                    }
                }
            }
        }
    } 
    

      到此,PSQueue的演示操作基本完毕,希望对您有所帮助!

  • 相关阅读:
    (转)Python中的__init__和__new__
    PEP8
    python lstrip()函数
    python中的生成器跟迭代器
    callback
    关于0.0.0.0这个ip的疑问
    Python import中相对路径的问题
    python读取excel
    git本地管理多个密钥/账户
    词法分析之有确定、不确定自动机及其生成器
  • 原文地址:https://www.cnblogs.com/lizhangyong/p/8595977.html
Copyright © 2011-2022 走看看