zoukankan      html  css  js  c++  java
  • kafka demo

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.11.0.2</version>
    <exclusions>
    <exclusion>
    <artifactId>jmxtools</artifactId>
    <groupId>com.sun.jdmk</groupId>
    </exclusion>
    <exclusion>
    <artifactId>jmxri</artifactId>
    <groupId>com.sun.jmx</groupId>
    </exclusion>
    <exclusion>
    <artifactId>jms</artifactId>
    <groupId>javax.jms</groupId>
    </exclusion>
    <!--<exclusion>-->
    <!--<groupId>org.apache.zookeeper</groupId>-->
    <!--<artifactId>zookeeper</artifactId>-->
    <!--</exclusion>-->
    <exclusion>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
    <exclusion>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    </exclusion>
    </exclusions>
    </dependency>




    package kafka.dynamic.consumer;

    import kafka.consumer.*;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    import kafka.serializer.StringDecoder;
    import kafka.utils.VerifiableProperties;

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;

    public class JavaKafkaConsumerHighAPI implements Runnable {

    private ConsumerConnector consumer;

    private String topic;

    private String groupId;

    private Integer consumerCount;

    private ExecutorService executorPool;

    public JavaKafkaConsumerHighAPI(String topic, String groupId, Integer consumerCount) {
    this.topic = topic;
    this.groupId = groupId;
    this.consumerCount = consumerCount;
    this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig("127.0.0.1:2181", groupId));
    }

    @Override
    public void run() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(this.topic, consumerCount);
    StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
    StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

    Map<String, List<KafkaStream<String, String>>> consumerMap = this.consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
    List<KafkaStream<String, String>> streams = consumerMap.get(this.topic);

    this.executorPool = Executors.newFixedThreadPool(consumerCount);
    for (final KafkaStream<String, String> stream : streams) {
    this.executorPool.submit(new ConsumerKafkaStreamProcesser(stream,topic,groupId));
    }
    }

    public void shutdown() {
    if (this.consumer != null) {
    this.consumer.shutdown();
    }

    if (this.executorPool != null) {
    this.executorPool.shutdown();
    try {
    if (!this.executorPool.awaitTermination(5, TimeUnit.SECONDS)) {
    System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly!!");
    }
    } catch (InterruptedException e) {
    System.out.println("Interrupted during shutdown, exiting uncleanly!!");
    }
    }
    }

    private ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
    Properties prop = new Properties();
    prop.put("group.id", groupId);
    prop.put("zookeeper.connect", zookeeper);
    prop.put("zookeeper.session.timeout.ms", "400");
    prop.put("zookeeper.sync.time.ms", "200");
    prop.put("auto.commit.interval.ms", "1000");
    return new ConsumerConfig(prop);
    }

    public static class ConsumerKafkaStreamProcesser implements Runnable {
    private KafkaStream<String, String> stream;
    private String topic;
    private String groupId;
    public ConsumerKafkaStreamProcesser(KafkaStream<String, String> stream,String topic,String groupId) {
    this.stream = stream;
    this.topic = topic;
    this.groupId = groupId;
    }
    @Override
    public void run() {
    try {
    ConsumerIterator<String, String> iter = this.stream.iterator();
    while (iter.hasNext()) {
    MessageAndMetadata value = iter.next();
    System.out.println("____" + value.offset() +"____topic:"+ topic + "____" +",groupId="+groupId +" ,value:"+value.message());
    }
    System.out.println("sd");
    }catch (Exception e){
    System.out.println("000"+e.getMessage());
    }
    System.out.println("sdfsad");
    }
    }
    }


    @KafkaListener(groupId = "Main", topics = "test",containerFactory = "kafkaListenerContainerFactory")
    public void listener(ConsumerRecord<?, ?> record) throws Exception{
    Bar bar;
    try {
    bar = new ObjectMapper().readValue(record.value().toString(), Bar.class);
    log.info("Received notification: {}", notification);
    } catch (JsonProcessingException e) {
    return;
    }
    if(map.size() < consumerCount) {
    if (!map.containsKey(bar.getName())) {
    JavaKafkaConsumerHighAPI example1 = new JavaKafkaConsumerHighAPI(record.topic(), bar.getName(), consumerCount);
    new Thread(example1).start();
    }
    }else {
    log.error("消费者配置数量不足!导致数据丢失");
    }
    }


    public static Map<String,JavaKafkaConsumerHighAPI> map = new HashMap<>();

    @Value("${app.pusher.consumerCount}")
    public Integer consumerCount;
  • 相关阅读:
    2021 6 3
    2021 5月 读书笔记
    2021 6 1
    第十三周 2021.05.30
    spring security permitAll不生效
    springboot配置jpa提示Unable to resolve name [mysql] as strategy
    element el-form-item el-input宽度设置
    vue+element表格中使用render函数(if判断处理)
    elementui移除tab报Avoided redundant navigation to current location: ***
    el-dropdown-item添加@click不生效
  • 原文地址:https://www.cnblogs.com/yunger/p/15020316.html
Copyright © 2011-2022 走看看