zoukankan      html  css  js  c++  java
  • RabbitMQ消息中间件极速入门与实战

    1:初识RabbitMQ

    RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。

    2:AMQP协议模型:

    3:RabbitMQ的整体架构

    4:RabbitMQ核心概念

      • Server:又称Broker,接受客户端的连接,实现AMQP实体服务
      • Connection:连接,应用程序与Broker的网络连接
      • Channel:网络信道

        几乎所有的操作都在Channel中进行
        Channel是进行消息读写的通道
        客户端可建立多个Channel
        每个Channel代表一个会话任务
      • Message:消息

        服务器和应用程序之间传送的数据,由Properties和Body组成
        Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性
        Body则就是消息体内容
      • Virtual host:虚拟机

        用于进行逻辑隔离,最上层的消息路由
        一个Virtual host里面可以有若干个Exchange和Queue
        同一个Virtual host里面不能有相同名称的Exchange或Queue
      • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
      • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
      • Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息
      • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者

    5:RabbitMQ消息的流转过程

    6:引入依赖 以及生产端的相关配置

     <!--RabbitMQ依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    # RabbitMQ配置
    spring.rabbitmq.addresses=192.168.0.105:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000

    # Server配置
    server.servlet.context-path=/
    server.port=8080

    spring.http.encoding.charset=UTF-8
    spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    spring.jackson.time-zone=GMT+8
    spring.jackson.default-property-inclusion=NON_NULL

    7: 订单对象

    public class Order implements Serializable{
    
        private static final long serialVersionUID = 6771608755338249746L;
    
        private String id;
    
        private String name;
        /**
         * 存储消息发送的唯一标识
         */
        private String messageId;
    }

    8:生产端向消息队列发送消息

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public OrderSender(
    RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
    }
    public void send(Order order) throws Exception {

    CorrelationData correlationData = new CorrelationData();
    correlationData.setId(order.getMessageId());

    // exchange:交换机
    // routingKey:路由键
    // message:消息体内容
    // correlationData:消息唯一ID
    this.rabbitTemplate.convertAndSend("order-exchange", "order.a", order, correlationData);
    }

    Order order = new Order();
    order.setId("201809062009010001");
    order.setName("测试订单1");
    order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString().replaceAll("-",""));
    this.orderSender.send(order);

    9:消费端相关配置

    # RabbitMQ连接配置
    spring.rabbitmq.addresses=192.168.0.105:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    # RabbitMQ消费配置
    # 基本并发:5
    spring.rabbitmq.listener.simple.concurrency=5
    # 最大并发:10
    spring.rabbitmq.listener.simple.max-concurrency=10
    # 签收模式:手动签收
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    # 限流策略:同一时间只有1条消息发送过来消费
    spring.rabbitmq.listener.simple.prefetch=1

    # Server配置
    server.servlet.context-path=/
    server.port=8082

    spring.http.encoding.charset=UTF-8
    spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    spring.jackson.time-zone=GMT+8
    spring.jackson.default-property-inclusion=NON_NULL

    10:消费端消费消息

    @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理

    @Component
    public class OrderReceiver {

    /**
    * 接收消息
    *
    * @param order 消息体内容
    * @param headers 消息头内容
    * @param channel 网络信道
    * @throws Exception 异常
    */
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "order-queue",durable = "true"),
    exchange = @Exchange(name = "order-exchange",type = "topic"),
    key = "order.*"
    ))
    @RabbitHandler
    public void onOrderMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
    // 消费者操作
    System.out.println("收到消息:");
    System.out.println("订单信息:" + order.toString());

    // 手动签收消息
    Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
    channel.basicAck(deliveryTag, false);
    }
    }

    11:保证100%消息投递

     12:mybatis相关依赖

    <!--MyBatis依赖-->
    <dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>1.3.2</version>
    </dependency>
    <dependency>
    <groupId>tk.mybatis</groupId>
    <artifactId>mapper-spring-boot-starter</artifactId>
    <version>1.1.0</version>
    </dependency>
    <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.0.24</version>
    </dependency>
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <dependency>
    <groupId>com.github.miemiedev</groupId>
    <artifactId>mybatis-paginator</artifactId>
    <version>1.2.17</version>
    <exclusions>
    <exclusion>
    <groupId>org.mybatis</groupId>
    <artifactId>mybatis</artifactId>
    </exclusion>
    </exclusions>
    </dependency>

    13:rabbitmq采用消息确认模式
    # 采用消息确认模式
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true

    14:mysql相关配置
    spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false
    spring.datasource.username=root
    spring.datasource.password=root

    mybatis.type-aliases-package=com.myimooc.rabbitmq.ha.dao.mapper
    mybatis.mapper-locations=classpath:com/  /mapper/*.xml


    数据源druid.properties配置
    ##下面为连接池的补充设置,应用到上面所有数据源中
    # 初始化大小、最小、最大
    druid.initialSize=1
    druid.minIdle=1
    druid.maxActive=300
    # 获取连接等待超时时间(单位:毫秒)
    druid.maxWait=60000
    # 间隔多久进行一次检测,检测需要关闭的空闲连接(单位:毫秒)
    druid.timeBetweenEvictionRunsMillis=60000
    # 一个连接在池中最小生存的时间(单位:毫秒)
    druid.minEvictableIdleTimeMillis=300000
    druid.validationQuery=SELECT 1 FROM DUAL
    druid.testWhileIdle=true
    druid.testOnBorrow=false
    druid.testOnReturn=false
    # 打开PSCache,并且指定每个连接上PSCache的大小
    druid.poolPreparedStatements=true
    druid.maxOpenPreparedStatements=20
    # 监控统计拦截的filters,去掉后监控界面sql无法统计,wall用于防火墙
    druid.filters=stat,wall,log4j
    # 通过connectionProperties属性来打开mergeSQL功能:慢SQL记录
    druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
    # 合并多个DruidDataSource的监控数据
    druid.useGlobalDataSourceStat=true


    DruidDataSourceSettings 类
    @Component
    @PropertySource("classpath:druid.properties")
    public class DruidDataSourceSettings {

    @Value("${spring.datasource.driver-class-name}")
    private String driverClassName;
    @Value("${spring.datasource.url}")
    private String url;
    @Value("${spring.datasource.username}")
    private String username;
    @Value("${spring.datasource.password}")
    private String password;

    @Value("${druid.initialSize}")
    private Integer initialSize;
    @Value("${druid.minIdle}")
    private Integer minIdle;
    @Value("${druid.maxActive}")
    private Integer maxActive;
    @Value("${druid.maxWait}")
    private Long maxWait;

    @Value("${druid.timeBetweenEvictionRunsMillis}")
    private Long timeBetweenEvictionRunsMillis;

    @Value("${druid.minEvictableIdleTimeMillis}")
    private Long minEvictableIdleTimeMillis;
    @Value("${druid.validationQuery}")
    private String validationQuery;
    @Value("${druid.testWhileIdle}")
    private Boolean testWhileIdle;
    @Value("${druid.testOnBorrow}")
    private Boolean testOnBorrow;
    @Value("${druid.testOnReturn}")
    private Boolean testOnReturn;

    @Value("${druid.poolPreparedStatements}")
    private Boolean poolPreparedStatements;
    @Value("${druid.maxOpenPreparedStatements}")
    private Integer maxOpenPreparedStatements;

    @Value("${druid.filters}")
    private String filters;
    @Value("${druid.connectionProperties}")
    private String connectionProperties;
    @Value("${druid.useGlobalDataSourceStat}")
    private Boolean useGlobalDataSourceStat;

    public String getDriverClassName() {
    return driverClassName;
    }

    public String getUrl() {
    return url;
    }

    public String getUsername() {
    return username;
    }

    public String getPassword() {
    return password;
    }

    public Integer getInitialSize() {
    return initialSize;
    }

    public Integer getMinIdle() {
    return minIdle;
    }

    public Integer getMaxActive() {
    return maxActive;
    }

    public Long getMaxWait() {
    return maxWait;
    }

    public Long getTimeBetweenEvictionRunsMillis() {
    return timeBetweenEvictionRunsMillis;
    }

    public Long getMinEvictableIdleTimeMillis() {
    return minEvictableIdleTimeMillis;
    }

    public String getValidationQuery() {
    return validationQuery;
    }

    public Boolean getTestWhileIdle() {
    return testWhileIdle;
    }

    public Boolean getTestOnBorrow() {
    return testOnBorrow;
    }

    public Boolean getTestOnReturn() {
    return testOnReturn;
    }

    public Boolean getPoolPreparedStatements() {
    return poolPreparedStatements;
    }

    public Integer getMaxOpenPreparedStatements() {
    return maxOpenPreparedStatements;
    }

    public String getFilters() {
    return filters;
    }

    public Properties getConnectionProperties() {
    Properties properties = new Properties();
    String[] entrys = connectionProperties.split(";");
    for (String entry : entrys) {
    String[] split = entry.split("=");
    properties.setProperty(split[0],split[1]);
    }
    return properties;
    }

    public Boolean getUseGlobalDataSourceStat() {
    return useGlobalDataSourceStat;
    }
    }

    DruidDataSourceConfig 类
    @Configuration
    @EnableTransactionManagement
    public class DruidDataSourceConfig {

    @Bean
    public DataSource dataSource(DruidDataSourceSettings druidSettings) throws SQLException {
    DruidDataSource dataSource = new DruidDataSource();
    dataSource.setDriverClassName(druidSettings.getDriverClassName());
    dataSource.setUrl(druidSettings.getUrl());
    dataSource.setUsername(druidSettings.getUsername());
    dataSource.setPassword(druidSettings.getPassword());

    dataSource.setInitialSize(druidSettings.getInitialSize());
    dataSource.setMinIdle(druidSettings.getMinIdle());
    dataSource.setMaxActive(druidSettings.getMaxActive());
    dataSource.setTimeBetweenEvictionRunsMillis(druidSettings.getTimeBetweenEvictionRunsMillis());
    dataSource.setMinEvictableIdleTimeMillis(druidSettings.getMinEvictableIdleTimeMillis());
    dataSource.setValidationQuery(druidSettings.getValidationQuery());

    dataSource.setTestWhileIdle(druidSettings.getTestWhileIdle());
    dataSource.setTestOnBorrow(druidSettings.getTestOnBorrow());
    dataSource.setTestOnReturn(druidSettings.getTestOnReturn());
    dataSource.setPoolPreparedStatements(druidSettings.getPoolPreparedStatements());
    dataSource.setMaxPoolPreparedStatementPerConnectionSize(druidSettings.getMaxOpenPreparedStatements());

    dataSource.setFilters(druidSettings.getFilters());
    dataSource.setConnectProperties(druidSettings.getConnectionProperties());
    return dataSource;
    }

    @Bean
    public PlatformTransactionManager transactionManager(DruidDataSourceSettings druidSettings) throws Exception {
    DataSourceTransactionManager manager = new DataSourceTransactionManager();
    manager.setDataSource(this.dataSource(druidSettings));
    return manager;
    }
    }
    MybatisDataSourceConfig 类
    @Configuration
    public class MybatisDataSourceConfig {

    @Autowired
    private DataSource dataSource;

    @Bean(name="sqlSessionFactory")
    public SqlSessionFactory sqlSessionFactoryBean() {
    SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
    bean.setDataSource(dataSource);
    // 添加XML目录
    ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
    try {
    bean.setMapperLocations(resolver.getResources("classpath:com/bfxy/springboot/mapping/*.xml"));
    SqlSessionFactory sqlSessionFactory = bean.getObject();
    sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);

    return sqlSessionFactory;
    } catch (Exception e) {
    throw new RuntimeException(e);
    }
    }

    @Bean
    public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
    return new SqlSessionTemplate(sqlSessionFactory);
    }

    }
    MybatisMapperScanerConfig 类
    @Configuration
    @AutoConfigureAfter(MybatisDataSourceConfig.class)
    public class MybatisMapperScanerConfig {

    @Bean
    public MapperScannerConfigurer mapperScannerConfigurer() {
    MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
    mapperScannerConfigurer.setSqlSessionFactoryBeanName("sqlSessionFactory");
    mapperScannerConfigurer.setBasePackage("com.bfxy.springboot.mapper");
    return mapperScannerConfigurer;
    }

    }



    15:消息发送者确认机制
    @Component
    public class OrderSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private BrokerMessageLogMapper brokerMessageLogMapper;
    /**
    * 回调方法:confirm确认
    */
    private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    System.out.println("correlationData:" + correlationData);
    String messageId = correlationData.getId();
    if (ack) {
    // 如果confirm返回成功,则进行更新
    BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO();
    messageLogPO.setMessageId(messageId);
    messageLogPO.setStatus(Constants.OrderSendStatus.SEND_SUCCESS);
    brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLogPO);
    } else {
    // 失败则进行具体的后续操作:重试或者补偿等
    System.out.println("异常处理...");
    }
    }
    };

    /**
    * 发送订单
    *
    * @param order 订单
    */
    public void send(Order order) {
    // 设置回调方法
    this.rabbitTemplate.setConfirmCallback(confirmCallback);
    // 消息ID
    CorrelationData correlationData = new CorrelationData(order.getMessageId());
    // 发送消息
    this.rabbitTemplate.convertAndSend("order-exchange", "order.a", order, correlationData);
    }


    }
    16:FastJson工具类
    需要引入
    <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.36</version>
    </dependency>
    public class FastJsonConvertUtils {

    private static final Logger logger = LoggerFactory.getLogger(FastJsonConvertUtils.class);

    /**
    * 将对象转为JSON字符串
    *
    * @param obj 任意对象
    * @return JSON字符串
    */
    public static String convertObjectToJson(Object obj) {
    try {
    return JSON.toJSONString(obj);
    } catch (Exception ex) {
    logger.warn("将对象转为JSON字符串异常:" + ex);
    throw new RuntimeException("将对象转为JSON字符串异常:" + ex.getMessage(), ex);
    }
    }

    /**
    * 将JSON字符串转为对象
    *
    * @param message JSON字符串
    * @param type 对象
    * @param <T> 对象
    * @return 对象实例
    */
    public static <T> T convertJsonToObject(String message, Class<T> type) {
    try {
    return JSONObject.parseObject(message, type);
    } catch (Exception ex) {
    logger.warn("将JSON字符串转为对象异常:" + ex);
    throw new RuntimeException("将JSON字符串转为对象异常:" + ex.getMessage(), ex);
    }
    }

    }
    17:
    <!--工具类依赖-->
    <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    </dependency>
    DateUtils.addMinutes(orderTime, 1)

    18:定时任务
    启用任务
    @SpringBootApplication
    @MapperScan("com.myimooc.rabbitmq.ha.dao.mapper")
    @EnableScheduling
    public class Application {
    public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
    }

    }

    配置定时任务
    @Configuration
    @EnableScheduling
    public class TaskSchedulerConfig implements SchedulingConfigurer {

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
    taskRegistrar.setScheduler(taskScheduler());
    }

    @Bean(destroyMethod="shutdown")
    public Executor taskScheduler(){
    return Executors.newScheduledThreadPool(100);
    }

    }

    定时任务相关执行代码
    @Component
    public class RetryMessageTask {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private OrderSender orderSender;
    @Autowired
    private BrokerMessageLogMapper brokerMessageLogMapper;

    /**
    * 启动完成3秒后开始执行,每隔10秒执行一次
    */
    @Scheduled(initialDelay = 3000, fixedDelay = 10000)
    public void retrySend() {
    logger.debug("重发消息定时任务开始");
    // 查询 status = 0 和 timeout 的消息日志
    List<BrokerMessageLogPO> pos = this.brokerMessageLogMapper.listSendFailureAndTimeoutMessage();
    for (BrokerMessageLogPO po : pos) {
    logger.debug("处理消息日志:{}",po);
    if (po.getTryCount() >= Constants.MAX_RETRY_COUNT) {
    // 更新状态为失败
    BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO();
    messageLogPO.setMessageId(po.getMessageId());
    messageLogPO.setStatus(Constants.OrderSendStatus.SEND_FAILURE);
    this.brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLogPO);
    } else {
    // 进行重试,重试次数+1
    this.brokerMessageLogMapper.updateRetryCount(po);
    Order reSendOrder = FastJsonConvertUtils.convertJsonToObject(po.getMessage(), Order.class);
    try {
    this.orderSender.send(reSendOrder);
    } catch (Exception ex) {
    // 异常处理
    logger.error("消息发送异常:{}", ex);
    }
    }
    }
    logger.debug("重发消息定时任务结束");
    }
    }
  • 相关阅读:
    Vue 监听子组件事件
    延时队列
    AES加密
    centos7.9 iftop 工具源码安装
    angular pass get paragrams by router
    Android chrome console in PC
    powershell 运行带路径的exe
    win下 nrm ls报错
    windows10 安装 node 16 解决node-sass node-gyp报错
    位图和布隆过滤器
  • 原文地址:https://www.cnblogs.com/zyy1688/p/10298448.html
Copyright © 2011-2022 走看看