zoukankan      html  css  js  c++  java
  • Kafka安装之三 spring-kafka实践

    一、spring-kafka配置详解

     1.1 要是用spring-kafka 我们首先要在pom要。xml中引入spring-kafka包

        <dependencies>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-websocket</artifactId>
            </dependency>
        </dependencies>

    1.2  配置文件application.properties 中对kafka进行配置

    spring.kafka.bootstrap-servers=172.21.13.26:9092
    spring.kafka.consumer.auto-offset-reset=latest
    spring.kafka.consumer.group-id=local_test
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

    二、spring-kafka生产者源码

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    /**
     * Created by wuxiaowei on 2018/1/10 0010.
     */
    @Component
    public class KafkaSender {
    
      public KafkaSender() {
      }
    
      @Autowired
      private KafkaTemplate kafkaTemplate;
    
      public void sendMessage(String topic, String message) {
    
        kafkaTemplate.send(topic, message);
      }
    }

    三、spring-kafka消费者源码

    使用监听@KafkaListener监听消息,并通过WebSocket发送消息到页面

    import com.saas.mq.controller.WebSocketMessage;
    import com.saas.mq.model.Body;
    import com.saas.mq.model.MessageBody;
    import java.io.IOException;
    
    /**
     * Created by wuxiaowei on 2018/1/11 0011.
     */
    public class SendMessage extends Thread {
    
      private String message;
    
      public SendMessage(String message) {
        this.message = message;
      }
    
      @Override
      public void run() {
        MessageBody messageBody = JSONObject.parseObject(this.message, MessageBody.class);
    
        Body body = messageBody.getResults().stream().findFirst().get();
        String resut = body.getV();
        for (WebSocketMessage item : WebSocketMessage.webSocketSet) {
          try {
            item.sendMessage("receive:" + resut);
          } catch (IOException e) {
            e.printStackTrace();
            continue;
          }
        }
      }
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    
    /**
     * Created by wuxiaowei on 2017/10/25 0025.
     */
    @Component
    public class KafkaConsumer {
    
      @KafkaListener(topics = "test")
      public void processMessage(String content) {
        SendMessage message = new SendMessage(content);
        message.start();
      }
    }
  • 相关阅读:
    Range
    cache
    从头到尾彻底解析Hash 表算法
    教你如何迅速秒杀掉:99%的海量数据处理面试题
    秒杀抢购思路以及高并发下数据安全
    Nginx+Tomcat负载均衡
    强大的Spring缓存技术(上)
    强大的Spring缓存技术(中)
    强大的Spring缓存技术(下)
    (转)C#中的 break 与continue 的使用和注意
  • 原文地址:https://www.cnblogs.com/wuwei928/p/9046584.html
Copyright © 2011-2022 走看看