zoukankan      html  css  js  c++  java
  • Spring Cloud(7.3):配置Consumer Server

    接下来我们创建一个消费者服务。消费者服务从生产者服务拿取商品-价格信息,并保存在Redis中。同时,接收消息队列中生产者服务的更新提示,如果某个商品-价格被修改,则删除Redis中的缓存数据,并重新从生产者服务中取。

    配置pom.xml

    首先,在pom.xml中添加spring-cloud-stream,spring-cloud-starter-stream-kafka,spring-data-redis及Redis的客户端jedis依赖。

    <!-- Spring cloud: stream -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <!-- Spring cloud starter: kafka -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
    <!--Spring Data: Redis -->
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-redis</artifactId>
    </dependency>
    <!--Redis Client -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
    </dependency>

    配置通道(channel),绑定器(binder)接收器(sink)

    public interface ProductPriceSource {
        @Input("productPriceInput")
        SubscribableChannel productPriceInput();
    }

    [注] 这里创建了一个叫“productPriceInput”的自定义接收通道,如果不使用自定义,可以直接使用org.springframework.cloud.stream.messaging.Sink接口及叫input的接收通道(下面的yml文件会讲如何配置)。

    @Component
    public class ProductPriceMessageReceiver {
    
        private static final Logger logger = LoggerFactory.getLogger(ProductPriceMessageReceiver.class);
    
        @Autowired
        private ProductPriceRedisRepository productPriceRedisRepository;
    
        @StreamListener("productPriceInput")
        public void receiveMessage(Long productId) {
    
            logger.info(String.format(">>>>> Received Kafka message productId=%s.", productId.toString()));
            if (productPriceRedisRepository.contains(productId)) {
                productPriceRedisRepository.delete(productId);
                logger.info(
                        String.format(">>>>>  Delete ProductPrice data productId=%s from cache.", productId.toString()));
            } else {
                logger.info(
                        String.format(">>>>>  No ProductPrice data productId=%s in cache. Skip.", productId.toString()));
            }
        }
    }

    [注] 这里配置了一个接收器bean,当消息队列有消息时,接受该消息并清除相应product的缓存。

    @SpringBootApplication
    @EnableBinding({ ProductPriceSource.class })
    public class MyApplication {
        public static void main(String[] args) {
            SpringApplication.run(MyApplication.class, args);
        }
    }

    [注] Application中加入@EnableBinding注解,并把定义好的发射通道(output)或接收通道(input)绑定到该服务中。可以绑定多个。

    配置RedisRepository

    这里我们额外用到了Redis,当我们引入jar包后,系统会自动给我们创建一个RedisTemplate<Object, Object>。我们可以用这个RedisTemplate来对Redis进行增删改查操作。

    public interface RedisRepository<HK, HV> {
    
        boolean contains(HK id);
    
        HV find(HK id);
    
        void save(HV value);
    
        void delete(HK id);
    }
    @Repository
    public class ProductPriceRedisRepository implements RedisRepository<Long, ProductPriceEntity> {
    
        private static final String HASH_NAME = "product_price";
    
        /**
         * 自动注入的Bean有:<br>
         * RedisTemplate<Object, Object>(in RedisAutoConfiguration)<br>
         * StringRedisTemplate(in RedisAutoConfiguration)<br>
         * RedisConnectionFactory(JedisConnectionFactory)<br>
         * 
         * 所以只能定义成RedisTemplate<Object, Object>的形式<br>
         */
        @Autowired
        private RedisTemplate<Object, Object> redisTemplate;
    
        private HashOperations<Object, Long, ProductPriceEntity> hashOperations;
    
        @PostConstruct
        private void init() {
            hashOperations = redisTemplate.opsForHash();
        }
    
        /* (non-Javadoc)
         * @see com.mytools.repository.RedisRepository#contains(java.lang.Object)
         */
        @Override
        public boolean contains(Long id) {
            return hashOperations.keys(HASH_NAME).contains(id);
        }
    
        /* (non-Javadoc)
         * @see com.mytools.repository.RedisRepository#find(java.lang.Object)
         */
        @Override
        public ProductPriceEntity find(Long productId) {
            return hashOperations.get(HASH_NAME, productId);
        }
    
        /* (non-Javadoc)
         * @see com.mytools.repository.RedisRepository#save(java.lang.Object)
         */
        @Override
        public void save(ProductPriceEntity entity) {
            hashOperations.put(HASH_NAME, entity.getProductId(), entity);
    
        }
    
        /* (non-Javadoc)
         * @see com.mytools.repository.RedisRepository#delete(java.lang.Object)
         */
        @Override
        public void delete(Long productId) {
            hashOperations.delete(HASH_NAME, productId);
        }
    }

    配置application.yml

    spring:
      # Stream/Kafka info
      cloud:
        stream:
          bindings:
            # input -> productPriceInput (自定义通道)
            productPriceInput:
              # 要读到消息的消息队列的名称
              destination: productPriceTopic
              # 发送和接收消息类型
              content-type: application/json
              # 消费者组:保证消息只会被一组服务实例处理一次
              group: productPriceGroup
          # 使用kafka作为消息总线
          kafka:
            binder:
              # 运行着kafka服务器的网络地址
              brokers: www.mtools.com
      # Redis/pool info (RedisProperties)
      redis:
        database: 0
        host: www.mytools.com
        port: 6379
        password:
        timeout: 8000
        jedis:
          pool:
            # 最大连接数,0为没有限制
            max-active: 8
            # 最大空闲连接,0为没有限制
            max-idle: 8
            #最大建立连接等待时间,如果超过此时间将接到异常,设为-1表示无限制
            max-wait: -1
            #最小空闲连接,0为没有限制
            min-idle: 0

    API及其他业务逻辑

    @Controller
    @RequestMapping("pp")
    public class ProductPriceController {
    
        @Autowired
        private ProductPriceServiceImpl productPriceService;
    
        @GetMapping(value = "find/productId/{productId}")
        @ResponseBody
        public ProductPriceEntity find(@PathVariable String productId) {
            return productPriceService.find(Long.valueOf(productId));
        }
    }

    [注] 这里创建了一个API,用于查询商品价格。

    @Service
    @Transactional
    public class ProductPriceServiceImpl {
    
        private static final Logger logger = LoggerFactory.getLogger(ProductPriceServiceImpl.class);
    
        @Autowired
        private ProductPriceRedisRepository productPriceRedisRepository;
    
        @Autowired
        private LoadBalancerClient loadBalancer;
    
        public ProductPriceEntity find(Long productId) {
    
            ProductPriceEntity result = null;
    
            if (productPriceRedisRepository.contains(productId)) {
                result = productPriceRedisRepository.find(productId);
                logger.info(">>>>> Get data from cache. {}", result.toString());
                return result;
            }
    
            ServiceInstance instance = loadBalancer.choose("app-db");
            String path = String.format("http://%s:%s/app-db/pp/find/productId/%s", instance.getHost(), instance.getPort(),
                    productId.toString());
            logger.info(path);
    
            RestTemplate restTemplate = new RestTemplate();
            ResponseEntity<ProductPriceEntity> response = restTemplate.exchange(path, HttpMethod.GET, null,
                    ProductPriceEntity.class);
            result = response.getBody();
            logger.info(">>>>> Get data from downstream. {}", result == null ? "null" : result.toString());
    
            if (result != null) {
                productPriceRedisRepository.save(result);
            }
    
            return result;
        }
    }

    [注] 具体逻辑如下:

    (1)如果缓存中有该数据,则从缓存中获取数据。

    (2)如果缓存中没有该数据,则调用生产者服务获取数据。拿到数据后再存入缓存中。

  • 相关阅读:
    C#编程利器之二:结构与枚举(Structure and enumeration)
    解读设计模式模板方法模式(Template Method),电脑就是这样造出来的
    清空mysql一个库中的所有表
    在执行并行程序工程中,突然弹出 connection closed 窗口,随后 ssh 与服务器的连接断开,并行程序也中断
    菜鸟求救 myeclipse安装flex3插件的问题
    linux 下 将 shell script 与 一个桌面图标联系在一起 (2)
    MYSQL EXPLAIN语句的extended 选项学习体会
    MySQL 性能跟踪语句
    Flex Flash
    Flex Builder 3 正式版
  • 原文地址:https://www.cnblogs.com/storml/p/11288551.html
Copyright © 2011-2022 走看看