zoukankan      html  css  js  c++  java
  • RabbitMQ第四篇:Spring集成RabbitMQ

       前面几篇讲解了如何使用rabbitMq,这一篇主要讲解spring集成rabbitmq。

       首先引入配置文件org.springframework.amqp,如下

            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>1.6.0.RELEASE</version>
            </dependency>

    一:配置消费者和生成者公共部分

    复制代码
    <rabbit:connection-factory id="connectionFactory" host="${rabbit.hosts}"
                                   port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}" virtual-host="${rabbit.virtualHost}"
                                   channel-cache-size="50"/>
        <rabbit:admin connection-factory="connectionFactory"/>
        <!--定义消息队列-->
        <rabbit:queue name="spittle.alert.queue.1" durable="true" auto-delete="false"/>
        <rabbit:queue name="spittle.alert.queue.2" durable="true" auto-delete="false"/>
        <rabbit:queue name="spittle.alert.queue.3" durable="true" auto-delete="false"/>
        <!--绑定队列-->
        <rabbit:fanout-exchange id="spittle.fanout" name="spittle.fanout" durable="true">
            <rabbit:bindings>
                <rabbit:binding queue="spittle.alert.queue.1"></rabbit:binding>
                <rabbit:binding queue="spittle.alert.queue.2"></rabbit:binding>
                <rabbit:binding queue="spittle.alert.queue.3"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:fanout-exchange>
    复制代码

    二:配置生成者

    <import resource="amqp-share.xml"/>
        <!--创建消息队列模板-->
        <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
                         exchange="spittle.fanout" message-converter="jsonMessageConverter">
        </rabbit:template>
        <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>

    三:生产者程序

    复制代码
    public class Spittle implements Serializable {
        private Long id;
        private Spitter spitter;
        private String message;
        private Date postedTime;
    
        public Spittle(Long id, Spitter spitter, String message, Date postedTime) {
            this.id = id;
            this.spitter = spitter;
            this.message = message;
            this.postedTime = postedTime;
        }
    
        public Long getId() {
            return this.id;
        }
    
        public String getMessage() {
            return this.message;
        }
    
        public Date getPostedTime() {
            return this.postedTime;
        }
    
        public Spitter getSpitter() {
            return this.spitter;
        }
    }
    复制代码
    复制代码
    public class ProducerMain {
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new ClassPathXmlApplicationContext("amqp/amqp-producer.xml");
            AmqpTemplate template = (AmqpTemplate) context.getBean("rabbitTemplate");
            for (int i = 0; i < 20; i++) {
                System.out.println("Sending message #" + i);
                Spittle spittle = new Spittle((long) i, null, "Hello world (" + i + ")", new Date());
                template.convertAndSend(spittle);
                Thread.sleep(5000);
            }
            System.out.println("Done!");
        }
    }
    复制代码

    其中convertAndSend方法默认第一个参数是交换机名称,第二个参数是路由名称,第三个才是我们发送的数据,现在我们启动程序,效果如下

    第四个:消费者程序

    首先编写一个用于监听生产者发送信息的代码

    复制代码
    /**
     * Created by Administrator on 2016/11/18.
     */
    public class SpittleAlertHandler implements MessageListener {
        @Override
        public void onMessage(Message message) {
            try {
                String body=new String(message.getBody(),"UTF-8");
                System.out.println(body);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    }
    复制代码

    一定要注意实现MessageListener,我们只需要获取message的body即可,通过json来转换我们需要的程序(比如我们可以发送一个map,map存放方法和实体,这样我们可以通过反射来调用不同的程序来运行)。

    下面我们配置消费者

    <import resource="amqp-share.xml"/>
        <rabbit:listener-container connection-factory="connectionFactory">
          <rabbit:listener ref="spittleListener" method="onMessage" queues="spittle.alert.queue.1,spittle.alert.queue.3,spittle.alert.queue.2"/>
        </rabbit:listener-container>
        <bean id="spittleListener" class="com.lp.summary.rabbitmq.impl.SpittleAlertHandler"/>

    其中spittleListener是监听的程序,method是执行的方法,queues是我们监听的队列,多个队列可以逗号隔开(因为我们采用的是分发,所以三个队列获取的消息是相同的,这里为了简便我放在一个监听程序中了,其实我们可以写三个消费者,每个消费者监听一个队列)

    现在只需要启动程序即可运行

    public class ConsumerMain {
        public static void main(String[] args) {
            ApplicationContext context = new ClassPathXmlApplicationContext("amqp/amqp-consumer.xml");
        }
    }

     当然direct跟上面的情况差不多,只不过这个是根据路由匹配,先把数据发送到交换机,然后绑定路由和队列,通过交换机id和路由来找到队列,下面是一些主要的配置

    复制代码
     <rabbit:queue id="spring-test-queue1" durable="true" auto-delete="false" exclusive="false" name="spring-test-queue1"></rabbit:queue>
        <rabbit:queue name="spring-test-queue2" durable="true" auto-delete="false" exclusive="false"></rabbit:queue>
        <!--交换机定义-->
        <!--rabbit:direct-exchange:定义exchange模式为direct,
            意思就是消息与一个特定的路由键完全匹配,才会转发。
            rabbit:binding:设置消息queue匹配的key-->
        <rabbit:direct-exchange name="${rabbit.exchange.direct}" durable="true" auto-delete="false" id="${rabbit.exchange.direct}">
            <rabbit:bindings>
                <rabbit:binding queue="spring-test-queue1" key="spring.test.queueKey1"/>
                <rabbit:binding queue="spring-test-queue2" key="spring.test.queueKey2"/>
          </rabbit:bindings>
        </rabbit:direct-exchange>
    
        <!--spring template声明-->
        <rabbit:template exchange="${rabbit.exchange.direct}" id="rabbitTemplate" connection-factory="connectionFactory"
        message-converter="jsonMessageConverter"></rabbit:template>
        <!--消息对象转成成json-->
        <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
    复制代码

    下面是消费者监听配置

     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
            <rabbit:listener queues="spring-test-queue1" method="onMessage" ref="queueListenter"></rabbit:listener>
        </rabbit:listener-container>
        <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
            <rabbit:listener queues="spring-test-queue2" method="onMessage" ref="queueListenter"></rabbit:listener>
        </rabbit:listener-container>

    下面是程序

     public static void main(String[] args) {
            ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-rabbitmq-producer.xml");
            MQProducer mqProducer=(MQProducer) context.getBean("mqProducer");
            mqProducer.sendDateToQueue("spring.test.queueKey1","Hello World spring.test.queueKey1");
            mqProducer.sendDateToQueue("spring.test.queueKey2","Hello World spring.test.queueKey2");
        }

    实际情况可能需要我们去分离消费者和生成者的程序。当然spring还有负载均衡的配置,这里就不多介绍了。

  • 相关阅读:
    为什么mvc里面的ModelState.IsValid一只都是true
    ASP.NET MVC Filters 4种默认过滤器的使用【附示例】
    ASP.NET MVC学习之过滤器篇(1)
    百分比定位加position定位的常用布局
    angular_$inject
    angular的$scope,这东西满重要的
    angular-scope.assign
    angular_$attrs
    angular-input
    angular_form
  • 原文地址:https://www.cnblogs.com/liuchuanfeng/p/6821444.html
Copyright © 2011-2022 走看看