我们首先创建一个生产者服务。这里以一个商品价格服务为例,这个微服务可以对商品-价格信息进行增删改查,当有商品-价格信息被更新或删除,则该微服务发送消息,告诉其他调用它的系统这条信息已经被修改。
配置pom.xml
首先,在pom.xml中添加spring-cloud-stream和spring-cloud-starter-stream-kafka两个依赖
<!-- Spring cloud: stream --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <!-- Spring cloud starter: kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
此外,它还是一个Eureka Client和Config Client,如何配置Eureka Client和Config Client请看前面章节。
配置发射器(source),通道(channel),绑定器(binder)及Application
public interface ProductPriceSource { @Output("productPriceOutput") MessageChannel productPriceOutput(); }
[注] 这里创建了一个叫“productPriceOutput”的自定义发射通道,如果不使用自定义,可以直接使用org.springframework.cloud.stream.messaging.Source接口及叫output的发射通道(下面的yml文件会讲如何配置)。
@Component public class ProductPriceMessageSender { @Autowired private ProductPriceSource source; private static final Logger logger = LoggerFactory.getLogger(ProductPriceMessageSender.class); /** * The product is expired and need to send kafka message to consumer service to remove it from cache(redis). * * @param productId */ public void sendMessage(Long productId) { logger.info(String.format(">>>>> Sending Kafka message: [productId = %s].", productId.toString())); source.productPriceOutput().send(MessageBuilder.withPayload(productId).build()); } }
[注] 这里配置了一个发射器bean,当有商品-价格信息被更新或删除,则调用该bean,把消息发布到消息队列。
@SpringBootApplication @EnableBinding({ ProductPriceSource.class }) public class MyApplication { public static void main(String[] args) { SpringApplication.run(MyApplication.class, args); } }
[注] Application中加入@EnableBinding注解,并把定义好的发射通道(output)或接收通道(input)绑定到该服务中。可以绑定多个。
配置application.yml
## Spring info spring: # Stream/Kafka info cloud: stream: bindings: # output -> productPriceOutput (自定义通道) productPriceOutput: # 要写入消息的消息队列的名称 destination: productPriceTopic # 发送和接收消息类型 content-type: application/json # 使用kafka作为消息总线 kafka: binder: # 运行着kafka服务器的网络地址 brokers: www.mytools.com
API及其他业务逻辑
@Controller @RequestMapping("pp") public class ProductPriceController { @Autowired private ProductPriceService productPriceService; @GetMapping(value = "find/all") @ResponseBody public List<ProductPriceEntity> findAll() { return productPriceService.findAll(); } @GetMapping(value = "find/productId/{productId}") @ResponseBody public ProductPriceEntity find(@PathVariable String productId) { return productPriceService.findById(Long.valueOf(productId)); } @GetMapping(value = "add/productId/{productId}/product/{product}/price/{price}") @ResponseBody public String save(@PathVariable String productId, @PathVariable String product, @PathVariable String price) { return productPriceService.save(Long.valueOf(productId), product, new BigDecimal(price)); } @GetMapping(value = "update/productId/{productId}/product/{product}/price/{price}") @ResponseBody public String update(@PathVariable String productId, @PathVariable String product, @PathVariable String price) { return productPriceService.update(Long.valueOf(productId), product, new BigDecimal(price)); } @GetMapping(value = "delete/productId/{productId}") @ResponseBody public String delete(@PathVariable String productId) { return productPriceService.delete(Long.valueOf(productId)); } }
[注] 这里创建了几个常规的,包括增删改查的API。
@Service public class ProductPriceService { private static final Map<Long, ProductPriceEntity> db = new ConcurrentHashMap<>(); static { ProductPriceEntity row1 = new ProductPriceEntity(1L, "Apple", new BigDecimal("8.5")); ProductPriceEntity row2 = new ProductPriceEntity(2L, "Watermelon", new BigDecimal("2.2")); ProductPriceEntity row3 = new ProductPriceEntity(3L, "Grape", new BigDecimal("6.8")); db.put(1L, row1); db.put(2L, row2); db.put(3L, row3); } @Autowired private ProductPriceMessageSender sender; public List<ProductPriceEntity> findAll() { List<ProductPriceEntity> results = new ArrayList<>(); results.addAll(db.values()); return results; } public ProductPriceEntity findById(Long productId) { return db.get(productId); } public String save(Long productId, String product, BigDecimal price) { if (db.containsKey(productId)) { return String.format("[WARN] Product which productId = %s already exists in DB.", productId.toString()); } else { ProductPriceEntity param = new ProductPriceEntity(productId, product, price); db.put(productId, param); return String.format("Save %s completed.", param); } } public String update(Long productId, String product, BigDecimal price) { if (db.containsKey(productId)) { ProductPriceEntity param = new ProductPriceEntity(productId, product, price); db.put(productId, param); // [UPDATE] send to kafka sender.sendMessage(productId); return String.format("Update %s completed.", param); } else { return String.format("[WARN] No product which productId = %s in DB.", productId.toString()); } } public String delete(Long productId) { if (db.containsKey(productId)) { ProductPriceEntity result = db.remove(productId); // [DELETE] send to kafka sender.sendMessage(productId); return String.format("Delete %s completed.", result.toString()); } else { return String.format("[WARN] No product which productId = %s in DB.", productId.toString()); } } }
[注] 这里使用一个Map来模拟DB。并且当有商品-价格信息被更新或删除时,才调用ProductPriceMessageSender发送消息。ProductPriceEntity的代码如下:
public class ProductPriceEntity implements Serializable { private static final long serialVersionUID = 1L; private Long productId; private String product; private BigDecimal price; public ProductPriceEntity() { } public ProductPriceEntity(Long productId, String product, BigDecimal price) { super(); this.productId = productId; this.product = product; this.price = price; } public Long getProductId() { return productId; } public void setProductId(Long productId) { this.productId = productId; } public String getProduct() { return product; } public void setProduct(String product) { this.product = product; } public BigDecimal getPrice() { return price; } public void setPrice(BigDecimal price) { this.price = price; } @Override public String toString() { return "ProductPriceEntity [productId=" + productId + ", product=" + product + ", price=" + price + "]"; } }