zoukankan      html  css  js  c++  java
  • spring mvc + kafka实战

    1. 下载zk
      • 地址: http://mirrors.hust.edu.cn/apache/zookeeper/stable/
      • 解压  tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz 
      • cd apache-zookeeper-3.5.8-bin/
      •  mv conf/ zoo_sample.cfg  conf/zoo.cfg

      •  vim zoo.cfg, 配置日志:
        1. tickTime: zookeeper中使用的基本时间单位, 毫秒
        2. dataDir: 内存数据快照的保存目录;如果没有自定义Log也使用该目录
        3. clientPort: 监听Cli连接的端口号
    2. 下载 kafa : http://kafka.apache.org/downloads.html

           a) 解压:tar -zxvf kafka_2.12-2.5.0.tgz  

           b)   修改 kafka_2.12-2.5.0/config/server.properties

             

          c) 修改 zookeeper.properties

             

         d) 启动kafka: sh bin/kafka-server-start.sh ./config/server.properties 

           

         e) 启动zk ,启动server : ./zkServer.sh start 

          

         d)创建kafak topic : sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

          

          检验是否创建好了topic: 

    ./kafka-topics.sh --list --zookeeper localhost:2181

           

    3.  sping mvc 项目

        a) 配置pom.xml文件

      

     <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>1.1.0.RELEASE</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka-clients</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.12</artifactId>
                <version>1.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>1.1.0</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>dubbo</artifactId>
                <version>2.5.3</version>
                <exclusions>
                    <exclusion>
                        <artifactId>spring</artifactId>
                        <groupId>org.springframework</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>com.101tec</groupId>
    
                <artifactId>zkclient</artifactId>
                <version>0.10</version>
            </dependency>

        b) 创建kafka-producer.xml文件

      

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
        <!--基本配置 -->
        <bean id="producerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <!-- kafka服务地址,可能是集群-->
                    <entry key="bootstrap.servers" value="localhost:9092,localhost:9093,localhost:9094" />
                    <!-- 有可能导致broker接收到重复的消息,默认值为3-->
                    <entry key="retries" value="10" />
                    <!-- 每次批量发送消息的数量-->
                    <entry key="batch.size" value="1638" />
                    <!-- 默认0ms,在异步IO线程被触发后(任何一个topic,partition满都可以触发)-->
                    <entry key="linger.ms" value="1" />
                    <!--producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常 -->
                    <entry key="buffer.memory" value="33554432 " />
                    <!-- producer需要server接收到数据之后发出的确认接收的信号,此项配置就是指procuder需要多少个这样的确认信号-->
                    <entry key="acks" value="all" />
                    <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
                    <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
                </map>
            </constructor-arg>
        </bean>
    
        <!-- 创建kafkatemplate需要使用的producerfactory bean -->
        <bean id="producerFactory"
              class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <ref bean="producerProperties" />
            </constructor-arg>
        </bean>
    
        <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
        <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
            <constructor-arg ref="producerFactory" />
            <!--设置对应topic-->
            <property name="defaultTopic" value="test" />
        </bean>
    </beans>

      c) 创建 kafka-consumer.xml 文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
        <bean id="consumerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <!--Kafka服务地址 -->
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    <!--Consumer的组ID,相同goup.id的consumer属于同一个组。 -->
                    <entry key="group.id" value="order-beta" />
                    <!--如果此值设置为true,consumer会周期性的把当前消费的offset值保存到zookeeper。当consumer失败重启之后将会使用此值作为新开始消费的值。 -->
                    <entry key="enable.auto.commit" value="true" />
                    <!--网络请求的socket超时时间。实际超时时间由max.fetch.wait + socket.timeout.ms 确定 -->
                    <entry key="session.timeout.ms" value="15000 " />
                    <entry key="key.deserializer"
                           value="org.apache.kafka.common.serialization.StringDeserializer" />
                    <entry key="value.deserializer"
                           value="org.apache.kafka.common.serialization.StringDeserializer" />
                </map>
            </constructor-arg>
        </bean>
    
        <!--指定具体监听类的bean -->
        <bean id="messageListernerConsumerService" class="test.ke.kafka.KafkaConsumerListener" />
    
        <!-- 创建consumerFactory bean -->
        <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <ref bean="consumerProperties"/>
            </constructor-arg>
        </bean>
    
        <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg value="test"/>
            <property name="messageListener" ref="messageListernerConsumerService"/>
        </bean>
    
        <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
            <constructor-arg ref="consumerFactory"/>
            <constructor-arg ref="containerProperties"/>
        </bean>
    </beans>

       d) 配置spring-mvc文件

       

      <!-- 引入kafka配置文件,根据个人文件位置-->
        <import resource="classpath:kafka-producer.xml"/>
        <import resource="classpath:kafka-consumer.xml"/>
    <!-- 使用zookeeper注册中心暴露服务地址 -->
        <dubbo:application name="test-provider"/>
        <dubbo:registry protocol="zookeeper" address="zookeeper://127.0.0.1:2181"/>

    d) 写接口

     @Resource
        private KafkaTemplate<Integer, String> kafkaTemplate;
    
    @RequestMapping(value = "/hello.do")
        @ResponseBody
        public void hello(){
    
            kafkaTemplate.sendDefault("test it");
    
        }

    消息消费

    package test.ke.kafka;
    
    import lombok.SneakyThrows;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.listener.MessageListener;
    
    public class KafkaConsumerListener implements MessageListener<Integer, String> {
        @SneakyThrows
        @Override
        public void onMessage(ConsumerRecord<Integer, String> consumerRecord) {
    
            Object o = consumerRecord.value();
            System.out.println(String.valueOf(o));
        }
    }

    此时,起服务在浏览器调用 http://localhost:18080/hello.do 则在控制台会打印出

    ---------------------

    要简单验证kafka是否运行正常,则可以在kafka的bin目录下执行:./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test 

    然后新起一个terminal, 执行下面的语句,producer发送文案,则上面的consumer就会拿到对应的结果

     ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

     同时由于我们刚刚起的spring服务监听的端口号也是9092, 因此服务也会获取到该消息:

  • 相关阅读:
    记账本开发记录——第十三天(2020.1.31)
    《构建之法——现代软件工程》读书笔记(二)
    记账本开发记录——第十二天(2020.1.30)
    记账本开发记录——第十一天(2020.1.29)
    记账本开发记录——第十天(2020.1.28)
    记账本开发记录——第九天(2020.1.27)
    记账本开发记录——第八天(2020.1.26)
    记账本开发记录——第七天(2020.1.24)
    记账本开发记录——第六天(2020.1.23)
    记账本开发记录——第五天(2020.1.22)
  • 原文地址:https://www.cnblogs.com/leavescy/p/13274944.html
Copyright © 2011-2022 走看看