zoukankan      html  css  js  c++  java
  • 消息中间件-RabbitMq(搭建&消息分发)

    消息中间件-RabbitMq(搭建&消息分发)

    RabbitMq】是一个【AMQP】协议的实现。服务端使用的是Erlang语言进行编写,那也就是说,我们要运行它,就要安装相关Erlang环境。前面说了AMQP最初是为了解决金融行业的可用性问题,所以Rabbit在高可用方面表现不俗,并且在我看来他是这几种中间件中最容易上手的一个。而且它在并发方面表现十分出色,可以实现大概10w的吞吐量。他的特点是:【可靠性、消息集群、高可用、插件机制(可以让它支持别的协议)、支持多语言客户端、管理页面 so on本篇主要聊聊如何安装、使用、以及关于他的一些名词方面的阐述。run。。

    安装运行

    • 我的环境是CentOS7
    • http://www.rabbitmq.com/which-erlang.html 页面查看安装rabbitmq需要安装erlang对应的版本,前面是Rabbit的版本,后面是Erlang的对它支持的版本。这里前后要对应下载,版本必须符合他的要求,我这里使用的就是第一个。
    • https://github.com/rabbitmq/erlang-rpm/releases 中复制对应的版本erlang下载地址
    • https://github.com/rabbitmq/rabbitmq-server/tags 中复制对应的版本rabbitmq的下载地址

    • 下载Erlang
      • wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v23.3.4.3/erlang-23.3.4.3-1.el7.x86_64.rpm
    • 安装Erlang
      • sudo rpm -Uvh /home/download/erlang-23.3.4.3-1.el7.x86_64.rpm
    • 安装socat
      • sudo yum install -y socat
    • 下载RabbitMQ
      •  wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.15/rabbitmq-server-3.8.15-1.el7.noarch.rpm
    • 安装RabbitMQ
      • 【sudo rpm -Uvh /home/download/rabbitmq-server-3.8.15-1.el7.noarch.rpm】

    到目前为止我们的准备工作完毕,以下是一些启动和关闭命令。

    停止服务】:sudo systemctl stop rabbitmq-server 【查询状态】:sudo systemctl status rabbitmq-server 【启动】:sudo systemctl start rabbitmq-server 【设置开启自启】:sudo systemctl enable rabbitmq-server 

    使用启动命令启动后,我们查询状态发现状态为 dead,这是因为我们要启动他的插件 使用【rabbitmq-plugins list】可以查询所有他支持的插件,我们这里需要启动

    【rabbitmq-plugins  enable rabbitmq_management】

     执行完成后使用【 cat /etc/rabbitmq/enabled_plugins】就可以知道是否启动插件成功,然后再次启动发现启动状态就为running,使用【netstat -nplt | grep 15672 】发现他的专用端口已经开启,至此,安装启动完毕。这个时候就可以对它进行访问了(你的ip:15672),出现下面的图,就证明搭建成功。这里注意开放一下端口,否则别的机器无法访问:

    • sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
    • sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
    • sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
    • sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent 

     然而我们用他自己的gust是无法login in 进去的,因为这个支持在搭建的服务器本身上访问,那我们就要创建自己的用户,并且赋予相应的权限。

    • 【添加一个admin用户】:rabbitmqctl add_user admin admin 
    • 【分配操作权限】:rabbitmqctl set_user_tags admin administrator
    • 【分配资源权限】:rabbitmqctl set_permissions -p / admin ".*" ".*" ".*

    使用admin进行登录,至此,可以rabbitmq可以正常使用

    使用

    添加相关依赖

    <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
       <version>5.5.1</version>
     </dependency>

    生产一个消息

    public class Producer {
    
        public static void main(String[] args) {
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 2、设置连接属性
            factory.setHost("你的ip");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
    
            Connection connection = null;
            Channel channel = null;
    
            try {
                // 3、从连接工厂获取连接
                connection = factory.newConnection();
    
                // 4、从链接中创建通道
                channel = connection.createChannel();
    
                /**
                 * 5、声明(创建)队列
                 * 如果队列不存在,才会创建
                 * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
                 *
                 * queueDeclare参数说明:
                 * @param queue 队列名称
                 * @param durable 队列是否持久化
                 * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制
                 * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
                 * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
                 */
                channel.queueDeclare("queue1", false, false, false, null);
                // 消息内容
                String message = "Hello World!";
                // 6、发送消息
                channel.basicPublish("", "queue1", null, message.getBytes());
                System.out.println("消息已发送!");
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 7、关闭通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
                // 8、关闭连接
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    View Code

    生产后你去管理页面查询,会发现一个消息还未读取。

    消费一个消息(消费后再次查询,发现ready中没有东西了)

    public class Consumer {
    
        public static void main(String[] args) {
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 2、设置连接属性
            factory.setHost("你的ip");
            factory.setUsername("admin");
            factory.setPassword("admin");
    
            Connection connection = null;
            Channel channel = null;
    
            try {
                // 3、从连接工厂获取连接
                connection = factory.newConnection("消费者");
    
                // 4、从链接中创建通道
                channel = connection.createChannel();
    
                channel.queueDeclare("queue1", false, false, false, null);
    
                // 6、定义收到消息后的回调
                DeliverCallback callback = new DeliverCallback() {
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
                    }
                };
                // 7、监听队列
                channel.basicConsume("queue1", true, callback, new CancelCallback() {
                    public void handle(String consumerTag) throws IOException {
                    }
                });
    
                System.out.println("开始接收消息");
                System.in.read();
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 8、关闭通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                // 9、关闭连接
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    View Code

    至此,简单使用结束。

    使用客户端:这里首先创建两个队列,然后在交换机上模拟发送消息,以topic类型的交换机为例,他会进行routing key的匹配,在发送消息的时候,把你的routing key 携带,即可匹配。

     一个topic类型的交换机的例子

    /**
     * Topic--生产者
     *
     * 生产者将消息发送到topic类型的交换器上,和routing的用法类似,都是通过routingKey路由,但topic类型交换器的routingKey支持通配符
     */
    public class Producer {
    
        public static void main(String[] args) {
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 2、设置连接属性
            factory.setHost("你的ip");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
    
            Connection connection = null;
            Channel channel = null;
    
            try {
                // 3、从连接工厂获取连接
                connection = factory.newConnection("生产者");
    
                // 4、从链接中创建通道
                channel = connection.createChannel();
    
                // 路由关系如下:com.# --> queue-1     *.order.* ---> queue-2
                // 消息内容
                String message = "Hello A";
                // 发送消息到topic_test交换器上
                channel.basicPublish("topic-exchange", "com.order.create", null, message.getBytes());
                System.out.println("消息 " + message + " 已发送!");
    
                // 消息内容
                message = "Hello B";
                // 发送消息到topic_test交换器上
                channel.basicPublish("topic-exchange", "com.sms.create", null, message.getBytes());
                System.out.println("消息 " + message + " 已发送!");
    
                // 消息内容
                message = "Hello C";
                // 发送消息到topic_test交换器上
                channel.basicPublish("topic-exchange", "cn.order.create", null, message.getBytes());
                System.out.println("消息 " + message + " 已发送!");
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 7、关闭通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                // 8、关闭连接
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    /**
     * 路由--消费者
     *
     * 消费者通过一个临时队列和交换器绑定,接收发送到交换器上的消息
     */
    public class Consumer {
    
        private static Runnable receive = () -> {
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 2、设置连接属性
            factory.setHost("你的ip");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
    
            Connection connection = null;
            Channel channel = null;
            final String queueName = Thread.currentThread().getName();
    
            try {
                // 3、从连接工厂获取连接
                connection = factory.newConnection("消费者");
    
                // 4、从链接中创建通道
                channel = connection.createChannel();
                // 定义消息接收回调对象
                DeliverCallback callback = new DeliverCallback() {
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println(queueName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
                    }
                };
                // 监听队列
                channel.basicConsume(queueName, true, callback, new CancelCallback() {
                    public void handle(String consumerTag) throws IOException {
                    }
                });
    
                System.out.println(queueName + " 开始接收消息");
                System.in.read();
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 8、关闭通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                // 9、关闭连接
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
    
        public static void main(String[] args) {
            new Thread(receive, "queue1").start();
            new Thread(receive, "queue-2").start();
        }
    
    }
    View Code

     

    Rabbit名词介绍

    Blocker:一个rabbit服务器就是一个Blocker

    虚拟主机(virtual host一个Blocker中可以有多个虚拟机,每个虚拟机类似于一个工作空间,每个虚拟主机中的消息和其他虚拟主机的消息不相互影响

    connection:消费者和rabbit中间的连接,有了这个连接,双方才能通信。

    RoutingKey:消息被发给交换机的时候,会携带它,这个是用来指定消息的路由规则(可以为空)

    channel(信道):是在connection上建立的管道,一个connection上可以建立多个channel,消息通过他们进行传递。

    BindingKey:Exchange和Queue绑定的关系,Exchange接收到的消息会带有RoutingKey这个字段。

    交换机(exchanger):当rabbit接收到消息后,交换机对这些消息进行转换,他的类型决定哪个队列中应该拥有这些消息,

    交换机类型:

    • 【direct】:当发送消息的时候,我们会在消息体上携带一个路由键【routekey】,如果消息体上你的路由键和队列匹配则发送给对应的队列。
    • 【fanout 】:发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
    • 【headers】:根据发送的消息内容中的【headers】属性进行匹配,当消息发送到RabbitMQ时会取到该消息的【headers】与Exchange绑定时指定的键值对进行匹配,如果匹配到,则对应队列可以接受到消息。
    • 【topic】:将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”会匹配一个或多个词,比如【ok.#】--》【ok.1.1 or ok.1.1.2 so on】,只要队列可以匹配到,就可以接受消息

    队列(queue):rabbit接收到的信息存储在这里,消费者也是从这里获取的消息。

    binder: 队列和交换机之间的绑定

    AMQP(advanced message queuing protocol):

    他是应用层协议的一个开放标准,为面向消息的中间件协议。他分为三层:

    【底层协议层】:主要传输二进制数据流,

    【中间层】:将客户端的命令转发给服务器,然后将服务器的回复转给客户端。【将最高层的用户层传递的信息转化为二进制,传递给底层。把底层的信息转化为客户端可以知道的语言。】

    【最高层】:提供用户调用的命令。

    流转流程

    生产者:建立连接->开启通道->发送消息->关闭资源

    消费者:建立连接->开启通道->接受消息->发送确认消息(告诉rabbit,rabbit修改消息状态为已经读 and so on)->释放资源

  • 相关阅读:
    面试题
    网络编程
    python_控制台输出带颜色的文字方法
    httpie 101
    JSON Web Signature 规范解析
    Kong 系列 -- Kong 101
    关于过渡机制的一点理解
    XAML概览 1(译自JeremyBytes.com)
    awk与sed简明教程
    Connection failed: NT_STATUS_ACCOUNT_RESTRICTION
  • 原文地址:https://www.cnblogs.com/UpGx/p/14978892.html
Copyright © 2011-2022 走看看