zoukankan      html  css  js  c++  java
  • kafka单机搭建,并测试api

    所用环境:
      kafka_2.12-2.0.0.gz
       centos 6.9 nat动态ip
    准备工作:
                (1).将防火墙关闭
                                        service iptables stop 临时关闭
                                        chkconfig iptables off 永久关闭
        
                 (2).修改C:WindowsSystem32driversetc 下的hosts文件
                    增加映射

    启动zookeeper服务(采用kafka内置的zk)

    /root/kafka_2.12-2.0.0/bin
    在这个目录下启动  zookeeper-server-start.sh
    
    命令 :bin/zookeeper-server-start.sh config/zookeeper.properties

    当最后一行显示 INFO binding to port 0.0.0.0/0.0.0.0:2181 证明成功

    启动kafka服务

    进入到kafka目录下
    bin/kafka-server-start.sh config/server.properties

    创建一个topic

     bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
    
    我这里是   192.168.15.140 test 
    localhost改为test也可以运行成功
    (当出现副本什么的larger than n,就要关闭防火墙)

    kafka生产者生产消息

    bin/kafka-console-producer.sh --broker-list test:9092 --topic testTopic

    消费者消费消息

    bin/kafka-console-consumer.sh --bootstrap-server test:9092 --topic testTopic --from-beginning

    代码测试:

        这里用了idea

    produce

     1 package com.xuliugen.kafka.demo;
     2 
     3 import org.apache.kafka.clients.producer.KafkaProducer;
     4 import org.apache.kafka.clients.producer.ProducerRecord;
     5 
     6 import java.util.Properties;
     7 
     8 public class ProducerDemo {
     9 
    10     // Topic
    11     private static final String topic = "testTopic";
    12 
    13     public static void main(String[] args) throws Exception {
    14 
    15         Properties props = new Properties();
    16         props.put("bootstrap.servers", "192.168.15.140:9092");
    17         props.put("acks", "0");
    18         props.put("group.id", "1111");
    19         props.put("retries", "0");
    20         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    21         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    22 
    23         //生产者实例
    24         KafkaProducer producer = new KafkaProducer(props);
    25 
    26         int i = 1;
    27 
    28         // 发送业务消息
    29         // 读取文件 读取内存数据库 读socket端口
    30         while (true) {
    31             Thread.sleep(1000);
    32             producer.send(new ProducerRecord<String, String>(topic, "key:" + i, "value:" + i));
    33             System.out.println("key:" + i + " " + "value:" + i);
    34             i++;
    35         }
    36     }
    37 }

    comsumer

    package com.xuliugen.kafka.demo;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    public class ConsumerDemo {
        private static final Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
        private static final String topic = "testTopic";
    
        public static void main(String[] args) {
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.15.140:9092");
            props.put("group.id", "1111");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    
            consumer.subscribe(Arrays.asList(topic));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }
    }

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.xuliugen.kafka</groupId>
        <artifactId>kafka.demo</artifactId>
        <version>1.0-SNAPSHOT</version>
    
    
        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.0.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.12</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.12</version>
            </dependency>
    
        </dependencies>
    
    </project>
    1 抄袭自
    2 https://blog.csdn.net/xlgen157387/article/details/77312569
    View Code

     代码地址            链接: https://pan.baidu.com/s/1hjJ7IRMTQEFdV-8SCf7VlA 提取码: 286w 复制这段内容后打开百度网盘手机App,操作更方便哦

    RUSH B
  • 相关阅读:
    三级菜单python写法(递归写法)
    webstorm2018.1.6版本安装+破解+汉化
    sourceTree 的使用教程
    nodeppt的使用教程
    堆和栈的区别(转过无数次的文章)
    黎曼滤波在神经计算方面的应用
    深度学习笔记——PCA原理与数学推倒详解
    TCP/IP模型详解
    OSI7层模型详解
    CNN车型分类总结
  • 原文地址:https://www.cnblogs.com/tangsonghuai/p/10552359.html
Copyright © 2011-2022 走看看