使用RabbitMQ,业务需求,想要知道队列中还有多少待消费待数据。
方式一:
@Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private Integer port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; ConnectionFactory factory = new ConnectionFactory(); //设置ip,port,用户名和秘密 factory.setHost(host); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); //创建链接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //创建一个type=direct 持久化的 非自动删除的交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
String []queues = queueNames.split(","); for(String queue : queues){ DeclareOk declareOk = channel.queueDeclarePassive(queue); //获取队列中的消息个数 int num = declareOk.getMessageCount(); }
方式二:
spring配置文件:
<!-- 连接配置 --> <rabbit:connection-factory id="rabbitConnectionFactory" host="#{app['mq.broker.host']}" port="#{app['mq.broker.port']}" username="#{app['mq.broker.username']}" password="#{app['mq.broker.password']}" virtual-host="/" publisher-confirms="true"/>
<!-- mq模板 --> <rabbit:template id="rabbitTemplate" connectionfactory="rabbitConnectionFactory" message-converter="jsonMessageConverter"/>
java代码:
@Autowired private RabbitTemplate rabbitTemplate; ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory(); // 创建连接 Connection connection = connectionFactory.createConnection(); // 创建通道 Channel channel = connection.createChannel(false); // 设置消息交换机 channel.exchangeDeclare("amp.topic", "direct", true, false, null); DeclareOk declareOk = channel.queueDeclarePassive(LETTER_CLASS_QUEUE); //获取队列中的消息个数 int queueCount = declareOk.getMessageCount(); result.put("queueCount", String.valueOf(queueCount)); // 关闭通道和连接 channel.close(); connection.close();