zoukankan      html  css  js  c++  java
  • kafka 生产者java编码

    package cn.bigdata.kafka;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class KafkaProducerDemo {
    	public static void main(String[] args) throws InterruptedException {
            /*
             Properties props = new Properties();
             * props.put("bootstrap.servers", "localhost:9092");
             * props.put("acks", "all");
             * props.put("retries", 0);
             * props.put("batch.size", 16384);
             * props.put("linger.ms", 1);
             * props.put("buffer.memory", 33554432);
             * props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
             * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
             */
    		// CommonClientConfigs ProducerConfig 可以配置的参数都在这些类里面
            Map<String, Object> config = new HashMap<String, Object>();
            // kafka对消息是按照key value形式存储,这里指定key和value的序列化方法
            config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // broker的地址
            config.put("bootstrap.servers", "m2:9092");
    		// acks=0 客户端不会等待服务端的确认
    		// acks=1 只会等待leader分区的确认
    		// acks=all或者acks=-1 等待leader分区和follower分区的确认
            config.put("acks", "all");
            Producer<String, String> producer = new KafkaProducer<String, String>(config);
     
            // 发送业务消息
            // 读取文件 读取内存数据库 读socket端口
            for (int i = 1; i <= 100; i++) {
                Thread.sleep(500);
                // 第一个参数是主题,第二个参数是消息
                producer.send(new ProducerRecord<String, String>("test2", i + ""));
            }
        }
    }
    

      

  • 相关阅读:
    C++概念性总结
    友元函数
    C++指针概念
    Linux下多线程(clone()线程)
    Qt5模块化详细总结
    C++有符号与无符号之间的转换问题
    使用C++test工具做Qt代码静态分析
    QT函数带有外部链接但没有在头文件中声明(QT noreturn属性添加)
    Linux之Docker手动创建Docker容器
    我在思考一个很变态的社会趋向
  • 原文地址:https://www.cnblogs.com/heml/p/6074294.html
Copyright © 2011-2022 走看看