zoukankan      html  css  js  c++  java
  • Kafka 2.3 Producer (0.9以后版本适用)

    kafka0.9版本以后用java重新编写了producer,废除了原来scala编写的版本。

    这里直接使用最新2.3版本,0.9以后的版本都适用。

    注意引用的包为:org.apache.kafka.clients.producer

    import java.util.Properties;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class ProducerDemo {
    
    	public static void main(String[] args) {
    
    		Properties properties = new Properties();
    		properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
    		properties.put("acks", "all");
    		properties.put("retries", 0);
    		properties.put("batch.size", 16384);
    		properties.put("linger.ms", 1);
    		properties.put("buffer.memory", 33554432);
    		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    		KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    		kafkaProducer.send(new ProducerRecord<>("topic", "value"));
    		kafkaProducer.close();
    
    	}
    	
    }
    
    

    0.11.0以后增加了事务,事务producer的示例代码如下,需要适用于0.11.0以后的版本:

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.KafkaException;
    import org.apache.kafka.common.errors.AuthorizationException;
    import org.apache.kafka.common.errors.OutOfOrderSequenceException;
    import org.apache.kafka.common.errors.ProducerFencedException;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class TransactionsProducerDemo {
    
    	public static void main(String[] args) {
    
    		Properties props = new Properties();
    		props.put("bootstrap.servers", "localhost:9092");
    		props.put("transactional.id", "my-transactional-id");
    		Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
    
    		producer.initTransactions();
    
    		try {
    			producer.beginTransaction();
    			for (int i = 0; i < 100; i++)
    				producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    			producer.commitTransaction();
    		} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    			// We can't recover from these exceptions, so our only option is to close the producer and exit.
    			producer.close();
    		} catch (KafkaException e) {
    			// For all other exceptions, just abort the transaction and try again.
    			producer.abortTransaction();
    		}
    		producer.close();
    
    	}
    	
    }
    

    更多实时计算,Kafka等相关技术博文,欢迎关注实时流式计算

  • 相关阅读:
    3星|简·雅各布斯《经济的本质》:经济遵循与自然一样的发展规律:分化与组合、多样性、共同发展
    3星|《陈志武金融通识课》:从金融的角度看历史、文化、商业
    2018左其盛差评榜(截至7月6日)
    2018左其盛好书榜(截至7月6日)
    哈佛多是标题党。6本哈佛书点评
    OPENGL画图类库
    Html,Css,Javascript及其他的注释方法详解
    C#画图解决闪烁问题
    C#ShowCursor光标的显示与隐藏
    C#实现GDI+基本图的缩放、拖拽、移动
  • 原文地址:https://www.cnblogs.com/tree1123/p/11386944.html
Copyright © 2011-2022 走看看