zoukankan      html  css  js  c++  java
  • 这事没完,继续聊spring cloud stream和kafka的这些小事

    上一篇文章讲了如何用spring cloud stream集成kafka,并且跑起来一个demo,如果这一次宣传spring cloud stream的文章,其实到这里就可以啦。但实际上,工程永远不是简单的技术会还是不会的问题,在实际的开发中,我们会遇到很多的细节问题(简称坑),这篇文章,会把其中一些很小的点说一下,算是用实例告诉大家,工程的复杂性,往往体现在实际的繁琐步骤中。

    1、group的配置

    在发送消息的配置里面,group是不用配置的

    关于这一点的证明,可以在源代码的注释里面看到

    org.springframework.cloud.stream.config.BindingProperties

    2、修改topic的partitions

    配置文件如下

    bindings:
            output:
              binder: kafka
              destination: wph-d-2 #消息发往的目的地,对应topic
              content-type: text/plain #消息的格式
              producer:  
                partitionCount: 7

    partitionCount是用来设置partition的数量,默认是1,如果这个topic已经建了,修改partitionCount无效,会提示错误

    Caused by: org.springframework.cloud.stream.provisioning.ProvisioningException: The number of expected partitions was: 7, but 5 have been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`
    	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:384) ~[spring-cloud-stream-binder-kafka-core-3.0.0.M4.jar:3.0.0.M4]
    	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:325) ~[spring-cloud-stream-binder-kafka-core-3.0.0.M4.jar:3.0.0.M4]
    	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:302) ~[spring-cloud-stream-binder-kafka-core-3.0.0.M4.jar:3.0.0.M4]
    	... 14 common frames omitted

    根据错误的提示,添加autoAddPartitions

    kafka: 
            binder:
              brokers: #Kafka的消息中间件服务器地址
              - localhost:9092
              autoAddPartitions: true

    再次启动就可以看到partitions数已经改了

    autoAddPartitions属性对应的类是org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties

    设置partitionCount属性的类是org.springframework.cloud.stream.binder.ProducerProperties

    3、发送json报错

    用postman发送sendMessage/complexType报错

    在服务器端的报错内容是:

    Resolved [org.springframework.web.HttpMediaTypeNotSupportedException: Content type 'text/plain;charset=UTF-8' not supported]

    原因是数据传输格式传输错误,要改一下postman发送数据的格式

    然后就能happy的发出去了

    4、正确的发送json并转换成对象

    如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json,后面会讲)

    bindings:
            output:
              binder: kafka
              destination: wph-d-2 #消息发往的目的地,对应topic
              content-type: application/json #消息的格式

    然后通过producer发送这个消息

    @RequestMapping(value = "/sendMessage/complexType", method = RequestMethod.POST)
    	public String publishMessageComplextType(@RequestBody ChatMessage payload) {
    		logger.info(payload.toString());
    		producer.getMysource().output().send(MessageBuilder.withPayload(payload).setHeader("type", "chatMessage").build());
    		return "success";
    	}

    这里需要注意的一点是ChatMessage的field name必须要有getter和settr方法,两者有一就可以了,否则json转换成对象的时候,field name收不到值。

    在订阅消息的时候,application.yml里面content-type可以不用配置,这个值默认就是“application/json”,这一点可以在org.springframework.cloud.stream.config.BindingProperties类的注释里面看到

    和上面一样,ChatMessage的field name需要有getter或者setter的方法,二者之一就行。

    接收json并转换成类的方法如下:

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='chatMessage'")
    	public void handle(ChatMessage message) {
    		logger.info(message.toString());
    }

    有坑警告:如果我们把发送消息端的content-type设置成text/plain,消息订阅端的content-type设置成application/json,就会在消息订阅端报这个错误

    Caused by: java.lang.IllegalStateException: argument type mismatch
    Endpoint [com.wphmoon.kscsclient.Consumer]

    如果颠倒过来的话,发送消息端的content-type设置成application/json,消息订阅端设置成text/plain,我实际测试过,是可以收到消息,并且能转换成ChatMessage对象,没有问题。

    源代码

  • 相关阅读:
    机器学习
    机器学习
    JavaWeb之tomcat安装、配置与使用(一)
    Tomcat安装、配置和部署笔记
    Java配置----JDK开发环境搭建及环境变量配置
    安装SQL2012
    SQLServer 数据库变成单个用户后无法访问问题的解决方法
    临时记录
    SQL Server 动态生成数据库所有表Insert语句
    SQL2000查看表的大小
  • 原文地址:https://www.cnblogs.com/wphmoon/p/11836523.html
Copyright © 2011-2022 走看看