zoukankan      html  css  js  c++  java
  • kafka-java客户端连接

    使用java客户端, kafkaproducer, kafkaconsumer进行kafka的连接

    注: 0.10 版本之后, 连接kafka只需要brokerip即可, 不需要zookeeper的信息

    1, kafka 配置信息

    {
        "producer": {
            "bootstrap.servers": "10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093",
            "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
            "value.serializer": "org.apache.kafka.common.serialization.StringSerializer",
            "max.request.size": "10485760",
            "batch.size": "163840",
            "buffer.memory": "536870912",
            "max.block.ms": "500",
            "retries": "3",
            "acks": "1",
        },
        "cosumer": {
            "bootstrap.servers": "10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093",
            "group.id": "test222",
            "session.timeout.ms": "30000",
            "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
            "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
        }
    }

    2, kafka utils, 用来读取kafka的配置信息

    package com.wenbronk.kafka;
    
    
    import com.alibaba.fastjson.JSON;
    import com.google.gson.JsonElement;
    import com.google.gson.JsonObject;
    import com.google.gson.JsonParser;
    import org.junit.Test;
    
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.util.Map;
    import java.util.Properties;
    
    public class KafkaUtils {
        @Test
        public void test() throws FileNotFoundException {
            getConfig("producer");
    //        fastJSON();
        }
    
        public static JsonObject getConfig(String name) throws FileNotFoundException {
            JsonParser parser = new JsonParser();
            JsonElement parse = parser.parse(new FileReader("src/main/resources/kafka"));
            JsonObject jsonObject = parse.getAsJsonObject().getAsJsonObject(name);
            System.out.println(jsonObject);
            return jsonObject;
        }
    
        public static Properties getProperties(String sourceName) throws FileNotFoundException {
            JsonObject config = KafkaUtils.getConfig(sourceName);
            Properties properties = new Properties();
    
            for (Map.Entry<String, JsonElement> entry : config.entrySet()) {
                properties.put(entry.getKey(), entry.getValue().getAsString());
            }
            return properties;
        }
    
    //    public static void fastJSON() throws FileNotFoundException {
    //        Object o = JSON.toJSON(new FileReader("src/main/resources/kafka"));
    //        System.out.println(o);
    //    }
    
    }

    3, kafka producer

    package com.wenbronk.kafka;
    
    import com.google.gson.JsonElement;
    import com.google.gson.JsonObject;
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.junit.Test;
    
    import javax.swing.text.StyledEditorKit;
    import java.io.FileNotFoundException;
    import java.util.*;
    import java.util.stream.IntStream;
    
    /**
     * 消息提供者
     */
    public class KafkaProducerMain {
    
        @Test
        public void send() throws Exception {
            HashMap<String, String> map = new HashMap<>();
            map.put("http_zhixin", "send message to kafka from producer");
            for (int i = 0; i < 3; i++ ) {
                sendMessage(map);
            }
    //        sendMessage(map);
        }
    
        /**
         * 消息发送
         */
        public void sendMessage(Map<String, String> topicMsg) throws FileNotFoundException {
            Properties properties = KafkaUtils.getProperties("producer");
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
    
            for (Map.Entry<String, String> entry : topicMsg.entrySet()) {
                String topic = entry.getKey();
                String message = entry.getValue();
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message);
                // 发送
    //            producer.send(record, new CallBackFuntion(topic, message));
                producer.send(record, (recordMetadata, e) -> {
                    if (e != null) {
                        System.err.println(topic + ": " + message + "--消息发送失败");
                    }else {
                        System.err.println(topic + ": " + message + "--消息发送成功");
                    }
                });
            }
            producer.flush();
            producer.close();
        }
    }

    回掉函数可写匿名内部类, 也可写外部类通过新建的方式运行

    package com.wenbronk.kafka;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    /**
     * 回掉函数
     */
    public class CallBackFuntion implements Callback {
    
        private String topic;
        private String message;
    
        public CallBackFuntion(String topic, String message) {
            this.topic = topic;
            this.message = message;
        }
    
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e != null) {
                System.out.println(topic + ": " + message + "--消息发送失败");
            }else {
                System.out.println(topic + ": " + message + "--消息发送成功");
            }
        }
    }

    4, kafka consumer

    package com.wenbronk.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    import org.junit.Test;
    
    import java.io.FileNotFoundException;
    import java.util.*;
    
    public class KafkaConsumerMain {
    
        /**
         * 自动提交offset
         */
        public void commitAuto(List<String> topics) throws FileNotFoundException {
            Properties props = KafkaUtils.getProperties("cosumer");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(topics);
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records)
                    System.err.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    
        /**
         * 手动提交offset
         *
         * @throws FileNotFoundException
         */
        public void commitControl(List<String> topics) throws FileNotFoundException {
            Properties props = KafkaUtils.getProperties("cosumer");
            props.put("enable.auto.commit", "false");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(topics);
            final int minBatchSize = 2;
            List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    buffer.add(record);
                }
                if (buffer.size() >= minBatchSize) {
                    insertIntoDb(buffer);
                    // 阻塞同步提交
                    consumer.commitSync();
                    buffer.clear();
                }
            }
        }
    
        /**
         * 手动设置分区
         */
        public void setOffSet(List<String> topics) throws FileNotFoundException {
            Properties props = KafkaUtils.getProperties("cosumer");
            props.put("enable.auto.commit", "false");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(topics);
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                // 处理每个分区消息后, 提交偏移量
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        System.out.println(record.offset() + ": " + record.value());
                    }
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                }
            }
        }
    
        /**
         * 手动设置消息offset
         */
        public void setSeek(List<String> topics) throws FileNotFoundException {
            Properties props = KafkaUtils.getProperties("cosumer");
            props.put("enable.auto.commit", "false");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(topics);
            consumer.seek(new TopicPartition("http_zhixin", 0), 797670770);
            ConsumerRecords<String, String> records = consumer.poll(100);
    
            for (ConsumerRecord<String, String> record : records) {
                System.err.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                consumer.commitSync();
            }
    
        }
    
        @Test
        public void test() throws FileNotFoundException {
            ArrayList<String> topics = new ArrayList<>();
            topics.add("http_zhixin");
    
    //        commitAuto(topics);
    //        commitControl(topics);
    //        setOffSet(topics);
            setSeek(topics);
        }
    
        /**
         * doSomethings
         */
        private void insertIntoDb(List<ConsumerRecord<String, String>> buffer) {
            buffer.stream().map(x -> x.value()).forEach(System.err::println);
        }
    
    
    }

    kafka 处于同一组的消费者, 不可以重复读取消息, 0.11版本中加入了事物控制

  • 相关阅读:
    计算字符串相似度算法——Levenshtein
    信息检索参考网站
    文献检索
    【BZOJ】1684: [Usaco2005 Oct]Close Encounter(暴力+c++)
    【BZOJ】1664: [Usaco2006 Open]County Fair Events 参加节日庆祝(线段树+dp)
    【BZOJ】1644: [Usaco2007 Oct]Obstacle Course 障碍训练课(bfs)
    【BZOJ】1652: [Usaco2006 Feb]Treats for the Cows(dp)
    【BZOJ】1672: [Usaco2005 Dec]Cleaning Shifts 清理牛棚(dp/线段树)
    Codeforces Round #265 (Div. 2)
    中秋节模拟赛之冷月葬花魂(被虐瞎)
  • 原文地址:https://www.cnblogs.com/wenbronk/p/8124346.html
Copyright © 2011-2022 走看看