zoukankan      html  css  js  c++  java
  • kafka单机环境搭建及其基本使用

    kafka
    https://blog.csdn.net/dapeng1995/article/details/81536862
    https://blog.csdn.net/dapeng1995/article/details/86582158
    结构图:https://www.processon.com/view/link/5ddf296de4b0b2fab737f8bc

    参考链接:https://www.cnblogs.com/qpf1/p/9161742.html

     

     

      最近在搞kettle整合kafka producer插件,于是自己搭建了一套单机的kafka环境,以便用于测试。现整理如下的笔记,发上来和大家分享。后续还会有kafka的研究笔记,依然会与大家分享!

    1 kafka环境搭建

    1.1 kafka单机环境搭建

    (1)、解压kafka_2.11-1.1.0.tgz,得到“kafka_2.11-1.1.0”文件夹。

    (2)、kafka需要安装zookee使用,但kafka集成zookeeper,在单机搭建时可直接使用。使用需配置kafka_2.11-1.1.0/config 下的“zookeeper.properties”。

    (3)、配置“zookeeper.properties”。修改dataDir和clientPort。前者是快照存放地址(自己随意配置),后者是客户端连接zookeeper服务的端口。

    (4)、配置kafka_2.11-1.1.0/config下的“server.properties”,修改log.dirs和zookeeper.connect。前者是日志存放文件夹,后者是zookeeper连接地址(端口和clientPort保持一致)。

    到此,kafka的单机环境就搭建成功了。

    2 kafka的相关命令

    开启kafka自带zookeeper:

    前台运行:

    bin/zookeeper-server-start.sh config/zookeeper.properties

    后台运行:

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper-run.log 2>&1 &

    (关于后台启动命令,可看我的另一篇博客:《Linux后台运行进程》)

    开启kafka:

    前台运行:

    bin/kafka-server-start.sh config/server.properties

    后台运行:

    nohup bin/kafka-server-start.sh config/server.properties > kafka-run.log 2>&1 &

    创建kafka主题:

    bin/kafka-topics.sh --create --zookeeper 10.45.xx.xx:2191 --replication-factor 1 --partitions 1 --topic test

    显示kafka所有主题:

    bin/kafka-topics.sh -list -zookeeper 127.0.0.1:2181

    创建kafka生产者:

    bin/kafka-console-producer.sh –broker-list 127.0.0.1:9092 -topic test

    创建kafka消费者:

    bin/kafka-console-consumer.sh -bootstrap-server 127.0.0.1:9092 -topic test -from-beginning

    解释:

    --zookeeper:后面接的是你配置的zookeeper地址

    --broker-list:默认端口为9092.可自行更改

    3 kafka的简单使用

    (1)、进入” kafka_2.11-1.1.0”文件夹,运行”开启kafka自带zookeeper”命令(最好用后台模式)。记得进入日志中,检查下启动是否有错误。

    (2)、确认zookeeper启动成功后,继续在当前目录,运行“开启kafka”命令(依然用后台模式)。记得检查日志。

    (3)、确认两者启动完毕后,运行“创建kafka主题”命令。显示红框中内容,即为创建成功。

    (4)、继续在此目录下,运行“创建kafka生产者”的命令。kafka默认端口是9092。(图片里我自己改成了9192,没改的直接用9092端口)。如图,我发送了”qpf_test”的消息。

    (5)、另起一个终端,仍在相同目录下运行“创建kafka消费者”的命令。消费者会收到生产者发来的消息。如图,我接收到了之前发的” qpf_test”的消息。

    JAVA客户端

    1. 新建一个maven项目,项目名称:kafka-demo:
    2. 1. Maven的pom.xml文件:
    <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>1.0.0</version>
        </dependency>

    生产者代码实现:

    /**
     * 生产者代码
     */
    public class DemoProducer{
        public static void main( String[] args ){
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.1.221:9092,192.168.1.222:9092,192.168.1.223:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for(int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<>("test", Integer.toString(i), "message_value=====>" + i));
            }
            producer.close();
        }
    }
    /**
     * 消费者代码
     */
    public class DemoConsumer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.1.221:9092,192.168.1.222:9092,192.168.1.223:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("test"));
            while(true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for(ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s
    ", record.offset(), record.key(), record.value());
                }
            }
        }
    }
    消息结果:
    
    message_value=====>1
    message_value=====>5
    message_value=====>7
    message_value=====>8
    message_value=====>4
    message_value=====>6
    message_value=====>0
    message_value=====>2
    message_value=====>3
    message_value=====>9

    Kafka Consumer

    kafka消费这块应该来说是重点,毕竟大部分的时候,我们主要使用的是将数据进行消费。

    kafka消费的配置如下:

      • bootstrap.servers: kafka的地址。
      • group.id:组名 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,那么这里你只需要更改组名就可以重复消费了。
      • enable.auto.commit:是否自动提交,默认为true。
      • auto.commit.interval.ms: 从poll(拉)的回话处理时长。
      • session.timeout.ms:超时时间。
      • max.poll.records:一次最大拉取的条数。
      • auto.offset.reset:消费规则,默认earliest 。
        earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。
        latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。
        none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
      • key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
      • value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。

    kafka教程参考:https://blog.csdn.net/yuan_xw/article/details/79188061

    java 创建及删除 topic

    https://www.cnblogs.com/MIC2016/p/9020562.html

    https://blog.csdn.net/meng984611383/article/details/80500761

  • 相关阅读:
    @Value不能给静态变量直接赋值问题
    jmeter测试http请求
    SqlServer单步调试
    mysql锁住 Lock wait timeout exceeded; try restarting transaction
    django在model中添加字段报错
    django admin下拉列表不显示值,显示为object的处理
    (原创推荐文章)kerberos服务器端与客户端
    kafka安装与测试
    Linux shell判断文件和文件夹是否存在
    df -h 卡死 如何解决
  • 原文地址:https://www.cnblogs.com/jiawen010/p/11951825.html
Copyright © 2011-2022 走看看