zoukankan      html  css  js  c++  java
  • RabbitMQ与java、Spring结合实例详细讲解

    林炳文Evankaka原创作品。转载请注明出处http://blog.csdn.net/evankaka

             摘要:本文介绍了rabbitMq,提供了如何在Ubuntu下安装RabbitMQ 服务的方法。最好以RabbitMQ与java、Spring结合的两个实例来演示如何使用RabbitMQ。

    本文工程免费下载

    一、rabbitMQ简介

    1.1、rabbitMQ的优点(适用范围)
    1. 基于erlang语言开发具有高可用高并发的优点,适合集群服务器。
    2. 健壮、稳定、易用、跨平台、支持多种语言、文档齐全。
    3. 有消息确认机制和持久化机制,可靠性高。
    4. 开源
    其他MQ的优势:
    1. Apache ActiveMQ曝光率最高,但是可能会丢消息。
    2. ZeroMQ延迟很低、支持灵活拓扑,但是不支持消息持久化和崩溃恢复。

    1.2、几个概念说明
    producer&Consumer
    producer指的是消息生产者,consumer消息的消费者。
    Queue
    消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。
    设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失
    设置为临时队列,queue中的数据在系统重启之后就会丢失
    设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除Exchange

    Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。
    Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:
    Direct
    直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue
    fanout
    广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。
    topic
    主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)
    headers
    消息体的header匹配(ignore)
    Binding
    所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系。
    virtual host
    在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。

    1.3、消息队列的使用过程
    1. 客户端连接到消息队列服务器,打开一个channel。
    2. 客户端声明一个exchange,并设置相关属性。
    3. 客户端声明一个queue,并设置相关属性。
    4. 客户端使用routing key,在exchange和queue之间建立好绑定关系。
    5. 客户端投递消息到exchange。
    6. exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里

    二、环境配置与安装

    1、Erlang环境安装
    RabbitMQ是基于Erlang的,所以首先必须配置Erlang环境。
    从Erlang的官网 http://www.erlang.org/download.html 下载最新的erlang安装包,我下载的版本是 otp_src_R14B03.tar.gz 。然后:

    1.  
      $ tar xvzf otp_src_R14B03.tar.gz
    2.  
      $ cd otp_src_R14B03
    3.  
      $ ./configure

    编译后的输出 
    如下图: 

    注:
    可能会报错 configure: error: No curses library functions found
    configure: error: /bin/sh '/home/liyixiang/erlang/configure' failed for erts

    原因是缺少ncurses包

    解决:在ubuntu系统下 
    1.  
      apt-cache search ncurses
    2.  
      apt-get install libncurses5-dev

    然后重新执行

    ./configure

    提示没有wxWidgets和fop、ssh、odbc、ssl,但是问题不大。继续:

    make

    然后:

    sudo make install

    配置erlang环境变量 
    修改/etc/profile文件,增加下面的环境变量:(vim profile i插入 编辑完毕ESC退出 wq!强制修改)

    1.  
      #set erlang environment
    2.  
      export PATH=$PATH:/usr/erlang/bin:$PATH
    3.  
      source profile使得文件生效

    下面是我的

    2、RabbitMQ-Server安装

    安装完Erlang,开始安装RabbitMQ-Server。安装方法有三种,这里笔者三者都试过了,就只有以下这个方法成功了。

    直接使用:

    apt-get  install rabbitmq-server

    安装完成后会自动打开:

    使用命令查看rabbitmq运行状态:

    rabbitmqctl status

    停止

    rabbitmqctl stop

    开启

    rabbitmq-server start

    3、rabbitmq web管理页面插件安装

    输入以下命令

    1.  
      cd /usr/lib/rabbitmq/bin/
    2.  
      rabbitmq-plugins enable rabbitmq_management
    这里笔者一直安装不成功。

    如果安装成功打开浏览器,输入 http://[server-name]:15672/ 如 http://localhost:15672/ ,会要求输入用户名和密码,用默认的guest/guest即可(guest/guest用户只能从localhost地址登录,如果要配置远程登录,必须另创建用户)。
    如果要从远程登录怎么做呢?处于安全考虑,guest这个默认的用户只能通过http://localhost:15672来登录,其他的IP无法直接用这个guest帐号。这里我们可以通过配置文件来实现从远程登录管理界面,只要编辑/etc/rabbitmq/rabbitmq.config文件(没有就新增),添加以下配置就可以了。

    4、添加用户

    vim /etc/rabbitmq/rabbitmq.config
    然后添加 
    1.  
      [
    2.  
      {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]}
    3.  
      ].
    注意上面有个点号 

    现在添加了一个新授权用户asdf,可以远程使用这个用户名。记得要先用命令添加这个命令才行: 
    cd /usr/lib/rabbitmq/bin/
    #用户名与密码 
    sudo rabbitmqctl add_user asdf 123456
    用户设置为administrator才能远程访问 
    1.  
      sudo rabbitmqctl set_user_tags asdf administrator
    2.  
      sudo rabbitmqctl set_permissions -p / asdf ".*" ".*" ".*"

    其实也可以通过管理平台页面直接添加用户和密码等信息。如果还不能远程访问或远程登录检查是不是5672, 15672端口没有开放!!!!!! 

    5、开放端口

    ufw allow 5672
     

    三、简单Java实例

    下面来演示一个使用java的简单实例:
    1、首先是消息生产者和提供者的基类
    1.  
      package com.lin;
    2.  
       
    3.  
      import java.io.IOException;
    4.  
       
    5.  
      import com.rabbitmq.client.Channel;
    6.  
      import com.rabbitmq.client.Connection;
    7.  
      import com.rabbitmq.client.ConnectionFactory;
    8.  
       
    9.  
      /**
    10.  
      *
    11.  
      * 功能概要: EndPoint类型的队列
    12.  
      *
    13.  
      * @author linbingwen
    14.  
      * @since 2016年1月11日
    15.  
      */
    16.  
      public abstract class EndPoint{
    17.  
       
    18.  
      protected Channel channel;
    19.  
      protected Connection connection;
    20.  
      protected String endPointName;
    21.  
       
    22.  
      public EndPoint(String endpointName) throws IOException{
    23.  
      this.endPointName = endpointName;
    24.  
       
    25.  
      //Create a connection factory
    26.  
      ConnectionFactory factory = new ConnectionFactory();
    27.  
       
    28.  
      //hostname of your rabbitmq server
    29.  
      factory.setHost("10.75.4.25");
    30.  
      factory.setPort(5672);
    31.  
      factory.setUsername("asdf");
    32.  
      factory.setPassword("123456");
    33.  
       
    34.  
      //getting a connection
    35.  
      connection = factory.newConnection();
    36.  
       
    37.  
      //creating a channel
    38.  
      channel = connection.createChannel();
    39.  
       
    40.  
      //declaring a queue for this channel. If queue does not exist,
    41.  
      //it will be created on the server.
    42.  
      channel.queueDeclare(endpointName, false, false, false, null);
    43.  
      }
    44.  
       
    45.  
       
    46.  
      /**
    47.  
      * 关闭channel和connection。并非必须,因为隐含是自动调用的。
    48.  
      * @throws IOException
    49.  
      */
    50.  
      public void close() throws IOException{
    51.  
      this.channel.close();
    52.  
      this.connection.close();
    53.  
      }
    54.  
      }

    2、消息提供者
    1.  
      package com.lin.producer;
    2.  
       
    3.  
      import java.io.IOException;
    4.  
      import java.io.Serializable;
    5.  
       
    6.  
      import org.apache.commons.lang.SerializationUtils;
    7.  
       
    8.  
      import com.lin.EndPoint;
    9.  
       
    10.  
       
    11.  
      /**
    12.  
      *
    13.  
      * 功能概要:消息生产者
    14.  
      *
    15.  
      * @author linbingwen
    16.  
      * @since 2016年1月11日
    17.  
      */
    18.  
      public class Producer extends EndPoint{
    19.  
       
    20.  
      public Producer(String endPointName) throws IOException{
    21.  
      super(endPointName);
    22.  
      }
    23.  
       
    24.  
      public void sendMessage(Serializable object) throws IOException {
    25.  
      channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
    26.  
      }
    27.  
      }

    3、消息消费者
    1.  
      package com.lin.consumer;
    2.  
       
    3.  
      import java.io.IOException;
    4.  
      import java.util.HashMap;
    5.  
      import java.util.Map;
    6.  
       
    7.  
      import org.apache.commons.lang.SerializationUtils;
    8.  
       
    9.  
      import com.lin.EndPoint;
    10.  
      import com.rabbitmq.client.AMQP.BasicProperties;
    11.  
      import com.rabbitmq.client.Consumer;
    12.  
      import com.rabbitmq.client.Envelope;
    13.  
      import com.rabbitmq.client.ShutdownSignalException;
    14.  
       
    15.  
       
    16.  
      /**
    17.  
      *
    18.  
      * 功能概要:读取队列的程序端,实现了Runnable接口
    19.  
      *
    20.  
      * @author linbingwen
    21.  
      * @since 2016年1月11日
    22.  
      */
    23.  
      public class QueueConsumer extends EndPoint implements Runnable, Consumer{
    24.  
       
    25.  
      public QueueConsumer(String endPointName) throws IOException{
    26.  
      super(endPointName);
    27.  
      }
    28.  
       
    29.  
      public void run() {
    30.  
      try {
    31.  
      //start consuming messages. Auto acknowledge messages.
    32.  
      channel.basicConsume(endPointName, true,this);
    33.  
      } catch (IOException e) {
    34.  
      e.printStackTrace();
    35.  
      }
    36.  
      }
    37.  
       
    38.  
      /**
    39.  
      * Called when consumer is registered.
    40.  
      */
    41.  
      public void handleConsumeOk(String consumerTag) {
    42.  
      System.out.println("Consumer "+consumerTag +" registered");
    43.  
      }
    44.  
       
    45.  
      /**
    46.  
      * Called when new message is available.
    47.  
      */
    48.  
      public void handleDelivery(String consumerTag, Envelope env,
    49.  
      BasicProperties props, byte[] body) throws IOException {
    50.  
      Map map = (HashMap)SerializationUtils.deserialize(body);
    51.  
      System.out.println("Message Number "+ map.get("message number") + " received.");
    52.  
       
    53.  
      }
    54.  
       
    55.  
      public void handleCancel(String consumerTag) {}
    56.  
      public void handleCancelOk(String consumerTag) {}
    57.  
      public void handleRecoverOk(String consumerTag) {}
    58.  
      public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
    59.  
      }

    4、测试
    1.  
      package com.lin.test;
    2.  
       
    3.  
      import java.io.IOException;
    4.  
      import java.sql.SQLException;
    5.  
      import java.util.HashMap;
    6.  
       
    7.  
      import com.lin.consumer.QueueConsumer;
    8.  
      import com.lin.producer.Producer;
    9.  
       
    10.  
      public class Test {
    11.  
      public Test() throws Exception{
    12.  
       
    13.  
      QueueConsumer consumer = new QueueConsumer("queue");
    14.  
      Thread consumerThread = new Thread(consumer);
    15.  
      consumerThread.start();
    16.  
       
    17.  
      Producer producer = new Producer("queue");
    18.  
       
    19.  
      for (int i = 0; i < 1000000; i++) {
    20.  
      HashMap message = new HashMap();
    21.  
      message.put("message number", i);
    22.  
      producer.sendMessage(message);
    23.  
      System.out.println("Message Number "+ i +" sent.");
    24.  
      }
    25.  
      }
    26.  
       
    27.  
      /**
    28.  
      * @param args
    29.  
      * @throws SQLException
    30.  
      * @throws IOException
    31.  
      */
    32.  
      public static void main(String[] args) throws Exception{
    33.  
      new Test();
    34.  
      }
    35.  
      }
    其中引入的jar包:
    1.  
      <!-- rabbitmq客户端 -->
    2.  
      <dependencies>
    3.  
      <dependency>
    4.  
      <groupId>com.rabbitmq</groupId>
    5.  
      <artifactId>amqp-client</artifactId>
    6.  
      <version>3.0.4</version>
    7.  
      </dependency>
    8.  
       
    9.  
      <dependency>
    10.  
      <groupId>commons-lang</groupId>
    11.  
      <artifactId>commons-lang</artifactId>
    12.  
      <version>2.6</version>
    13.  
      </dependency>
    14.  
      <dependency>
    15.  
      <groupId>org.apache.commons</groupId>
    16.  
      <artifactId>commons-lang3</artifactId>
    17.  
      <version>3.1</version>
    18.  
      </dependency>
    19.  
      </dependencies>

    测试结果:
    在提供消息
    在消费消息 
    然后同时打开rabbitmq的服务端,输入如下:
    rabbitmqctl list_queues
    这个命令是用来查看服务端中有多处个消息队列的。
    可以看到有个名为queue的消息队列(更好的方法是安装好web监控插件,笔者一直安装失败,所以这里就不展示了)
     
     

    四、Rbbitmq与Spring结合使用

    首先建立一个maven工程,整个项目的结构如下:
     
    下面将具体来讲讲整个过程
    1、jar包的引入
    pom.xml配置即可,如下:
    1.  
      <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    2.  
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    3.  
      <modelVersion>4.0.0</modelVersion>
    4.  
      <groupId>com.lin</groupId>
    5.  
      <artifactId>rabbit_c2</artifactId>
    6.  
      <version>0.0.1-SNAPSHOT</version>
    7.  
      <properties>
    8.  
      <!-- spring版本号 -->
    9.  
      <spring.version>3.2.8.RELEASE</spring.version>
    10.  
      <!-- log4j日志文件管理包版本 -->
    11.  
      <slf4j.version>1.6.6</slf4j.version>
    12.  
      <log4j.version>1.2.12</log4j.version>
    13.  
      <!-- junit版本号 -->
    14.  
      <junit.version>4.10</junit.version>
    15.  
      </properties>
    16.  
       
    17.  
      <dependencies>
    18.  
      <!-- 添加Spring依赖 -->
    19.  
      <dependency>
    20.  
      <groupId>org.springframework</groupId>
    21.  
      <artifactId>spring-core</artifactId>
    22.  
      <version>${spring.version}</version>
    23.  
      </dependency>
    24.  
      <dependency>
    25.  
      <groupId>org.springframework</groupId>
    26.  
      <artifactId>spring-webmvc</artifactId>
    27.  
      <version>${spring.version}</version>
    28.  
      </dependency>
    29.  
      <dependency>
    30.  
      <groupId>org.springframework</groupId>
    31.  
      <artifactId>spring-context</artifactId>
    32.  
      <version>${spring.version}</version>
    33.  
      </dependency>
    34.  
      <dependency>
    35.  
      <groupId>org.springframework</groupId>
    36.  
      <artifactId>spring-context-support</artifactId>
    37.  
      <version>${spring.version}</version>
    38.  
      </dependency>
    39.  
      <dependency>
    40.  
      <groupId>org.springframework</groupId>
    41.  
      <artifactId>spring-aop</artifactId>
    42.  
      <version>${spring.version}</version>
    43.  
      </dependency>
    44.  
      <dependency>
    45.  
      <groupId>org.springframework</groupId>
    46.  
      <artifactId>spring-aspects</artifactId>
    47.  
      <version>${spring.version}</version>
    48.  
      </dependency>
    49.  
      <dependency>
    50.  
      <groupId>org.springframework</groupId>
    51.  
      <artifactId>spring-tx</artifactId>
    52.  
      <version>${spring.version}</version>
    53.  
      </dependency>
    54.  
      <dependency>
    55.  
      <groupId>org.springframework</groupId>
    56.  
      <artifactId>spring-jdbc</artifactId>
    57.  
      <version>${spring.version}</version>
    58.  
      </dependency>
    59.  
      <dependency>
    60.  
      <groupId>org.springframework</groupId>
    61.  
      <artifactId>spring-web</artifactId>
    62.  
      <version>${spring.version}</version>
    63.  
      </dependency>
    64.  
       
    65.  
      <!--单元测试依赖 -->
    66.  
      <dependency>
    67.  
      <groupId>junit</groupId>
    68.  
      <artifactId>junit</artifactId>
    69.  
      <version>${junit.version}</version>
    70.  
      <scope>test</scope>
    71.  
      </dependency>
    72.  
       
    73.  
      <!-- 日志文件管理包 -->
    74.  
      <!-- log start -->
    75.  
      <dependency>
    76.  
      <groupId>log4j</groupId>
    77.  
      <artifactId>log4j</artifactId>
    78.  
      <version>${log4j.version}</version>
    79.  
      </dependency>
    80.  
      <dependency>
    81.  
      <groupId>org.slf4j</groupId>
    82.  
      <artifactId>slf4j-api</artifactId>
    83.  
      <version>${slf4j.version}</version>
    84.  
      </dependency>
    85.  
      <dependency>
    86.  
      <groupId>org.slf4j</groupId>
    87.  
      <artifactId>slf4j-log4j12</artifactId>
    88.  
      <version>${slf4j.version}</version>
    89.  
      </dependency>
    90.  
      <!-- log end -->
    91.  
       
    92.  
      <!--spring单元测试依赖 -->
    93.  
      <dependency>
    94.  
      <groupId>org.springframework</groupId>
    95.  
      <artifactId>spring-test</artifactId>
    96.  
      <version>${spring.version}</version>
    97.  
      <scope>test</scope>
    98.  
      </dependency>
    99.  
       
    100.  
      <!--rabbitmq依赖 -->
    101.  
      <dependency>
    102.  
      <groupId>org.springframework.amqp</groupId>
    103.  
      <artifactId>spring-rabbit</artifactId>
    104.  
      <version>1.3.5.RELEASE</version>
    105.  
      </dependency>
    106.  
       
    107.  
      <dependency>
    108.  
      <groupId>javax.validation</groupId>
    109.  
      <artifactId>validation-api</artifactId>
    110.  
      <version>1.1.0.Final</version>
    111.  
      </dependency>
    112.  
       
    113.  
      <dependency>
    114.  
      <groupId>org.hibernate</groupId>
    115.  
      <artifactId>hibernate-validator</artifactId>
    116.  
      <version>5.0.1.Final</version>
    117.  
      </dependency>
    118.  
       
    119.  
      </dependencies>
    120.  
      <build>
    121.  
      <resources>
    122.  
      <resource>
    123.  
      <directory>src/main/resources</directory>
    124.  
      <targetPath>${basedir}/target/classes</targetPath>
    125.  
      <includes>
    126.  
      <include>**/*.properties</include>
    127.  
      <include>**/*.xml</include>
    128.  
      </includes>
    129.  
      <filtering>true</filtering>
    130.  
      </resource>
    131.  
      <resource>
    132.  
      <directory>src/main/resources</directory>
    133.  
      <targetPath>${basedir}/target/resources</targetPath>
    134.  
      <includes>
    135.  
      <include>**/*.properties</include>
    136.  
      <include>**/*.xml</include>
    137.  
      </includes>
    138.  
      <filtering>true</filtering>
    139.  
      </resource>
    140.  
      </resources>
    141.  
       
    142.  
      <plugins>
    143.  
      <plugin>
    144.  
      <groupId>org.apache.maven.plugins</groupId>
    145.  
      <artifactId>maven-compiler-plugin</artifactId>
    146.  
      <configuration>
    147.  
      <source>1.6</source>
    148.  
      <target>1.6</target>
    149.  
      <encoding>UTF-8</encoding>
    150.  
      </configuration>
    151.  
      </plugin>
    152.  
      <plugin>
    153.  
      <groupId>org.apache.maven.plugins</groupId>
    154.  
      <artifactId>maven-war-plugin</artifactId>
    155.  
      <version>2.1.1</version>
    156.  
      <configuration>
    157.  
      <warSourceExcludes>${warExcludes}</warSourceExcludes>
    158.  
      </configuration>
    159.  
      </plugin>
    160.  
      <plugin>
    161.  
      <groupId>org.apache.maven.plugins</groupId>
    162.  
      <artifactId>maven-surefire-plugin</artifactId>
    163.  
      <version>2.4.3</version>
    164.  
      <configuration>
    165.  
      <testFailureIgnore>true</testFailureIgnore>
    166.  
      </configuration>
    167.  
      </plugin>
    168.  
      <plugin>
    169.  
      <inherited>true</inherited>
    170.  
      <groupId>org.apache.maven.plugins</groupId>
    171.  
      <artifactId>maven-source-plugin</artifactId>
    172.  
      <executions>
    173.  
      <execution>
    174.  
      <id>attach-sources</id>
    175.  
      <goals>
    176.  
      <goal>jar</goal>
    177.  
      </goals>
    178.  
      </execution>
    179.  
      </executions>
    180.  
      </plugin>
    181.  
      <plugin>
    182.  
      <groupId>org.apache.maven.plugins</groupId>
    183.  
      <artifactId>maven-resources-plugin</artifactId>
    184.  
      <configuration>
    185.  
      <encoding>UTF-8</encoding>
    186.  
      </configuration>
    187.  
      </plugin>
    188.  
      </plugins>
    189.  
      </build>
    190.  
      </project>

    2、消息生产者
    1.  
      package com.lin.producer;
    2.  
       
    3.  
      import javax.annotation.Resource;
    4.  
       
    5.  
      import org.slf4j.Logger;
    6.  
      import org.slf4j.LoggerFactory;
    7.  
      import org.springframework.amqp.core.AmqpTemplate;
    8.  
      import org.springframework.stereotype.Service;
    9.  
       
    10.  
      /**
    11.  
      * 功能概要:消息产生,提交到队列中去
    12.  
      *
    13.  
      * @author linbingwen
    14.  
      * @since 2016年1月15日
    15.  
      */
    16.  
      @Service
    17.  
      public class MessageProducer {
    18.  
       
    19.  
      private Logger logger = LoggerFactory.getLogger(MessageProducer.class);
    20.  
       
    21.  
      @Resource
    22.  
      private AmqpTemplate amqpTemplate;
    23.  
       
    24.  
      public void sendMessage(Object message){
    25.  
      logger.info("to send message:{}",message);
    26.  
      amqpTemplate.convertAndSend("queueTestKey",message);
    27.  
      }
    28.  
      }
    3、消息消费者
    1.  
      package com.lin.consumer;
    2.  
       
    3.  
      import org.slf4j.Logger;
    4.  
      import org.slf4j.LoggerFactory;
    5.  
      import org.springframework.amqp.core.Message;
    6.  
      import org.springframework.amqp.core.MessageListener;
    7.  
       
    8.  
      /**
    9.  
      * 功能概要:消费接收
    10.  
      *
    11.  
      * @author linbingwen
    12.  
      * @since 2016年1月15日
    13.  
      */
    14.  
      public class MessageConsumer implements MessageListener {
    15.  
       
    16.  
      private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
    17.  
       
    18.  
      @Override
    19.  
      public void onMessage(Message message) {
    20.  
      logger.info("receive message:{}",message);
    21.  
      }
    22.  
       
    23.  
      }

    4、rabbitMq.xml配置信息
    1.  
      <?xml version="1.0" encoding="UTF-8"?>
    2.  
      <beans xmlns="http://www.springframework.org/schema/beans"
    3.  
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    4.  
      xsi:schemaLocation="http://www.springframework.org/schema/beans
    5.  
      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    6.  
      http://www.springframework.org/schema/beans
    7.  
      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    8.  
      http://www.springframework.org/schema/rabbit
    9.  
      http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
    10.  
      <!--配置connection-factory,指定连接rabbit server参数 -->
    11.  
      <rabbit:connection-factory id="connectionFactory"
    12.  
      username="asdf" password="123456" host="10.75.4.25" port="5672" />
    13.  
       
    14.  
      <!--定义rabbit template用于数据的接收和发送 -->
    15.  
      <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
    16.  
      exchange="exchangeTest" />
    17.  
       
    18.  
      <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    19.  
      <rabbit:admin connection-factory="connectionFactory" />
    20.  
       
    21.  
      <!--定义queue -->
    22.  
      <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />
    23.  
       
    24.  
      <!-- 定义direct exchange,绑定queueTest -->
    25.  
      <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false">
    26.  
      <rabbit:bindings>
    27.  
      <rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>
    28.  
      </rabbit:bindings>
    29.  
      </rabbit:direct-exchange>
    30.  
       
    31.  
      <!-- 消息接收者 -->
    32.  
      <bean id="messageReceiver" class="com.lin.consumer.MessageConsumer"></bean>
    33.  
       
    34.  
      <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    35.  
      <rabbit:listener-container connection-factory="connectionFactory">
    36.  
      <rabbit:listener queues="queueTest" ref="messageReceiver"/>
    37.  
      </rabbit:listener-container>
    38.  
       
    39.  
      </beans>

    5、spring集成rabbiqMq。application.xml内容如下:
    1.  
      <?xml version="1.0" encoding="UTF-8"?>
    2.  
      <beans xmlns="http://www.springframework.org/schema/beans"
    3.  
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    4.  
      xmlns:p="http://www.springframework.org/schema/p"
    5.  
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    6.  
      http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
    7.  
       
    8.  
       
    9.  
      <import resource="classpath*:rabbitmq.xml" />
    10.  
       
    11.  
       
    12.  
      <!-- 扫描指定package下所有带有如@controller,@services,@resource,@ods并把所注释的注册为Spring Beans -->
    13.  
      <context:component-scan base-package="com.lin.consumer,com.lin.producer" />
    14.  
       
    15.  
       
    16.  
      <!-- 激活annotation功能 -->
    17.  
      <context:annotation-config />
    18.  
      <!-- 激活annotation功能 -->
    19.  
      <context:spring-configured />
    20.  
       
    21.  
      </beans>

    6、最后,为了方便,打印了日志,log4j.properties配置如下
    1.  
      log4j.rootLogger=DEBUG,Console,Stdout
    2.  
       
    3.  
      #Console
    4.  
      log4j.appender.Console=org.apache.log4j.ConsoleAppender
    5.  
      log4j.appender.Console.layout=org.apache.log4j.PatternLayout
    6.  
      log4j.appender.Console.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n
    7.  
       
    8.  
      log4j.logger.java.sql.ResultSet=INFO
    9.  
      log4j.logger.org.apache=INFO
    10.  
      log4j.logger.java.sql.Connection=DEBUG
    11.  
      log4j.logger.java.sql.Statement=DEBUG
    12.  
      log4j.logger.java.sql.PreparedStatement=DEBUG
    13.  
       
    14.  
      log4j.appender.Stdout = org.apache.log4j.DailyRollingFileAppender
    15.  
      log4j.appender.Stdout.File = E://logs/log.log
    16.  
      log4j.appender.Stdout.Append = true
    17.  
      log4j.appender.Stdout.Threshold = DEBUG
    18.  
      log4j.appender.Stdout.layout = org.apache.log4j.PatternLayout
    19.  
      log4j.appender.Stdout.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
     
    接着运行整个工程即可:
    下面是运行的结果:
     
    一会发一会收:因为在同一工程,所以发消息和接消息是交替出现的
     
    我们出可以去rabbitMq 服务器上看:
    可以看到,我们配置的队列已存在了:
    到此,整个工程结束。
     
  • 相关阅读:
    【记录】Mybatis-Generator 数据层代码生成器,自动生成dao类,mapper,pojo类
    【记录】logstash 的filter 使用
    【转载】windows 开启 nginx 监听80 端口 以及 禁用 http 服务后,无法重启 HTTP 服务,提示 系统错误 123,文件目录、卷标出错
    【报错】解决logstash tracking_column not found in dataset. {:tracking_column=>"updated_time"}问题
    【记录】elasticsearch 注解
    index read-only
    wget下载阿里云RDS备份集
    mysqlbinlog相关
    es安装elasticsearch-sql插件
    elastichd安装部署
  • 原文地址:https://www.cnblogs.com/shizhijie/p/9795383.html
Copyright © 2011-2022 走看看