zoukankan      html  css  js  c++  java
  • Kafka的安装与部署

    一、硬件环境

    假设有4台机,IP及主机名如下:

    192.168.100.105 c1
    192.168.100.110 c2
    192.168.100.115 c3
    192.168.100.120 c4

    二、软件环境

    1.安装JDK

    https://www.cnblogs.com/live41/p/14235891.html

    2.安装ZooKeeper

    https://www.cnblogs.com/live41/p/15522363.html

    三、搭建分布式Kafka

    1.下载安装包

    http://kafka.apache.org/downloads

    这里下载的是kafka_2.12-3.0.0.tgz。

    * 以下步骤在每台机都要执行

    2.上传安装包到服务器

    假设安装在home目录

    cd /home
    rz

    3.解压

    tar -xvf kafka_2.12-3.0.0.tgz
    mv kafka_2.12-3.0.0 kafka

    4.配置系统环境变量

    vim ~/.bashrc

    添加以下内容:

    export PATH=$PATH:/home/kafka/bin

    保存退出后,更新环境变量:

    source ~/.bashrc

    5.编辑Kafka配置文件

    cd /home/kafka/config
    vim server.properties

    添加以下内容:

    vim server.properties
    broker.id=0
    listeners=PLAINTEXT://0.0.0.0:9092
    zookeeper.connect=c1:2181,c2:2181,c3:2181,c4:2181

    * 其中0.0.0.0是同时监听localhost(127.0.0.1)和内网IP(例如c1或192.168.100.105),也可以改为localhost或c1或192.168.100.105。

    6.启动

    zkServer.sh start
    kafka-server-start.sh -daemon home/kafka/config/server.properties

    7.检查

    jps

    会看到jps、QuorumPeerMain、Kafka

    8.Kafka命令测试

    #创建topic
    kafka-topics.sh --create --bootstrap-server c1:9092 --topic topic1 --partitions 8 --replication-factor 2
    
    #列出所有topic
    kafka-topics.sh --list --bootstrap-server c1:9092
    
    #列出所有topic的信息
    kafka-topics.sh --bootstrap-server c1:9092 --describe
    
    #列出指定topic的信息
    kafka-topics.sh --bootstrap-server c1:9092 --describe --topic topic1
    
    #生产者(消息发送程序)
    kafka-console-producer.sh --broker-list c1:9092 --topic topic1
    
    #消费者(消息接收程序)
    kafka-console-consumer.sh --bootstrap-server c1:9092 --topic topic1

    其中,topic1是topic名,可自定义。

    * 由于Apache开发团队的版本升级原因,不同版本的命令会有所区别。

    https://www.cnblogs.com/live41/p/15522207.html

    9.Java代码测试

    (1) 配置maven

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.0.0</version>
    </dependency>

    (2) 调用代码

    public class KafkaHandler
    {
        public static void main(String[] args)
        {
            try
            {
                // 先监听,再发送消息
                consume();
                produce();
            }
            catch (Exception e)
            {
                System.out.println(e);
            }
        }
    
        private static void produce() throws Exception
        {
            Properties props = new Properties();
            props.put("bootstrap.servers", "c1:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
            try
            {
                kafkaProducer.send(new ProducerRecord<String, String>("topic1", "这是测试文本"));
            }
            finally
            {
                kafkaProducer.close();
            }
        }
    
        private static void consume() throws Exception
        {
            Properties props = new Properties();
            props.put("bootstrap.servers", "c1:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("auto.offset.reset", "earliest");
    
            KafkaConsumer consumer = new KafkaConsumer<>(props);
            try
            {
                consumer.subscribe(Arrays.asList("topic1"));
                while (true)
                {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                    for (ConsumerRecord<String, String> record : records)
                    {
                        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    }
                }
            }
            finally
            {
                consumer.close();
            }
        }
    }

    10.停止

    kafka-server-stop.sh

    附录

    1.acks参数

    acks = 1    只保证leader保存成功,如果刚好leader挂了,数据丢失
    acks = 0    使用异步模式,该模式下kafka无法保证消息,可能会丢失
    acks = all  所有副本都写入成功并确认

    2.数据丢失问题的相关参数

    acks = all   所有副本都写入成功并确认
    retries = n  重试次数,设置为3或以上
    min.insync.replicas = 2  消息至少要被写入到2个副本才算成功
    unclean.leader.election.enable = false  关闭ubclean leader选举,不允许非ISR中的副本被选举为leader,防止数据不一致的情况

    unclean.leader.election.enable参数的资料:

    https://honeypps.com/mq/kafka-params-analysis-of-unclean-leader-election-enable/

  • 相关阅读:
    redis简单使用及连接池
    初始Redis
    redis安装教程
    .whel文件的打开方式
    xpath选择器的使用,selenium使用
    爬虫之bs4的使用,之爬取汽车之家新闻,之代理池的搭建
    爬虫基础 之 爬取梨视频 模拟登陆 爬取妹子图
    git基本使用

    re模块正则表达式
  • 原文地址:https://www.cnblogs.com/live41/p/15522443.html
Copyright © 2011-2022 走看看