zoukankan      html  css  js  c++  java
  • rabbitmq

    RabbitMQ  是一个在AMQP协议标准基础上完整的,可服用的企业消息系统。它遵循Mozilla Public License开源协议,采用 Erlang 实现的工业级的消息队列(MQ)服务器。

    一、应用场景

    异步处理

    应用解耦

    流量消峰

    日志收集

    二、支持多种语言

    比如Js广告收集,大量的广告收集  会用到消息队列

    消息中间件--》流实时计算

    消息中间件--》离线计算

    三、java操作rabbitmq

    1 simple 简单队列

    2 work queues 工作队列、公平分发、轮询分发

    3 publish/subscribe 发布订阅

    4 routing 路由选择 通配符模式

    5 topics 主题

    6 手动和自动确认消息

    7 队列的持久化和非持久化

    8 rabbitmq 的延迟队列        场景  :未支付订单30分钟取消

    百度统计 cnzz架构

    rabbitmq AMQP协议

    添加用户

    创建库   以/开头

    virtual host

    对用户授权

    控制台 overview

    ProtocolNodeBound toPort
    amqp rabbit1@mqnode1 :: 30007
    amqp rabbit2@mqnode2 :: 30007
    amqp rabbit3@mqnode3 :: 30007
    clustering rabbit1@mqnode1 :: 50001
    clustering rabbit2@mqnode2 :: 50001
    clustering rabbit3@mqnode3 :: 50001
    http rabbit1@mqnode1 :: 30008
    http rabbit2@mqnode2 :: 30008
    http rabbit3@mqnode3 :: 30008

    amqp 开发所有接口,qmqp协议

    clustering 集群端口

    http  http访问端口

    connections 当前有哪些连接

    chaannels 当前有那些频道

    exchanges 交换机

    队列

    简单队列

    下载安装

    RabbitMQ 

    https://www.rabbitmq.com/install-rpm.html#downloads

    https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.17

    erlang

    https://www.erlang-solutions.com/resources/download.html

    教程

    https://blog.csdn.net/u013887008/article/details/100859070

    安装erlang

    rpm -ivh esl-erlang_22.0.7-1~centos~7_amd64.rpm

    如果报错如下:
        警告:esl-erlang_22.0-1_centos_7_amd64.rpm: 头V4 RSA/SHA256 Signature, 密钥 ID a14f4fca: NOKEY
        
        则需要先执行下面命令,安装依赖,在执行安装的命令:
    1. sudo yum install epel-release
    2. sudo yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl

    3.执行下面命令进行验证:erl

    4.执行如下命令退出:halt().        "注意这个.点,不能丢"

    查看rpm 包安装到哪了

    rpm -ql XXX.rpm

    二、开始安装rabbitmq

    pm -ivh --prefix= /opt/temp  xxx.rpm

    1.下载3.7.15版本的rabbitmq,与Erlang版本要对应上

    wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.15/rabbitmq-server-generic-unix-3.7.15.tar.xz
    rpm -ivh rabbitmq-server-3.7.17-1.el7.noarch.rpm
    ## 可能需要先安装!!
    yum install socat
    

    启动rabbitmq

    1.开机启动:

    chkconfig rabbitmq-server on
    

    2.查看启动状态:

    rabbitmqctl status
    

    3.启动,关闭,重启:

    systemctl start rabbitmq-server.service
    systemctl stop rabbitmq-server.service
    systemctl restart rabbitmq-server.service
    

    启动web管理台

    rabbitmq-plugins enable rabbitmq_management
    

     

    访问: http://192.168.93.129:15672,默认用户:guest/guest,但登陆时显示User can only log in via localhost!!!

    解决方案

    找到文件/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/ebin/rabbit.app:

    {loopback_users, [<<"guest">>]},
    
    改为{loopback_users, []},
    

    然后重启服务即可:

    systemctl restart rabbitmq-server.service

    简单消息队列

    java 调用mq,一对一   单个生产者,一个消息者

    <dependencies>
             <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.0.0</version>
            </dependency>
            
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-nop</artifactId>
                <version>1.7.25</version>
            </dependency>
             <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.25</version>
                <scope>test</scope>
            </dependency>
        </dependencies>

    生产者

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class ProducterDirectDemo {
        
        private static String queneName = "testQuene";
        
        
        public static Connection getConnection() {
            Connection connection = null;
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("192.168.80.110");
                factory.setPort(5672);
                factory.setUsername("guest");
                factory.setPassword("guest");
                factory.setVirtualHost("/");
                // 创建与RabbitMQ服务器的TCP连接
                connection = factory.newConnection();
            } catch (Exception ex) {
                ex.printStackTrace();
            } 
            
            return connection;
        }
        
        
        public static void main(String[] args) throws IOException, TimeoutException {
    
            Connection connection = getConnection();
            Channel channel = null;
            try {
                // 创建一个频道
                channel = connection.createChannel();
                // 声明默认的队列
                channel.queueDeclare(queneName, false, false, false, null);
                String msg = "发送消息";
                channel.basicPublish("", queneName, null, msg.getBytes());
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }
    }

    消费者

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class Consumer {
        private final static String QUEUE_NAME = "testQuene"; //队列名称
    
        public static void receive() throws IOException, TimeoutException {
            //由连接工厂创建连接
            Connection connection = ProducterDirectDemo.getConnection();
            //通过连接创建信道
            Channel channel = connection.createChannel();
            //创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替
            DefaultConsumer consumer = new DefaultConsumer(channel){
                //监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写
                @Override
                public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    java.lang.String msg = new java.lang.String(body);
                    System.out.println("received msg: " + msg);
                }
            };
    
            //监听指定的queue。会一直监听。
            //参数:要监听的queue、是否自动确认消息、使用的Consumer
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
        
        
        public static void main(String[] args) throws IOException, TimeoutException {
            receive();
        }
        
    }

    2、work queues工作队列

    一个生产者多个消费者

    多个客户端会均分,不管哪个处理的快哪个处理的慢

    采用轮询的机制

    采用公平分发,要关闭自动应答

    3、autoAck      自动确认后从内存中删除

    4、如果强制杀死正在执行的消费者就会丢失消息

    如果不想丢失消息,要设置成手动

    5、发布订阅模式

    生产者把消息发送到交换机(exchange),交换机把消息发送到消息队列,一个队列对应一个消费者

    路由规则

    往交换机中发送消息
    如果没有提前将队列绑定到交换机,那么直接运行生产者的话,消息是不会发到任何队列里的

    6、topic 主题模式

    将路由键和某模式匹配

    # 匹配一个或多个

    *  匹配一个

    Good.#

    7.rabbitmq 消息确认机制(事务)

    生产有没有把消息发送到消息队列,默认是不知道的

    两种方式

    amqp 实现了事务机制

    txSelect  用户将当前chanel 设置成transation

    txCommit  提交事务

    txRollback 回滚事务

    降低了消息的吞吐量

    confirm 模式

    生产者端confirm 实现原理

    confirm 异步模式

    开启confirm 模式

    发一条 waitforconfim 单条效率偏低

    发一批 waitforconfim

    异步模式

  • 相关阅读:
    新一代MQ apache pulsar的架构与核心概念
    Flutter使用fluwx实现微信分享
    BZOJ3622 已经没有什么好害怕的了 动态规划 容斥原理 组合数学
    NOIP2016提高组Day1T2 天天爱跑步 树链剖分 LCA 倍增 差分
    Codeforces 555C Case of Chocolate 其他
    NOIP2017提高组Day2T3 列队 洛谷P3960 线段树
    NOIP2017提高组Day2T2 宝藏 洛谷P3959 状压dp
    NOIP2017提高组Day1T3 逛公园 洛谷P3953 Tarjan 强连通缩点 SPFA 动态规划 最短路 拓扑序
    Codeforces 873F Forbidden Indices 字符串 SAM/(SA+单调栈)
    Codeforces 873E Awards For Contestants ST表
  • 原文地址:https://www.cnblogs.com/jentary/p/13579603.html
Copyright © 2011-2022 走看看