zoukankan      html  css  js  c++  java
  • Spring Cloud(7.2):配置Producer Server

    我们首先创建一个生产者服务。这里以一个商品价格服务为例,这个微服务可以对商品-价格信息进行增删改查,当有商品-价格信息被更新或删除,则该微服务发送消息,告诉其他调用它的系统这条信息已经被修改。

    配置pom.xml

    首先,在pom.xml中添加spring-cloud-stream和spring-cloud-starter-stream-kafka两个依赖

    <!-- Spring cloud: stream -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <!-- Spring cloud starter: kafka -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>

    此外,它还是一个Eureka Client和Config Client,如何配置Eureka Client和Config Client请看前面章节。

    配置发射器(source),通道(channel),绑定器(binder)及Application

    public interface ProductPriceSource {
        @Output("productPriceOutput")
        MessageChannel productPriceOutput();
    }

    [注] 这里创建了一个叫“productPriceOutput”的自定义发射通道,如果不使用自定义,可以直接使用org.springframework.cloud.stream.messaging.Source接口及叫output的发射通道(下面的yml文件会讲如何配置)。

    @Component
    public class ProductPriceMessageSender {
    
        @Autowired
        private ProductPriceSource source;
    
        private static final Logger logger = LoggerFactory.getLogger(ProductPriceMessageSender.class);
    
        /**
         * The product is expired and need to send kafka message to consumer service to remove it from cache(redis).
         * 
         * @param productId
         */
        public void sendMessage(Long productId) {
            logger.info(String.format(">>>>> Sending Kafka message: [productId = %s].", productId.toString()));
            source.productPriceOutput().send(MessageBuilder.withPayload(productId).build());
        }
    }

    [注] 这里配置了一个发射器bean,当有商品-价格信息被更新或删除,则调用该bean,把消息发布到消息队列。

    @SpringBootApplication
    @EnableBinding({ ProductPriceSource.class })
    public class MyApplication {
        public static void main(String[] args) {
            SpringApplication.run(MyApplication.class, args);
        }
    }

    [注] Application中加入@EnableBinding注解,并把定义好的发射通道(output)或接收通道(input)绑定到该服务中。可以绑定多个。

    配置application.yml

    ## Spring info
    spring:
      # Stream/Kafka info
      cloud:
        stream:
          bindings:
            # output -> productPriceOutput (自定义通道)
            productPriceOutput:
              # 要写入消息的消息队列的名称
              destination:  productPriceTopic
              # 发送和接收消息类型
              content-type: application/json
          # 使用kafka作为消息总线
          kafka:
            binder:
              # 运行着kafka服务器的网络地址
              brokers: www.mytools.com

    API及其他业务逻辑

    @Controller
    @RequestMapping("pp")
    public class ProductPriceController {
    
        @Autowired
        private ProductPriceService productPriceService;
    
        @GetMapping(value = "find/all")
        @ResponseBody
        public List<ProductPriceEntity> findAll() {
            return productPriceService.findAll();
        }
    
        @GetMapping(value = "find/productId/{productId}")
        @ResponseBody
        public ProductPriceEntity find(@PathVariable String productId) {
            return productPriceService.findById(Long.valueOf(productId));
        }
    
        @GetMapping(value = "add/productId/{productId}/product/{product}/price/{price}")
        @ResponseBody
        public String save(@PathVariable String productId, @PathVariable String product, @PathVariable String price) {
            return productPriceService.save(Long.valueOf(productId), product, new BigDecimal(price));
        }
    
        @GetMapping(value = "update/productId/{productId}/product/{product}/price/{price}")
        @ResponseBody
        public String update(@PathVariable String productId, @PathVariable String product, @PathVariable String price) {
            return productPriceService.update(Long.valueOf(productId), product, new BigDecimal(price));
        }
    
        @GetMapping(value = "delete/productId/{productId}")
        @ResponseBody
        public String delete(@PathVariable String productId) {
            return productPriceService.delete(Long.valueOf(productId));
        }
    }

    [注] 这里创建了几个常规的,包括增删改查的API。

    @Service
    public class ProductPriceService {
    
        private static final Map<Long, ProductPriceEntity> db = new ConcurrentHashMap<>();
    
        static {
            ProductPriceEntity row1 = new ProductPriceEntity(1L, "Apple", new BigDecimal("8.5"));
            ProductPriceEntity row2 = new ProductPriceEntity(2L, "Watermelon", new BigDecimal("2.2"));
            ProductPriceEntity row3 = new ProductPriceEntity(3L, "Grape", new BigDecimal("6.8"));
            db.put(1L, row1);
            db.put(2L, row2);
            db.put(3L, row3);
        }
    
        @Autowired
        private ProductPriceMessageSender sender;
    
        public List<ProductPriceEntity> findAll() {
    
            List<ProductPriceEntity> results = new ArrayList<>();
            results.addAll(db.values());
    
            return results;
        }
    
        public ProductPriceEntity findById(Long productId) {
            return db.get(productId);
        }
    
        public String save(Long productId, String product, BigDecimal price) {
            if (db.containsKey(productId)) {
                return String.format("[WARN] Product which productId = %s already exists in DB.", productId.toString());
            } else {
                ProductPriceEntity param = new ProductPriceEntity(productId, product, price);
                db.put(productId, param);
                return String.format("Save %s completed.", param);
            }
        }
    
        public String update(Long productId, String product, BigDecimal price) {
            if (db.containsKey(productId)) {
                ProductPriceEntity param = new ProductPriceEntity(productId, product, price);
                db.put(productId, param);
                // [UPDATE] send to kafka
                sender.sendMessage(productId);
                return String.format("Update %s completed.", param);
            } else {
                return String.format("[WARN] No product which productId = %s in DB.", productId.toString());
            }
        }
    
        public String delete(Long productId) {
            if (db.containsKey(productId)) {
                ProductPriceEntity result = db.remove(productId);
                // [DELETE] send to kafka
                sender.sendMessage(productId);
                return String.format("Delete %s completed.", result.toString());
            } else {
                return String.format("[WARN] No product which productId = %s in DB.", productId.toString());
            }
        }
    }

    [注] 这里使用一个Map来模拟DB。并且当有商品-价格信息被更新或删除时,才调用ProductPriceMessageSender发送消息。ProductPriceEntity的代码如下:

    public class ProductPriceEntity implements Serializable {
    
        private static final long serialVersionUID = 1L;
    
        private Long productId;
    
        private String product;
    
        private BigDecimal price;
    
        public ProductPriceEntity() {
        }
    
        public ProductPriceEntity(Long productId, String product, BigDecimal price) {
            super();
            this.productId = productId;
            this.product = product;
            this.price = price;
        }
    
        public Long getProductId() {
            return productId;
        }
    
        public void setProductId(Long productId) {
            this.productId = productId;
        }
    
        public String getProduct() {
            return product;
        }
    
        public void setProduct(String product) {
            this.product = product;
        }
    
        public BigDecimal getPrice() {
            return price;
        }
    
        public void setPrice(BigDecimal price) {
            this.price = price;
        }
    
        @Override
        public String toString() {
            return "ProductPriceEntity [productId=" + productId + ", product=" + product + ", price=" + price + "]";
        }
    }
    ProductPriceEntity

    Input and Output to a broker,

  • 相关阅读:
    基于微软解决方案的负载测试实现知识库1____(转)理解.NET中的数据库连接池
    [转] Performance vs. load vs. stress testing _Grig Gheorghiu (翻译水平有限,如有错误请帮忙刊正)
    Bill Gates Centimillionaire and one poor man.
    VB Comwrapper 的实现
    使用接口作为返回值
    如何排查SQL死锁的错误?
    VC++动态链接库编程之DLL典型实例
    VC++动态链接库编程之DLL木马
    VC中获取窗口句柄的各种方法 .
    VC++动态链接库编程之MFC扩展 DLL
  • 原文地址:https://www.cnblogs.com/storml/p/11288548.html
Copyright © 2011-2022 走看看