zoukankan      html  css  js  c++  java
  • RabbitMQ简单实现,exchange四种模式,持久化

    RabbitMQ
    目录

    一、简介,简单实现
    二、Exchange四种类型简单介绍
    三、消息确认,交换机、队列及消息持久化
    一、简介及简单实现
    RabbitMQ是一个消息代理:它接受并转发消息。你可以把它当成一个邮局:当你想邮寄信件的时候,你会把信件放在投递箱中,并确信邮递员最终会将信件送到收件人的手里。在这个例子中,RabbitMQ就相当与投递箱、邮局和邮递员。
    RabbitMQ与邮局的区别在于:RabbitMQ并不处理纸质信件,而是接受、存储并转发二进制数据—消息。
    谈到RabbitMQ的消息,通常有几个术语:

    生产者:是指发送消息的程序
    队列:相当于RabbitMQ的投递箱。尽管消息在RabbitMQ和你的应用之间传递,但是消息仅仅会在队列之中存储。队列只能存储在内存或磁盘中,本质上是一个大的消息缓冲区。不同的生产者可以发送消息到同一个对队列,不同的消费者也可以从同一个队列中获取消息。
    消费者:等待接受消息的程序。


    简单实现
    生产者
    1、创建连接:ConnectionFactory、Connection、Channel
    2、指明队列:channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    queueDeclare(String queue, //队列名称
    boolean durable, //是否持久化
    boolean exclusive, //是否排外
    Map<String, Object> arguments);//其他信息(自动删除等)
    1
    2
    3
    4
    3、发送消息:channel.basicPublish()

    void basicPublish(String exchange,//交换机Exchange名称
    String routingKey,//路游键
    BasicProperties props,//其他属性
    byte[] body) throws IOException;//内容
    1
    2
    3
    4
    4、关闭连接: channel.close();
    connection.close();

    package first;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * 生产者
    * Created by GXR on 2019/3/9.
    */
    public class Send {

    private final static String QUEUE_NAME = "first";

    public static void main(String[] args) {
    try {
    //创建连接和通道
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    //通道指明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "helloworld";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println("[First]Send:" + message);
    //关闭连接
    channel.close();
    connection.close();
    } catch (IOException | TimeoutException e) {
    e.printStackTrace();
    }
    }

    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    消费者(接收者)
    1、创建连接:ConnectionFactory、Connection、Channel
    2、指明队列:channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    3、获取消息:Consumer
    4、消息确认:channel.basicConsume(QUEUE_NAME, true, consumer);//队列名称,是否自动确认,消费者

    package first;

    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * 消费者
    * Created by GXR on 2019/3/9.
    */
    public class Recive {

    private final static String QUEUE_NAME = "first";

    public static void main(String[] args) {
    try {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    //指明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //获取消息
    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
    throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [First] Received :" + message);
    }
    };
    channel.basicConsume(QUEUE_NAME, false, consumer);
    } catch (IOException | TimeoutException e) {
    e.printStackTrace();
    }
    }

    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    Exchange四种类型简单介绍
    Exchange与队列绑定
    channel.exchangeDeclare(Exchange_name,ExchangeType);//exchange名称及类型
    1
    在之前的介绍里,我们都是直接往队列里发送消息,然后又直接从队列里取出消息。
    RabbitMQ的消息模型中的一个核心思想是,生产者绝不会将消息直接发送到队列中,实际上,在大部分场景中生产者根本不知道消息会发送到哪些队列中。
    相反,生产者只会将消息发送给一个Exchange(路由器/交换器)。Exchange其实很简单,它所做的就是,接收生产者发来的消息,并将这些消息推送到队列中。Exchange必须清楚地知道怎么处理接收到的消息:是将消息放到一个特定的队列中,还是放到多个队列中,还是直接将消息丢弃。下图示意了Exchange在消息模型中的位置:

    Exchange一共有四种类型:direct、topic、headers 和fanout。
    效率:fanout > direct > topic

    Direct Exchange:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果相等,则发送到该Binding对应的Queue中。
    Topic Exchange:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行对比,如果匹配上了,则发送到该Binding对应的Queue中。
    Fanout Exchange:直接将消息转发到所有binding的对应queue中,这种exchange在路由转发的时候,忽略Routing key。
    Headers Exchange:将消息中的headers与该Exchange相关联的所有Binging中的参数进行匹配,如果匹配上了,则发送到该Binding对应的Queue中。
    direct、fanout类型的Exchange易于理解不做过多介绍。着重介绍topic类型的Exchange类型。

    topic类型Exchange(路由器)routingKey匹配规则
    *匹配一个单词
    #匹配0个或多个字符
    *,# 只能写在.号左右,且不能挨着字符
    单词和单词之间需要用.隔开

    对于上图的例子,我们将会发送描述动物的消息。这些消息将会以由三个单词组成的路由键发送。路由键中的第一个单词描述了速度,第二个描述了颜色,第三个描述了物种:..。
    我们创建了三个绑定,Q1的绑定键为*.orange.,Q2的绑定键有两个,分别是.*.rabbit和lazy.#。
    上述绑定关系可以描述为:
    ①Q1关注所有颜色为orange的动物。
    ②Q2关注所有的rabbit,以及所有的lazy的动物。
    如果一个消息的路由键是quick.orange.rabbit,那么Q1和Q2都可以接收到,路由键是lazy.orange.elephant的消息同样如此。但是,路由键是quick.orange.fox的消息只会到达Q1,路由键是lazy.brown.fox的消息只会到达Q2。注意,路由键为lazy.pink.rabbit的消息只会到达Q2一次,尽管它匹配了两个绑定键。路由键为quick.brown.fox的消息因为不和任意的绑定键匹配,所以将会被丢弃。
    假如我们不按常理出牌:发送一个路由键只有一个单词或者四个单词的消息,像orange或者quick.orange.male.rabbit,这样的话,这些消息因为不和任意绑定键匹配,都将会丢弃。但是,lazy.orange.male.rabbit消息因为和lazy.#匹配,所以会到达Q2,尽管它包含四个单词。

    消息确认、持久化
    消息确认
    为了确保消息永远不会丢失,RabbitMQ支持消息确认。消费者将会发送一个确认信息来告诉RabbitMQ,我已经接收到了消息,并且处理完了,你可以随便删它了。
    如果一个消费者在发送确认信息前死去(连接或通道关闭、TCP连接丢失等),RabbitMQ将会认为该消息没有被完全处理并会重新将消息加入队列。如果此时有其他的消费者,RabbitMQ很快就会重新发送该消息到其他的消费者。通过这种方式,你完全可以保证没有消息丢失,即使某个消费者意外死亡。
    对RabbitMQ而言,没有消息超时这一说。如果消费者死去,RabbitMQ将会重新发送消息。即使处理一个消息需要耗时很久很久也没有关系。

    package secound;

    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * 消费者
    * Created by GXR on 2019/3/9.
    */
    public class Recive2 {

    private final static String QUEUE_NAME = "secound";

    public static void main(String[] args) {
    try {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    //指明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //最大接收数
    channel.basicQos(1);
    //获取消息
    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
    throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [Secound] Received2 :" + message);
    }
    };
    boolean autoAck = true;//是否自动确定
    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    } catch (IOException | TimeoutException e) {
    e.printStackTrace();
    }
    }

    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    queue的持久化
    queue的持久化是通过durable=true来实现的。

    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("queueName", durable, false, false, null);
    1
    2
    3
    queueDeclare完整定义

    /**
    * Declare a queue
    * @see com.rabbitmq.client.AMQP.Queue.Declare
    * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
    * @param queue the name of the queue
    * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
    * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
    * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
    * @param arguments other properties (construction arguments) for the queue
    * @return a declaration-confirm method to indicate the queue was successfully declared
    * @throws java.io.IOException if an error is encountered
    */
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
    Map<String, Object> arguments) throws IOException;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    queue:queue的名称

    exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。

    autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
    1
    2
    3
    4
    5
    消息的持久化
    如过将queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,也会存在,因为服务会把持久化的queue存放在硬盘上,当服务重启的时候,会重新什么之前被持久化的queue。队列是可以被持久化,但是里面的消息是否为持久化那还要看消息的持久化设置。也就是说,重启之前那个queue里面还没有发出去的消息的话,重启之后那队列里面是不是还存在原来的消息,这个就要取决于发生着在发送消息时对消息的设置了。
    如果要在重启后保持消息的持久化必须设置消息是持久化的标识。

    设置消息的持久化:

    channel.basicPublish("exchangeName", routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, "message".getBytes());
    1
    这里的关键是:MessageProperties.PERSISTENT_TEXT_PLAIN
    首先看一下basicPublish的方法:

    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
    void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
    throws IOException;
    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
    throws IOException;
    1
    2
    3
    4
    5
    exchange表示exchange的名称
    routingKey表示routingKey的名称
    body代表发送的消息体
    这里关键的是BasicProperties props这个参数了,这里看下BasicProperties的定义:

    public BasicProperties(
    String contentType,//消息类型如:text/plain
    String contentEncoding,//编码
    Map<String,Object> headers,
    Integer deliveryMode,//1:nonpersistent 2:persistent
    Integer priority,//优先级
    String correlationId,
    String replyTo,//反馈队列
    String expiration,//expiration到期时间
    String messageId,
    Date timestamp,
    String type,
    String userId,
    String appId,
    String clusterId)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    这里的deliveryMode=1代表不持久化,deliveryMode=2代表持久化。
    exchange的持久化
    上面阐述了队列的持久化和消息的持久化,如果不设置exchange的持久化对消息的可靠性来说没有什么影响,但是同样如果exchange不设置持久化,那么当broker服务重启之后,exchange将不复存在,那么既而发送方rabbitmq producer就无法正常发送消息。这里博主建议,同样设置exchange的持久化。exchange的持久化设置也特别简单,一般只需要:

    channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);
    //即在声明的时候讲durable字段设置为true即可。
    1
    2

    ————————————————
    版权声明:本文为CSDN博主「做个好人好吗」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/China_110/article/details/88324671

  • 相关阅读:
    mysql-day06
    C语言 输出二进制数
    Python学习笔记(一)
    数组指针与指针数组
    重装系统--小白版
    Java 面对对象阶段练手项目【飞机大战】
    Java环境的配置
    在Linux环境下运行C语言程序
    Torrent文件
    ubuntu下载速度慢的解决办法--修改下载源
  • 原文地址:https://www.cnblogs.com/ExMan/p/11772022.html
Copyright © 2011-2022 走看看