zoukankan      html  css  js  c++  java
  • 【kafka学习之六】kakfa消息生产、消费示例

    环境
      虚拟机:VMware 10
      Linux版本:CentOS-6.5-x86_64
      客户端:Xshell4
      FTP:Xftp4
      jdk1.8
      kafka_2.11-0.11.0.0

      zookeeper-3.4.6

    生产者:

    package com.qyg.test;
    
    import java.util.Properties;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    /**
     * java实现Kafka生产者的示例
     * 分通道发送数据
     */
    public class KafkaClusterProTest {
        private static final String topic = "REC-CBBO-MSG-TOPIC";
    
        public static void main(String[] args) {
            String brokerList = "node1:9092,node2:9092,node3:9092";
            Properties props = new Properties();
            props.put("metadata.broker.list", brokerList);
            props.put("request.required.acks", "-1");
            props.put("producer.type", "sync");
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            //分区规则定义
            props.put("partitioner.class","com.qyg.test.SimplePartitioner");
            props.put("message.send.max.retries", "3");
            props.put("batch.num.messages", "200");
            props.put("send.buffer.bytes", "102400");
            props.put("serializer.encoding", "gbk");
            ProducerConfig config = new ProducerConfig(props);
            Producer<String, String> producer = new Producer<String, String>(config);
            
            for (int i=0;i<1000;i++)
            {
                System.out.println("msg"+i);
                KeyedMessage msg = new KeyedMessage(topic, "0531", "msg"+i);
                producer.send(msg); 
            }
            
            producer.close();
        }
    
    }

    消费者:

    package com.qyg.test;
    
    import java.util.HashMap;  
    import java.util.List;  
    import java.util.Map;  
    import java.util.Properties;  
      
    import kafka.consumer.ConsumerConfig;  
    import kafka.consumer.ConsumerIterator;  
    import kafka.consumer.KafkaStream;  
    import kafka.javaapi.consumer.ConsumerConnector;  
    import kafka.serializer.StringDecoder;  
    import kafka.utils.VerifiableProperties;
    
    public class KafkaConsumer {
          
        private final ConsumerConnector consumer;  
      
        private KafkaConsumer() {  
            Properties props = new Properties();  
              
            // zookeeper 配置  
            props.put("zookeeper.connect", "node3:2181,node4:2181,node5:2181");  
      
            // 消费者所在组  
            props.put("group.id", "MyGroup1");  
      
            // zk连接超时  
            props.put("zookeeper.session.timeout.ms", "4000");  
            props.put("zookeeper.sync.time.ms", "200");  
            props.put("auto.commit.interval.ms", "1000"); 
            /**
             * 此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),
             * consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),
             * smallest表示最小offset,即从topic的开始位置消费所有消息.
             */
            props.put("auto.offset.reset", "smallest");  
              
            // 序列化类  
            props.put("serializer.class", "kafka.serializer.StringEncoder");  
      
            ConsumerConfig config = new ConsumerConfig(props);  
      
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);  
        }  
      
        void consume() {  
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
            topicCountMap.put("REC-CBBO-MSG-TOPIC", new Integer(1));  
      
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());  
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());  
            Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);  
            KafkaStream<String, String> stream = consumerMap.get("REC-CBBO-MSG-TOPIC").get(0);  
            ConsumerIterator<String, String> it = stream.iterator();  
              
            while (it.hasNext()){  
                System.out.println(it.next().message());  
            }  
        }  
      
        public static void main(String[] args) {  
            new KafkaConsumer().consume();  
        }  
    }  
  • 相关阅读:
    openerp domain 規則
    openerp创建动态视图-fields_view_get
    postgres时间转换函数
    UG NX9.0.0 for linux安装
    OpenERP 源码变更后数据库升级
    在Ubuntu 12 服务器上源码安装 OpenERP 8.0
    OpenERP7.0中非admin帐号新增其它用户问题
    有关WINDOWS XP登录密码的几种方法
    OpenERP对象字段定义的详解
    Openerp workflow 工作流批注模块
  • 原文地址:https://www.cnblogs.com/cac2020/p/10766536.html
Copyright © 2011-2022 走看看