zoukankan      html  css  js  c++  java
  • Kafka安装之三 spring-kafka实践

    一、spring-kafka配置详解

     1.1 要是用spring-kafka 我们首先要在pom要。xml中引入spring-kafka包

        <dependencies>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-websocket</artifactId>
            </dependency>
        </dependencies>

    1.2  配置文件application.properties 中对kafka进行配置

    spring.kafka.bootstrap-servers=172.21.13.26:9092
    spring.kafka.consumer.auto-offset-reset=latest
    spring.kafka.consumer.group-id=local_test
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

    二、spring-kafka生产者源码

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    /**
     * Created by wuxiaowei on 2018/1/10 0010.
     */
    @Component
    public class KafkaSender {
    
      public KafkaSender() {
      }
    
      @Autowired
      private KafkaTemplate kafkaTemplate;
    
      public void sendMessage(String topic, String message) {
    
        kafkaTemplate.send(topic, message);
      }
    }

    三、spring-kafka消费者源码

    使用监听@KafkaListener监听消息,并通过WebSocket发送消息到页面

    import com.saas.mq.controller.WebSocketMessage;
    import com.saas.mq.model.Body;
    import com.saas.mq.model.MessageBody;
    import java.io.IOException;
    
    /**
     * Created by wuxiaowei on 2018/1/11 0011.
     */
    public class SendMessage extends Thread {
    
      private String message;
    
      public SendMessage(String message) {
        this.message = message;
      }
    
      @Override
      public void run() {
        MessageBody messageBody = JSONObject.parseObject(this.message, MessageBody.class);
    
        Body body = messageBody.getResults().stream().findFirst().get();
        String resut = body.getV();
        for (WebSocketMessage item : WebSocketMessage.webSocketSet) {
          try {
            item.sendMessage("receive:" + resut);
          } catch (IOException e) {
            e.printStackTrace();
            continue;
          }
        }
      }
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    
    /**
     * Created by wuxiaowei on 2017/10/25 0025.
     */
    @Component
    public class KafkaConsumer {
    
      @KafkaListener(topics = "test")
      public void processMessage(String content) {
        SendMessage message = new SendMessage(content);
        message.start();
      }
    }
  • 相关阅读:
    View Programming Guide for iOS ---- iOS 视图编程指南(四)---Views
    Java设计模式---原型模式(Prototype)
    Java设计模式----建造者模式(Builder)
    Java设计模式----单例模式(Singleton)
    Java设计模式---工厂方法模式(Factory-Method)
    Hibernate之基于外键映射的一对一(1-1)关联关系
    Hibernate关联关系之双向1—n
    修改Map中确定key对应的value问题
    Hibernate关联关系之——单向n-1
    Hibernate之Session对象的相关方法以及持久化对象的状态
  • 原文地址:https://www.cnblogs.com/wuwei928/p/9046584.html
Copyright © 2011-2022 走看看