zoukankan      html  css  js  c++  java
  • rabbitmq

    RabbitMQ  是一个在AMQP协议标准基础上完整的,可服用的企业消息系统。它遵循Mozilla Public License开源协议,采用 Erlang 实现的工业级的消息队列(MQ)服务器。

    一、应用场景

    异步处理

    应用解耦

    流量消峰

    日志收集

    二、支持多种语言

    比如Js广告收集,大量的广告收集  会用到消息队列

    消息中间件--》流实时计算

    消息中间件--》离线计算

    三、java操作rabbitmq

    1 simple 简单队列

    2 work queues 工作队列、公平分发、轮询分发

    3 publish/subscribe 发布订阅

    4 routing 路由选择 通配符模式

    5 topics 主题

    6 手动和自动确认消息

    7 队列的持久化和非持久化

    8 rabbitmq 的延迟队列        场景  :未支付订单30分钟取消

    百度统计 cnzz架构

    rabbitmq AMQP协议

    添加用户

    创建库   以/开头

    virtual host

    对用户授权

    控制台 overview

    ProtocolNodeBound toPort
    amqp rabbit1@mqnode1 :: 30007
    amqp rabbit2@mqnode2 :: 30007
    amqp rabbit3@mqnode3 :: 30007
    clustering rabbit1@mqnode1 :: 50001
    clustering rabbit2@mqnode2 :: 50001
    clustering rabbit3@mqnode3 :: 50001
    http rabbit1@mqnode1 :: 30008
    http rabbit2@mqnode2 :: 30008
    http rabbit3@mqnode3 :: 30008

    amqp 开发所有接口,qmqp协议

    clustering 集群端口

    http  http访问端口

    connections 当前有哪些连接

    chaannels 当前有那些频道

    exchanges 交换机

    队列

    简单队列

    下载安装

    RabbitMQ 

    https://www.rabbitmq.com/install-rpm.html#downloads

    https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.17

    erlang

    https://www.erlang-solutions.com/resources/download.html

    教程

    https://blog.csdn.net/u013887008/article/details/100859070

    安装erlang

    rpm -ivh esl-erlang_22.0.7-1~centos~7_amd64.rpm

    如果报错如下:
        警告:esl-erlang_22.0-1_centos_7_amd64.rpm: 头V4 RSA/SHA256 Signature, 密钥 ID a14f4fca: NOKEY
        
        则需要先执行下面命令,安装依赖,在执行安装的命令:
    1. sudo yum install epel-release
    2. sudo yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl

    3.执行下面命令进行验证:erl

    4.执行如下命令退出:halt().        "注意这个.点,不能丢"

    查看rpm 包安装到哪了

    rpm -ql XXX.rpm

    二、开始安装rabbitmq

    pm -ivh --prefix= /opt/temp  xxx.rpm

    1.下载3.7.15版本的rabbitmq,与Erlang版本要对应上

    wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.15/rabbitmq-server-generic-unix-3.7.15.tar.xz
    rpm -ivh rabbitmq-server-3.7.17-1.el7.noarch.rpm
    ## 可能需要先安装!!
    yum install socat
    

    启动rabbitmq

    1.开机启动:

    chkconfig rabbitmq-server on
    

    2.查看启动状态:

    rabbitmqctl status
    

    3.启动,关闭,重启:

    systemctl start rabbitmq-server.service
    systemctl stop rabbitmq-server.service
    systemctl restart rabbitmq-server.service
    

    启动web管理台

    rabbitmq-plugins enable rabbitmq_management
    

     

    访问: http://192.168.93.129:15672,默认用户:guest/guest,但登陆时显示User can only log in via localhost!!!

    解决方案

    找到文件/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/ebin/rabbit.app:

    {loopback_users, [<<"guest">>]},
    
    改为{loopback_users, []},
    

    然后重启服务即可:

    systemctl restart rabbitmq-server.service

    简单消息队列

    java 调用mq,一对一   单个生产者,一个消息者

    <dependencies>
             <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.0.0</version>
            </dependency>
            
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-nop</artifactId>
                <version>1.7.25</version>
            </dependency>
             <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.25</version>
                <scope>test</scope>
            </dependency>
        </dependencies>

    生产者

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class ProducterDirectDemo {
        
        private static String queneName = "testQuene";
        
        
        public static Connection getConnection() {
            Connection connection = null;
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("192.168.80.110");
                factory.setPort(5672);
                factory.setUsername("guest");
                factory.setPassword("guest");
                factory.setVirtualHost("/");
                // 创建与RabbitMQ服务器的TCP连接
                connection = factory.newConnection();
            } catch (Exception ex) {
                ex.printStackTrace();
            } 
            
            return connection;
        }
        
        
        public static void main(String[] args) throws IOException, TimeoutException {
    
            Connection connection = getConnection();
            Channel channel = null;
            try {
                // 创建一个频道
                channel = connection.createChannel();
                // 声明默认的队列
                channel.queueDeclare(queneName, false, false, false, null);
                String msg = "发送消息";
                channel.basicPublish("", queneName, null, msg.getBytes());
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }
    }

    消费者

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class Consumer {
        private final static String QUEUE_NAME = "testQuene"; //队列名称
    
        public static void receive() throws IOException, TimeoutException {
            //由连接工厂创建连接
            Connection connection = ProducterDirectDemo.getConnection();
            //通过连接创建信道
            Channel channel = connection.createChannel();
            //创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替
            DefaultConsumer consumer = new DefaultConsumer(channel){
                //监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写
                @Override
                public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    java.lang.String msg = new java.lang.String(body);
                    System.out.println("received msg: " + msg);
                }
            };
    
            //监听指定的queue。会一直监听。
            //参数:要监听的queue、是否自动确认消息、使用的Consumer
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
        
        
        public static void main(String[] args) throws IOException, TimeoutException {
            receive();
        }
        
    }

    2、work queues工作队列

    一个生产者多个消费者

    多个客户端会均分,不管哪个处理的快哪个处理的慢

    采用轮询的机制

    采用公平分发,要关闭自动应答

    3、autoAck      自动确认后从内存中删除

    4、如果强制杀死正在执行的消费者就会丢失消息

    如果不想丢失消息,要设置成手动

    5、发布订阅模式

    生产者把消息发送到交换机(exchange),交换机把消息发送到消息队列,一个队列对应一个消费者

    路由规则

    往交换机中发送消息
    如果没有提前将队列绑定到交换机,那么直接运行生产者的话,消息是不会发到任何队列里的

    6、topic 主题模式

    将路由键和某模式匹配

    # 匹配一个或多个

    *  匹配一个

    Good.#

    7.rabbitmq 消息确认机制(事务)

    生产有没有把消息发送到消息队列,默认是不知道的

    两种方式

    amqp 实现了事务机制

    txSelect  用户将当前chanel 设置成transation

    txCommit  提交事务

    txRollback 回滚事务

    降低了消息的吞吐量

    confirm 模式

    生产者端confirm 实现原理

    confirm 异步模式

    开启confirm 模式

    发一条 waitforconfim 单条效率偏低

    发一批 waitforconfim

    异步模式

  • 相关阅读:
    Git 开发、合并、提交的一些常见命令语句
    selenium自动化过程中窗口句柄的问题
    git 忽略文件夹下面的文件,但是保留空文件夹
    python接口测试中不同的请求方式
    python接口测试中发送请求中的一些参数
    python接口测试中的session运用
    接口测试数据依赖处理
    pygame的安装
    Bottstrap的基本用法
    前端CSS
  • 原文地址:https://www.cnblogs.com/jentary/p/13579603.html
Copyright © 2011-2022 走看看