zoukankan      html  css  js  c++  java
  • Kafka入门教程(二)

    转自:https://blog.csdn.net/yuan_xw/article/details/79188061


    Kafka集群环境安装

    相关下载

    JDK要求1.8版本以上。

    JDK安装教程:http://blog.csdn.net/yuan_xw/article/details/49948285

    Zookeeper安装教程:http://blog.csdn.net/yuan_xw/article/details/47148401

    Kafka下载地址:http://mirrors.shu.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
    这里写图片描述

    Kafka集群规划

    主机名IP安装软件
    Kafka1 192.168.1.221 Jdk、Zookeeper、Kafka
    Kafka2 192.168.1.222 Jdk、Zookeeper、Kafka
    Kafka3 192.168.1.223 Jdk、Zookeeper、Kafka

     配置ssh免密码登录:

    产生密钥,执行命令:ssh-keygen -t rsa,按4回车,密钥文件位于~/.ssh文件

    在192.168.1.221上生产一对钥匙,将公钥拷贝到其他节点,包括自己,执行命令:

    ssh-copy-id 192.168.1.221

    ssh-copy-id 192.168.1.222

    ssh-copy-id 192.168.1.223

    在192.168.1.222上生产一对钥匙,将公钥拷贝到其他节点,包括自己,执行命令:

    ssh-copy-id 192.168.1.221

    ssh-copy-id 192.168.1.222

    ssh-copy-id 192.168.1.223

    在192.168.1.223上生产一对钥匙,将公钥拷贝到其他节点,包括自己,执行命令:

    ssh-copy-id 192.168.1.221

    ssh-copy-id 192.168.1.222

    ssh-copy-id 192.168.1.223

    在所有的服务器上设置环境变量:

    export JAVA_HOME=/usr/local/software/jdk1.8.0_66
    export CLASSPATH=.:$JAVA_HOME</span>/lib/dt.<span class="hljs-symbol">jar:</span><span class="hljs-variable">$JAVA_HOME/lib/tools.jar
    export KAFKA_HOME=/usr/local/software/kafka_2.11-1.0.0
    export ZOOKEEPER_HOME=/usr/local/software/zookeeper-3.4.11
    export PATH=.:$JAVA_HOME</span>/<span class="hljs-symbol">bin:</span><span class="hljs-variable">$KAFKA_HOME/bin:$ZOOKEEPER_HOME</span>/<span class="hljs-symbol">bin:</span><span class="hljs-variable">$PATH

       这里写图片描述

    刷新环境变量:source /etc/profile

    关闭所有服务器上的防火墙:

    systemctl stop firewalld.service

    systemctl disable firewalld.service

    安装Kafka

    1.分别在[Kafka1、Kafka2、Kafka3]服务器下载:

    tar -zxvf kafka_2.11-1.0.0.tgz
    这里写图片描述
    2. 修改配置文件:

    Kafka1服务器配置文件

    broker.id=0

    listeners=PLAINTEXT://:9092

    log.dir= /usr/local/software/kafka_2.11-1.0.0/kafka-logs

    Kafka2服务器配置文件

    broker.id=1

    listeners=PLAINTEXT://:9092

    log.dir= /usr/local/software/kafka_2.11-1.0.0/kafka-logs

    Kafka3服务器配置文件

    broker.id=2

    listeners=PLAINTEXT://:9092

    log.dir= /usr/local/software/kafka_2.11-1.0.0/kafka-logs

    1. 分别启动[Kafka1、Kafka2、Kafka3]服务器的kafka服务

    启动命令:./bin/kafka-server-start.sh -daemon config/server.properties

    查看启动日志:tail -100f logs/server.log

    查看启动端口:lsof -i:9092
    这里写图片描述

    创建主题Topic

           创建主题topic

    执行命令:bin/kafka-topics.sh - -create - -zookeeper 192.168.1.221:2181
    - -replication-factor 2 - -partitions 2 - -topic test

          查看主题topic描述

    执行命令:bin/kafka-topics.sh –list –zookeeper 192.168.1.223:2181
    这里写图片描述
    查看创建的所有主题

    执行命令:bin/kafka-topics.sh –list –zookeeper 192.168.1.223:2181

    生产者生产消息

    执行命令:bin/kafka-console-producer.sh –broker-list 192.168.1.221:9092 –topic test

    消费者消费消息

    执行命令:bin/kafka-console-consumer.sh –bootstrap-server 192.168.1.223:9092 –topic test –from-beginning

    JAVA客户端

    新建一个maven项目,项目名称:kafka-demo:

    代码结构如下:
    这里写图片描述 

    Maven的pom.xml文件:

    <
    projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.kafka</groupId> <artifactId>kafka-demo</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <packaging>jar</packaging> <name>kafka-demo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> </dependencies> </project>
    1. 生产者代码实现:
    package com.kafka;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    /**
     * 生产者代码
     */
    public classDemoProducer{
        public static void main( String[] args ){
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.1.221:9092,192.168.1.222:9092,192.168.1.223:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for(int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<>("test", Integer.toString(i), "message_value=====>" + i));
            }
            producer.close();
        }
    }


    1. 消费者代码实现:
    package com.kafka;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Arrays;
    import java.util.Properties;
    /**
     * 消费者代码
     */
    public classDemoConsumer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.1.221:9092,192.168.1.222:9092,192.168.1.223:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("test"));
            while(true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for(ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s
    ", record.offset(), record.key(), record.value());
                }
            }
        }
    }

    消息结果:

    message_value=====>1
    message_value=====>5
    message_value=====>7
    message_value=====>8
    message_value=====>4
    message_value=====>6
    message_value=====>0
    message_value=====>2
    message_value=====>3
    message_value=====>9

    源代码下载地址:http://download.csdn.net/download/yuan_xw/10228246

    学习Kafka推荐书籍:

    1. 《Kafka入门与实践》

    2. 《Kafka技术内幕 图文详解Kafka源码设计与实现》

    3. 《流式架构:Kafka与MapR Streams数据流处理》

    4. 《Scala语言基础与开发实战》

    5. 《Kafka权威指南》

    6. 《Kafka源码解析与实战》

    –以上为《Kafka集群环境安装》,如有不当之处请指出,我后续逐步完善更正,大家共同提高。谢谢大家对我的关注。——厚积薄发(yuanxw)

    希望在知识中书写人生的代码
  • 相关阅读:
    java+selenium+new——同一个标签窗口里 ,访问多个网页的后退driver.navigate().back()、前进driver.navigate().forward()、刷新driver.navigate().refresh()等功能 。以及获取当前页面的title属性driver.getTitle()和获取当前页面的url地址driver.getCurrentUrl()
    SoapUI接口测试——关联——参数化
    SoapUI接口测试——添加测试套件——new TestSuite——(类似于postman里面的集合)——添加测试步骤——teststeps(测试步骤)
    java+selenium+new——获取网页源代码driver.getPageSource()
    g++命令行详解
    hdoj_1503Advanced Fruits
    指针遍历vector向量
    最长公共子序列
    hdoj_1087Super Jumping! Jumping! Jumping!
    pcc32应用1
  • 原文地址:https://www.cnblogs.com/tongxupeng/p/10259542.html
Copyright © 2011-2022 走看看