这两天看了《RabbitMQ实战》这本书,也在网上看了其他人的一些博客描述,所以想写些自己所学会的东西
RabbitMQ是erlang语言写的,遵守AMQP协议,我们通过其来降低代码之间的耦合程度,当一个producer完成后,可以同时发送给多个consumer,这样可以不用通过代码
来进行调用,而是通过订阅来实现类多线程操作。
顺序是:producer(生产者) --》 exchange(交换机) --》 channel(信道) --》 queue(队列) --》 consumer(消费者)
exchange和queue之间使用binding按照路由规则绑定(路由键)
exchange、channel和queue都是存在于 消息代理服务器(我是用VM中的CentOS搭建的,安装过程有时间会去写下)
一、Python
hello_world_producer.py
//pika是RabbitMQ团队编写的官方Python AMQP库 import pika,sys //指定账户,这里guest是默认账户 credentials = pika.PlainCredentials("guest","guest") //建立到代理服务器的连接 conn_params = pika.ConnectionParameters("localhost",credentials = credentials) conn_broker = pika.BlockingConnection(conn_params) //获得信道 channel = conn_broker.channel() //声明交换器 //这里刚开始总是提示有错误,将exchange_declare()里面参数不带变量名才可以运行 //[root@localhost Desktop]# python ./hello_world_consumer.py // File "./hello_world_consumer.py", line 10 // channel.exchange_declare(exchange="hello-exchange","direct",False // SyntaxError: non-keyword arg after keyword arg channel.exchange_declare("hello-exchange", 'direct', False, True, False) //创建纯文本消息 msg = sys.argv[1] msg_props = pika.BasicProperties() msg_props.content_type = "text/plain" //发布消息 channel.basic_publish(body=msg, exchange="hello-exchange", properties=msg_props, routing_key="hola")
hello_world_consumer.py import pika //建立到代理服务器的连接 credentials = pika.PlainCredentials("guest","guest") conn_params = pika.ConnectionParameters("localhost",credentials = credentials) conn_broker = pika.BlockingConnection(conn_params) //获取信道 channel = conn_broker.channel() //创建交换器 channel.exchange_declare("hello-exchange","direct",False ,True,False) //声明队列 channel.queue_declare(queue="hello-queue") //通过键“hola”将队列和交换器绑定起来 channel.queue_bind(queue="hello-queue", exchange="hello-exchange", routing_key="hola") //用于处理传入消息的函数 def msg_consumer(channel,method,header,body): //消息确认 channel.basic_ack(delivery_tag=method.delivery_tag) //停止消费并退出,但是不知道为什么退出不了,很怪 if body == "quit": channel.basic_cancel(consumer_tag="hello-consumer") channel.stop_consuming() else: print body //订阅消费者 channel.basic_consume(msg_consumer, queue="hello-queue", consumer_tag='hello-consumer') //开始消费 channel.start_consuming()
可以完成通信Python(没学过,所有有些地方看的不是很明白)
二、JAVA(不与spring整合的,但只利用了队列,没有使用交换器)
1.导入相关jar包(我只导入两个):amqp-client和commons-lang3
2.写一个获取connection的工具类
public class ConnectionUtil { public static Connection getConnection() throws Exception{ //定义连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置服务地址 connectionFactory.setHost("localhost"); //端口 //Web端口是15672,这里是5672 否则会报java.util.concurrent.TimeoutException connectionFactory.setPort(5672); //设置账号信息,用户名,密码,vhost connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //通过工程获取链接 Connection connection = connectionFactory.newConnection(); return connection; } }
3.producer
public class Producer { private final static String QUEUE_NAME = "test_queue";//队列名称 public static void main(String[] args) throws Exception { //获取到链接以及mq通道 Connection connection = ConnectionUtil.getConnection(); //创建信道 Channel channel = connection.createChannel(); //创建队列 /** * @param queue the name of the queue 【队列名称】 * @param durable true if we are declaring a durable queue (the queue will survive a server restart) 【持久化,这里特别说一下,如果你想消息是持久化的,必须消息是持久化的,交换器也是持久化的,队列更是持久化的,其中一个不是也无法恢复消息】 * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) 【私有的,独有的。 这个队列之后这个应用可以消费,上面的英文注释是 说restricted to this connection 就是限制在这个连接可以消费,就是说不限制channel信道咯,具体没有试过,但是应该是这样,除非备注骗我,我读得书少,你唔好呃我!!!】 * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) 【没有人使用自动删除】 注意:如果exclusive为true 最好 autodelete都为true 至于为什么 这么简单自己想~ * @param arguments other properties (construction arguments) for the queue 【其他参数没有玩过】 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); //定义发送的消息 String message = "hello world!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("send "+message+"."); //关闭信道和链接 channel.close(); connection.close(); } }
4.consumer
public class Consumer { private final static String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws Exception { //获取连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //监听队列 channel.basicConsume(QUEUE_NAME, true, consumer);//true自动模式 //获取消息 while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("received "+message+"."); } } }
三、JAVA(与spring整合)
1.先导入整合jar包:spring-rabbit
2.整合最主要的就是application-rabbitmq.xml的配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd"> <!-- 连接工厂 --> <!-- org.springframework.amqp.rabbit.connection.CachingConnectionFactory --> <rabbit:connection-factory id="rabbitConnectionFactory" host="${amqp.host}" port="${amqp.port}" virtual-host="${amqp.vhost}" username="${amqp.username}" password="${amqp.password}"/> <!--
RabbitAdmin会自动在RabbitMQ上创建交换器、队列、绑定
--> <!-- org.springframework.amqp.rabbit.core.RabbitAdmin --> <rabbit:admin connection-factory="rabbitConnectionFactory"/> <!-- 消息队列,消息的栖所 --> <!-- org.springframework.amqp.core.Queue --> <rabbit:queue id="converterQueue" name="converterQueue" durable="true" auto-delete="false" exclusive="false" auto-declare="true"/> <!-- 创建交换器,并绑定对应队列 org.springframework.amqp.core.DirectExchange org.springframework.amqp.core.Binding --> <rabbit:direct-exchange id="converterExchange" name="converterExchange" durable="true"> <!-- 绑定 --> <rabbit:bindings> <!-- converterQueueKey,将队列绑定到交换器的路由KEY --> <rabbit:binding queue="converterQueue" key="converterQueueKey" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- 创建RabbitTemplate对象,用于发送消息 --> <!-- org.springframework.amqp.rabbit.core.RabbitTemplate --> <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" encoding="utf-8" exchange="converterExchange" routing-key="converterQueueKey" message-converter="jackson2JsonMessageConverter"/> <bean id="jackson2JsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> </beans>
3.接下来使用spring的IOC技术
//xml文件中使用<rabit:template>标签来对这里进行注入
@Autowired private AmqpTemplate amqpTemplate; public void send(Object message) { amqpTemplate.convertAndSend(message); }
这样的就可以使用了