zoukankan      html  css  js  c++  java
  • 这事没完,继续聊spring cloud stream和kafka的这些小事

    上一篇文章讲了如何用spring cloud stream集成kafka,并且跑起来一个demo,如果这一次宣传spring cloud stream的文章,其实到这里就可以啦。但实际上,工程永远不是简单的技术会还是不会的问题,在实际的开发中,我们会遇到很多的细节问题(简称坑),这篇文章,会把其中一些很小的点说一下,算是用实例告诉大家,工程的复杂性,往往体现在实际的繁琐步骤中。

    1、group的配置

    在发送消息的配置里面,group是不用配置的

    关于这一点的证明,可以在源代码的注释里面看到

    org.springframework.cloud.stream.config.BindingProperties

    2、修改topic的partitions

    配置文件如下

    bindings:
            output:
              binder: kafka
              destination: wph-d-2 #消息发往的目的地,对应topic
              content-type: text/plain #消息的格式
              producer:  
                partitionCount: 7

    partitionCount是用来设置partition的数量,默认是1,如果这个topic已经建了,修改partitionCount无效,会提示错误

    Caused by: org.springframework.cloud.stream.provisioning.ProvisioningException: The number of expected partitions was: 7, but 5 have been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`
    	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:384) ~[spring-cloud-stream-binder-kafka-core-3.0.0.M4.jar:3.0.0.M4]
    	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:325) ~[spring-cloud-stream-binder-kafka-core-3.0.0.M4.jar:3.0.0.M4]
    	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:302) ~[spring-cloud-stream-binder-kafka-core-3.0.0.M4.jar:3.0.0.M4]
    	... 14 common frames omitted

    根据错误的提示,添加autoAddPartitions

    kafka: 
            binder:
              brokers: #Kafka的消息中间件服务器地址
              - localhost:9092
              autoAddPartitions: true

    再次启动就可以看到partitions数已经改了

    autoAddPartitions属性对应的类是org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties

    设置partitionCount属性的类是org.springframework.cloud.stream.binder.ProducerProperties

    3、发送json报错

    用postman发送sendMessage/complexType报错

    在服务器端的报错内容是:

    Resolved [org.springframework.web.HttpMediaTypeNotSupportedException: Content type 'text/plain;charset=UTF-8' not supported]

    原因是数据传输格式传输错误,要改一下postman发送数据的格式

    然后就能happy的发出去了

    4、正确的发送json并转换成对象

    如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json,后面会讲)

    bindings:
            output:
              binder: kafka
              destination: wph-d-2 #消息发往的目的地,对应topic
              content-type: application/json #消息的格式

    然后通过producer发送这个消息

    @RequestMapping(value = "/sendMessage/complexType", method = RequestMethod.POST)
    	public String publishMessageComplextType(@RequestBody ChatMessage payload) {
    		logger.info(payload.toString());
    		producer.getMysource().output().send(MessageBuilder.withPayload(payload).setHeader("type", "chatMessage").build());
    		return "success";
    	}

    这里需要注意的一点是ChatMessage的field name必须要有getter和settr方法,两者有一就可以了,否则json转换成对象的时候,field name收不到值。

    在订阅消息的时候,application.yml里面content-type可以不用配置,这个值默认就是“application/json”,这一点可以在org.springframework.cloud.stream.config.BindingProperties类的注释里面看到

    和上面一样,ChatMessage的field name需要有getter或者setter的方法,二者之一就行。

    接收json并转换成类的方法如下:

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='chatMessage'")
    	public void handle(ChatMessage message) {
    		logger.info(message.toString());
    }

    有坑警告:如果我们把发送消息端的content-type设置成text/plain,消息订阅端的content-type设置成application/json,就会在消息订阅端报这个错误

    Caused by: java.lang.IllegalStateException: argument type mismatch
    Endpoint [com.wphmoon.kscsclient.Consumer]

    如果颠倒过来的话,发送消息端的content-type设置成application/json,消息订阅端设置成text/plain,我实际测试过,是可以收到消息,并且能转换成ChatMessage对象,没有问题。

    源代码

  • 相关阅读:
    利用DTrace实时检测MySQl
    改进MySQL Order By Rand()的低效率
    RDS for MySQL查询缓存 (Query Cache) 的设置和使用
    RDS For MySQL 字符集相关说明
    RDS for MySQL 通过 mysqlbinlog 查看 binlog 乱码
    RDS for MySQL Mysqldump 常见问题和处理
    RDS for MySQL Online DDL 使用
    RDS MySQL 表上 Metadata lock 的产生和处理
    RDS for MySQL 如何使用 Percona Toolkit
    北京已成为投融资诈骗重灾区:存好骗子公司黑名单,谨防上当!
  • 原文地址:https://www.cnblogs.com/wphmoon/p/11836523.html
Copyright © 2011-2022 走看看