zoukankan      html  css  js  c++  java
  • 初识RabbitMq(二) 消息的创建以及接收

    1基础

    1.核心思想:接收并转发消息。你可以把它想象成一个邮局

    2.Proucer(pu rao ):生产者(消息发送者)

    3.Queue :邮箱,接收(一个或多个)proucer生产的消息,提供给一个或多个消费者消费。

    4.Consumer(抗苏门):消费者

    5.注意proucerqueue还有consumer可以分开在单独服务器上

    6.MQ(Message Queue):消息队列,队列(先进先出,有三个作用:入列、存储、出列)

    7.消息队列的特征:

      (1) 业务无关

      (2) FIFO(先进先出)

      (3) 容灾

      (4) 性能:吞吐量内部通信

    8.使用消息队列原因

      (1) 系统解耦:系统a(消费卷)产生消息 系统b(订单)/缓存模块接受消息。

        ① 如果是系统a去产生消息给bc(直接调用bc接口),比较耗时

        ② 不易维护

        ③ a产生消息给消息队列,然后bc去订阅消息

      (2) 异步调用(同步:需要等返回结果,然后在进行下一个。异步:同时进行多个,不用等结果返回)

      (3) 流量削峰:比如每天只有半个小时是流量高峰,现有的服务器数量不足以支撑。加服务器会浪费、不加这段时间搞不过去。所以使用消息队列,先将所有请求存储,然后按照服务器可接受范围吐出。只到流量高峰过去

    9.使用举例:【此段举例转载】

    以熟悉的电商场景为例,如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?这就需要消息队列登场了。

      消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。

    10.AMQP协议中间的几个重要概念:【此段转载】

    • Server:接收客户端的连接,实现AMQP实体服务。
    • Connection:连接,应用程序与Server的网络连接,TCP连接。
    • Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
    • Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。有Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
    • Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
    • Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
    • Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
    • RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
    • Queue:消息队列,用来保存消息,供消费者消费。

    2RabbitMq特点

    1. 开源、跨语言
    2. Erlang语言编写(性能好、延迟低)
    3. 应用广泛
    4. 社区活跃、api丰富

    3AMQP协议(Advanced Message Queuing Protocol)

    4RabbitMQ核心概念

    1. Server:服务
    2. connection:与Server建立连接
    3. channel:信道,几乎所有的操作都在信道上进行,客户端可以建立多个信道
    4. message:消息,由propertiesbody组成
    5. virtual host:虚拟主机,顶层隔离。同一个虚拟主机下,不能有重复的交换机和queue
    6. Exchange:交换机,接收生产者的消息的,然后根据指定的路由器去把消息转发到所绑定的队列上 u binding:绑定交换机和队列
    7. routing key:路由键,路由规则,虚拟机可以用它来确定这个消息如何进行一个路由
    8. queue:队列,消费者只需要监听队列来消费消息,不需要关注消息来自于哪个Exchange
    9. ExchangeMessage Queue存在着绑定的关系,一个Exchage可以绑定多个消息队列

    5消息流转过程

    6创建一个简单的发送接收例子

    1.1首先给账号授权

    1.2创建java项目,并且引入相应的maven

        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.10.0</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-nop -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-nop</artifactId>
                <version>1.7.30</version>
                <scope>test</scope>
            </dependency>
        </dependencies>

    1.3创建发送者

    package item.com;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     描述:
      的发送类,连接到RabbitMQ服务端, 然后发送一
     条消息,然后退出。
    */
    public class Send {
        private final  static String QUEUE_NAME="hello";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置 RabbitMQ 地址  todo setHost设置腾讯云服务器就不行,需要周末排查以下
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            //建立连接
            Connection connection = connectionFactory.newConnection();
            //获得信道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    //        channel.queueDeclare(QUEUE_NAME,是否需要持久(服务器重启是否还在),队列是否独有,是否需要自动删除,参数);
            //发布消息
            String message = "Hello World2233!";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
    //        channel.basicPublish(交换机,QUEUE_NAME,额外配置,message.getBytes("UTF-8"));
            System.out.println("消息发送"+message);
            //关闭
            channel.close();;
            connection.close();
        }
    }

    1.4创建接收者

    package item.com;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 接受消息
     */
    public class Recv {
        private final static String QUEUE_NAME = "hello";
    
        //接受消息不用关闭,一直打开接收
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置 RabbitMQ 地址  todo setHost设置腾讯云服务器就不行,需要周末排查以下
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            //建立连接
            Connection connection = connectionFactory.newConnection();
            //获得信道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //        channel.queueDeclare(QUEUE_NAME,是否需要持久(服务器重启是否还在),队列是否独有,是否需要自动删除,参数);
            //接受消息
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String s = new String(body, "UTF-8");
                    System.out.println("收到消息:"+s);
                }
            });
            //        channel.basicConsume(QUEUE_NAME,是否通知发送者已经签收-确认,处理消息)
        }
    }

    1.5查看运行

     

    7多消费者平均压力

    2.1多消费者

    消费者可以同时启动多个,比如Recv1、Recv2(代码完全相同)。这样就会平均的去处理消息(数量上平均)。但是不同的消息耗时不同。又导致不平均

    2.2平均消费者【下列红色代码部分】,这样就根据时间来而不是数量平均

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 接受消息
     */
    public class Recv3 {
        private final static String QUEUE_NAME = "hello";
    
        //接受消息不用关闭,一直打开接收
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置 RabbitMQ 地址  todo setHost设置腾讯云服务器就不行,需要周末排查以下
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            //建立连接
            Connection connection = connectionFactory.newConnection();
            //获得信道
            final Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //        channel.queueDeclare(QUEUE_NAME,是否需要持久(服务器重启是否还在),队列是否独有,是否需要自动删除,参数);
            //接受消息
            channel.basicQos(1);//处理完之前不接受下一个任务
            channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String s = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + s + "'");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
            //        channel.basicConsume(QUEUE_NAME,是否通知发送者已经签收-确认,处理消息)
        }
    
    }
  • 相关阅读:
    技术为辅,思维主导
    阶段性目标的设置
    非计算机专业测试之路
    第四章 Appium真机运行测试用例讲解
    第三章 Appium API介绍
    第二章 测试环境搭建(下)
    第二章 测试环境搭建(上)
    第一章 Appium简介
    测试人员的工作经验值钱吗
    2017 年该学习的编程语言、框架和工具
  • 原文地址:https://www.cnblogs.com/1439107348s/p/14449356.html
Copyright © 2011-2022 走看看