zoukankan      html  css  js  c++  java
  • Kafka API使用

    旧版代码示意:

    public class TestDemo {

    @Test

    public void producer(){

    Properties props=new Properties();

    props.put("serializer.class","kafka.serializer.StringEncoder");

    props.put("metadata.broker.list","192.168.234.11:9092");

    Producer<Integer,String> producer=new Producer<>(new ProducerConfig(props));

       

    producer.send(new KeyedMessage<Integer, String>("enbook","Think in java"));

    }

       

    }

       

    新版代码示意:

    import java.util.Properties;

    import java.util.concurrent.ExecutionException;

       

    import org.apache.kafka.clients.producer.KafkaProducer;

    import org.apache.kafka.clients.producer.Producer;

    import org.apache.kafka.clients.producer.ProducerConfig;

    import org.apache.kafka.clients.producer.ProducerRecord;

    import org.junit.Test;

       

       

    public class TestDemo {

    @Test

    public void producer() throws InterruptedException, ExecutionException{

    Properties props=new Properties();

    props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");

    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

       

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.234.11:9092");

       

    Producer<Integer, String> kafkaProducer = new KafkaProducer<Integer, String>(props);

    for(int i=0;i<100;i++){

    ProducerRecord<Integer, String> message = new ProducerRecord<Integer, String>("enbook",""+i);

    kafkaProducer.send(message);

    }

       

    while(true);

    }

       

    }

       

    创建Topic代码:

    @Test

    public void create_topic(){

    ZkUtils zkUtils = ZkUtils.apply("192.168.234.11:2181,192.168.234.210:2181,192.168.234.211:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());

    // 创建一个单分区单副本名为t1topic

    AdminUtils.createTopic(zkUtils, "t1", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);

    zkUtils.close();

    }

       

    删除Topic代码:

    @Test

    public void delete_topic(){

    ZkUtils zkUtils = ZkUtils.apply("192.168.234.11:2181,192.168.234.210:2181,192.168.234.211:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());

    // 删除topic 't1'

    AdminUtils.deleteTopic(zkUtils, "t1");

    zkUtils.close();

    }

       

       

    创建消费者线程并指定消费者组:

    @Test

    public void consumer_1(){

    Properties props = new Properties();

    props.put("bootstrap.servers", "192.168.234.11:9092");

    props.put("group.id", "consumer-tutorial");

    props.put("key.deserializer", StringDeserializer.class.getName());

    props.put("value.deserializer", StringDeserializer.class.getName());

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

       

    consumer.subscribe(Arrays.asList("enbook", "t2"));

       

    try {

    while (true) {

    ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);

    for (ConsumerRecord<String, String> record : records)

    System.out.println("c1消费:"+record.offset() + ":" + record.value());

    }

    } catch (Exception e) {

    } finally {

    consumer.close();

    }

    }

       

    @Test

    public void consumer_2(){

    Properties props = new Properties();

    props.put("bootstrap.servers", "192.168.234.11:9092");

    props.put("group.id", "consumer-tutorial");

    props.put("key.deserializer", StringDeserializer.class.getName());

    props.put("value.deserializer", StringDeserializer.class.getName());

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

       

    consumer.subscribe(Arrays.asList("enbook", "t2"));

       

    try {

    while (true) {

    ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);

    for (ConsumerRecord<String, String> record : records)

    System.out.println("c2消费:"+record.offset() + ":" + record.value());

    }

    } catch (Exception e) {

    } finally {

    consumer.close();

    }

    }

    }

  • 相关阅读:
    [Sass学习]Sass的安装和使用
    [CSS学习] padding属性讲解
    [CSS学习] line-height属性讲解
    IOS学习之路——Swift语言(2)——基本类型与函数
    IOS学习之路——Swift语言(1)——基本类型、运算符与逻辑控制语句
    上海 day23 -- 面向对象三大特征---多态 和 内置魔法函数
    上海 day22 -- 面向对象三大特征---- 封装
    上海 day21 -- 面向对象三大特征----继承
    上海 day20 -- 面向对象基础
    上海 day18~19 ATM+购物车(待更新)
  • 原文地址:https://www.cnblogs.com/shuzhiwei/p/11043610.html
Copyright © 2011-2022 走看看