zoukankan      html  css  js  c++  java
  • Kafka API使用

    旧版代码示意:

    public class TestDemo {

    @Test

    public void producer(){

    Properties props=new Properties();

    props.put("serializer.class","kafka.serializer.StringEncoder");

    props.put("metadata.broker.list","192.168.234.11:9092");

    Producer<Integer,String> producer=new Producer<>(new ProducerConfig(props));

       

    producer.send(new KeyedMessage<Integer, String>("enbook","Think in java"));

    }

       

    }

       

    新版代码示意:

    import java.util.Properties;

    import java.util.concurrent.ExecutionException;

       

    import org.apache.kafka.clients.producer.KafkaProducer;

    import org.apache.kafka.clients.producer.Producer;

    import org.apache.kafka.clients.producer.ProducerConfig;

    import org.apache.kafka.clients.producer.ProducerRecord;

    import org.junit.Test;

       

       

    public class TestDemo {

    @Test

    public void producer() throws InterruptedException, ExecutionException{

    Properties props=new Properties();

    props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");

    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

       

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.234.11:9092");

       

    Producer<Integer, String> kafkaProducer = new KafkaProducer<Integer, String>(props);

    for(int i=0;i<100;i++){

    ProducerRecord<Integer, String> message = new ProducerRecord<Integer, String>("enbook",""+i);

    kafkaProducer.send(message);

    }

       

    while(true);

    }

       

    }

       

    创建Topic代码:

    @Test

    public void create_topic(){

    ZkUtils zkUtils = ZkUtils.apply("192.168.234.11:2181,192.168.234.210:2181,192.168.234.211:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());

    // 创建一个单分区单副本名为t1topic

    AdminUtils.createTopic(zkUtils, "t1", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);

    zkUtils.close();

    }

       

    删除Topic代码:

    @Test

    public void delete_topic(){

    ZkUtils zkUtils = ZkUtils.apply("192.168.234.11:2181,192.168.234.210:2181,192.168.234.211:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());

    // 删除topic 't1'

    AdminUtils.deleteTopic(zkUtils, "t1");

    zkUtils.close();

    }

       

       

    创建消费者线程并指定消费者组:

    @Test

    public void consumer_1(){

    Properties props = new Properties();

    props.put("bootstrap.servers", "192.168.234.11:9092");

    props.put("group.id", "consumer-tutorial");

    props.put("key.deserializer", StringDeserializer.class.getName());

    props.put("value.deserializer", StringDeserializer.class.getName());

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

       

    consumer.subscribe(Arrays.asList("enbook", "t2"));

       

    try {

    while (true) {

    ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);

    for (ConsumerRecord<String, String> record : records)

    System.out.println("c1消费:"+record.offset() + ":" + record.value());

    }

    } catch (Exception e) {

    } finally {

    consumer.close();

    }

    }

       

    @Test

    public void consumer_2(){

    Properties props = new Properties();

    props.put("bootstrap.servers", "192.168.234.11:9092");

    props.put("group.id", "consumer-tutorial");

    props.put("key.deserializer", StringDeserializer.class.getName());

    props.put("value.deserializer", StringDeserializer.class.getName());

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

       

    consumer.subscribe(Arrays.asList("enbook", "t2"));

       

    try {

    while (true) {

    ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);

    for (ConsumerRecord<String, String> record : records)

    System.out.println("c2消费:"+record.offset() + ":" + record.value());

    }

    } catch (Exception e) {

    } finally {

    consumer.close();

    }

    }

    }

  • 相关阅读:
    abap 调用Http --“Get”
    SAP-PI接口创建中的ABAP处理(不含PI配置)
    ALV 监听事件
    ABAP动态创建内表并展示--自撸版
    ABAP动态生成内表的三种方法
    SAP BP字段增强--付款条件检查
    SAP RFC上传接口(包含发布Webservice地址)
    C# 时间函数
    配置Excel的DCOM权限
    ORA-12571 : TNS : 包写入程序失败
  • 原文地址:https://www.cnblogs.com/shuzhiwei/p/11043610.html
Copyright © 2011-2022 走看看