zoukankan      html  css  js  c++  java
  • kafka demo

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.11.0.2</version>
    <exclusions>
    <exclusion>
    <artifactId>jmxtools</artifactId>
    <groupId>com.sun.jdmk</groupId>
    </exclusion>
    <exclusion>
    <artifactId>jmxri</artifactId>
    <groupId>com.sun.jmx</groupId>
    </exclusion>
    <exclusion>
    <artifactId>jms</artifactId>
    <groupId>javax.jms</groupId>
    </exclusion>
    <!--<exclusion>-->
    <!--<groupId>org.apache.zookeeper</groupId>-->
    <!--<artifactId>zookeeper</artifactId>-->
    <!--</exclusion>-->
    <exclusion>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
    <exclusion>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    </exclusion>
    </exclusions>
    </dependency>




    package kafka.dynamic.consumer;

    import kafka.consumer.*;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    import kafka.serializer.StringDecoder;
    import kafka.utils.VerifiableProperties;

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;

    public class JavaKafkaConsumerHighAPI implements Runnable {

    private ConsumerConnector consumer;

    private String topic;

    private String groupId;

    private Integer consumerCount;

    private ExecutorService executorPool;

    public JavaKafkaConsumerHighAPI(String topic, String groupId, Integer consumerCount) {
    this.topic = topic;
    this.groupId = groupId;
    this.consumerCount = consumerCount;
    this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig("127.0.0.1:2181", groupId));
    }

    @Override
    public void run() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(this.topic, consumerCount);
    StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
    StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

    Map<String, List<KafkaStream<String, String>>> consumerMap = this.consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
    List<KafkaStream<String, String>> streams = consumerMap.get(this.topic);

    this.executorPool = Executors.newFixedThreadPool(consumerCount);
    for (final KafkaStream<String, String> stream : streams) {
    this.executorPool.submit(new ConsumerKafkaStreamProcesser(stream,topic,groupId));
    }
    }

    public void shutdown() {
    if (this.consumer != null) {
    this.consumer.shutdown();
    }

    if (this.executorPool != null) {
    this.executorPool.shutdown();
    try {
    if (!this.executorPool.awaitTermination(5, TimeUnit.SECONDS)) {
    System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly!!");
    }
    } catch (InterruptedException e) {
    System.out.println("Interrupted during shutdown, exiting uncleanly!!");
    }
    }
    }

    private ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
    Properties prop = new Properties();
    prop.put("group.id", groupId);
    prop.put("zookeeper.connect", zookeeper);
    prop.put("zookeeper.session.timeout.ms", "400");
    prop.put("zookeeper.sync.time.ms", "200");
    prop.put("auto.commit.interval.ms", "1000");
    return new ConsumerConfig(prop);
    }

    public static class ConsumerKafkaStreamProcesser implements Runnable {
    private KafkaStream<String, String> stream;
    private String topic;
    private String groupId;
    public ConsumerKafkaStreamProcesser(KafkaStream<String, String> stream,String topic,String groupId) {
    this.stream = stream;
    this.topic = topic;
    this.groupId = groupId;
    }
    @Override
    public void run() {
    try {
    ConsumerIterator<String, String> iter = this.stream.iterator();
    while (iter.hasNext()) {
    MessageAndMetadata value = iter.next();
    System.out.println("____" + value.offset() +"____topic:"+ topic + "____" +",groupId="+groupId +" ,value:"+value.message());
    }
    System.out.println("sd");
    }catch (Exception e){
    System.out.println("000"+e.getMessage());
    }
    System.out.println("sdfsad");
    }
    }
    }


    @KafkaListener(groupId = "Main", topics = "test",containerFactory = "kafkaListenerContainerFactory")
    public void listener(ConsumerRecord<?, ?> record) throws Exception{
    Bar bar;
    try {
    bar = new ObjectMapper().readValue(record.value().toString(), Bar.class);
    log.info("Received notification: {}", notification);
    } catch (JsonProcessingException e) {
    return;
    }
    if(map.size() < consumerCount) {
    if (!map.containsKey(bar.getName())) {
    JavaKafkaConsumerHighAPI example1 = new JavaKafkaConsumerHighAPI(record.topic(), bar.getName(), consumerCount);
    new Thread(example1).start();
    }
    }else {
    log.error("消费者配置数量不足!导致数据丢失");
    }
    }


    public static Map<String,JavaKafkaConsumerHighAPI> map = new HashMap<>();

    @Value("${app.pusher.consumerCount}")
    public Integer consumerCount;
  • 相关阅读:
    luogu P1833 樱花 看成混合背包
    luogu P1077 摆花 基础记数dp
    luogu P1095 守望者的逃离 经典dp
    Even Subset Sum Problem CodeForces
    Maximum White Subtree CodeForces
    Sleeping Schedule CodeForces
    Bombs CodeForces
    病毒侵袭持续中 HDU
    病毒侵袭 HDU
    Educational Codeforces Round 35 (Rated for Div. 2)
  • 原文地址:https://www.cnblogs.com/yunger/p/15020316.html
Copyright © 2011-2022 走看看