1.引入相关jar包
//RabbitMQ
compile group: 'org.springframework.amqp', name: 'spring-rabbit', version: '1.6.6.RELEASE' compile group: 'org.springframework.integration', name: 'spring-integration-amqp', version: '4.3.5.RELEAS
|
生产者配置
2.实现一个消息处理器,继承自org.springframework.amqp.core.MessageListener
public class AmqpMsgListener implements MessageListener {
@Override public void onMessage(Message message) { System.out.println(message.toString()); } }
|
3.rabbit-producer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:int="http://www.springframework.org/schema/integration" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd"> <!--连接工厂--> <rabbit:connection-factory id="connectionFactory" host="{ip地址}" port="{端口}" username="{用户名}" password="{密码}" publisher-confirms="true"/> <!--创建队列--> <rabbit:queue name="queue.test" durable="true" exclusive="false" auto-delete="false" /> <!--创建分发交换器--> <rabbit:direct-exchange name="exchange.directTest" durable="true"> <rabbit:bindings> <rabbit:binding key="foo.bar" queue="queue.test"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="exchange.directTest" routing-key="foo.bar" message-converter="jsonMessageConverter" confirm-callback=""/> <rabbit:admin connection-factory="connectionFactory" id="adminId"/> <!-- 配置exchange,不同的exchange会影响消息分发策略 --> <!-- 消息对象json转换类 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
</beans>
|
消费者配置
4.rabbit-customer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!-- rabbitmq连接配置 --> <rabbit:connection-factory id="connectionFactory1" host="{ip地址}" port="{端口}" username="{用户名}" password="{密码}" publisher-confirms="true"/>
<rabbit:admin connection-factory="connectionFactory1"/>
<!--按项目需求配置 --> <rabbit:listener-container connection-factory="connectionFactory1" acknowledge="auto"> <rabbit:listener ref="amqpMsgListener" queues="queue.test" /> </rabbit:listener-container> <bean id="amqpMsgListener" class="com.nxin.farm.test.AmqpMsgListener"/> </beans>
|
5.创建测试类
public class MqTest extends BaseJunit { @Autowired private RabbitTemplate amqpTemplate;
@Test public void testSend() { try { for(int i=0;i<100;i++){ amqpTemplate.convertAndSend("nx.farm.exchange.directTest", "foo.bar", "Hello, world! send by xxxxx"); } Thread.sleep(1000*1000); } catch (AmqpException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
|