zoukankan      html  css  js  c++  java
  • 040 RabbitMq及数据同步02

    1.Spring AMQP

    (1)简介

    Spring有很多不同的项目,其中就有对AMQP的支持:

    Spring AMQP的页面:http://spring.io/projects/spring-amqp

    注意:Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。

    (2)依赖和配置

    添加AMQP的启动器:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    application.yml中添加RabbitMQ地址:

    spring:
      rabbitmq:
        host: 127.0.0.1
        username: guest
        password: guest
        virtual-host: /

    (3)监听者

    在SpringAmqp中,对消息的消费者进行了封装和抽象,一个普通的JavaBean中的普通方法,只要通过简单的注解,就可以成为一个消费者。

    @Component
    public class Listener {
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "spring.test.queue", durable = "true"),
                exchange = @Exchange(
                        value = "spring.test.exchange",
                        ignoreDeclarationExceptions = "true",
                        type = ExchangeTypes.TOPIC
                ),
                key = {"#.#"}))
        public void listen(String msg){
            System.out.println("接收到消息:" + msg);
        }
    }
    • @Componet:类上的注解,注册到Spring容器

    • @RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:

      • bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:

        • value:这个消费者关联的队列。值是@Queue,代表一个队列

        • exchange:队列所绑定的交换机,值是@Exchange类型

        • key:队列和交换机绑定的RoutingKey

    类似listen这样的方法在一个类中可以写多个,就代表多个消费者。

    (4)AmqpTemplate

    Spring最擅长的事情就是封装,把他人的框架进行封装和整合。

    Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:

     

    红框圈起来的是比较常用的3个方法,分别是:

    • 指定交换机、RoutingKey和消息体

    • 指定消息

    • 指定RoutingKey和消息,会向默认的交换机发送消息

    (5)测试代码

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = Application.class)
    public class MqDemo {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        @Test
        public void testSend() throws InterruptedException {
            String msg = "hello, Spring boot amqp";
            this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);
            // 等待10秒后再结束
            Thread.sleep(10000);
        }
    }

    运行后查看日志:

    2.搜索服务、商品静态页的数据同步

    (1)思路分析

    <1>发送方:商品微服务

    • 什么时候发?

      当商品服务对商品进行写操作:增、删、改的时候,需要发送一条消息,通知其它服务。

    • 发送什么内容?

      对商品的增删改时其它服务可能需要新的商品数据,但是如果消息内容中包含全部商品信息,数据量太大,而且并不是每个服务都需要全部的信息。因此我们只发送商品id,其它服务可以根据id查询自己需要的信息。

    <2>接收方:搜索微服务、静态页微服务

    接收消息后如何处理?

    • 搜索微服务:

      • 增/改:添加新的数据到索引库

      • 删:删除索引库数据

    • 静态页微服务:

      • 增/改:创建新的静态页

      • 删:删除原来的静态页

    (2)商品服务发送消息

     我们先在商品微服务leyou-item-service中实现发送消息

    <1>引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <2>配置文件

    我们在application.yml中添加一些有关RabbitMQ的配置:

    spring:
      rabbitmq:
          host: 127.0.0.1
          username: guest
          password: guest
          virtual-host: /
          template:
            exchange: leyou.item.exchange
          publisher-confirms: true
    • template:有关AmqpTemplate的配置

      • exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个

    • publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试

    <3>改造GoodsService

    在GoodsService中封装一个发送消息到mq的方法:(需要注入AmqpTemplate模板)

    
    
    @Autowired
    private AmqpTemplate amqpTemplate;
    /**
         * 利用rabbitmq发送消息
         * @param id
         * @param type
         */
        private void sendMessage(Long id, String type){
            // 发送消息
            try {
                this.amqpTemplate.convertAndSend("item." + type, id);
            } catch (Exception e) {
                //logger.error("{}商品消息发送异常,商品id:{}", type, id, e);
            }
        }

    这里没有指定交换机,因此默认发送到了配置中的:leyou.item.exchange

    注意:这里要把所有异常都try起来,不能让消息的发送影响到正常的业务逻辑

    然后在新增的时候调用:

    /**
         * 新增商品
         * @param spuBo
         */
        @Override
        @Transactional  //添加事务
        public void saveGoods(SpuBo spuBo) {
            // 01 新增spu
            // 设置默认字段
            spuBo.setId(null);
            spuBo.setSaleable(true);  //设置是否可售
            spuBo.setValid(true);
            spuBo.setCreateTime(new Date());  //设置创建时间
            spuBo.setLastUpdateTime(spuBo.getCreateTime()); //设置更新时间
            this.spuMapper.insertSelective(spuBo);
    
            // 02 新增spuDetail
            SpuDetail spuDetail = spuBo.getSpuDetail();
            spuDetail.setSpuId(spuBo.getId());
            this.spuDetailMapper.insertSelective(spuDetail);
    
            saveSkuAndStock(spuBo);
    
            //发送rabbitmq消息
            sendMessage(spuBo.getId(),"insert");
        }

    修改的时候调用:

    /**
         * 更新商品
         * @param spu
         */
        @Override
        @Transactional
        public void updateGoods(SpuBo spu) {
            // 查询以前sku
            Sku sku=new Sku();
            sku.setSpuId(spu.getId());
            List<Sku> skus = this.skuMapper.select(sku);
    
            // 如果以前存在,则删除
            if(!CollectionUtils.isEmpty(skus)) {
                List<Long> ids = skus.stream().map(s -> s.getId()).collect(Collectors.toList());
                // 删除以前库存
                Example example = new Example(Stock.class);
                example.createCriteria().andIn("skuId", ids);
                this.stockMapper.deleteByExample(example);
    
                // 删除以前的sku
                Sku record = new Sku();
                record.setSpuId(spu.getId());
                this.skuMapper.delete(record);
    
            }
            // 新增sku和库存
            saveSkuAndStock(spu);
    
            // 更新spu
            spu.setLastUpdateTime(new Date());
            spu.setCreateTime(null);   //不能更新的内容,设置为null
            spu.setValid(null);
            spu.setSaleable(null);
            this.spuMapper.updateByPrimaryKeySelective(spu);
    
            // 更新spu详情
            this.spuDetailMapper.updateByPrimaryKeySelective(spu.getSpuDetail());
    
            //发送rabbitmq消息
            sendMessage(spu.getId(),"insert");
    
        }

    (3)搜索服务接收消息

    搜索服务接收到消息后要做的事情:

    • 增:添加新的数据到索引库

    • 删:删除索引库数据

    • 改:修改索引库数据

    因为索引库的新增和修改方法是合二为一的,因此我们可以将这两类消息一同处理,删除另外处理。

    <1>引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <2>添加配置

    spring:
      rabbitmq:
          host: 127.0.0.1
          username: guest
          password: guest
          virtual-host: /

    这里只是接收消息而不发送,所以不用配置template相关内容。

    <3>编写监听器

    package lucky.leyou.listener;
    
    import lucky.leyou.service.SearchService;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class GoodsListener {
    
        @Autowired
        private SearchService searchService;
    
        /**
         * 处理insert和update的消息
         *
         * @param id
         * @throws Exception
         */
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "leyou.create.index.queue", durable = "true"),
                exchange = @Exchange(
                        value = "leyou.item.exchange",
                        ignoreDeclarationExceptions = "true",
                        type = ExchangeTypes.TOPIC),
                key = {"item.insert", "item.update"}))
        public void listenCreate(Long id) throws Exception {
            if (id == null) {
                return;
            }
            // 创建或更新索引
            this.searchService.createIndex(id);
        }
    
        /**
         * 处理delete的消息
         *
         * @param id
         */
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "leyou.delete.index.queue", durable = "true"),
                exchange = @Exchange(
                        value = "leyou.item.exchange",
                        ignoreDeclarationExceptions = "true",
                        type = ExchangeTypes.TOPIC),
                key = "item.delete"))
        public void listenDelete(Long id) {
            if (id == null) {
                return;
            }
            // 删除索引
            this.searchService.deleteIndex(id);
        }
    }

    <4>编写创建和删除索引方法

    这里因为要创建和删除索引,我们需要在SearchService中拓展两个方法,创建和删除索引:

    public void createIndex(Long id) throws IOException {
    
        Spu spu = this.goodsClient.querySpuById(id);
        // 构建商品
        Goods goods = this.buildGoods(spu);
    
        // 保存数据到索引库
        this.goodsRepository.save(goods);
    }
    
    public void deleteIndex(Long id) {
        this.goodsRepository.deleteById(id);
    }

    创建索引的方法可以从之前导入数据的测试类中拷贝和改造。

  • 相关阅读:
    不重复随机数生成
    centos 输入密码正确进不去系统
    程序退出异常_DebugHeapDelete和std::numpunct
    iptables导致数据包过多时连接失败
    linux服务器并发与tcmalloc
    Windows server 2008 被ntlmssp安装攻击 解决
    转载 html div三列布局占满全屏(左右两列定宽或者百分比、中间自动适应,div在父div中居底)
    WIN2003使用IP安全策略只允许指定IP远程桌面连接
    如何让电脑公司Win7系统自动关闭停止响应的程序
    win7 64的系统安装。net4.0总是提示安装未成功
  • 原文地址:https://www.cnblogs.com/luckyplj/p/11624751.html
Copyright © 2011-2022 走看看