zoukankan      html  css  js  c++  java
  • Kafka

    Kafka添加依赖:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.11.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <version>0.11.0.0</version>
            </dependency>

    创建生产者(向Kafka写入数据):

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.log4j.Logger;
    import org.springframework.stereotype.Component;
    
    import java.util.Properties;
    import java.util.ResourceBundle;
    import java.util.concurrent.Future;
    
    /**
     * Kafka生产者
     */
    @Component
    public class CustomProducer {
        private final static Logger logger = Logger.getLogger(CustomProducer.class);
    
        private static ResourceBundle resource = ResourceBundle.getBundle("application");
        private static KafkaProducer<String, String> producer;
        public final static String TOPIC = "XXX_JSON_TOPIC";
    
        public void InitCustomProducer() {
            try {
                logger.debug("开始初始化KafKa生产者");
                Properties props = new Properties();
                // Kafka服务端的主机名和端口号
                props.put("bootstrap.servers", resource.getString("kafka.ip")+":"+resource.getString("kafka.port"));
                // 等待所有副本节点的应答
                props.put("acks", "all");
                // 消息发送最大尝试次数e'AAZ
                props.put("retries", 0);
                // 一批消息处理大小
                props.put("batch.size", 16384);
                // 请求延时
                props.put("linger.ms", 1);
                // 发送缓存区内存大小
                props.put("buffer.memory", 33554432);
                // key序列化
                props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                // value序列化
                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
                producer = new KafkaProducer<>(props);
                logger.debug("初始化KafKa生产者成功");
            } catch (Exception e) {
                logger.error("Kafka生产者初始化异常:"+e.toString()+"|||"+e.getCause());
            }
        }
    
        /**
         * 发生数据
         * @param data
         */
        public void SendData(String data){
            if (producer == null){
                InitCustomProducer();
            }
            try {
                logger.debug("入Kafka数据:"+data);
                Future future = producer.send(new ProducerRecord<>(TOPIC,data));
            } catch (Exception e) {
                logger.error("send to kafka is error:"+e.toString()+"|||"+e.getStackTrace());
            }
        }
    }

    创建Kafka消费者(读取数据):import org.apache.cxf.endpoint.Client;import org.apache.kafka.clients.consumer.ConsumerRecord;

    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.log4j.Logger;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.xml.namespace.QName;
    import java.util.*;
    
    /**
     * Kafka消费者
     */
    @Component
    public class CustomConsumer {
        private final static Logger logger = Logger.getLogger(CustomConsumer.class);
        private static ResourceBundle resource = ResourceBundle.getBundle("application");
    
        @Autowired
        private AsyncTask asyncTask;
    
        public void InitCustomConsumer(){
            logger.debug("开始初始化KafKa消费者");
            Properties props = new Properties();
            // 定义kakfa 服务的地址,不需要将所有broker指定上
            props.put("bootstrap.servers", resource.getString("kafka.ip")+":"+resource.getString("kafka.port"));
            // 制定consumer group
    // 由于同一group.id会导致每次重新启动都会重复读取数据,这里暂时随机分配group.id(原因不明)
    props.put("group.id", "test-consumer-group"+System.currentTimeMillis()); //消费规则 //props.put("auto.offset.reset","latest"); // 是否自动确认offset
    //自动提交偏移量
    props.put("enable.auto.commit", "true"); // 自动确认offset的时间间隔 props.put("auto.commit.interval.ms", "1000"); // key的序列化类 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 定义consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 消费者订阅的topic, 可同时订阅多个 consumer.subscribe(Collections.singletonList("XXX_JSON_TOPIC")); logger.debug("初始化KafKa消费者成功"); try { Client client = asyncTask.getClient(); QName qName = asyncTask.getQName(); while (true) { // 读取数据,读取超时时间为100ms ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ logger.debug(String.format("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value())); logger.debug("消费数据:"+record.value()); } /*
    手动提交偏移量
    try { consumer.commitSync(); } catch (Exception e) { logger.error("commit failed", e); }*/ } }catch (Exception e) { logger.error("consumer异常:"+e); }finally { consumer.close(); } } }
  • 相关阅读:
    牛客小白月赛16E
    洛谷P1309 瑞士轮
    洛谷P1781 宇宙总统
    洛谷P1068 分数线划定
    洛谷P1059 明明的随机数(桶排思想)
    洛谷P1177 【模板】快速排序 (归并排序)
    Python基础-----sys模块
    Python基础-----模块导入注意事项
    Python基础-----os模块
    Python基础-----random随机模块(验证码)
  • 原文地址:https://www.cnblogs.com/lijianda/p/11857545.html
Copyright © 2011-2022 走看看