zoukankan      html  css  js  c++  java
  • rabbitMq及安装、fanout交换机-分发(发布/订阅)

     <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.6.5</version>
     </dependency>

    最常见的几种消息通信模式主要有发布-订阅、点对点这两种
    http://blog.csdn.net/woogeyu/article/details/51119101 集群
    http://blog.csdn.net/column/details/rabbitmq.html Python版
    http://www.cnblogs.com/LipeiNet/category/896408.html
    http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html  api
    RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现
    其他公开标准(如 COBAR的 IIOP ,或者是 SOAP 等)


    从Erlang的官网 http://www.erlang.org/download.html 下载最新的erlang安装包,Linux和MacOSX下载的版本是 http://www.erlang.org/download.html
    1、安装依赖
    # yum install build-essential m4  
    # yum install openssl  
    # yum install openssl-devel  
    # yum install unixODBC  
    # yum install unixODBC-devel  
    # yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
    # yum install perl

    2、配置并安装erlang
    # tar -zxvf otp_src_20.1.tar  
    # cd otp_src_20.1
    # ./configure --prefix=/usr/local/erlang --enable-hipe --enable-threads --enable-smp-support --enable-kernel-poll
    ./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac  
    ./configure --prefix=/usr/local/erlang --enable-all (选这个)
    make
    make install

    3、设置erlang环境变量
    打开/etc/profile设置环境变量 查看PATH:echo $PATH 环境变量
    ERL_HOME=/usr/local/erlang  
    PATH=$ERL_HOME/bin:$PATH  
    export ERL_HOME PATH

    4、安装mq
    http://www.rabbitmq.com/install-rpm.html http://www.rabbitmq.com/releases/rabbitmq-server/ 下载rpm包  rabbitmq-server-3.6.14-1.el6.noarch
    yum install -y socat(不需要)
    ln -s /usr/local/erlang/bin/erl      /usr/bin/erl 建立软连接 (不建可能会报错)
    rpm -i --nodeps rabbitmq-server-3.6.5-1.noarch
    执行结果 warning:rabbitmq-server-3.6.5-1.noarch.rpm: Header V4 RSA/SHA512 Signature, key ID 6026dfca: NOKEY

    5、起停mq
    /sbin/service rabbitmq-server stop/start/etc.  
    [root@iZ250x18mnzZ ~]#service rabbitmq-server start 启动服务  
    [root@iZ250x18mnzZ ~]#service rabbitmq-server etc   查看哪些命令可以使用  
    [root@iZ250x18mnzZ ~]#service rabbitmq-server stop  停止服务  
    [root@iZ250x18mnzZ ~]#service rabbitmq-server status 查看服务状态
    启动监控管理器:rabbitmq-plugins enable rabbitmq_management
    关闭监控管理器:rabbitmq-plugins disable rabbitmq_management
    查看所有的队列:rabbitmqctl list_queues
    清除所有的队列:rabbitmqctl reset
    关闭应用:rabbitmqctl stop_app 不同于停止服务
    启动应用:rabbitmqctl start_app

    6、卸载等命令
    #rpm -qa|grep rabbitmq
    rabbitmq-server-3.6.5-1.noarch
    卸载 mq
    #rpm -e --nodeps rabbitmq-server-3.6.5-1.noarch
    #rpm -qa|grep erlang
    esl-erlang-18.3-1.x86_64
    #rpm -e --nodeps esl-erlang-18.3-1.x86_64

    7、访问后台
    关闭防火墙 service iptables stop
    安装启动后其实还不能在其它机器访问, rabbitmq默认的 guest 账号只能在本地机器访问, 如果想在其它机器访问必须配置其它账号
    配置管理员账号:
        rabbitmqctl add_user admin adminpasspord
        rabbitmqctl set_user_tags admin administrator
    启动rabbitmq内置web插件, 管理rabbitmq账号等信息
        rabbitmq-plugins enable rabbitmq_management
    访问 http://192.168.89.131:15672/#/users  为刚建的账号 set permission
    http://127.0.0.1:15672

    (1)添加用户
    rabbitmqctl add_user admin admin
    rabbitmqctl set_user_tags admin administrator
    rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
    (2) 删除一个用户
    rabbitmqctl  delete_user  Username
    (3) 修改用户的密码
    rabbitmqctl  change_password  Username  Newpassword
    (4) 查看当前用户列表
    rabbitmqctl  list_users

    4、设置开机启动
    chkconfig rabbitmq-server on

    5、java客户端amqp-client版本号为3.6.5 与 rabbitmq-server-3.6.5-1.noarch服务版本号必须匹配

    8、命令
    http://www.linuxidc.com/Linux/2016-10/136493.htm

    1、信息分发
    向指定队列发送多条信息,多个消费者来获取该队列中信息
    channel.basicQos(1);保证一次只分发一个,否则可能有些消费者获取较多消息有的消费者获取不到消息
    可以使消费者采用手动答复,保证在一个消费者处理消息失败后(此时不答复)其他消费者还能继续获取并处理该消息
    2、交换机
    rabbitMQ其实真正的思想是生产者不发送任何信息到队列,甚至不知道信息将发送到哪个队列。相反生产者只能发送信息到交换机,交换机接收到生产者的信息,然后按照规则把它推送到对列中
    fanout(散开分列),表示分发,所有的消费者得到同样的队列信息,发布/订阅,channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    direct(直接的),发送信息到不同的routingKey,消费者根据不同routingKey获取不同信息
    topics(话题),发送信息到不同的routingKey,消费者根据模糊匹配获取某些routingKey的信息

    代码解读

    1,单任务消息
    生产者:根据一定的 QUEUE_NAME 生产单个消息
    消费者:根据 QUEUE_NAME 获取消息
    2,多任务消费(非交换机)                  集群消费模式,针对每条消息,集群里只会有1个实例拉取并处理消息
    生产jqdeal()  消费jqdeal1()jqdeal2()
    生产者:根据一定的 QUEUE_NAME 生产多个消息
    消费者:根据 QUEUE_NAME 获取消息(channel.basicQos(1)一次只分发一个,通过手动回复使多个消费者公平的获取和处理消息)
    3,交换机
    生产exchange()  消费exchange1()exchange2()
    生产者:根据一定的 EXCHANGE_NAME,生产多个消息(fanout表示分发,所有的消费者得到同样的信息,广播模式)   
    消费者:根据 EXCHANGE_NAME,每一个消费者都会获取一遍消息

    交换机分为
    1,direct路由,消费者根据路由获取自己关心的消息
    2,Topics主题模糊匹配,消费者根据模糊匹配获取消息

    service

    package com.xmh.mq;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.MessageProperties;
    
    public class ServerTest {
    
        private static final String EXCHANGE_NAME = "logs";
        public final static String QUEUE_NAME="rabbitMQ.test1";
        
        public static void main(String[] args) throws Exception{
            
        }
        
        public static void exchange(){
            try{
                ConnectionFactory factory=new ConnectionFactory();
                factory.setHost("192.168.89.131");
                factory.setUsername("admin");
                factory.setPassword("admin");
                factory.setPort(5672);
                Connection connection=factory.newConnection();
                Channel channel=connection.createChannel();
    
                channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//fanout表示分发,所有的消费者得到同样的队列信息
                //分发信息
                for (int i=0;i<5;i++){
                    String message="Hello World"+i;
                    channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
                    System.out.println("EmitLog Sent '" + message + "'");
                }
                channel.close();
                connection.close();
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        
        /**
         * 多任务分发,两个以上客户端处理消息
         */
        public static void jqdeal(){
            try{
                ConnectionFactory factory=new ConnectionFactory();
                factory.setHost("192.168.89.131");
                factory.setUsername("admin");
                factory.setPassword("admin");
                factory.setPort(5672);
                Connection connection=factory.newConnection();
                Channel channel=connection.createChannel();
                channel.queueDeclare(QUEUE_NAME,true,false,true,null);
                //分发信息
                for (int i=0;i<10;i++){
                    String message="Hello RabbitMQ"+i;
                    channel.basicPublish("",QUEUE_NAME,
                            MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
                    System.out.println("NewTask send '"+message+"'");
                }
                channel.close();
                connection.close();
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        /**
         * 注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、
         * 第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
                  注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体
                 声明队列后mq后台可看到该队列及存放的消息  
         */
        public static void quene(){
            try{
             //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置RabbitMQ相关信息
            factory.setHost("192.168.89.131");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
    //        factory.setVirtualHost("/");
            //创建一个新的连接
            Connection connection = factory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
            //  声明一个队列       
            channel.queueDeclare(QUEUE_NAME, false, false, true, null);
            String message = "Hello RabbitMQ-1";
            //发送消息到队列中
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("Producer Sendss +'" + message + "'");
            //关闭通道和连接
            channel.close();
            connection.close();
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        
    }

    注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
    注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体

    package com.xmh.mq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class ConsumerTest {
    
         private final static String QUEUE_NAME = "rabbitMQ.test1";
         private static final String EXCHANGE_NAME = "logs";
    
            public static void main(String[] args) throws IOException, TimeoutException {
                test_exchange();
                test_exchange2();
            }
            
            /**
             * 必须先订阅(先启动客户端,才能收到服务端消息) 发布/订阅 
             * rabbitMQ其实真正的思想是生产者不发送任何信息到队列,甚至不知道信息将发送到哪个队列。相反生产者只能发送信息到交换机,交换机接收到生产者的信息,
             * 然后按照规则把它推送到对列中,交换机发布/订阅  Fanout扇形交换机 其他有(Direct直连交换机、Topic主题交换机)
             */
            public static void exchange1(){
                try{
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.setHost("192.168.89.131");
                    factory.setUsername("admin");
                    factory.setPassword("admin");
                    factory.setPort(5672);
                    Connection connection = factory.newConnection();
                    Channel channel = connection.createChannel();
    
                    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
                    //产生一个随机的队列名称
                    String queueName = channel.queueDeclare().getQueue();
                    channel.queueBind(queueName, EXCHANGE_NAME, "");//对队列进行绑定
    
                    System.out.println("ReceiveLogs1 Waiting for messages");
                    Consumer consumer = new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                            String message = new String(body, "UTF-8");
                            System.out.println("ReceiveLogs1 Received '" + message + "'");
                        }
                    };
                    channel.basicConsume(queueName, true, consumer);//队列会自动删除
                }catch(Exception e){
                    e.printStackTrace();
                }
            }
            public static void exchange2(){
                try{
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.setHost("192.168.89.131");
                    factory.setUsername("admin");
                    factory.setPassword("admin");
                    factory.setPort(5672);
                    Connection connection = factory.newConnection();
                    Channel channel = connection.createChannel();
    
                    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
                    //产生一个随机的队列名称
                    String queueName = channel.queueDeclare().getQueue();
                    channel.queueBind(queueName, EXCHANGE_NAME, "");//对队列进行绑定
    
                    System.out.println("ReceiveLogs2 Waiting for messages");
                    Consumer consumer = new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                            String message = new String(body, "UTF-8");
                            System.out.println("ReceiveLogs2 Received '" + message + "'");
                        }
                    };
                    channel.basicConsume(queueName, true, consumer);//队列会自动删除
                }catch(Exception e){
                    e.printStackTrace();
                }
            }
            
            /**
             * 对于分发信息的处理
             * 注:channel.basicQos(1);保证一次只分发一个 。autoAck是否自动回复,如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出,
             * 那么就无法获取数据,我们当然是不希望出现这样的情况,所以才去手动回复,每当消费者收到并处理信息然后在通知生成者。最后从队列中删除这条信息。如果消费者异常退出,
             * 如果还有其他消费者,那么就会把队列中的消息发送给其他消费者,如果没有,等消费者启动时候再次发送。
             */
            public static void jqdeal1(){
                try{
                    final ConnectionFactory factory = new ConnectionFactory();
                    factory.setHost("192.168.89.131");
                    factory.setUsername("admin");
                    factory.setPassword("admin");
                    factory.setPort(5672);
                    Connection connection = factory.newConnection();
                    final Channel channel = connection.createChannel();
    
                    channel.queueDeclare(QUEUE_NAME, true, false, true, null);
                    System.out.println("Worker1  Waiting for messages");
    
                    //每次从队列获取的数量
                    channel.basicQos(1);
    
                    final Consumer consumer = new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope,
                                                   AMQP.BasicProperties properties,
                                                   byte[] body) throws IOException {
                            //envelope - packaging data for the message  body - the message body (opaque, client-specific byte array)
                            String message = new String(body, "UTF-8");
                            System.out.println("Worker1  Received '" + message + "'");
                            try {
                                doWork(message);
                                channel.basicAck(envelope.getDeliveryTag(),false);//Acknowledge one or several received messages.  .getDeliveryTag() 消息传递标记
                                System.out.println("Worker1 Done");
                            }catch (Exception e){
                                channel.abort();//终止渠道
                            }finally {
                            }
                        }
                    };
                    boolean autoAck=false;//手动回复
                    //消息消费完成确认
                    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
                    
                    } catch (Exception e){
                        e.printStackTrace();
                    }
            }
            //对于分发信息的处理
            public static void jqdeal2(){
                try{
                    final ConnectionFactory factory = new ConnectionFactory();
                    factory.setHost("192.168.89.131");
                    factory.setUsername("admin");
                    factory.setPassword("admin");
                    factory.setPort(5672);
                    Connection connection = factory.newConnection();
                    final Channel channel = connection.createChannel();
    
                    channel.queueDeclare(QUEUE_NAME, true, false, true, null);
                    System.out.println("Worker2  Waiting for messages");
    
                    //每次从队列获取的数量
                    channel.basicQos(1);
    
                    final Consumer consumer = new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope,
                                                   AMQP.BasicProperties properties,
                                                   byte[] body) throws IOException {
                            //envelope - packaging data for the message  body - the message body (opaque, client-specific byte array)
                            String message = new String(body, "UTF-8");
                            System.out.println("Worker2  Received '" + message + "'");
                            try {
                                doWork(message);
                                //Integer.parseInt("s");
                                channel.basicAck(envelope.getDeliveryTag(),false);//Acknowledge one or several received messages.  .getDeliveryTag() 消息传递标记
                                System.out.println("Worker2 Done");
                            }catch (Exception e){
                                channel.abort();//终止渠道,另外一个客户端会继续获取消息
                                System.out.println("Worker2 客户端处理异常");
                            }finally {
                            }
                        }
                    };
                    boolean autoAck=false;
                    //消息消费完成确认
                    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
                    } catch (Exception e){
                        e.printStackTrace();
                    }
            }
            private static void doWork(String task) {
                try {
                    Thread.sleep(1000); // 暂停1秒钟
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
            /**
             * 手动关掉客户端后,该队列会自动删除(声明队列时设置)
             */
            public static void test2(){
                try{
                // 创建连接工厂
                ConnectionFactory factory = new ConnectionFactory();
                //设置RabbitMQ地址
                factory.setHost("192.168.89.131");// 192.168.89.131 :15672
                factory.setUsername("admin");
                factory.setPassword("admin");
                factory.setPort(5672);
                //创建一个新的连接
                Connection connection = factory.newConnection();
                //创建一个通道
                Channel channel = connection.createChannel();
                //声明要关注的队列
                channel.queueDeclare(QUEUE_NAME, false, false, true, null);
                System.out.println("Customer Waiting Received messages");
                //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
                // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery,持续监听
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                                               AMQP.BasicProperties properties, byte[] body)
                            throws IOException {
                        String message = new String(body, "UTF-8");
                        System.out.println("Customer Received '" + message + "'");
                    }
                };
                //自动回复队列应答 -- RabbitMQ中的消息确认机制
                channel.basicConsume(QUEUE_NAME, true, consumer);
                } catch (Exception e){
                    e.printStackTrace();
                }
            }
    }

    关于多任务分发(Round-robin dispatching 循环分发)
    RabbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的。如果现在load加重,那么只需要创建更多的Consumer来进行任务处理即 可。当然了,对于负载还要加大怎么办?我没有遇到过这种情况,那就可以创建多个virtual Host,细化不同的通信类别了。
    1、首先开启两个Consumer,即运行两个Recieve.cs。
    2、在开启两个Producer,即运行两个Producer.cs。
    默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin(优雅分发)
    你可能也注意到了,分发机制不是那么优雅。默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。当然n是取余后的。它不管Consumer是否还有unacked Message,只是按照这个默认机制进行分发。
    那么如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却是毫无休息的机会。那么,RabbitMQ是如何处理这种问题呢
    通过 BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。 设置方法如下:
    channel.BasicQos(0, 1, false);
    //每次从队列获取的数量 Fair dispatch   (公平分发)
    channel.basicQos(1);
    注释:
    如果添加该代码使用公平分发,在一个consumer处理失败后producer得不到ack回应,就不会再向他分发消息
    如果使用默认的Round-robin dispatching 循环分发,producer不管有没有得到ack回应,都会轮询向其分发
     
    1、持久化
    exchange、queue持久化,声明时durable = true
    Message持久化  发送消息时MessageProperties.PERSISTENT_TEXT_PLAIN
    2、序列化
    RabbitMq抽象MessageConvert接口,默认的序列化类为SimpleMessageConverter
    序列化内容:Message的body
    其他序列化方式:jdk序列化,hessian,jackson,protobuf等
    注释:
    channel.queueDeclare(QUEUE_NAME,false,false,true,null);//(队列名称、是否持久化、是否为独占队列(创建者可以使用的私有队列,断开后自动删除)、当所有消费断开是否自动删除、其他) 

    http://blog.csdn.net/u013256816/article/details/53524766
    https://www.cnblogs.com/wangiqngpei557/p/6158094.html
    http://blog.csdn.net/u010029983/article/details/45337365 

  • 相关阅读:
    JavaScript 技巧
    网页打开客户端本机程序,未安装则提示要求安装
    IIS(World Wide Web Publishing Service)127 无法响应的解决方法
    Jquery hover事件 示例
    JavaScript MVC
    jquery调用基于.NET Framework 3.5的WebService返回JSON数据
    文件下载类
    网页打印局部示例
    未能执行URL(FCK)
    网页中各种宽高
  • 原文地址:https://www.cnblogs.com/xingminghui/p/8650394.html
Copyright © 2011-2022 走看看