zoukankan      html  css  js  c++  java
  • kafka例子程序

    //生产端 产生数据

    /**
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements. See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    package kafka.examples;

    import java.util.Properties;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;

    public class Producer extends Thread {
    private final kafka.javaapi.producer.Producer<Integer, String> producer;
    private final String topic;
    private final Properties props = new Properties();

    public Producer(String topic) {
    props.put("serializer.class", "kafka.serializer.StringEncoder");// 字符串消息
    props.put("metadata.broker.list",
    "192.168.1.155:9092,192.168.1.156:9092");
    // Use random partitioner. Don't need the key type. Just set it to
    // Integer.
    // The message is of type String.
    producer = new kafka.javaapi.producer.Producer<Integer, String>(
    new ProducerConfig(props));
    this.topic = topic;
    }

    public void run() {
    for (int i = 0; i < 2000; i++) {
    String messageStr = new String("Message_" + i);
    System.out.println("product:"+messageStr);
    producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
    }

    }

    public static void main(String[] args) {
    Producer producerThread = new Producer(KafkaProperties.topic);
    producerThread.start();
    }
    }

    //消费端 消费数据

    /**
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements. See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    package kafka.examples;

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;

    public class Consumer extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;

    public Consumer(String topic) {
    consumer = kafka.consumer.Consumer
    .createJavaConsumerConnector(createConsumerConfig());//创建kafka时传入配置文件
    this.topic = topic;
    }
    //配置kafka的配置文件项目
    private static ConsumerConfig createConsumerConfig() {
    Properties props = new Properties();
    props.put("zookeeper.connect", KafkaProperties.zkConnect);
    props.put("group.id", KafkaProperties.groupId);//相同的kafka groupID会给同一个customer消费
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "60000");//

    return new ConsumerConfig(props);

    }
    // push消费方式,服务端推送过来。主动方式是pull
    public void run() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));//先整体存到Map中
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
    .createMessageStreams(topicCountMap);//用consumer创建message流然后放入到consumerMap中
    KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);//再从流里面拿出来进行迭代
    ConsumerIterator<byte[], byte[]> it = stream.iterator();

    while (it.hasNext()){
    //逻辑处理
    System.out.println(new String(it.next().message()));

    }

    }

    public static void main(String[] args) {
    Consumer consumerThread = new Consumer(KafkaProperties.topic);
    consumerThread.start();
    }
    }

  • 相关阅读:
    【博弈论】【SG函数】【找规律】Divide by Zero 2017 and Codeforces Round #399 (Div. 1 + Div. 2, combined) E. Game of Stones
    【概率dp】Divide by Zero 2017 and Codeforces Round #399 (Div. 1 + Div. 2, combined) D. Jon and Orbs
    【基数排序】Divide by Zero 2017 and Codeforces Round #399 (Div. 1 + Div. 2, combined) C. Jon Snow and his Favourite Number
    【找规律】Divide by Zero 2017 and Codeforces Round #399 (Div. 1 + Div. 2, combined) B. Code For 1
    【kmp算法】poj2185 Milking Grid
    【kmp算法】poj2406 Power Strings
    【DFS】Codeforces Round #398 (Div. 2) C. Garland
    【枚举】【贪心】 Codeforces Round #398 (Div. 2) B. The Queue
    【暴力】Codeforces Round #398 (Div. 2) A. Snacktower
    【kmp算法】uva11475 Extend to Palindrome
  • 原文地址:https://www.cnblogs.com/yaohaitao/p/5542524.html
Copyright © 2011-2022 走看看