zoukankan      html  css  js  c++  java
  • java实现Kafka生产者示例

    使用java实现Kafka的生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    package com.lisg.kafkatest;
     
    import java.util.Properties;
     
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.Partitioner;
    import kafka.producer.ProducerConfig;
    import kafka.serializer.StringEncoder;
     
    /**
     * Kafka生产者
     * @author lisg
     *
     */
    public class KafkaProducer {
     
        public static void main(String[] args) {
             
            Properties props = new Properties();
            //根据这个配置获取metadata,不必是kafka集群上的所有broker,但最好至少有两个
            props.put("metadata.broker.list", "vm1:9092,vm2:9092");
            //消息传递到broker时的序列化方式
            props.put("serializer.class", StringEncoder.class.getName());
            //zk集群
            props.put("zookeeper.connect", "vm1:2181");
            //是否获取反馈
            //0是不获取反馈(消息有可能传输失败)
            //1是获取消息传递给leader后反馈(其他副本有可能接受消息失败)
            //-1是所有in-sync replicas接受到消息时的反馈
            props.put("request.required.acks", "1");
    //      props.put("partitioner.class", MyPartition.class.getName());
             
            //创建Kafka的生产者, key是消息的key的类型, value是消息的类型
            Producer<Integer, String> producer = new Producer<Integer, String>(
                    new ProducerConfig(props));
             
            int count = 0;
            while(true) {
                String message = "message-" + ++count;
                //消息主题是test
                KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", message);
                //message可以带key, 根据key来将消息分配到指定区, 如果没有key则随机分配到某个区
    //          KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", 1, message);
                producer.send(keyedMessage);
                System.out.println("send: " + message);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
             
    //      producer.close();
        }
     
    }
     
    /**
     * 自定义分区类
     *
     */
    class MyPartition implements Partitioner {
     
        public int partition(Object key, int numPartitions) {
            return key.hashCode()%numPartitions;
        }
         
    }






    附件列表

    • 相关阅读:
      动态控件、控件的生存周期和ViewState的运行细节
      PDA开发初级经验
      编译原理知识总结
      A System for Collecting and Analyzing TopicSpecific Web Information
      show tooltip on control
      慧科新闻、慧科搜索
      内存泄漏
      www.sinobankers.com/forum“今日新帖”“最新会员”“论坛热贴”消失问题
      一堆信息抽取的资料文档
      showing tooltip on controls (description on TTN_NEEDTEXT)
    • 原文地址:https://www.cnblogs.com/lishouguang/p/4560559.html
    Copyright © 2011-2022 走看看