zoukankan      html  css  js  c++  java
  • Kafka编程实例

     编程

        Producer是一个应用程序。它创建消息并发送它们到Kafka broker中。这些producer在本质上是不同。比方。前端应用程序。后端服务。代理服务。适配器对于潜在的系统,Hadoop对于的Producer。这些不同的Producer可以使用不同的语言实现。比方java、C和Python。

    以下的这部图表解释了消息producer的Kafka API.


    以下将具体介绍假设编写一个简单的Producer和Consumer应用程序。

    发送简单消息给Kafka broker。Producer端编写类ClusterProducer。

    public classClusterProducer extends Thread {
        private static final Log log =LogFactory.getLog(ClusterProducer.class);
     
        public void sendData() {
            Random rnd = new Random();
            Properties props =PropertiesParser.getProperties(PropertiesSettings.PRODUCER_FILE_NAME);
            if (props == null) {
                log.error("can't loadspecified file " + PropertiesSettings.PRODUCER_FILE_NAME);
               return;
            }
            //set the producer configurationproperties
            ProducerConfig config = newProducerConfig(props);
            Producer<String, String> producer= new Producer<String, String>(config);
     
            //Send the data
            int count = 1;
            KeyedMessage<String, String>data;
            while (count < 100) {
                String sign = "*";
                String ip = "192.168.2."+ rnd.nextInt(255);
                StringBuffer sb = newStringBuffer();
                for (int i = 0; i < count; i++){
                    sb.append(sign);
                }
                log.info("set data:" +sb);
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                data = new KeyedMessage<String,String>(PropertiesSettings.TOPIC_NAME, ip, sb.toString());
                producer.send(data);
                count++;
            }
            producer.close();
        }
     
        public void run() {
            sendData();
        }
     
        public static void main(String[] args) {
            new ClusterProducer().sendData();
        }
    }


    定于Consumer获取端,获取相应topic的数据:

    public class Consumerextends Thread {
        private static final Log log =LogFactory.getLog(Consumer.class);
        private final ConsumerConnector consumer;
        private final String topic;
     
        public Consumer(String topic) {
            consumer =kafka.consumer.Consumer.createJavaConsumerConnector(
                    createConsumerConfig());
            this.topic = topic;
        }
     
        private static ConsumerConfigcreateConsumerConfig() {
            Properties props = new Properties();
           props.put("zookeeper.connect", KafkaProperties.zkConnect);
            props.put("group.id",KafkaProperties.groupId);
           props.put("zookeeper.session.timeout.ms", "400");
           props.put("zookeeper.sync.time.ms", "200");
           props.put("auto.commit.interval.ms", "1000");
     
            return new ConsumerConfig(props);
     
        }
     
        public void run() {
            Map<String, Integer>topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, newInteger(1));
            Map<String,List<KafkaStream<byte[], byte[]>>> consumerMap =consumer.createMessageStreams(topicCountMap);
            KafkaStream<byte[], byte[]>stream = consumerMap.get(topic).get(0);
            ConsumerIterator<byte[], byte[]>it = stream.iterator();
            while (it.hasNext()) {
                log.info("+message: " +new String(it.next().message()));
            }
        }
     
        public static void main(String[] args) {
            Consumer client = new Consumer("cluster_statistics_topic");
            client.

         辅助类:

    public interface PropertiesSettings {
    
        final static String CONSUMER_FILE_NAME = "consumer.properties";
        final static String PRODUCER_FILE_NAME = "producer.properties";
        final static String TOPIC_NAME = "cluster_statistics_topic";
        final static String TOPIC_A = "cluster_statistics_topic_A";
    }
    


    package com.kafka.utils;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Properties;
    
    /**
     * @author JohnLiu
     * @version 0.1.0
     * @date 2014/8/27
     */
    public class PropertiesParser {
    
        private static final Log log = LogFactory.getLog(PropertiesParser.class);
        /* properties file type */
        Properties props = null;
    
        /* constructor method*/
        public PropertiesParser(Properties props) {
            this.props = props;
        }
    
        /**
         * Get the trimmed String value of the property with the given
         * <code>name</code>.  If the value the empty String (after
         * trimming), then it returns null.
         */
        public String getStringProperty(String name) {
            return getStringProperty(name, null);
        }
    
        /**
         * Get the trimmed String value of the property with the given
         * <code>name</code> or the given default value if the value is
         * null or empty after trimming.
         */
        public String getStringProperty(String name, String def) {
            String val = props.getProperty(name, def);
            if (val == null) {
                return def;
            }
    
            val = val.trim();
    
            return (val.length() == 0) ? def : val;
        }
    
        private Properties loadPropertiesFile() {
            Properties props = new Properties();
            InputStream in;
            ClassLoader cl = getClass().getClassLoader();
            if (cl == null)
                cl = findClassloader();
            if (cl == null)
                try {
                    throw new ProcessingException("Unable to find a class loader on the current thread or class.");
                } catch (ProcessingException e) {
                    e.printStackTrace();
                }
            in = cl.getResourceAsStream(PropertiesSettings.CONSUMER_FILE_NAME);
            try {
                props.load(in);
            } catch (IOException ioe) {
                log.error("can't load " + PropertiesSettings.CONSUMER_FILE_NAME, ioe);
            }
            return props;
        }
    
        private ClassLoader findClassloader() {
            // work-around set context loader for windows-service started jvms (QUARTZ-748)
            if (Thread.currentThread().getContextClassLoader() == null && getClass().getClassLoader() != null) {
                Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
            }
            return Thread.currentThread().getContextClassLoader();
        }
    
        public static Properties getProperties(final String fileName) {
            Properties props = new Properties();
            InputStream in = Thread.currentThread().getContextClassLoader()
                    .getResourceAsStream(fileName);
            try {
                props.load(in);
            } catch (IOException ioe) {
                log.error("can't load " + fileName, ioe);
            }
            return props;
        }
    }
    

          配置參数文件consumer.properties:

    zookeeper.connect=bigdata09:2181,bigdata08:2181,bigdata07:2181
    group.id=cluster_group
    zookeeper.session.timeout.ms=400
    zookeeper.sync.time.ms=200
    auto.commit.interval.ms=1000



          配置參数文件producer.properties:

    metadata.broker.list=bigdata09:9092,bigdata08:9092,bigdata07:9092
    serializer.class=kafka.serializer.StringEncoder
    #partitioner.class=com.kafka.producer.SimplePartitioner
    request.required.acks=1


         分别运行上面的代码,能够发送或者得到相应topic信息。

         Enjoy yourself!(*^__^*) ……

  • 相关阅读:
    OSG中的示例程序简介(转)
    空间点到直线垂足坐标的解算方法 (转)
    OpenscenGraph中控制swapbuffer的方法(用于多机大屏幕同步显示机制)
    吏治 ? 官治 ?
    C++中使用union的几点思考(转)
    一个穷人移民美国三年的生活经历(转)
    展望99股市:谁是重组大黑马?(转)
    mysql 在一个实例运行情况下再搭建一个实例
    在CentOS下安装crontab服务
    Zabbix监控之迁移zabbix server
  • 原文地址:https://www.cnblogs.com/wzjhoutai/p/6912570.html
Copyright © 2011-2022 走看看