zoukankan      html  css  js  c++  java
  • spring整合kafak(一)

    一.环境要求

    1.操作系统:win10

    2.项目情况:spring+struts1+mybatis (老项目,非maven项目)

    3.kafka_2.12-2.7.0 【新版本的kafka已经内置了zookeeper,因此不需要单独下载zookeeper】

    下载kafka_2.12-2.7.0已经在云盘分享,也可以去官网自行下载(http://kafka.apache.org/downloads)

    链接:https://pan.baidu.com/s/1atZaQ8kkAZrnsJJnlc5IWQ
    提取码:c57f
    复制这段内容后打开百度网盘手机App,操作更方便哦

    二.整合操作

    1.导入依赖,因为是非maven项目,所以要导入jar包(kafka_2.12-2.7.0.jar,kafka-clients-1.0.1.jar,kafka-raft-2.7.0.jar)

    jar包下载地址已在云盘分享

    链接:https://pan.baidu.com/s/15WU3d9wpjURfy5JtSo8JwQ
    提取码:orz2
    复制这段内容后打开百度网盘手机App,操作更方便哦

    如果是maven项目,可以直接在pom.xml中添加依赖

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.7.0</version>
    </dependency>

    注意:导入的依赖或jar包版本与你下载的kafka版本保持一致

    2.创建生产者

    package org.kafka;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import java.util.Properties;
    
    /**
     * kafka 创建生产者
     * fjt
     * 20210325
     */
    public class KafkaProduce {
        private static Properties properties;
        private static Log log = LogFactory.getLog(KafkaProduce.class);
        public KafkaProduce(String url){
            properties = new Properties();
            properties.put("bootstrap.servers", url);
            properties.put("producer.type", "sync");
            properties.put("request.required.acks", "1");
            properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("bak.partitioner.class", "kafka.producer.DefaultPartitioner");
            properties.put("bak.key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("bak.value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        }
    
        /**
         * @Title: sendMessage
         * @Description: 生产消息
         */
        public void sendMessage(String topic, String key, String value) {
            // 实例化produce
            KafkaProducer<String, String> kp = new KafkaProducer<String, String>(properties);
    
            // 消息封装
            ProducerRecord<String, String> pr = new ProducerRecord<String, String>(topic, key, value);
    
            // 发送数据
            kp.send(pr, new Callback() {
                // 回调函数
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (null != exception) {
                        log.error("记录的offset在:" + metadata.offset() + exception.getMessage());
                    }
                }
            });
    
            // 关闭produce
            kp.close();
        }
    }

    3.创建消费者

    package org.kafka;
    
    import com.gexin.fastjson.JSONObject;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Collections;
    import java.util.Properties;
    
    /**
     * 创建消费者
     * fjt
     * 20210325
     */
    public class KafkaConsume {
        private static Properties properties;
        private long SIZE = 100;
        KafkaConsumer<String, String> consumer;
        private static Log log = LogFactory.getLog(KafkaConsume.class);
    
        public KafkaConsume(String url){
            properties = new Properties();
            properties.put("bootstrap.servers", url);
            properties.put("zookeeper.connect", url);
            properties.put("group.id", "kafkaDemo");
            properties.put("zookeeper.session.timeout.ms", "4000");
            properties.put("zookeeper.sync.time.ms", "200");
            properties.put("auto.commit.interval.ms", "1000");
            properties.put("auto.offset.reset", "earliest");
            properties.put("serializer.class", "kafka.serializer.StringEncoder");
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        }
    
        /**
         * @Title: getMessage
         * @Description: 消费一个消息
         */
        public void getMessage(String topic) {
            consumer = new KafkaConsumer<String, String>(properties);
            // 订阅主题
            consumer.subscribe(Collections.singletonList(topic));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(SIZE);
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        System.out.println("消费者接收数据如下:");
                        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                        //消息数据为JSON格式的字符串,解析数据
                        JSONObject msgInfo = JSONObject.parseObject(record.value());
                        System.out.println(msgInfo.getString("deveui"));
                        System.out.println(msgInfo.getString("type"));
                        System.out.println(msgInfo.getString("happendtime"));
                        System.out.println(msgInfo.getString("msg"));
                    }catch ( Exception e){
                        log.error(e.getMessage());
                    }
                }
            }
        }
    
        public void closeConsumer() {
            consumer.close();
        }
    
    
    }

    4.调用生产者/消费者示例

    package org.kafka;
    
    /**
     * 生产者/消费者调用示例
     * fjt
     * 2021.03.25
     */
    public class KafkaDemo {
        public static void main( String[] args )
        {
            String url = "127.0.0.1:9092";
            String topic = "topic";
    
            //生产者
            KafkaProduce kp = new KafkaProduce(url);
            //生产一条消息
            kp.sendMessage(topic, "key", "{"deveui":"13579","type":"fault","happendtime":"2013-03-25 13:24:19","msg":"XXX位置XX设备发生故障,请及时处理"}");
    
            //消费者
            KafkaConsume kc = new KafkaConsume(url);
            //消费指定topic的数据
            kc.getMessage(topic);
        }
    }

    三.整合测试

    1.下载后的kafka_2.12-2.7.0.tgz解压到指定目录(建议不要放在中文目录下)

    2.kafka根目录下新建data和kafka-logs文件夹,后面要用到,作为kafka快照和日志的存储文件夹

     3.修改配置文件

    进入到config目录,

    修改server.properties里面log.dirs路径未log.dirs=D:\MyStudy\Kafka\kafka_2.12-2.7.0\kafka-logs

    修改zookeeper.properties里面dataDir路径为dataDir=D:\MyStudy\Kafka\kafka_2.12-2.7.0\data

    注意:文件夹分割符一定要是”\”

    4.启动kafka内置的zookeeper

    在D:MyStudyKafkakafka_2.12-2.7.0目录下 shift+右击 打开Powershell窗口 或 在目录中输入cmd回车 打开cmd窗口  

    输入命令:.inwindowszookeeper-server-start.bat  .configzookeeper.properties

    启动zookeeper后,不要关闭窗口,否则关闭zookeeper关闭

    5.启动kafka服务

    在D:MyStudyKafkakafka_2.12-2.7.0目录下 shift+右击 打开Powershell窗口 或 在目录中输入cmd回车 打开cmd窗口  

    输入命令:.inwindowskafka-server-start.bat .configserver.properties

    启动kafkar后,不要关闭窗口,否则关闭kafkar关闭

    6.kafka创建消息主题 myTopic

    在D:MyStudyKafkakafka_2.12-2.7.0目录下 shift+右击 打开Powershell窗口 或 在目录中输入cmd回车 打开cmd窗口  

    输入命令:.inwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myTopic

    创建kafka主题后,不要关闭窗口

    7.kafka创建生产者产生消息

    在D:MyStudyKafkakafka_2.12-2.7.0目录下 shift+右击 打开Powershell窗口 或 在目录中输入cmd回车 打开cmd窗口  

    输入命令:.inwindowskafka-console-producer.bat --broker-list localhost:9092 --topic myTopic

    创建kafka生产者后,不要关闭窗口

    8.kafka创建消费者接收消息

    在D:MyStudyKafkakafka_2.12-2.7.0目录下 shift+右击 打开Powershell窗口 或 在目录中输入cmd回车 打开cmd窗口  

    输入命令:.inwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic myTopic --from-beginning

    创建kafka消费者后,不要关闭窗口

    9.生产者生产数据,消费者自动接收

    10.通过以上步骤 ,可以测试本地的zookeeper及kafka服务启动没有问题,我们可以关闭掉主题窗口、生产者窗口、消费者窗口,保留zookeeper及kafka服务运行

    通过运行项目代码,测试代码的运行效果

    四.参考博文

    https://blog.csdn.net/u014088839/article/details/83150406

    https://blog.csdn.net/github_38482082/article/details/82112641

  • 相关阅读:
    WQS二分
    题解 洛谷 P4696 【[CEOI2011]Matching】
    原根
    单位根反演
    题解 洛谷 P4218 【[CTSC2010]珠宝商】
    题解 洛谷 P5434 【有标号荒漠计数】
    题解 洛谷 P5406 【[THUPC2019]找树】
    题解 洛谷 P3563 【[POI2013]POL-Polarization】
    题解 洛谷 P6078 【[CEOI2004]糖果】
    拉格朗日插值法
  • 原文地址:https://www.cnblogs.com/fujingtao5470/p/14581315.html
Copyright © 2011-2022 走看看