1.实现一个消息监听器ReceiveMessageListener
public class ReceiveMessageListener implements MessageListener { public void onMessage(Message message) { System.out.println("msg:"+new String(message.getBody())); } }
2. 消费者配置Consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!--配置connection-factory,指定连接rabbit server参数 --> <rabbit:connection-factory id="rabbitConnectionFactory" host="${rabbit.hosts}" username="${rabbit.username}" password="${rabbit.password}" port="5672" virtual-host="${rabbit.virtualHost}"/> <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 --> <rabbit:admin connection-factory="rabbitConnectionFactory"/> <!--声明消息队列 queue --> <rabbit:queue id="spring.queue.tag" name="spring.queue.tag" durable="true" exclusive="false" auto-delete="false"></rabbit:queue> <!-- exchange queue binging key 绑定 --> <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- amqp中MessageListener的实现类 --> <bean id="receiveMessageListener" class="test.spring.ReceiveMessageListener"></bean> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象--> <rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="auto"> <rabbit:listener queues="spring.queue.tag" ref="receiveMessageListener"/> </rabbit:listener-container> </beans>
3. 生产者配置Producer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!--配置connection-factory,指定连接rabbit server参数 --> <rabbit:connection-factory id="rabbitConnectionFactory" host="${rabbit.hosts}" username="${rabbit.username}" password="${rabbit.password}" port="5672" virtual-host="${rabbit.virtualHost}"/> <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 --> <rabbit:admin connection-factory="rabbitConnectionFactory"/> <!--声明消息队列 queue --> <rabbit:queue id="spring.queue.tag" name="spring.queue.tag" durable="true" exclusive="false" auto-delete="false"></rabbit:queue> <!-- exchange queue binging key 绑定 --> <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列, --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"></bean> <!--定义rabbit template用于数据的接收和发送 --> <rabbit:template id="rabbitTemplate" connection-factory="rabbitConnectionFactory" exchange="spring.queue.exchange" queue="spring.queue.tag" routing-key="spring.queue.tag.key" message-converter="jsonMessageConverter"/> </beans>
4. 消费者Consumer
public class Consumer { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("spring/Consumer.xml"); System.out.println("消费者启动完成 "); } }
5. 生产者Producer
public class Producer { public static void main(String[] args) throws InterruptedException { ApplicationContext context = new ClassPathXmlApplicationContext("spring/Producer.xml"); AmqpTemplate amqpTemplate = context.getBean(RabbitTemplate.class); for(int i = 0;i < 10;i++) { User user = new User(); user.setId(i); user.setName("niuniu"); amqpTemplate.convertAndSend(user); System.out.println("生产者发送完成:"+user.toString()); Thread.sleep(2000); } } }