zoukankan      html  css  js  c++  java
  • 查询mq队列的消息数

    package com.hmzj.callcentercommon.mq;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import sun.misc.BASE64Encoder;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.HttpURLConnection;
    import java.net.URL;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Objects;
    
    /**
     * @author Yangqi.Pang 1039467780@qq.com
     * @version V0.0.1
     * @date 2019/2/1 18:25
     */
    @Component
    @Slf4j
    public class MqUtils {
        @Value("${spring.rabbitmq.username}")
        private String username;
        @Value("${spring.rabbitmq.password}")
        private String password;
        @Value("${spring.rabbitmq.host}")
        private  String host;
        private  String virtualHosts="%2f";//当虚拟主机名为"/"时用"%2f"来表示
    
        /**
         * 队列任务总数
         *
         * @param queueName
         * @return
         */
        public long getMessageCount(String queueName) throws IOException {
            String apiMessage = getApiMessage(queueName);
            if (Objects.equals(apiMessage, "")) {
                log.error("请求RabbitMQ API时出错!!");
                return 0;
            }
            JSONObject jsonObject = JSON.parseObject(apiMessage);
            if(null==jsonObject.get("messages")){
                return 0;
            }else {
                return Integer.parseInt(jsonObject.get("messages").toString());
            }
        }
    
        /**
         * 队列ready任务数
         *
         * @param queueName
         * @return
         */
        public int getMessageReadyCount(String queueName) throws IOException {
            String apiMessage = getApiMessage(queueName);
            if (Objects.equals(apiMessage, "")) {
                log.error("请求RabbitMQ API时出错!!");
                return 0;
            }
            JSONObject jsonObject = JSON.parseObject(apiMessage);
            return Integer.parseInt(jsonObject.get("messages_ready").toString());
        }
    
        /**
         * 队列unack数MQ
         *
         * @param queueName
         * @return
         */
        public int getMessagesUnacknowledgedCount(String queueName) throws IOException {
            String apiMessage = getApiMessage(queueName);
            if (Objects.equals(apiMessage, "")) {
                log.error("请求RabbitMQ API时出错!!");
                return 0;
            }
            JSONObject jsonObject = JSON.parseObject(apiMessage);
            return Integer.parseInt(jsonObject.get("messages_unacknowledged").toString());
        }
    
        /**
         * 获取队列消息总数、ready消息数、unack消息数
         *
         * @param queueName
         * @return Map<String,Integer>
         */
        public Map<String, Integer> getMQCountMap(String queueName) throws IOException {
            String apiMessage = getApiMessage(queueName);
            JSONObject jsonObject = JSON.parseObject(apiMessage);
            Map<String, Integer> map = new HashMap<>();
            map.put("messages", Integer.parseInt(jsonObject.get("messages").toString()));
            map.put("messages_ready", Integer.parseInt(jsonObject.get("messages_ready").toString()));
            map.put("messages_unacknowledged", Integer.parseInt(jsonObject.get("messages_unacknowledged").toString()));
            return map;
        }
    
        public String getApiMessage(String queueName) throws IOException {
            //发送一个GET请求
            HttpURLConnection httpConn = null;
            BufferedReader in = null;
    
            String urlString = "http://" + host + ":" + "15672" + "/api/queues/"+virtualHosts+"/" + queueName;
            URL url = new URL(urlString);
            httpConn = (HttpURLConnection) url.openConnection();
            //设置用户名密码
            String auth = username + ":" + password;
            BASE64Encoder enc = new BASE64Encoder();
            String encoding = enc.encode(auth.getBytes());
            httpConn.setDoOutput(true);
            httpConn.setRequestProperty("Authorization", "Basic " + encoding);
            // 建立实际的连接
            httpConn.connect();
            //读取响应
            if (httpConn.getResponseCode() == HttpURLConnection.HTTP_OK) {
                StringBuilder content = new StringBuilder();
                String tempStr = "";
                in = new BufferedReader(new InputStreamReader(httpConn.getInputStream()));
                while ((tempStr = in.readLine()) != null) {
                    content.append(tempStr);
                }
                in.close();
                httpConn.disconnect();
                return content.toString();
            } else {
                httpConn.disconnect();
                return "";
            }
        }
    }

    mq的httpAPI http://ip:15672/api

  • 相关阅读:
    Java基础:基本类型
    完全干净的卸载VS2013
    git本地仓库首次push到远程仓库出现错误 ! [rejected] master -> master (fetch first)
    运行VS出现warning C4996错误的解决办法
    xbmc-android的编译
    linux执行sh,出现/bin/sh^M: bad interpreter: No such file or directory
    Ubuntu配置android-vlc编译环境(2015-11-05)
    a80修改默认4k输出,官方代码锁死了
    ubuntu12.04平台下a80编译环境搭建
    编译java代码出现 错误: 需要class, interface或enum 提示
  • 原文地址:https://www.cnblogs.com/pangyangqi/p/10347722.html
Copyright © 2011-2022 走看看