zoukankan      html  css  js  c++  java
  • rabbitmq消息消费者

    pom

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>springcloudparent</artifactId>
            <groupId>com.cxy</groupId>
            <version>0.0.1-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>rabbitMqConsumer</artifactId>
        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.4.3</version>
            </dependency>
        </dependencies>
    
    </project>

    消费者代码:

    package com.cxy.consumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /***
     * @ClassName: Consumer1
     * @Description:
     * @Auther: cxy
     * @Date: 2019/3/24:11:37
     * @version : V1.0
     */
    public class Consumer1 {
        private  static  final  String Queue="helloworld";
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory= new ConnectionFactory();
            //设置连接地址
            connectionFactory.setHost("192.168.230.134");
            //设置端口
            connectionFactory.setPort(5672);
            //设置密码用户名
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //设置虚拟机,每个虚拟机相当于一个小的mq
            connectionFactory.setVirtualHost("/");
            Connection connection =null;
            try {
                //建立连接
                connection = connectionFactory.newConnection();
                //建立通道,生产着和消费者都是在通道中完成
                Channel channel = connection.createChannel();
                /*
                queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments)
                 参数1,声明队列
                 参数2 是否持久化
                 参数3 是否排他,是否独战连接,队列只允许该链接中访问,如果连接关闭,队列也就删除了
                 参数4:是否自动删除,如果将此参数设置true,那么就变成零时队列
                 参数5 :扩展参数,例如存活时间
               */
                channel.queueDeclare(Queue,true,false,false,null);
              /*
             String basicConsume(String queue, boolean autoAck, Consumer callback)
             参数一:队列名称
             参数二:自动回复
             参数三 消费者方法
              */
                DefaultConsumer defaultConsumer=new DefaultConsumer(channel) {
                    //当接受到消息时候,此方法被调用
                    /**
                    * @Author cxy
                    * @Description //TODO
                    * @Date  2019/3/24
                    * @Param [consumerTag, envelope, properties, body]
                    * @return void
                     *
                     * consumerTag 用来标识.可以再监听队列时候设置
                     * envelope 信封,通过envelope可以通过这个获取到很多东西
                     * properties 额外的消息属性
                     * body:消息体
                    **/
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //获取交换机
                        String exchange = envelope.getExchange();
                        //消息id,用来表示那个消息消费了
                        long deliveryTag = envelope.getDeliveryTag();
                        String message=new String(body,"utf-8");
                        System.out.println("receive");
                    }
                };
               channel.basicConsume(Queue,true ,defaultConsumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
        }
    }

    由于注释内容都写得很详细就没有单独写文字了.运行之后可以发掘管控台中消息没有了,

    在正式开发中不会使用这种原生得代码去使用,会采用springboot去整合相关内容,至于以上代码为什么还要去监听队列,防止如果队列不存在,程序会存在异常,所以这样,在正式开发中

    会采用手动会签得方式,如果,没有会签,就会进行消息重试.保证消息不会丢失

  • 相关阅读:
    js中undefined,null,NaN的区别
    js中数字计算精度
    BestCoder Round #32
    POJ 2299 求逆序对(归并排序或树状数组)
    POJ 2603
    CodeForces 515C
    POJ 1853 背包问题
    UVA 10115 子符串替换
    POJ 1155 树状dp
    HDU 2196 树状dp 求树中节点之间的最长距离
  • 原文地址:https://www.cnblogs.com/xiufengchen/p/10587732.html
Copyright © 2011-2022 走看看