zoukankan      html  css  js  c++  java
  • 查询mq队列的消息数

    package com.hmzj.callcentercommon.mq;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import sun.misc.BASE64Encoder;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.HttpURLConnection;
    import java.net.URL;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Objects;
    
    /**
     * @author Yangqi.Pang 1039467780@qq.com
     * @version V0.0.1
     * @date 2019/2/1 18:25
     */
    @Component
    @Slf4j
    public class MqUtils {
        @Value("${spring.rabbitmq.username}")
        private String username;
        @Value("${spring.rabbitmq.password}")
        private String password;
        @Value("${spring.rabbitmq.host}")
        private  String host;
        private  String virtualHosts="%2f";//当虚拟主机名为"/"时用"%2f"来表示
    
        /**
         * 队列任务总数
         *
         * @param queueName
         * @return
         */
        public long getMessageCount(String queueName) throws IOException {
            String apiMessage = getApiMessage(queueName);
            if (Objects.equals(apiMessage, "")) {
                log.error("请求RabbitMQ API时出错!!");
                return 0;
            }
            JSONObject jsonObject = JSON.parseObject(apiMessage);
            if(null==jsonObject.get("messages")){
                return 0;
            }else {
                return Integer.parseInt(jsonObject.get("messages").toString());
            }
        }
    
        /**
         * 队列ready任务数
         *
         * @param queueName
         * @return
         */
        public int getMessageReadyCount(String queueName) throws IOException {
            String apiMessage = getApiMessage(queueName);
            if (Objects.equals(apiMessage, "")) {
                log.error("请求RabbitMQ API时出错!!");
                return 0;
            }
            JSONObject jsonObject = JSON.parseObject(apiMessage);
            return Integer.parseInt(jsonObject.get("messages_ready").toString());
        }
    
        /**
         * 队列unack数MQ
         *
         * @param queueName
         * @return
         */
        public int getMessagesUnacknowledgedCount(String queueName) throws IOException {
            String apiMessage = getApiMessage(queueName);
            if (Objects.equals(apiMessage, "")) {
                log.error("请求RabbitMQ API时出错!!");
                return 0;
            }
            JSONObject jsonObject = JSON.parseObject(apiMessage);
            return Integer.parseInt(jsonObject.get("messages_unacknowledged").toString());
        }
    
        /**
         * 获取队列消息总数、ready消息数、unack消息数
         *
         * @param queueName
         * @return Map<String,Integer>
         */
        public Map<String, Integer> getMQCountMap(String queueName) throws IOException {
            String apiMessage = getApiMessage(queueName);
            JSONObject jsonObject = JSON.parseObject(apiMessage);
            Map<String, Integer> map = new HashMap<>();
            map.put("messages", Integer.parseInt(jsonObject.get("messages").toString()));
            map.put("messages_ready", Integer.parseInt(jsonObject.get("messages_ready").toString()));
            map.put("messages_unacknowledged", Integer.parseInt(jsonObject.get("messages_unacknowledged").toString()));
            return map;
        }
    
        public String getApiMessage(String queueName) throws IOException {
            //发送一个GET请求
            HttpURLConnection httpConn = null;
            BufferedReader in = null;
    
            String urlString = "http://" + host + ":" + "15672" + "/api/queues/"+virtualHosts+"/" + queueName;
            URL url = new URL(urlString);
            httpConn = (HttpURLConnection) url.openConnection();
            //设置用户名密码
            String auth = username + ":" + password;
            BASE64Encoder enc = new BASE64Encoder();
            String encoding = enc.encode(auth.getBytes());
            httpConn.setDoOutput(true);
            httpConn.setRequestProperty("Authorization", "Basic " + encoding);
            // 建立实际的连接
            httpConn.connect();
            //读取响应
            if (httpConn.getResponseCode() == HttpURLConnection.HTTP_OK) {
                StringBuilder content = new StringBuilder();
                String tempStr = "";
                in = new BufferedReader(new InputStreamReader(httpConn.getInputStream()));
                while ((tempStr = in.readLine()) != null) {
                    content.append(tempStr);
                }
                in.close();
                httpConn.disconnect();
                return content.toString();
            } else {
                httpConn.disconnect();
                return "";
            }
        }
    }

    mq的httpAPI http://ip:15672/api

  • 相关阅读:
    亚洲区哈尔滨赛区网络预选赛over
    背包问题
    Memcache基础教程
    Telnet的命令
    Telnet技术白皮书
    workthread模式
    Telnet的命令
    telnet 测试memcached
    telnet 测试memcached
    Memcache基础教程
  • 原文地址:https://www.cnblogs.com/pangyangqi/p/10347722.html
Copyright © 2011-2022 走看看