zoukankan      html  css  js  c++  java
  • RabbitMQ消息中间件入门

    第一章:RabbitMQ起步

    1.1 课程导航

    • RabbitMQ简介及AMQP协议
    • RabbitMQ安装与使用
    • RabbitMQ核心概念
    • 与SpringBoot整合
    • 保障100%的消息可靠性投递方案落地实现
    • 学习源码

    1.2 RabbitMQ简介

    初识RabbitMQ

    • RabbitMQ是一个开源的消息代理和队列服务器
    • 用来通过普通协议在完全不同的应用之间共享数据
    • RabbitMQ是使用Erlang语言来编写的
    • 并且RabbitMQ是基于AMQP协议的

    RabbitMQ简介

    • 目前很多互联网大厂都在使用RabbitMQ
    • RabbitMQ底层采用Erlang语言进行编写
    • 开源、性能优秀,稳定性保障
    • 与SpringAMQP完美的整合、API丰富
    • 集群模式丰富,表达式配置,HA模式,镜像队列模型
    • 保证数据不丢失的前提做到高可靠性、可用性
    • AMQP全称:Advanced Message Queuing Protocol
    • AMQP翻译:高级消息队列协议

    AMQP协议模型

    1.3 RabbitMQ安装

    0.安装准备
    官网地址:http://www.rabbitmq.com/
    安装Linux必要依赖包<Linux7>
    下载RabbitMQ安装包
    
    yum install 
    build-essential openssl openssl-devel unixODBC unixODBC-devel 
    make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
    
    
    1.下载:
    wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
    wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
    wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
    
    
    2.相关安装配置
    三个软件包的安装
    修改相关配置文件
     vim /etc/hostname
     vim /etc/hosts
     (Linux防火墙)
    
    
    3.修改RabbitMQ配置
    vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.5.7/ebin/rabbit.app
    比如修改密码、配置等等;例如:loopback_users中的<<"guest">>,只保留guest
    服务启动:rabbitmq-server start &
    默认进程号:29123
    默认端口号:5672
    服务停止:rabbitmqctl app_stop
    
    
    4.安装RabbitMQ web管理插件
    rabbitmq-plugins enable rabbitmq_management
    sudo systemctl restart rabbitmq-server
    访问管控台地址:http://192.168.11.81:15672/
    默认用户名密码:guest/guest

    1.4 RabbitMQ概念

    RabbitMQ的整体架构

    RabbitMQ核心概念

    • Server:又称Broker,接受客户端的连接,实现AMQP实体服务
    • Connection:连接,应用程序与Broker的网络连接
    • Channel:网络信道

    几乎所有的操作都在Channel中进行
    Channel是进行消息读写的通道
    客户端可建立多个Channel
    每个Channel代表一个会话任务

    • Message:消息

    服务器和应用程序之间传送的数据,由Properties和Body组成
    Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性
    Body则就是消息体内容

    • Virtual host:虚拟机

    用于进行逻辑隔离,最上层的消息路由
    一个Virtual host里面可以有若干个Exchange和Queue
    同一个Virtual host里面不能有相同名称的Exchange或Queue

    • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
    • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
    • Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息
    • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者

    RabbitMQ消息的流转过程

    第二章:RabbitMQ整合SpringBoot2.x

    2.1 发送消息Producer

    SpringBoot与RabbitMQ集成

    • 引入相关依赖
    • 对application.properties进行配置

    1、创建名为rabbitmq-producer的maven工程pom如下

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <parent>
     6         <artifactId>47-rabbitmq</artifactId>
     7         <groupId>com.myimooc</groupId>
     8         <version>1.0-SNAPSHOT</version>
     9     </parent>
    10     <modelVersion>4.0.0</modelVersion>
    11 
    12     <artifactId>rabbitmq-producer</artifactId>
    13 
    14     <properties>
    15         <spring.boot.version>2.0.4.RELEASE</spring.boot.version>
    16     </properties>
    17 
    18     <dependencyManagement>
    19         <dependencies>
    20             <dependency>
    21                 <groupId>org.springframework.boot</groupId>
    22                 <artifactId>spring-boot-parent</artifactId>
    23                 <version>${spring.boot.version}</version>
    24                 <type>pom</type>
    25                 <scope>import</scope>
    26             </dependency>
    27         </dependencies>
    28     </dependencyManagement>
    29 
    30     <dependencies>
    31         <dependency>
    32             <groupId>org.springframework.boot</groupId>
    33             <artifactId>spring-boot-starter</artifactId>
    34         </dependency>
    35 
    36         <!--RabbitMQ依赖-->
    37         <dependency>
    38             <groupId>org.springframework.boot</groupId>
    39             <artifactId>spring-boot-starter-amqp</artifactId>
    40         </dependency>
    41 
    42         <!--工具类依赖-->
    43         <dependency>
    44             <groupId>org.apache.commons</groupId>
    45             <artifactId>commons-lang3</artifactId>
    46         </dependency>
    47         <dependency>
    48             <groupId>commons-io</groupId>
    49             <artifactId>commons-io</artifactId>
    50             <version>2.5</version>
    51         </dependency>
    52         <dependency>
    53             <groupId>com.alibaba</groupId>
    54             <artifactId>fastjson</artifactId>
    55             <version>1.2.36</version>
    56         </dependency>
    57         <dependency>
    58             <groupId>javax.servlet</groupId>
    59             <artifactId>javax.servlet-api</artifactId>
    60             <scope>provided</scope>
    61         </dependency>
    62         <dependency>
    63             <groupId>org.slf4j</groupId>
    64             <artifactId>slf4j-api</artifactId>
    65         </dependency>
    66         <dependency>
    67             <groupId>log4j</groupId>
    68             <artifactId>log4j</artifactId>
    69             <version>1.2.17</version>
    70         </dependency>
    71 
    72         <dependency>
    73             <groupId>org.springframework.boot</groupId>
    74             <artifactId>spring-boot-starter-test</artifactId>
    75             <scope>test</scope>
    76         </dependency>
    77     </dependencies>
    78 
    79     <build>
    80         <plugins>
    81             <plugin>
    82                 <groupId>org.springframework.boot</groupId>
    83                 <artifactId>spring-boot-maven-plugin</artifactId>
    84             </plugin>
    85         </plugins>
    86     </build>
    87 
    88 </project>

    2、编写application.properties类

     1 # rabbitmq地址
     2 spring.rabbitmq.addresses=10.1.195.196:5672
     3 # rabbitmq用户名
     4 spring.rabbitmq.username=guest
     5 # rabbitmq密码
     6 spring.rabbitmq.password=guest
     7 # rabbitmq默认虚拟主机地址
     8 spring.rabbitmq.virtual-host=/
     9 # rabbitmq超时时间为15秒
    10 spring.rabbitmq.connection-timeout=15000
    11 
    12 #字符集
    13 spring.http.encoding.charset=UTF-8
    14 #格式化
    15 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    16 spring.jackson.time-zone=GMT+8
    17 spring.jackson.default-property-inclusion=NON_NULL
    18 
    19 spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull
    20 spring.datasource.username=root
    21 spring.datasource.password=
    22 spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    23 spring.datasource.type= com.alibaba.druid.pool.DruidDataSource
    24 
    25 # 项目路径
    26 server.servlet.context-path=/
    27 # 服务端口号
    28 server.port=8080

    3、创建数据库

    -- ----------------------------
    -- Table structure for broker_message_log
    -- ----------------------------
    DROP TABLE IF EXISTS `broker_message_log`;
    CREATE TABLE `broker_message_log` (
      `message_id` varchar(255) NOT NULL COMMENT '消息唯一ID',
      `message` varchar(4000) NOT NULL COMMENT '消息内容',
      `try_count` int(4) DEFAULT '0' COMMENT '重试次数',
      `status` varchar(10) DEFAULT '' COMMENT '消息投递状态 0投递中,1投递成功,2投递失败',
      `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP COMMENT '下一次重试时间',
      `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
      `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`message_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    -- ----------------------------
    -- Table structure for t_order
    -- ----------------------------
    DROP TABLE IF EXISTS `t_order`;
    CREATE TABLE `t_order` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `name` varchar(255) DEFAULT NULL,
      `message_id` varchar(255) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=2018091102 DEFAULT CHARSET=utf8;

    4、编写Order类

     1 package com.bfxy.springboot.entity;
     2 
     3 import java.io.Serializable;
     4 
     5 /**
     6  * 订单实体
     7  * @author wangc
     8  */
     9 public class Order implements Serializable {
    10     private static final long serialVersionUID = -1502291609049620042L;
    11     private String id;
    12     private String name;
    13 
    14     /**
    15      * 存储消息发送的唯一标识
    16      */
    17     private String messageId;
    18 
    19     public Order() {
    20     }
    21 
    22     public Order(String id, String name, String messageId) {
    23         this.id = id;
    24         this.name = name;
    25         this.messageId = messageId;
    26     }
    27 
    28     public String getId() {
    29         return id;
    30     }
    31 
    32     public void setId(String id) {
    33         this.id = id;
    34     }
    35 
    36     public String getName() {
    37         return name;
    38     }
    39 
    40     public void setName(String name) {
    41         this.name = name;
    42     }
    43 
    44     public String getMessageId() {
    45         return messageId;
    46     }
    47 
    48     public void setMessageId(String messageId) {
    49         this.messageId = messageId;
    50     }
    51 }

    5、编写OrderSender类

     1 package com.bfxy.springboot.producer;
     2 
     3 
     4 import com.bfxy.springboot.entity.Order;
     5 import org.springframework.amqp.rabbit.core.RabbitTemplate;
     6 import org.springframework.amqp.rabbit.support.CorrelationData;
     7 import org.springframework.beans.factory.annotation.Autowired;
     8 import org.springframework.stereotype.Component;
     9 
    10 /**
    11  * 订单消息发送者
    12  *
    13  * @author wangc
    14  */
    15 @Component
    16 public class OrderSender {
    17     @Autowired
    18     private RabbitTemplate rabbitTemplate;
    19 
    20     /**
    21      * 发送订单
    22      *
    23      * @param order 订单
    24      * @throws Exception 异常
    25      */
    26     public void send(Order order) throws Exception {
    27         CorrelationData correlationData = new CorrelationData();
    28         correlationData.setId(order.getMessageId());
    29         rabbitTemplate.convertAndSend("order-exchange",
    30                 "order.abcd",
    31                 order,
    32                 correlationData);
    33     }
    34 }

    6、编写OrderSenderTest类

     1 package com.bfxy.springboot;
     2 
     3 import com.bfxy.springboot.entity.Order;
     4 import com.bfxy.springboot.producer.OrderSender;
     5 import org.junit.Test;
     6 import org.junit.runner.RunWith;
     7 import org.omg.CORBA.PUBLIC_MEMBER;
     8 import org.springframework.beans.factory.annotation.Autowired;
     9 import org.springframework.boot.test.context.SpringBootTest;
    10 import org.springframework.test.context.junit4.SpringRunner;
    11 
    12 import java.util.UUID;
    13 
    14 @RunWith(SpringRunner.class)
    15 @SpringBootTest
    16 public class SpringbootProducerApplicationTests {
    17 
    18     @Test
    19     public void contextLoads() {
    20     }
    21 
    22 
    23     @Autowired
    24     private OrderSender orderSender;
    25 
    26     /**
    27      * 订单消息发送者测试
    28      * @author wangc
    29      */
    30     @Test
    31     public void testSend1() throws Exception {
    32         Order order = new Order();
    33         order.setId("201808180000000001");
    34         order.setName("测试订单1");
    35         order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());
    36         orderSender.send(order);
    37 
    38     }
    39 }

    2.2 接收消息Cunsumer

    1、编写application.properties类

     1 # SpringBoot整合rabbitMQ的基本配置:
     2 # rabbitmq地址
     3 spring.rabbitmq.addresses=10.1.195.196:5672
     4 # rabbitmq用户名
     5 spring.rabbitmq.username=guest
     6 # rabbitmq密码
     7 spring.rabbitmq.password=guest
     8 # rabbitmq默认虚拟主机地址
     9 spring.rabbitmq.virtual-host=/
    10 # rabbitmq超时时间为15秒
    11 spring.rabbitmq.connection-timeout=15000
    12 
    13 #字符集
    14 #spring.http.encoding.charset=UTF-8
    15 #格式化
    16 #spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    17 #spring.jackson.time-zone=GMT+8
    18 #spring.jackson.default-property-inclusion=NON_NULL
    19 
    20 spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull
    21 spring.datasource.username=root
    22 spring.datasource.password=123456
    23 spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    24 spring.datasource.type= com.alibaba.druid.pool.DruidDataSource
    25 
    26 # SpringBoot整合rabbitMQ 消费端配置:
    27 # 基本并发:5
    28 spring.rabbitmq.listener.simple.concurrency=5
    29 # 签收模式:手动签收
    30 spring.rabbitmq.listener.simple.acknowledge-mode=manual
    31 # 最大并发:10
    32 spring.rabbitmq.listener.simple.max-concurrency=10
    33 # 限流策略:同一时间只有1条消息发送过来消费
    34 spring.rabbitmq.listener.simple.prefetch=1
    35 
    36 
    37 # Server配置:
    38 # 项目路径
    39 server.servlet.context-path=/
    40 # 服务端口号
    41 server.port=8082

    2、编写OrderReceiver类

     1 package com.bfxy.springboot.consumer;
     2 
     3 import com.bfxy.springboot.entity.Order;
     4 import com.rabbitmq.client.Channel;
     5 import org.springframework.amqp.rabbit.annotation.*;
     6 import org.springframework.amqp.support.AmqpHeaders;
     7 import org.springframework.messaging.handler.annotation.Headers;
     8 import org.springframework.messaging.handler.annotation.Payload;
     9 import org.springframework.stereotype.Component;
    10 
    11 import java.util.Map;
    12 
    13 
    14 /**
    15  * 订单接收者
    16  *
    17  * @author wangc
    18  */
    19 @Component
    20 public class OrderReceiver {
    21     /**
    22      * 接收消息
    23      *
    24      * @RabbitListener 绑定监听
    25      *
    26      * @param order   消息体内容
    27      * @param headers 消息头内容
    28      * @param channel 网络信道
    29      * @throws Exception 异常
    30      */
    31     @RabbitListener(bindings = @QueueBinding(
    32             value = @Queue(value = "order-queue", durable = "true"),
    33             exchange = @Exchange(name = "order-exchange", durable = "true", type = "topic"),
    34             key = "order.*"
    35     )
    36 
    37     )
    38     @RabbitHandler
    39     public void onOrderMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
    40         // 消费者操作
    41         System.out.println("------------收到消息,开始收费-------------------");
    42         System.out.println("订单ID" + order.getId());
    43         Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
    44         // 手动签收消息
    45 //        channel.basicAck(deliveryTag, false);
    46     }
    47 
    48 }
  • 相关阅读:
    centos7安装gitlab
    jenkins 部署k8s-jar包项目
    jenkins部署k8s项目-CICD
    pipeline
    jenkins打包
    jenkins 按角色设置管理权限
    1 jenkins的介绍和安装
    PyTables的下载和安装
    解决python报错:ImportError: No module named shutil_get_terminal_size 的方法
    nodejs安装失败
  • 原文地址:https://www.cnblogs.com/w13248223001/p/10349786.html
Copyright © 2011-2022 走看看