zoukankan      html  css  js  c++  java
  • kafka-java消费者与生产者代码示例

    引入依赖

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
            </dependency>

    消费者示例ConsumerMain.java

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    public class ConsumerMain {
        
        public final static String TOPIC = "product_logs_test";
    
        public static void main(String[] args) {
            Properties props = new Properties();
            //集群地址,多个地址用","分隔
            props.put("bootstrap.servers", "masterx1:9092,masterx2:9092,masterx3:9092");
            //设置消费者的group id
            props.put("group.id", "group_test");
            //如果为真,consumer所消费消息的offset将会自动的同步到zookeeper。如果消费者死掉时,由新的consumer使用继续接替
            props.put("enable.auto.commit", "true");
            //consumer向zookeeper提交offset的频率
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            //反序列化
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            List<String> topics = new ArrayList<>();
            topics.add(TOPIC);
            consumer.subscribe(topics);
            while(true) {
                ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
                for(ConsumerRecord<String,String> consumerRecord : consumerRecords){
                    try {
                        String topic = consumerRecord.topic();
                        System.out.println(topic);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
            
        }
    }

    生产者示例ProviderMain.java

    import java.util.Properties;
    import java.util.concurrent.Future;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import net.sf.json.JSONObject;
    
    public class ProviderMain {
    
        public final static String TOPIC = "product_logs_test";
        
        public static void main(String[] args) {
            try {
                Properties props = new Properties();
                //集群地址,多个地址用","分隔
                props.put("bootstrap.servers", "masterx1:9092,masterx2:9092,masterx3:9092");
                props.put("session.timeout.ms", "30000");
                //序列化
                props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                
                KafkaProducer<String, String> producer = new KafkaProducer<>(props);
                JSONObject data = new JSONObject();
                data.put("name", "TheoryDance");
                data.put("fromWhere", "ProductLogTest");
                data.put("time", "2020-04-16 16:12:00");
                
                ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, data.toString());
                Future<RecordMetadata> future = producer.send(record);
                System.out.println(future.get());
                producer.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }

     如果需要通过logstash进行消费,示例logstash-kafka.conf,各种写法,参考logstash的官方文档

    input{
        kafka {
            topics => ["product_logs_test"]
            bootstrap_servers => "192.168.1.xxx:9092"
            codec => "json"
            group_id => "group_test"
        }
    }
    
    filter {
    
    }
    
    output {
        elasticsearch {
            hosts => ["https://192.168.1.xx1:9200","https://192.168.1.xx2:9200","https://192.168.1.xx3:9200"]
            index => "product_logs_test"
            user => "your_passport"
            password => "your_pwd"
            cacert => "/usr/xxxx.../CloudSearchService.cer"使用华为云CSS的证数
        }
    }

    示例logstash-simple.conf

    input{
        file {
            path => "/data/tmp/*"
            type => "file"
            codec => "json"
            start_position => "beginning"
        }
    }
    
    filter {
    
    }
    
    output {
        elasticsearch {
            hosts => ["https://192.168.1.xx1:9200","https://192.168.1.xx2:9200","https://192.168.1.xx3:9200"]
            index => "product_logs_file"
            user => "your_passport"
            password => "your_pwd"
            cacert => "/usr/xxxx.../CloudSearchService.cer"使用华为云CSS的证数
      }
    }
  • 相关阅读:
    Netty实现Http客户端
    Netty实现Http服务端
    Netty实现Tcp客户端
    Netty实现Tcp服务端
    MySQL进阶系列:一文详解explain
    spring boot 获取运行时的yml里的active配置
    eureka 注册中心添加认证
    zuul 负载
    jenkins spring cloud
    秒杀系统如何设计?
  • 原文地址:https://www.cnblogs.com/TheoryDance/p/12742718.html
Copyright © 2011-2022 走看看