zoukankan      html  css  js  c++  java
  • rabbitMQ 学习

    这两天看了《RabbitMQ实战》这本书,也在网上看了其他人的一些博客描述,所以想写些自己所学会的东西

    RabbitMQ是erlang语言写的,遵守AMQP协议,我们通过其来降低代码之间的耦合程度,当一个producer完成后,可以同时发送给多个consumer,这样可以不用通过代码

    来进行调用,而是通过订阅来实现类多线程操作。

    顺序是:producer(生产者) --》 exchange(交换机) --》 channel(信道) --》 queue(队列) --》 consumer(消费者)

    exchange和queue之间使用binding按照路由规则绑定(路由键)

    exchange、channel和queue都是存在于 消息代理服务器(我是用VM中的CentOS搭建的,安装过程有时间会去写下)

    一、Python

    hello_world_producer.py

    //pika是RabbitMQ团队编写的官方Python AMQP库 import pika,sys //指定账户,这里guest是默认账户 credentials = pika.PlainCredentials("guest","guest") //建立到代理服务器的连接 conn_params = pika.ConnectionParameters("localhost",credentials = credentials) conn_broker = pika.BlockingConnection(conn_params) //获得信道 channel = conn_broker.channel() //声明交换器 //这里刚开始总是提示有错误,将exchange_declare()里面参数不带变量名才可以运行 //[root@localhost Desktop]# python ./hello_world_consumer.py // File "./hello_world_consumer.py", line 10 // channel.exchange_declare(exchange="hello-exchange","direct",False // SyntaxError: non-keyword arg after keyword arg channel.exchange_declare("hello-exchange", 'direct', False, True, False) //创建纯文本消息 msg = sys.argv[1] msg_props = pika.BasicProperties() msg_props.content_type = "text/plain" //发布消息 channel.basic_publish(body=msg, exchange="hello-exchange", properties=msg_props, routing_key="hola")
    hello_world_consumer.py
    
    import pika
    
    //建立到代理服务器的连接
    credentials = pika.PlainCredentials("guest","guest")
    conn_params = pika.ConnectionParameters("localhost",credentials = credentials)
    conn_broker = pika.BlockingConnection(conn_params)
    //获取信道
    channel = conn_broker.channel()
    //创建交换器
    channel.exchange_declare("hello-exchange","direct",False
                             ,True,False)
    //声明队列
    channel.queue_declare(queue="hello-queue")
    //通过键“hola”将队列和交换器绑定起来
    channel.queue_bind(queue="hello-queue",
                exchange="hello-exchange",
                routing_key="hola")
    //用于处理传入消息的函数
    def msg_consumer(channel,method,header,body):
        //消息确认
        channel.basic_ack(delivery_tag=method.delivery_tag)
        //停止消费并退出,但是不知道为什么退出不了,很怪
        if body == "quit":
            channel.basic_cancel(consumer_tag="hello-consumer")
            channel.stop_consuming()
        else:
            print body
    //订阅消费者
    channel.basic_consume(msg_consumer,
                queue="hello-queue",
                consumer_tag='hello-consumer')
    //开始消费
    channel.start_consuming()

    可以完成通信Python(没学过,所有有些地方看的不是很明白)

    二、JAVA(不与spring整合的,但只利用了队列,没有使用交换器)

    1.导入相关jar包(我只导入两个):amqp-client和commons-lang3

    2.写一个获取connection的工具类

    public class ConnectionUtil {
        
        public static Connection getConnection() throws Exception{
            //定义连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置服务地址
            connectionFactory.setHost("localhost");
            //端口  
            //Web端口是15672,这里是5672 否则会报java.util.concurrent.TimeoutException
            connectionFactory.setPort(5672);
            //设置账号信息,用户名,密码,vhost
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            //通过工程获取链接
            Connection connection = connectionFactory.newConnection();
            return connection;
        }
    
    }

    3.producer

    public class Producer {
    
        private final static String QUEUE_NAME = "test_queue";//队列名称
        
        public static void main(String[] args) throws Exception {
            //获取到链接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            //创建信道
            Channel channel = connection.createChannel();
            //创建队列
            /**
             * @param queue the name of the queue  【队列名称】 
             * @param durable true if we are declaring a durable queue (the queue will survive a server restart)  【持久化,这里特别说一下,如果你想消息是持久化的,必须消息是持久化的,交换器也是持久化的,队列更是持久化的,其中一个不是也无法恢复消息】
             * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)  【私有的,独有的。 这个队列之后这个应用可以消费,上面的英文注释是 说restricted to this connection  就是限制在这个连接可以消费,就是说不限制channel信道咯,具体没有试过,但是应该是这样,除非备注骗我,我读得书少,你唔好呃我!!!】 
             * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) 【没有人使用自动删除】 注意:如果exclusive为true 最好 autodelete都为true 至于为什么 这么简单自己想~
             * @param arguments other properties (construction arguments) for the queue 【其他参数没有玩过】
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //定义发送的消息
            String message = "hello world!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("send "+message+".");
            //关闭信道和链接
            channel.close();
            connection.close();
        }
        
    }

    4.consumer

    public class Consumer {
        
        private final static String QUEUE_NAME = "test_queue";
        
        public static void main(String[] args) throws Exception {
            //获取连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //监听队列
            channel.basicConsume(QUEUE_NAME, true, consumer);//true自动模式  
            //获取消息
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("received "+message+".");
            }
            
        }
    
    }

    三、JAVA(与spring整合)

    1.先导入整合jar包:spring-rabbit

    2.整合最主要的就是application-rabbitmq.xml的配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
           http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
           http://www.springframework.org/schema/context 
           http://www.springframework.org/schema/context/spring-context-4.0.xsd
           http://www.springframework.org/schema/aop
           http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
           http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">
    
        <!-- 连接工厂 -->
        <!-- org.springframework.amqp.rabbit.connection.CachingConnectionFactory -->
        <rabbit:connection-factory id="rabbitConnectionFactory"
                                     host="${amqp.host}"
                                     port="${amqp.port}"
                                     virtual-host="${amqp.vhost}"
                                     username="${amqp.username}"
                                     password="${amqp.password}"/>
                                     
          <!-- 
        RabbitAdmin会自动在RabbitMQ上创建交换器、队列、绑定
        --> <!-- org.springframework.amqp.rabbit.core.RabbitAdmin --> <rabbit:admin connection-factory="rabbitConnectionFactory"/> <!-- 消息队列,消息的栖所 --> <!-- org.springframework.amqp.core.Queue --> <rabbit:queue id="converterQueue" name="converterQueue" durable="true" auto-delete="false" exclusive="false" auto-declare="true"/> <!-- 创建交换器,并绑定对应队列 org.springframework.amqp.core.DirectExchange org.springframework.amqp.core.Binding --> <rabbit:direct-exchange id="converterExchange" name="converterExchange" durable="true"> <!-- 绑定 --> <rabbit:bindings> <!-- converterQueueKey,将队列绑定到交换器的路由KEY --> <rabbit:binding queue="converterQueue" key="converterQueueKey" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- 创建RabbitTemplate对象,用于发送消息 --> <!-- org.springframework.amqp.rabbit.core.RabbitTemplate --> <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" encoding="utf-8" exchange="converterExchange" routing-key="converterQueueKey" message-converter="jackson2JsonMessageConverter"/> <bean id="jackson2JsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> </beans>

    3.接下来使用spring的IOC技术

      //xml文件中使用<rabit:template>标签来对这里进行注入
      @Autowired
    private AmqpTemplate amqpTemplate; public void send(Object message) { amqpTemplate.convertAndSend(message); }

    这样的就可以使用了

  • 相关阅读:
    【转载】Unity 合理安排增量更新(热更新)
    COCOS2D 释放资源的最佳时机
    【转载】利用Unity自带的合图切割功能将合图切割成子图
    用GL画出人物的移动路径
    使用行为树(Behavior Tree)实现游戏AI
    C#学习笔记
    题目:给定一数组 例如:a = [1,2,3,5,2,1] 现用户提供一个数字 请返回用户所提供的数字的所有下标
    算法: 归并排序
    题目:给定两个有序数组,对其进行合并
    数据结构 顺序表实现优先队列 回顾练习
  • 原文地址:https://www.cnblogs.com/kongkongFabian/p/7527421.html
Copyright © 2011-2022 走看看