zoukankan      html  css  js  c++  java
  • springboot整合kafka

    kafka官网:http://kafka.apache.org/

    一,先从官网,https://start.spring.io/,选择1.5的版本,下载到本地,在导入MyEclipse。

    如图:

    二,在pom.xml导入需要的jar包,完整文件如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.21.RELEASE</version>
            <relativePath /> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>springboot-kafka</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>springboot-kafka</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <!-- web支持 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <optional>true</optional> <!-- 这个需要为 true 热部署才有效 -->
            </dependency>
            <!-- 部署tomcat,排除冲突 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-tomcat</artifactId>
                <scope>provided</scope>
            </dependency>
            <!-- servlet依赖 -->
            <dependency>
                <groupId>javax.servlet</groupId>
                <artifactId>javax.servlet-api</artifactId>
            </dependency>
            <dependency>
                <groupId>javax.servlet</groupId>
                <artifactId>jstl</artifactId>
            </dependency>
            <!-- tomcat的支持 -->
            <dependency>
                <groupId>org.apache.tomcat.embed</groupId>
                <artifactId>tomcat-embed-jasper</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>

    三、安装zookeeper和kafka,并启动。

    zookeeper参考:https://www.jianshu.com/p/5491d16e6abd

      进入其文件位置zookeeper-3.4.10/bin

        在终端运行 :./zkServer.sh start

    kafka参考:http://kafka.apache.org/quickstart

    进入kafka文件位置后,这里是kafka_2.12-2.2.0。按以下运行

    打开kafka: bin/kafka-server-start.sh config/server.properties

    创建topic:bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    查看topic:bin/kafka-topics.sh --list --bootstrap-server localhost:9092
     
    四:创建producer和consumer程序;
    /**
     * 测试kafka生产者
     */
    @RestController
    @RequestMapping("kafka")
    public class TestKafkaProducerController {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @RequestMapping("send")
        public String send() {
            for (int i = 0; i < 5; i++) {
                try {
                    kafkaTemplate.send("test", i + "");
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            return "success";
        }
    
    }
    /**
     * kafka消费者测试
     */
    @Component
    public class TestConsumer {
    
        @KafkaListener(topics = "test")
        public void listen(ConsumerRecord<?, ?> record) throws Exception {
            System.out.printf("topic = %s, offset = %d, value = %s 
    ", record.topic(), record.offset(), record.value());
        }
    }
    @SpringBootApplication
    public class SpringbootKafkaApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringbootKafkaApplication.class, args);
        }
    
    }

    五,运行SpringbootKafkaApplication程序。

    在终端运行:http://localhost:8080/kafka/send

    控制台输出:

    topic = test, offset = 43, value = 0 
    topic = test, offset = 44, value = 1 
    topic = test, offset = 45, value = 2 
    topic = test, offset = 46, value = 3 
    topic = test, offset = 47, value = 4 
  • 相关阅读:
    random,time,os
    内置函数
    迭代器,生成器,开放封闭原则,列表推导式
    函数的有用信息
    装饰器
    动态参数,作用域,闭包
    初始函数def
    python之文件操作
    “SLR”指人造卫星激光测距,“VLBI”指甚长基线干涉测量。
    解压软件使用方法
  • 原文地址:https://www.cnblogs.com/luoa/p/10933686.html
Copyright © 2011-2022 走看看