zoukankan      html  css  js  c++  java
  • RabbitMQ 概念与Java例子

    RabbitMQ简介

    目前RabbitMQ是AMQP 0-9-1(高级消息队列协议)的一个实现,使用Erlang语言编写,利用了Erlang的分布式特性。


    概念介绍:

    1. Broker:简单来说就是消息队列服务器实体。
    2. Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
    3. Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
    4. Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
    5. Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
    6. vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
    7. producer:消息生产者,就是投递消息的程序。
    8. consumer:消息消费者,就是接受消息的程序。
    9. channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

    使用流程

    AMQP模型中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式发到相应的Queue上,Queue又将消息发送给consumer,消息从queue到consumer有push和pull两种方式。 消息队列的使用过程大概如下:

    1. 客户端连接到消息队列服务器,打开一个channel。
    2. 客户端声明一个exchange,并设置相关属性。
    3. 客户端声明一个queue,并设置相关属性。
    4. 客户端使用routing key,在exchange和queue之间建立好绑定关系。
    5. 客户端投递消息到exchange。

    exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。 exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。

    Exchange类型

    Exchange路由消息的集中类型:

    名称

    默认的预先定义exchange名字

    作用描述

    Direct exchange

    (Empty string) and amq.direct

    根据Binding指定的Routing Key,将符合Key的消息发送到BindingQueue

    Fanout exchange

    amq.fanout

    将同一个message发送到所有同该Exchange bingdingqueue

    Topic exchange

    amq.topic

    根据Binding指定的Routing KeyExchangekey进行模式匹配后路由到相应的Queue,模式匹配时符号”#”匹配一个或多个词,符号”*”匹配正好一个词。

    Headers exchange

    amq.match (and amq.headers in RabbitMQ)

    direct exchange类似,不同之处是不再使用Routing Key路由,而是使用headersmessage attributes)进行匹配路由到指定Queue

    参考:

    http://www.choudan.net/2013/07/25/OpenStack-RabbitMQ(%E4%B8%80).html

    http://stephansun.iteye.com/blog/1452853

    http://www.diggerplus.org/archives/3110

    http://backend.blog.163.com/blog/static/202294126201322563245975/

    http://lynnkong.iteye.com/blog/1699684

    特性:

    1. broker的持久化:exchange和queue声明为durable时,exchange和queue的配置会在服务端磁盘保存起来,这样在服务停掉重启后,exchange和queue以及其相应的binding等配置不会丢失;
    2. message的持久化:当message的deliver mode attribute(message properties)设置为2时,每个未被消费的message将被保存在磁盘中,在服务重启后仍能保存。
      message在文件中的保存参考:http://my.oschina.net/hncscwc/blog/182083
    3. cluster:RabbitMQ支持多个nodes(每个nodes是一个RabbitMQ实例)组成一个cluster,访问cluster中的任意一个node的效果是相同的,也就是说任何一个message都可以在任意一个nodes上生产和消费(生产或消费的message会在nodes间中转)。
    4. mirrored-queue:RabbitMQ在cluster的基础上,支持同一个queue的message同时存储在多个nodes上,这样当部分节点失效时,可保证message和broker的配置不丢失。


    安装与配置

    1.安装erlang

    sudo apt-get install tk tcl unixODBC erlang 
    sudo vim /etc/profile
    添加export PATH=$PATH:/usr/lib/erlang/bin/

    2.安装rabbitmq

    sudo apt-get install rabbitmq-server
    sudo vim /etc/profile 添加export PATH=$PATH:/usr/lib/rabbitmq/bin
    source /etc/profile

    rabbitmq的基本配置(端口等)参考:http://my.oschina.net/hncscwc/blog/302339

    3.用户与权限

    在正式应用之前,我们先在RabbitMQ里创建一个vhost,加一个用户,并设置该用户的权限。使用rabbitmqctl客户端工具,在根目录下创建”/mq_test”这个vhost:

    rabbitmqctl add_vhost /mq_test

    创建一个用户名”test,设置密码”test123″:

    rabbitmqctl add_user test test123

    设置pyh用户对/pyhtest这个vhost拥有全部权限:

    rabbitmqctl set_permissions -p /mq_test test “.*” “.*” “.*”、

    后面三个”*”代表pyh用户拥有对/pyhtest的配置、写、读全部权限


    参考:http://my.oschina.net/hncscwc/blog/262246

    4.配置开启web管理插件

    cat <<EOF>> /etc/rabbitmq/enabled_plugins
    [rabbitmq_management]. 
    EOF

    可以通过http://localhost:15672/ 查看运行情况

    5.启动

    使用root权限运行rabbitmq-server 或使用/etc/init.d/rabbitmq-server start|restart|stop

    6.在一台机器上启动多个节点(模拟集群)

    vim start_rabbitmq_cluster.sh

    添加以下内容

    #!/bin/bash
    
    if [[ $# != 1 ]]
    then
        echo "Usage: $0 process_num"
        exit 1
    fi
    
    HOST_NAME=`hostname`
    START_NUM=0
    PROCESS_NUM=$1
    END_NUM=$(( START_NUM + PROCESS_NUM - 1))
    
    RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME="rabbit" RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]" rabbitmq-server -detached
    
    for (( i=$((START_NUM+1)); i<=$END_NUM; i++  ))
    do
        RABBITMQ_PROT=$(( i + 5672 ))
        MANAGE_PORT=$(( i + 15672  ))
        NODE_NAME="rabbit_$i"
        echo $RABBITMQ_PROT
        echo $MANAGE_PORT
        echo $NODE_NAME
        RABBITMQ_NODE_PORT=$RABBITMQ_PROT RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,$MANAGE_PORT}]" RABBITMQ_NODENAME=$NODE_NAME rabbitmq-server -detached
        sleep 3
        
        rabbitmqctl -n $NODE_NAME stop_app
        rabbitmqctl -n $NODE_NAME reset
        echo "join cluster"
        rabbitmqctl -n $NODE_NAME join_cluster rabbit@$HOST_NAME
        rabbitmqctl -n $NODE_NAME start_app
    done
    
    rabbitmqctl cluster_status -n rabbit

    运行

    chmod a+x start_rabbitmq_cluster.sh
    start_rabbitmq_cluster.sh 3

    启动后可以通过rabbitmqctl -n rabbit cluster_status查看集群节点配置情况,或者在web管理页面中查看

    7.在多台机器上建立集群

    首先在主节点上启动服务

    然后将其他机器的rabbitmq加入集群

    1.将主服务器的/var/log/rabbitmq/.erlang.cookie 拷贝到新节点
    2.在新节点上将文件所有人更改为rabbitmq,注意保持文件权限为所有者只读,其他人无权限
    chown rabbitmq.rabbitmq /var/log/rabbitmq/.erlang.cookie
    3.在新节点上加入集群、
    /etc/init.d/rabbitmq-server start
    rabbitmqctl -n rabbit stop_app
    rabbitmqctl -n rabbit reset
    rabbitmqctl -n rabbit join_cluster  rabbit@$MASTER_NODE
    rabbitmqctl -n rabbit start_app

    8.配置network partion时的处理方式

    cat<<EOF>> /usr/local/rabbitmq/rabbitmq_server-3.1.0/etc/rabbitmq/rabbitmq.conf 
    [
    {rabbit, [{cluster_partition_handling, pause_minority}]}
    ].
    EOF

    参考:http://my.oschina.net/hncscwc/blog/174417

    Java代码示例

    首先在项目中添加maven依赖

        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.2.2</version>
            </dependency>
        </dependencies>

    Producer

    import com.rabbitmq.client.*;
    import com.sun.deploy.util.StringUtils;
    
    import java.io.IOException;
    import java.lang.String;
    import java.lang.System;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Scanner;
    
    public class Producer {
        //exchange type
        public enum XT {
            DEFAULT, DIRECT, TOPIC, HEADERS, FANOUT
        }
    
        private static final String QUEUE_NAME = "log2";
    
        public static void main(String[] args) throws IOException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost"); //使用默认端口连接本地rabbitmq服务器
    
            Connection connection = factory.newConnection(); //声明一个连接
            Channel channel = connection.createChannel(); //声明消息通道
    
            //exchange类型 参考:http://stephansun.iteye.com/blog/1452853
            XT xt = XT.HEADERS;
            switch (xt) {
                case DEFAULT: //默认,向指定的队列发送消息,消息只会被一个consumer处理,多个消费者消息会轮训处理,消息发送时如果没有consumer,消息不会丢失
                    //为消息通道绑定一个队列
                    //队列的相关参数需要与第一次定义该队列时相同,否则会出错
                    //参数1:队列名称
                    //参数2:为true时server重启队列不会消失
                    //参数3:队列是否是独占的,如果为true只能被一个connection使用,其他连接建立时会抛出异常
                    //参数4:队列不再使用时是否自动删除(没有连接,并且没有未处理的消息)
                    //参数5:建立队列时的其他参数
                    channel.queueDeclare(QUEUE_NAME, true, false, true, null);
    
                    while (GetInputString()) {
                        //向server发布一条消息
                        //参数1:exchange名字,若为空则使用默认的exchange
                        //参数2:routing key
                        //参数3:其他的属性
                        //参数4:消息体
                        //RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,
                        //任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃
                        channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); //设置消息为持久化,服务器重启不会丢失
    
                        System.out.println("Send " + message);
                    }
                    break;
                case FANOUT:
                    //广播给所有队列  接收方也必须通过fanout交换机获取消息,所有连接到该交换机的consumer均可获取消息
                    //如果producer在发布消息时没有consumer在监听,消息将被丢弃
    
    
                    //定义一个交换机
                    //参数1:交换机名称
                    //参数2:交换机类型
                    //参数3:交换机持久性,如果为true则服务器重启时不会丢失
                    //参数4:交换机在不被使用时是否删除
                    //参数5:交换机的其他属性
                    channel.exchangeDeclare(XCHG_NAME, "fanout", true, true, null);
    
                    while (GetInputString()) {
                        //发送一条广播消息,参数2此时无意义
                        channel.basicPublish(XCHG_NAME, "", null, message.getBytes());
    
                        System.out.println("Send " + message);
                    }
                    break;
                case DIRECT:
                    //向所有绑定了相应routing key的队列发送消息
                    //如果producer在发布消息时没有consumer在监听,消息将被丢弃
                    //如果有多个consumer监听了相同的routing key  则他们都会受到消息
    
                    channel.exchangeDeclare(XCHG_NAME, "direct", true, true, null);
    
                    while (GetInputString()) {
                        //input like : info message
                        String[] temp = StringUtils.splitString(message, " ");
                        channel.basicPublish(XCHG_NAME, temp[0], null, temp[1].getBytes());
                        System.out.println("Send " + message);
                    }
                    break;
                case TOPIC:
                    //与direct模式有类似之处,都使用routing key作为路由
                    //不同之处在于direct模式只能指定固定的字符串,而topic可以指定一个字符串模式
    
                    channel.exchangeDeclare(XCHG_NAME, "topic", true, true, null);
                    while (GetInputString()) {
                        //input like : topic message
                        String[] temp = StringUtils.splitString(message, " ");
                        channel.basicPublish(XCHG_NAME, temp[0], null, temp[1].getBytes());
                        System.out.println("Send " + message);
                    }
                    break;
                case HEADERS:
                    //与topic和direct有一定相似之处,但不是通过routing key来路由消息
                    //通过headers中词的匹配来进行路由
    
                    channel.exchangeDeclare(XCHG_NAME, "headers", true, true, null);
                    while (GetInputString()) {
                        //input like : headers message
                        String[] temp = StringUtils.splitString(message, " ");
    
                        Map<String, Object> headers = new HashMap<String, Object>();
                        headers.put("name", temp[0]); //定义headers
                        headers.put("sex", temp[1]);
                        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder().headers(headers);
    
                        channel.basicPublish(XCHG_NAME, "", builder.build(), temp[2].getBytes()); //根据headers路由到相应的consumer
                        System.out.println("Send " + message);
                    }
                    break;
            }
            channel.close();
            connection.close();
        }
    
        private static boolean GetInputString() {
            message = scanner.nextLine();
            if (message.length() == 0) return false;
            return true;
        }
    
        private static Scanner scanner = new Scanner(System.in);
        private static String message = "";
        public static String XCHG_NAME = "xchg3";
    }

    Consumer

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class Consumer {
        private static final String QUEUE_NAME = "log2";
    
        public static void main(String[] args) throws IOException, InterruptedException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            String queueName = QUEUE_NAME;
    
            Producer.XT xt = Producer.XT.HEADERS;
    
            switch (xt) {
                case DEFAULT:
                    //队列的相关参数需要与第一次定义该队列时相同,否则会出错,使用channel.queueDeclarePassive()可只被动绑定已有队列,而不创建
                    channel.queueDeclare(queueName, true, false, true, null);
                    break;
                case FANOUT:
                    //接收端也声明一个fanout交换机
                    channel.exchangeDeclare(Producer.XCHG_NAME, "fanout", true, true, null);
                    //channel.exchangeDeclarePassive() 可以使用该函数使用一个已经建立的exchange
                    //声明一个临时队列,该队列会在使用完比后自动销毁
                    queueName = channel.queueDeclare().getQueue();
                    //将队列绑定到交换机,参数3无意义此时
                    channel.queueBind(queueName, Producer.XCHG_NAME, "");
                    break;
                case DIRECT:
                    channel.exchangeDeclare(Producer.XCHG_NAME, "direct", true, true, null);
                    queueName = channel.queueDeclare().getQueue();
                    channel.queueBind(queueName, Producer.XCHG_NAME, "info"); //绑定一个routing key,可以绑定多个
                    channel.queueBind(queueName, Producer.XCHG_NAME, "warning");
                    break;
                case TOPIC:
                    channel.exchangeDeclare(Producer.XCHG_NAME, "topic", true, true, null);
                    queueName = channel.queueDeclare().getQueue();
                    channel.queueBind(queueName, Producer.XCHG_NAME, "warning.#"); //监听两种模式 #匹配一个或多个单词 *匹配一个单词
                    channel.queueBind(queueName, Producer.XCHG_NAME, "*.blue");
                    break;
                case HEADERS:
                    channel.exchangeDeclare(Producer.XCHG_NAME, "headers", true, true, null);
                    queueName = channel.queueDeclare().getQueue();
                    Map<String, Object> headers = new HashMap<String, Object>() {{
                        put("name", "test");
                        put("sex", "male");
                        put("x-match", "any");//all==匹配所有条件,any==匹配任意条件
                    }};
                    channel.queueBind(queueName, Producer.XCHG_NAME, "", headers);
                    break;
            }
    
            // 在同一时间不要给一个worker一个以上的消息。
            // 不要将一个新的消息分发给worker知道它处理完了并且返回了前一个消息的通知标志(acknowledged)
            // 替代的,消息将会分发给下一个不忙的worker。
            channel.basicQos(1); //server push消息时的队列长度
    
            //用来缓存服务器推送过来的消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
    
            //为channel声明一个consumer,服务器会推送消息
            //参数1:队列名称
            //参数2:是否发送ack包,不发送ack消息会持续在服务端保存,直到收到ack。  可以通过channel.basicAck手动回复ack
            //参数3:消费者
            channel.basicConsume(queueName, false, consumer);
            //channel.basicGet() //使用该函数主动去服务器检索是否有新消息,而不是等待服务器推送
    
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                System.out.println("Received " + new String(delivery.getBody()));
    
                //回复ack包,如果不回复,消息不会在服务器删除
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                //channel.basicReject(); channel.basicNack(); //可以通过这两个函数拒绝消息,可以指定消息在服务器删除还是继续投递给其他消费者
            }
        }
    }
  • 相关阅读:
    CSS 实现隐藏滚动条同时又可以滚动
    在vue项目中的axios使用配置记录
    QS:vue中qs的使用
    Electron 无边框窗口最大化最小化关闭功能
    CSS样式表能否控制文字禁止选择,复制, 焦点
    yarn 在Vue框架中的常用命令
    Vue 实现左边导航栏且右边显示具体内容(element-ui)
    Vuex 存储||获取后台接口数据
    软件工程第二周开课介绍
    返回一个整数数组中最大子数组的和 (非环状 和环状)
  • 原文地址:https://www.cnblogs.com/stormli/p/rabbitmq.html
Copyright © 2011-2022 走看看