zoukankan      html  css  js  c++  java
  • rabbitmq简介

    安装RabbitMQ

    • RabbitMQ是用Erlang开发的,所以需要先安装Erlang环境,在这里下载对应系统的Erlang安装包进行安装
    • 点击这里下载对应平台的RabbitMQ安装包进行安装

    编写生产者和消费者

    • Spring对RabbitMQ已经进行了封装,正常使用中,会使用Spring集成,第一个项目中,我们先不考虑那么多

      在IDE中新建一个Maven项目,并在pom.xml中贴入如下依赖,RabbitMQ的最新版本依赖可以在这里找到

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>4.1.0</version>
    </dependency>
    • 新建一个生产者
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class MessageProducer {
        
        private Logger logger = LoggerFactory.getLogger(MessageSender.class);
    
        //声明一个队列名字
        private final static String QUEUE_NAME = "hello";
        
        public boolean sendMessage(String message){
            //new一个RabbitMQ的连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置需要连接的RabbitMQ地址,这里指向本机
            factory.setHost("127.0.0.1");
            Connection connection = null;
            Channel channel = null;
            try {
                //尝试获取一个连接
                connection = factory.newConnection();
                //尝试创建一个channel
                channel = connection.createChannel();
                //队列名,是否持久化,是否是排他性队列,是否自动删除,其他参数
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //注意这里调用了getBytes(),发送的其实是byte数组,接收方收到消息后,需要重新组装成String
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                logger.info("Sent '" + message + "'");
                //关闭channel和连接
                channel.close();
                connection.close();
            } catch (IOException | TimeoutException e) {
                //失败后记录日志,返回false,代表发送失败
                logger.error("send message faild!",e);
                return false;
            }
            return true;
        }
    }
    • 新建一个消费者
    package com.liyang.ticktock.rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class MessageConsumer {
        
        private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
        
        public boolean consume(String queueName){
            //连接RabbitMQ
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            Connection connection = null;
            Channel channel = null;
            try {
                connection = factory.newConnection();
                channel = connection.createChannel();
                //这里声明queue是为了取消息的时候,queue肯定会存在
                //注意,queueDeclare是幂等的,也就是说,消费者和生产者,不论谁先声明,都只会有一个queue
                channel.queueDeclare(queueName, false, false, false, null);
                
                //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String
                Consumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                              throws IOException {
                            String message = new String(body, "UTF-8");
                            logger.info("Received '" + message + "'");
                    }
                };
                //上面是声明消费者,这里用声明的消费者消费掉队列中的消息
                channel.basicConsume(queueName, true, consumer);
                
                //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
               
            } catch (IOException | TimeoutException e) {
                //失败后记录日志,返回false,代表消费失败
                logger.error("send message faild!",e);
                return false;
            }
            
            
            return true;
        }
    }
    • 防止客户端丢失信息,RabbitMQ引入了消息确认机制,当消息处理完成后,给Server端发送一个确认消息,来告诉服务端可以删除该消息了,如果连接断开的时候,Server端没有收到消费者发出的确认信息,则会把消息转发给其他保持在线的消费者。
    • 注;RabbitMQ只有在收到消费者确认后,才会从内存中删除消息,如果消费者忘了确认(更多情况是因为代码问题没有执行到确认的代码),将会导致内存泄漏。
    • 现在,消费者宕机已经无法影响到我们的消息了,但如果RabbitMQ重启了,消息依然会丢失。所幸的是,RabbitMQ提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ,消息也不会丢失。但是,仍然有一个非常短暂的时间窗口(RabbitMQ收到消息还没来得及存到硬盘上)会导致消息丢失,如果需要严格的控制,可以参考官方文档
    • 在RabbitMQ中,生产者不会直接把消息发送给队列,实际上,生产者甚至不知道一条消息会不会被发送到队列上。生产者会把消息发送给RabbitMQ的交换中心(Exchange),Exchange的一侧是生产者,另一侧则是一个或多个队列,由Exchange决定一条消息的生命周期--发送给某些队列,或者直接丢弃掉。
    • RabbitMQ中,有4种类型的Exchange

      • direct    通过消息的routing key比较queue的key,相等则发给该queue,常用于相同应用多实例之间的任务分发
        • 默认类型   本身是一个direct类型的exchange,routing key自动设置为queue name。注意,direct不等于默认类型,默认类型是在queue没有指定exchange时的默认处理方式,发消息时,exchange字段也要相应的填成空字符串“”
      • topic    话题,通过可配置的规则分发给绑定在该exchange上的队列,通过地理位置推送等场景适用
      • headers    当分发规则很复杂,用routing key不好表达时适用,忽略routing key,用header取代之,header可以为非字符串,例如Integer或者String
      • fanout    分发给所有绑定到该exchange上的队列,忽略routing key,适用于MMO游戏、广播、群聊等场景
    • 在RabbitMQ中,消息是发送到Exchange的,不是直接发送的Queue。因此,需要把Queue和Exchange进行绑定,告诉RabbitMQ把指定的Exchange上的消息发送的这个队列上来
    • topic exchange对routingKey是有要求的,必须是一个关键字的列表才能发挥正常作用,用“.”分割每个关键字,你可以定义任意的层级,唯一的限制是最大长度为255字节。

      opic与direct的重要区别就是,它有两个关键字

      1. “*”星号,代表一个词,比如上述规则:*.error 表示所有系统的error级别的日志
      2. “#”井号,代表零个或多个词,比如上述规则: *.# 表示所有系统的所有消息,与单独一个#是等效的,core.# 表示核心系统的所有日志,它和 core.* 的区别是,即使以后规则改为 <系统>.<日志级别>.<其他条件>.<其他条件>.……,core.# 依然可以完成匹配,而 core.* 则无法匹配 core.info.xxx.xxx

    转载自:http://www.cnblogs.com/4----/p/6518801.html

  • 相关阅读:
    前端 JS+CSS
    Git 命令行操作
    信息安全 学习笔记(2)——防火墙(Netfilter/ IPtables)
    信息安全 学习笔记(3)—— 后门(Backdoor+rootkit)
    Linux课程学习总结报告
    信息安全 学习笔记(4)—— 【问答题 复习纲要】
    结合中断上下文切换和进程上下文切换分析Linux内核的一般执行过程
    centos安装和卸载软件
    zookeeper错误KeeperErrorCode = ConnectionLoss解决
    查看linux信息
  • 原文地址:https://www.cnblogs.com/mingyao123/p/10072529.html
Copyright © 2011-2022 走看看