zoukankan      html  css  js  c++  java
  • 【Kafka】Windows环境配置测试

    一、配置

    1、Java配置:JAVA_HOME路径不要有空格

    2、下载/kafka_2.11-1.1.0,地址是https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka_2.11-1.1.0.tgz,并解压缩至C:1000_Study3000_Javakafka_2.11-1.1.0

    3、配置kafka的path环境

    二、命令行测试

    cd C:1000_Study3000_Javakafka_2.11-1.1.0


    binwindowszookeeper-server-start.bat configzookeeper.properties


    binwindowskafka-server-start.bat configserver.properties


    binwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


    binwindowskafka-topics.bat --list --zookeeper localhost:2181


    binwindowskafka-console-producer.bat --broker-list localhost:9092 --topic demo


    binwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo --from-beginning

    三、开发环境

    1、topic创建

    package kafkatest;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.CreateTopicsResult;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.TopicPartition;
    
    public class createtopic {
    
    	public static void main(String[] args) {
    		//创建topic
    	    Properties props = new Properties();
    	    props.put("bootstrap.servers", "localhost:9092");
    	    AdminClient adminClient = AdminClient.create(props);
    	    ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
    	    NewTopic newTopic = new NewTopic("demo", 1, (short) 1);
    	    topics.add(newTopic);
    	    CreateTopicsResult result = adminClient.createTopics(topics);
    	    try {
    	        result.all().get();
    	    } catch (InterruptedException e) {
    	        e.printStackTrace();
    	    } catch (ExecutionException e) {
    	        e.printStackTrace();
    	    }
    	}
    
    }
    

      

    2、消息生产

    package kafkatest;
    
    import java.util.ArrayList;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.CreateTopicsResult;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class msgproducer {
    
        public static void main(String[] args) {
    
            
            
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            for (int i = 0; i < 100; i++)
                producer.send(new ProducerRecord<String, String>("demo", Integer.toString(i), Integer.toString(i)));
    
            producer.close();
     
        }
    
    }

    3、消息消费

    package kafkatest;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.CreateTopicsResult;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.TopicPartition;
    
    public class msgconsumer {
    
        public static void main(String[] args) {
             Properties props = new Properties();
                props.put("bootstrap.servers", "localhost:9092");
                props.put("group.id", "test");
                props.put("enable.auto.commit", "true");
                props.put("auto.commit.interval.ms", "1000");
                props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                final KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
                consumer.subscribe(Arrays.asList("demo"),new ConsumerRebalanceListener() {
             
                    @Override
                    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                        //将偏移设置到最开始
                        consumer.seekToBeginning(collection);
                        
                    }
                    @Override
                    public void onPartitionsRevoked(Collection<TopicPartition> arg0) {
                        // TODO Auto-generated method stub
                        
                    }
                });
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (ConsumerRecord<String, String> record : records)
                        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
        }
    
    }

    四、参考资料

    https://blog.csdn.net/woshixiazaizhe/article/details/80610432

    https://blog.csdn.net/chinagissoft/article/details/50954401

    https://blog.csdn.net/lingbo229/article/details/80761778

  • 相关阅读:
    PostgerSQL 回收表空间,查看占用磁盘大小
    为 Docker 添加阿里云的镜像地址
    Docker 常用命令
    CentOS 7 安装 Docker
    kafka-常用脚本2
    Nginx 端口被占用(0.0.0.0:443 failed (98: Address already in use))
    nginx: [error] open() "/var/run/nginx.pid" failed (2: No such file or directory)
    检查Nginx 配置文件出否有问题
    Python2 安装虚拟环境
    记录 | 程序员技术博客平台推荐和选取
  • 原文地址:https://www.cnblogs.com/defineconst/p/11615282.html
Copyright © 2011-2022 走看看