zoukankan      html  css  js  c++  java
  • SpringBoot实战(十四)之整合KafKa

     本人今天上午参考了不少博文,发现不少博文不是特别好,不是因为依赖冲突问题就是因为版本问题。

    于是我结合相关的博文和案例,自己改写了下并参考了下,于是就有了这篇文章。希望能够给大家帮助,少走一些弯路。

    一、KafKa的介绍

    1.主要功能

    根据官网的介绍,ApacheKafka®是一个分布式流媒体平台,它主要有3种功能:

      a.发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因。

      b.以容错的方式记录消息流,kafka以文件的方式来存储消息流。

      c.可以再消息发布的时候进行处理。

    2.使用场景

    a.在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能。

    b.构建实时的流数据处理程序来变换或处理数据流,数据处理功能。

    3.详细介绍

     Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下面简单介绍一下kafka的基本机制

    消息传输过程:

    Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。

    Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息

    Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。

    二、安装

    安装包下载地址:http://kafka.apache.org/downloads

    找到0.11.0.1版本,如图:

    1.下载

    wget https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz

    2.解压

    tar -xzvf kafka_2.11-0.11.0.1.tgz

    配置说明:

        consumer.properites 消费者配置,这个配置文件用于配置开启的消费者,此处我们使用默认的即可。

        producer.properties 生产者配置,这个配置文件用于配置开启的生产者,此处我们使用默认的即可。

      server.properties kafka服务器的配置,此配置文件用来配置kafka服务器,目前仅介绍几个最基础的配置。

           a.broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,我们这里采用默认配置即可。

           b.listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,

    例如:listeners=PLAINTEXT:// 192.168.126.143:9092。并确保服务器的9092端口能够访问。

      c.zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,

    使用默认配置即可,zookeeper.connect=localhost:2181。

    3.运行

    首先运行zookeeper

    bin/zookeeper-server-start.sh config/zookeeper.properties

    运行成功,显示如图:

    然后运行kafka

    bin/kafka-server-start.sh config/server.properties

     运行成功,显示如图:

    三、整合KafKa

    1.新建Maven项目导入Maven依赖

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>cn.test</groupId>
      <artifactId>kafka_demo</artifactId>
      <version>0.0.1-SNAPSHOT</version>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.9.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>1.1.1.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>2.8.2</version>
            </dependency>
    
        </dependencies>
     
        
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
                
       <!-- 指定编译版本 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
       
            
        
                
        
            
            <finalName>${project.artifactId}</finalName>
                
    
        </build>
    
       
    </project>

    2.编写消息实体

    package com.springboot.kafka.bean;
    
    
    import java.util.Date;
    
    import lombok.Data;
     
    
    
    @Data
    public class Message {
        private Long id;    //id
    
        private String msg; //消息
    
        private Date sendTime;  //时间戳
    
    }

     有了lombok,每次编写实体不必要使用快捷键生成seter或geter方法了,代码看起来更加简洁了。

    3.编写消息发送者(可以理解为生产者,最好联系详细介绍中的图)

    package com.springboot.kafka.producer;
    
    import java.util.Date;
    import java.util.UUID;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    import com.springboot.kafka.bean.Message;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Component
    @Slf4j
    public class KafkaSender {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        private Gson gson = new GsonBuilder().create();
    
        //发送消息方法
        public void send() {
            Message message = new Message();
            message.setId(System.currentTimeMillis());
            message.setMsg(UUID.randomUUID().toString());
            message.setSendTime(new Date());
            log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));
            kafkaTemplate.send("zhisheng", gson.toJson(message));
        }
    }

    4.编写消息接收者(可以理解为消费者)

    package com.springboot.kafka.producer;
    
    import java.util.Date;
    import java.util.UUID;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    import com.springboot.kafka.bean.Message;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Component
    @Slf4j
    public class KafkaSender {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        private Gson gson = new GsonBuilder().create();
    
        //发送消息方法
        public void send() {
            Message message = new Message();
            message.setId(System.currentTimeMillis());
            message.setMsg(UUID.randomUUID().toString());
            message.setSendTime(new Date());
            log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));
            kafkaTemplate.send("zhisheng", gson.toJson(message));
        }
    }

    5.编写启动类

    package com.springboot.kafka;
     
    
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    
    import com.springboot.kafka.producer.KafkaSender;
    
    
    @SpringBootApplication
    public class KafkaApplication {
    
        public static void main(String[] args) {
    
            ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);
    
            KafkaSender sender = context.getBean(KafkaSender.class);
    
            for (int i = 0; i < 3; i++) {
                //调用消息发送类中的消息发送方法
                sender.send();
    
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    6.编写application.properties配置文件

    #============== kafka ===================
    # u6307u5B9Akafka u4EE3u7406u5730u5740uFF0Cu53EFu4EE5u591Au4E2A
    spring.kafka.bootstrap-servers=192.168.126.143:9092
    
    #=============== provider  =======================
    
    spring.kafka.producer.retries=0
    # u6BCFu6B21u6279u91CFu53D1u9001u6D88u606Fu7684u6570u91CF
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.buffer-memory=33554432
    
    # u6307u5B9Au6D88u606Fkeyu548Cu6D88u606Fu4F53u7684u7F16u89E3u7801u65B9u5F0F
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
    #=============== consumer  =======================
    # u6307u5B9Au9ED8u8BA4u6D88u8D39u8005group id
    spring.kafka.consumer.group-id=test-consumer-group
    
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=100
    
    # u6307u5B9Au6D88u606Fkeyu548Cu6D88u606Fu4F53u7684u7F16u89E3u7801u65B9u5F0F
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

    7.运行结果

    示例代码地址:https://github.com/youcong1996/study_simple_demo/tree/kafka_demo

    如果按照上述流程没有达到预计的效果可以git clone到本地。

  • 相关阅读:
    【杂谈】压行技巧(压代码)
    【UVA】11464 Even Parity(枚举子集)
    【POJ】2373 Dividing the Path(单调队列优化dp)
    【POJ】2329 Nearest number
    【BZOJ】1833: [ZJOI2010] count 数字计数(数位dp)
    【BZOJ】2809: [Apio2012]dispatching(左偏树)
    【BZOJ】2342: [Shoi2011]双倍回文(Manacher)
    【BZOJ】1912: [Apio2010]patrol 巡逻(树的直径)
    【BZOJ】1911: [Apio2010]特别行动队(斜率优化dp)
    【BZOJ】1913: [Apio2010]signaling 信号覆盖(计算几何+计数)
  • 原文地址:https://www.cnblogs.com/youcong/p/10216573.html
Copyright © 2011-2022 走看看