zoukankan      html  css  js  c++  java
  • kafka集群搭建

    第一步

    先去官网下载 kafka_2.9.2-0.8.1.1.tgz 并解压再进入到安装文件夹(也能够自己配置路径,方法跟配置java、hadoop等路径是一样的).
    > tar -xzf kafka_2.9.2-0.8.1.1.tgz 
    > cd kafka_2.9.2-0.8.1.1


    第二步
    zeekeeper集群搭建(用的是kafka自带的zeekeeper,一共准备了三台机器)
    1、关闭各台机器的防火墙(一定要切记。我搭建的时候以为能ping通就ok了,就没关心防火墙的问题了。最后白白浪费了一天的时间)
    命令 /ect/init.d/iptables stop

    2、进入到打开/ect下的hosts文件
    改动为
    127.0.0.1 localhost
    10.61.5.66 host1
    10.61.5.67 host2
    10.61.5.68 host3
    (ip和机器名依据个人实际情况改动)

    3、改动zeekeeper 配置文件
    进入到kafka安装文件夹下的config文件。打开zookeeper.properties
    改动dataDir={kafka安装文件夹}/zookeeper/logs/
    凝视掉maxClientCnxns=0
    在文件末尾加入例如以下语句
    tickTime=2000
    initLimit=5
    syncLimit=2
    #host1、2、3为主机名。能够依据实际情况更改。port号也能够更改
    server.1=host1:2888:3888
    server.2=host2:2888:3888
    server.3=host3:2888:3888

    4、在dataDir文件夹下的建立一个myid文件
    命令   echo 1 >myid
    另外两台机子分别设置为2、3,依次类推。


    第三步
    启动zookeeper服务(每台机子的zeekeeper都要启)
    > bin/zookeeper-server-start.sh config/zookeeper.properties
    在三台机子的zeekeeper都启动好之前。先启动的机子会有错误日志,这是正常的

     

    第四步
    配置kafka
    1、在kafka安装文件夹下的config文件夹下打开server.properties文件
    改动
    zookeeper.connect=host1:2181,host2:2181,host3:2181    (2181为port号。能够依据自己的实际情况更改)
    其它两台机子的server.properties文件里的broker.id也要改,反正三台机子的broker.id不能有反复
    2、改动producer.properties文件
    改动
    metadata.broker.list=host1:9092,host2:9092,host3:9092
    prodeucer.type=async

    3、改动consumer.properties文件
    改动
    zeekeeper.connect=host1:2181,host2:2181,host3:2181

    4、在每台机子启动kafka服务
    > bin/kafka-server-start.sh config/server.properties

     

    第四步:建立一个主题
    > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 6 --topic my-replicated-test
    factor大小不能超过broker数

    通过下面命令查看主题
    > bin/kafka-topics.sh --list --zookeeper host1:2181 (也能够是host2:2181等)
    my-replicated-test

    通过下述命令能够看到该主题详情
    > bin/kafka-topics.sh --describe --zookeeper host1:2181 --topic my-replicated-test


    第五步:发送消息
    在host2上建立生产者角色。并发送消息(事实上能够是三台机子中的不论什么一台)
    > bin/kafka-console-producer.sh --broker-list host1:9092 --topic my-replicated-test 
    This is a message
    This is another message

    在host3上建立消费者角色(在该终端窗体内能够看到生产者公布这消息)
    > bin/kafka-console-consumer.sh --zookeeper host1:2181 --topic my-replicated-test --from-beginning
    This is a message
    This is another message

    至此。一个kafka集群就搭好了,能够作为kafkaserver了

     

     測试程序(在win系统上)

    切记要去C:Windowssystem32driversetchosts作例如以下配置,否则測试程序无法訪问kafkaserver!

    10.61.5.66 host1
    10.61.5.67 host2
    10.61.5.68 host3

    记得将kafka安装文件夹下libs里的全部包导入项目里去

    //生产者測试程序

    public class ProducerTest {
     public static void main(String[] args) throws FileNotFoundException {  
            Properties props = new Properties();  
            props.put("zookeeper.connect", "slaves7:2182,slaves8:2182,slaves9:2182");  
            props.put("serializer.class", "kafka.serializer.StringEncoder");  
            props.put("metadata.broker.list","slaves7:9092,slaves8:9092,slaves9:9092");
          
            ProducerConfig config = new ProducerConfig(props);  
            Producer<String, String> producer = new Producer<String, String>(config);  
             File file=new File("E:/test","test.txt");
             BufferedReader readtxt=new BufferedReader(new FileReader(file));
              String line=null;
              byte[] item=null;
       try {
          while((line=readtxt.readLine())!=null){
          item=line.getBytes();
          String str = new String(item);
          System.out.println(str);
          producer.send(new KeyedMessage<String, String>("my-replicated-topic",str));
          }
       } catch (IOException e) {
        e.printStackTrace();
       }
           }  
    }
    //消费者測试程序

    public class ConsumerTest extends Thread {
     private final ConsumerConnector consumer;  
        private final String topic;  
          
        public static void main(String[] args) {  
            ConsumerTest consumerThread = new ConsumerTest("my-replicated-topic");  
            consumerThread.start();  
        }  
          
        public ConsumerTest(String topic) {  
         System.out.println(topic);
            consumer = kafka.consumer.Consumer  
                    .createJavaConsumerConnector(createConsumerConfig());  
            this.topic = topic;  
        }  
          
        private static ConsumerConfig createConsumerConfig() {  
            Properties props = new Properties();  
            props.put("zookeeper.connect", "slaves7:2182,slaves8,slaves9:2182");  
            props.put("group.id", "0");  
            props.put("zookeeper.session.timeout.ms", "400000");  
            props.put("zookeeper.sync.time.ms", "200");  
            props.put("auto.commit.interval.ms", "1000");  
          
            return new ConsumerConfig(props);  
          
        }  
          
        public void run() {  
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
            topicCountMap.put(topic, new Integer(1));  
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer  
                    .createMessageStreams(topicCountMap);  
            KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);  
            ConsumerIterator<byte[], byte[]> it = stream.iterator();  
            while (it.hasNext())  
                System.out.println(new String(it.next().message()));  
        }  
    }

    当两个測试程序都执行后,生产者程序会从本机读取txt文件的内容,消费者程序会显示出这些内容

  • 相关阅读:
    linux性能调优总结
    mongodb之sharding原理
    Centos6.6搭建mongodb3.2.6副本集分片
    vmstat 命令详解
    SaltStack之Targeting
    saltstack之pillar详解
    saltstack之grains详解
    saltstack之yum简单部署lnmp
    Redis监控工具
    PHP实现选择排序
  • 原文地址:https://www.cnblogs.com/gcczhongduan/p/5321976.html
Copyright © 2011-2022 走看看