zoukankan      html  css  js  c++  java
  • kafka2.12_1.0.1生产者示范代码

    import java.util.Properties;
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;

    public class ProducerDemo {
    //主題
    private final static String TOPIC="test02";
    //要发送的数据
    private final static String CONNECT="this is a message";
    public static void main(String[] args) {
    // 声明连接属性
    Properties properties = new Properties();
    properties.put("zookeeper.connect", "192.168.157.131:2181");// 声明zk
    properties.put("group.id", "g");// 必须要使用别的组名称,
    // 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
    properties.put("auto.offset.reset", "smallest");
    properties.put("metadata.broker.list", "192.168.157.131:9092");
    properties.put("serializer.class", "kafka.serializer.StringEncoder"); //根据自己存储数据类型选择不同的编码器

    ProducerConfig conf=new ProducerConfig(properties);
    //创建Kafka的生产者, key是消息的key的类型, value是消息的类型
    Producer<String,String> producer=new Producer<String,String>(conf);
    KeyedMessage<String,String> message=new KeyedMessage<String,String>(TOPIC,CONNECT);
    producer.send(message);
    }
    }

    producer包含一个用于保存待发送消息的缓冲池,缓冲池中消息是还没来得及传输到kafka集群的消息。位于底层的kafka I/O线程负责将缓冲池中的消息转换成请求发送到集群。如果在结束produce时,没有调用close()方法,那么这些资源会发生泄露。 
    用于建立消费者的相关参数说明及其默认值参见producerconfigs,此处对代码中用到的几个参数进行解释: 
    bootstrap.servers:用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2; 
    acks:生产者需要server端在接收到消息后,进行反馈确认的尺度,主要用于消息的可靠性传输;acks=0表示生产者不需要来自server的确认;acks=1表示server端将消息保存后即可发送ack,而不必等到其他follower角色的都收到了该消息;acks=all(or acks=-1)意味着server端将等待所有的副本都被接收后才发送确认。 
    retries:生产者发送失败后,重试的次数 
    batch.size:当多条消息发送到同一个partition时,该值控制生产者批量发送消息的大小,批量发送可以减少生产者到服务端的请求数,有助于提高客户端和服务端的性能。 
    linger.ms:默认情况下缓冲区的消息会被立即发送到服务端,即使缓冲区的空间并没有被用完。可以将该值设置为大于0的值,这样发送者将等待一段时间后,再向服务端发送请求,以实现每次请求可以尽可能多的发送批量消息。 
    batch.size和linger.ms是两种实现让客户端每次请求尽可能多的发送消息的机制,它们可以并存使用,并不冲突。 
    buffer.memory:生产者缓冲区的大小,保存的是还未来得及发送到server端的消息,如果生产者的发送速度大于消息被提交到server端的速度,该缓冲区将被耗尽。 
    key.serializer,value.serializer说明了使用何种序列化方式将用户提供的key和vaule值序列化成字节。

    注意:代码运行IP必须要配置host映射关系,zookeeper里记录的是host名,运行代码连接kafka里是利用zookeeper里记录的host名来访问kafka,若没有配置host映射,则会提示“kafka Failed to send messages after 3 tries”异常

  • 相关阅读:
    关于Maven项目build时出现No compiler is provided in this environment的处理
    freemaker的函数使用
    FTP在docker容器中上传失败解决,改为被动模式
    linux重定向及nohup不输出的方法
    手动抠下的wordpress登录页面样式
    使用后端生成图片验证码流文件(不推荐)
    部署到docker容器后图片验证码显示不出来
    Linux修改profile文件改错了,恢复的方法
    DotNETCore 学习笔记 异常处理
    DotNETCore 学习笔记 路由
  • 原文地址:https://www.cnblogs.com/runnerjack/p/8629029.html
Copyright © 2011-2022 走看看