zoukankan      html  css  js  c++  java
  • RabbitMQ 概念与Java例子

    RabbitMQ简介

    目前RabbitMQ是AMQP 0-9-1(高级消息队列协议)的一个实现,使用Erlang语言编写,利用了Erlang的分布式特性。


    概念介绍:

    1. Broker:简单来说就是消息队列服务器实体。
    2. Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
    3. Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
    4. Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
    5. Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
    6. vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
    7. producer:消息生产者,就是投递消息的程序。
    8. consumer:消息消费者,就是接受消息的程序。
    9. channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

    使用流程

    AMQP模型中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式发到相应的Queue上,Queue又将消息发送给consumer,消息从queue到consumer有push和pull两种方式。 消息队列的使用过程大概如下:

    1. 客户端连接到消息队列服务器,打开一个channel。
    2. 客户端声明一个exchange,并设置相关属性。
    3. 客户端声明一个queue,并设置相关属性。
    4. 客户端使用routing key,在exchange和queue之间建立好绑定关系。
    5. 客户端投递消息到exchange。

    exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。 exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。

    Exchange类型

    Exchange路由消息的集中类型:

    名称

    默认的预先定义exchange名字

    作用描述

    Direct exchange

    (Empty string) and amq.direct

    根据Binding指定的Routing Key,将符合Key的消息发送到BindingQueue

    Fanout exchange

    amq.fanout

    将同一个message发送到所有同该Exchange bingdingqueue

    Topic exchange

    amq.topic

    根据Binding指定的Routing KeyExchangekey进行模式匹配后路由到相应的Queue,模式匹配时符号”#”匹配一个或多个词,符号”*”匹配正好一个词。

    Headers exchange

    amq.match (and amq.headers in RabbitMQ)

    direct exchange类似,不同之处是不再使用Routing Key路由,而是使用headersmessage attributes)进行匹配路由到指定Queue

    参考:

    http://www.choudan.net/2013/07/25/OpenStack-RabbitMQ(%E4%B8%80).html

    http://stephansun.iteye.com/blog/1452853

    http://www.diggerplus.org/archives/3110

    http://backend.blog.163.com/blog/static/202294126201322563245975/

    http://lynnkong.iteye.com/blog/1699684

    特性:

    1. broker的持久化:exchange和queue声明为durable时,exchange和queue的配置会在服务端磁盘保存起来,这样在服务停掉重启后,exchange和queue以及其相应的binding等配置不会丢失;
    2. message的持久化:当message的deliver mode attribute(message properties)设置为2时,每个未被消费的message将被保存在磁盘中,在服务重启后仍能保存。
      message在文件中的保存参考:http://my.oschina.net/hncscwc/blog/182083
    3. cluster:RabbitMQ支持多个nodes(每个nodes是一个RabbitMQ实例)组成一个cluster,访问cluster中的任意一个node的效果是相同的,也就是说任何一个message都可以在任意一个nodes上生产和消费(生产或消费的message会在nodes间中转)。
    4. mirrored-queue:RabbitMQ在cluster的基础上,支持同一个queue的message同时存储在多个nodes上,这样当部分节点失效时,可保证message和broker的配置不丢失。


    安装与配置

    1.安装erlang

    sudo apt-get install tk tcl unixODBC erlang 
    sudo vim /etc/profile
    添加export PATH=$PATH:/usr/lib/erlang/bin/

    2.安装rabbitmq

    sudo apt-get install rabbitmq-server
    sudo vim /etc/profile 添加export PATH=$PATH:/usr/lib/rabbitmq/bin
    source /etc/profile

    rabbitmq的基本配置(端口等)参考:http://my.oschina.net/hncscwc/blog/302339

    3.用户与权限

    在正式应用之前,我们先在RabbitMQ里创建一个vhost,加一个用户,并设置该用户的权限。使用rabbitmqctl客户端工具,在根目录下创建”/mq_test”这个vhost:

    rabbitmqctl add_vhost /mq_test

    创建一个用户名”test,设置密码”test123″:

    rabbitmqctl add_user test test123

    设置pyh用户对/pyhtest这个vhost拥有全部权限:

    rabbitmqctl set_permissions -p /mq_test test “.*” “.*” “.*”、

    后面三个”*”代表pyh用户拥有对/pyhtest的配置、写、读全部权限


    参考:http://my.oschina.net/hncscwc/blog/262246

    4.配置开启web管理插件

    cat <<EOF>> /etc/rabbitmq/enabled_plugins
    [rabbitmq_management]. 
    EOF

    可以通过http://localhost:15672/ 查看运行情况

    5.启动

    使用root权限运行rabbitmq-server 或使用/etc/init.d/rabbitmq-server start|restart|stop

    6.在一台机器上启动多个节点(模拟集群)

    vim start_rabbitmq_cluster.sh

    添加以下内容

    #!/bin/bash
    
    if [[ $# != 1 ]]
    then
        echo "Usage: $0 process_num"
        exit 1
    fi
    
    HOST_NAME=`hostname`
    START_NUM=0
    PROCESS_NUM=$1
    END_NUM=$(( START_NUM + PROCESS_NUM - 1))
    
    RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME="rabbit" RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]" rabbitmq-server -detached
    
    for (( i=$((START_NUM+1)); i<=$END_NUM; i++  ))
    do
        RABBITMQ_PROT=$(( i + 5672 ))
        MANAGE_PORT=$(( i + 15672  ))
        NODE_NAME="rabbit_$i"
        echo $RABBITMQ_PROT
        echo $MANAGE_PORT
        echo $NODE_NAME
        RABBITMQ_NODE_PORT=$RABBITMQ_PROT RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,$MANAGE_PORT}]" RABBITMQ_NODENAME=$NODE_NAME rabbitmq-server -detached
        sleep 3
        
        rabbitmqctl -n $NODE_NAME stop_app
        rabbitmqctl -n $NODE_NAME reset
        echo "join cluster"
        rabbitmqctl -n $NODE_NAME join_cluster rabbit@$HOST_NAME
        rabbitmqctl -n $NODE_NAME start_app
    done
    
    rabbitmqctl cluster_status -n rabbit

    运行

    chmod a+x start_rabbitmq_cluster.sh
    start_rabbitmq_cluster.sh 3

    启动后可以通过rabbitmqctl -n rabbit cluster_status查看集群节点配置情况,或者在web管理页面中查看

    7.在多台机器上建立集群

    首先在主节点上启动服务

    然后将其他机器的rabbitmq加入集群

    1.将主服务器的/var/log/rabbitmq/.erlang.cookie 拷贝到新节点
    2.在新节点上将文件所有人更改为rabbitmq,注意保持文件权限为所有者只读,其他人无权限
    chown rabbitmq.rabbitmq /var/log/rabbitmq/.erlang.cookie
    3.在新节点上加入集群、
    /etc/init.d/rabbitmq-server start
    rabbitmqctl -n rabbit stop_app
    rabbitmqctl -n rabbit reset
    rabbitmqctl -n rabbit join_cluster  rabbit@$MASTER_NODE
    rabbitmqctl -n rabbit start_app

    8.配置network partion时的处理方式

    cat<<EOF>> /usr/local/rabbitmq/rabbitmq_server-3.1.0/etc/rabbitmq/rabbitmq.conf 
    [
    {rabbit, [{cluster_partition_handling, pause_minority}]}
    ].
    EOF

    参考:http://my.oschina.net/hncscwc/blog/174417

    Java代码示例

    首先在项目中添加maven依赖

        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.2.2</version>
            </dependency>
        </dependencies>

    Producer

    import com.rabbitmq.client.*;
    import com.sun.deploy.util.StringUtils;
    
    import java.io.IOException;
    import java.lang.String;
    import java.lang.System;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Scanner;
    
    public class Producer {
        //exchange type
        public enum XT {
            DEFAULT, DIRECT, TOPIC, HEADERS, FANOUT
        }
    
        private static final String QUEUE_NAME = "log2";
    
        public static void main(String[] args) throws IOException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost"); //使用默认端口连接本地rabbitmq服务器
    
            Connection connection = factory.newConnection(); //声明一个连接
            Channel channel = connection.createChannel(); //声明消息通道
    
            //exchange类型 参考:http://stephansun.iteye.com/blog/1452853
            XT xt = XT.HEADERS;
            switch (xt) {
                case DEFAULT: //默认,向指定的队列发送消息,消息只会被一个consumer处理,多个消费者消息会轮训处理,消息发送时如果没有consumer,消息不会丢失
                    //为消息通道绑定一个队列
                    //队列的相关参数需要与第一次定义该队列时相同,否则会出错
                    //参数1:队列名称
                    //参数2:为true时server重启队列不会消失
                    //参数3:队列是否是独占的,如果为true只能被一个connection使用,其他连接建立时会抛出异常
                    //参数4:队列不再使用时是否自动删除(没有连接,并且没有未处理的消息)
                    //参数5:建立队列时的其他参数
                    channel.queueDeclare(QUEUE_NAME, true, false, true, null);
    
                    while (GetInputString()) {
                        //向server发布一条消息
                        //参数1:exchange名字,若为空则使用默认的exchange
                        //参数2:routing key
                        //参数3:其他的属性
                        //参数4:消息体
                        //RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,
                        //任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃
                        channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); //设置消息为持久化,服务器重启不会丢失
    
                        System.out.println("Send " + message);
                    }
                    break;
                case FANOUT:
                    //广播给所有队列  接收方也必须通过fanout交换机获取消息,所有连接到该交换机的consumer均可获取消息
                    //如果producer在发布消息时没有consumer在监听,消息将被丢弃
    
    
                    //定义一个交换机
                    //参数1:交换机名称
                    //参数2:交换机类型
                    //参数3:交换机持久性,如果为true则服务器重启时不会丢失
                    //参数4:交换机在不被使用时是否删除
                    //参数5:交换机的其他属性
                    channel.exchangeDeclare(XCHG_NAME, "fanout", true, true, null);
    
                    while (GetInputString()) {
                        //发送一条广播消息,参数2此时无意义
                        channel.basicPublish(XCHG_NAME, "", null, message.getBytes());
    
                        System.out.println("Send " + message);
                    }
                    break;
                case DIRECT:
                    //向所有绑定了相应routing key的队列发送消息
                    //如果producer在发布消息时没有consumer在监听,消息将被丢弃
                    //如果有多个consumer监听了相同的routing key  则他们都会受到消息
    
                    channel.exchangeDeclare(XCHG_NAME, "direct", true, true, null);
    
                    while (GetInputString()) {
                        //input like : info message
                        String[] temp = StringUtils.splitString(message, " ");
                        channel.basicPublish(XCHG_NAME, temp[0], null, temp[1].getBytes());
                        System.out.println("Send " + message);
                    }
                    break;
                case TOPIC:
                    //与direct模式有类似之处,都使用routing key作为路由
                    //不同之处在于direct模式只能指定固定的字符串,而topic可以指定一个字符串模式
    
                    channel.exchangeDeclare(XCHG_NAME, "topic", true, true, null);
                    while (GetInputString()) {
                        //input like : topic message
                        String[] temp = StringUtils.splitString(message, " ");
                        channel.basicPublish(XCHG_NAME, temp[0], null, temp[1].getBytes());
                        System.out.println("Send " + message);
                    }
                    break;
                case HEADERS:
                    //与topic和direct有一定相似之处,但不是通过routing key来路由消息
                    //通过headers中词的匹配来进行路由
    
                    channel.exchangeDeclare(XCHG_NAME, "headers", true, true, null);
                    while (GetInputString()) {
                        //input like : headers message
                        String[] temp = StringUtils.splitString(message, " ");
    
                        Map<String, Object> headers = new HashMap<String, Object>();
                        headers.put("name", temp[0]); //定义headers
                        headers.put("sex", temp[1]);
                        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder().headers(headers);
    
                        channel.basicPublish(XCHG_NAME, "", builder.build(), temp[2].getBytes()); //根据headers路由到相应的consumer
                        System.out.println("Send " + message);
                    }
                    break;
            }
            channel.close();
            connection.close();
        }
    
        private static boolean GetInputString() {
            message = scanner.nextLine();
            if (message.length() == 0) return false;
            return true;
        }
    
        private static Scanner scanner = new Scanner(System.in);
        private static String message = "";
        public static String XCHG_NAME = "xchg3";
    }

    Consumer

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class Consumer {
        private static final String QUEUE_NAME = "log2";
    
        public static void main(String[] args) throws IOException, InterruptedException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            String queueName = QUEUE_NAME;
    
            Producer.XT xt = Producer.XT.HEADERS;
    
            switch (xt) {
                case DEFAULT:
                    //队列的相关参数需要与第一次定义该队列时相同,否则会出错,使用channel.queueDeclarePassive()可只被动绑定已有队列,而不创建
                    channel.queueDeclare(queueName, true, false, true, null);
                    break;
                case FANOUT:
                    //接收端也声明一个fanout交换机
                    channel.exchangeDeclare(Producer.XCHG_NAME, "fanout", true, true, null);
                    //channel.exchangeDeclarePassive() 可以使用该函数使用一个已经建立的exchange
                    //声明一个临时队列,该队列会在使用完比后自动销毁
                    queueName = channel.queueDeclare().getQueue();
                    //将队列绑定到交换机,参数3无意义此时
                    channel.queueBind(queueName, Producer.XCHG_NAME, "");
                    break;
                case DIRECT:
                    channel.exchangeDeclare(Producer.XCHG_NAME, "direct", true, true, null);
                    queueName = channel.queueDeclare().getQueue();
                    channel.queueBind(queueName, Producer.XCHG_NAME, "info"); //绑定一个routing key,可以绑定多个
                    channel.queueBind(queueName, Producer.XCHG_NAME, "warning");
                    break;
                case TOPIC:
                    channel.exchangeDeclare(Producer.XCHG_NAME, "topic", true, true, null);
                    queueName = channel.queueDeclare().getQueue();
                    channel.queueBind(queueName, Producer.XCHG_NAME, "warning.#"); //监听两种模式 #匹配一个或多个单词 *匹配一个单词
                    channel.queueBind(queueName, Producer.XCHG_NAME, "*.blue");
                    break;
                case HEADERS:
                    channel.exchangeDeclare(Producer.XCHG_NAME, "headers", true, true, null);
                    queueName = channel.queueDeclare().getQueue();
                    Map<String, Object> headers = new HashMap<String, Object>() {{
                        put("name", "test");
                        put("sex", "male");
                        put("x-match", "any");//all==匹配所有条件,any==匹配任意条件
                    }};
                    channel.queueBind(queueName, Producer.XCHG_NAME, "", headers);
                    break;
            }
    
            // 在同一时间不要给一个worker一个以上的消息。
            // 不要将一个新的消息分发给worker知道它处理完了并且返回了前一个消息的通知标志(acknowledged)
            // 替代的,消息将会分发给下一个不忙的worker。
            channel.basicQos(1); //server push消息时的队列长度
    
            //用来缓存服务器推送过来的消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
    
            //为channel声明一个consumer,服务器会推送消息
            //参数1:队列名称
            //参数2:是否发送ack包,不发送ack消息会持续在服务端保存,直到收到ack。  可以通过channel.basicAck手动回复ack
            //参数3:消费者
            channel.basicConsume(queueName, false, consumer);
            //channel.basicGet() //使用该函数主动去服务器检索是否有新消息,而不是等待服务器推送
    
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                System.out.println("Received " + new String(delivery.getBody()));
    
                //回复ack包,如果不回复,消息不会在服务器删除
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                //channel.basicReject(); channel.basicNack(); //可以通过这两个函数拒绝消息,可以指定消息在服务器删除还是继续投递给其他消费者
            }
        }
    }
  • 相关阅读:
    [ jquery 选择器 :hidden ] 此方法选取匹配所有不可见元素,或者type为hidden的元素
    剑指 Offer 03. 数组中重复的数字 哈希
    LeetCode 1736. 替换隐藏数字得到的最晚时间 贪心
    Leetcode 1552. 两球之间的磁力 二分
    Leetcode 88. 合并两个有序数组 双指针
    LeetCode 1744. 你能在你最喜欢的那天吃到你最喜欢的糖果吗?
    LeetCode 1743. 相邻元素对还原数组 哈希
    LeetCode 1745. 回文串分割 IV dp
    剑指 Offer 47. 礼物的最大价值 dp
    剑指 Offer 33. 二叉搜索树的后序遍历序列 树的遍历
  • 原文地址:https://www.cnblogs.com/stormli/p/rabbitmq.html
Copyright © 2011-2022 走看看