zoukankan      html  css  js  c++  java
  • kafka Windows客户端Linux服务器---转

    原文:http://blog.csdn.net/jingshuigg/article/details/25001979

    一、对于服务器端的搭建可以参考上一篇文章:kafka单机版环境搭建与测试

    服务器端IP :10.0.30.221

    运行环境的目录如下:

    需要改动config文件夹下的server.properties中的以下两个属性

    zookeeper.connect=localhost:2181改成zookeeper.connect=00.00.00.01 (IP地址):2181

    以及默认注释掉的

    #host.name=localhost改成host.name=00.00.00.01 (IP地址)

    host.name不更改会造成客户端报如下的错误

    Exception in thread "Thread-0" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:76)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    at its.kafka.Producer.run(Producer.java:46)

    上述步骤完成以后,按照《kafka单机版环境搭建与测试》中的方法启动zookeeper-server和kafka-server即可

    二、客户端搭建

    客户端使用的win7系统,在Eclipse中连接服务器

    1.在eclipse下新建工程kafka_producer,目录如下:

    注意:将config文件夹下的log4j.properties文件放在src下,这样才起作用,可以观测到日志信息

    producer的代码如下:

    import java.util.Properties;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    public class Producer extends Thread{
        private final kafka.javaapi.producer.Producer<Integer, String> producer;
        private final String topic;
        private final String name;
        private final int numsOfMessage;
        private final Properties props = new Properties();
    
        public Producer(String name,String topic,int numsOfMessage){
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("metadata.broker.list", "10.0.30.221:9092");
            //异步发送
            //props.put("producer.type", "async");
            //每次发送多少条
            //props.put("batch.num.messages", "100");
            producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
            this.topic = topic;
            this.name = name;
            this.numsOfMessage = numsOfMessage;
        }
      
          public void run() {
              int messageNo = 1;
              while(messageNo <= numsOfMessage) { //每个生产者生产的消息数;
                  String message = new String(name+"'s    Message_" + messageNo+"******");
                  KeyedMessage<Integer, String> messageForSend = new KeyedMessage<Integer, String>(topic, message);
                  producer.send(messageForSend);
                  messageNo++;
              }
              producer.close();
         }
    }

    启动producer的代码如下:

    import java.util.concurrent.TimeUnit;
    
    public class KafkaProducerDemo implements KafkaProperties{ 
        public static void main(String[] args){
            StartThread(1,"testTopic",10);
        }
        /**
         * @param numsOfProducer  生产者的数目
         * @param topic        消息的主题
         * @param numsOfMessage    每个生产者生产的消息树
         * @return 
         */
        public static void StartThread(int numsOfProducer,String topic,int numsOfMessage){
            for(int i = 1; i <= numsOfProducer; i ++ ){
                String name = "Producer" + i;
                new Producer(name,topic,numsOfMessage).start();    
            }
        }
    }

    2.在eclipse下新建kafka_consumer工程,目录如下:

    consumer代码如下:

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    
    
    public class Consumer extends Thread {
      private final ConsumerConnector consumer;
      private final String topic;
      private final String name;
      
      public Consumer(String name,String topic){
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig());
        this.topic = topic;
        this.name = name;
      }
    
      private static ConsumerConfig createConsumerConfig(){
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id", KafkaProperties.groupId);
        props.put("zookeeper.session.timeout.ms", "60000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        //每次最少接收的字节数,默认是1
        //props.put("fetch.min.bytes", "1024");
        //每次最少等待时间,默认是100
        //props.put("fetch.wait.max.ms", "600000");
        return new ConsumerConfig(props);
      }
     
      public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while(it.hasNext()){
            System.out.println("************"+name+"    gets    "+new String(it.next().message()));
        }
      }
    }

    启动consumer的代码:

    public class KafkaConsumerDemo implements KafkaProperties
    {
      public static void main(String[] args){
        //Consumer1
        Consumer consumerThread1 = new Consumer("Consumer1",KafkaProperties.topic);
    
        consumerThread1.start();
      }
    }

    properties的代码(为了传递属性值,当然也可以是xml提供属性值):

    public interface KafkaProperties{
      final static String zkConnect = "10.0.30.221:2181";  
      final static  String groupId = "group1";
      final static String topic = "testTopic";
      final static String kafkaServerURL = "10.0.30.221";
      final static int kafkaServerPort = 9092;
      final static int kafkaProducerBufferSize = 64*1024;
      final static int connectionTimeOut = 100000;
      final static int reconnectInterval = 10000;
      final static String clientId = "SimpleConsumerDemoClient";
    }
     

    3.启动consumer然后再启动producer,则在eclipse的Console窗口中观察到:

  • 相关阅读:
    三大主流负载均衡软件对比(LVS+Nginx+HAproxy)
    nginx 提示the "ssl" directive is deprecated, use the "listen ... ssl" directive instead
    centos安装nginx并配置SSL证书
    hadoop创建目录文件失败
    The server time zone value 'EDT' is unrecognized or represents more than one time zone.
    脚本启动SpringBoot(jar)
    centos做免密登录
    数据库远程连接配置
    Bash 快捷键
    TCP三次握手四次断开
  • 原文地址:https://www.cnblogs.com/davidwang456/p/4201875.html
Copyright © 2011-2022 走看看