zoukankan      html  css  js  c++  java
  • Kafka API使用

    旧版代码示意:

    public class TestDemo {

    @Test

    public void producer(){

    Properties props=new Properties();

    props.put("serializer.class","kafka.serializer.StringEncoder");

    props.put("metadata.broker.list","192.168.234.11:9092");

    Producer<Integer,String> producer=new Producer<>(new ProducerConfig(props));

       

    producer.send(new KeyedMessage<Integer, String>("enbook","Think in java"));

    }

       

    }

       

    新版代码示意:

    import java.util.Properties;

    import java.util.concurrent.ExecutionException;

       

    import org.apache.kafka.clients.producer.KafkaProducer;

    import org.apache.kafka.clients.producer.Producer;

    import org.apache.kafka.clients.producer.ProducerConfig;

    import org.apache.kafka.clients.producer.ProducerRecord;

    import org.junit.Test;

       

       

    public class TestDemo {

    @Test

    public void producer() throws InterruptedException, ExecutionException{

    Properties props=new Properties();

    props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");

    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

       

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.234.11:9092");

       

    Producer<Integer, String> kafkaProducer = new KafkaProducer<Integer, String>(props);

    for(int i=0;i<100;i++){

    ProducerRecord<Integer, String> message = new ProducerRecord<Integer, String>("enbook",""+i);

    kafkaProducer.send(message);

    }

       

    while(true);

    }

       

    }

       

    创建Topic代码:

    @Test

    public void create_topic(){

    ZkUtils zkUtils = ZkUtils.apply("192.168.234.11:2181,192.168.234.210:2181,192.168.234.211:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());

    // 创建一个单分区单副本名为t1topic

    AdminUtils.createTopic(zkUtils, "t1", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);

    zkUtils.close();

    }

       

    删除Topic代码:

    @Test

    public void delete_topic(){

    ZkUtils zkUtils = ZkUtils.apply("192.168.234.11:2181,192.168.234.210:2181,192.168.234.211:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());

    // 删除topic 't1'

    AdminUtils.deleteTopic(zkUtils, "t1");

    zkUtils.close();

    }

       

       

    创建消费者线程并指定消费者组:

    @Test

    public void consumer_1(){

    Properties props = new Properties();

    props.put("bootstrap.servers", "192.168.234.11:9092");

    props.put("group.id", "consumer-tutorial");

    props.put("key.deserializer", StringDeserializer.class.getName());

    props.put("value.deserializer", StringDeserializer.class.getName());

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

       

    consumer.subscribe(Arrays.asList("enbook", "t2"));

       

    try {

    while (true) {

    ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);

    for (ConsumerRecord<String, String> record : records)

    System.out.println("c1消费:"+record.offset() + ":" + record.value());

    }

    } catch (Exception e) {

    } finally {

    consumer.close();

    }

    }

       

    @Test

    public void consumer_2(){

    Properties props = new Properties();

    props.put("bootstrap.servers", "192.168.234.11:9092");

    props.put("group.id", "consumer-tutorial");

    props.put("key.deserializer", StringDeserializer.class.getName());

    props.put("value.deserializer", StringDeserializer.class.getName());

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

       

    consumer.subscribe(Arrays.asList("enbook", "t2"));

       

    try {

    while (true) {

    ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);

    for (ConsumerRecord<String, String> record : records)

    System.out.println("c2消费:"+record.offset() + ":" + record.value());

    }

    } catch (Exception e) {

    } finally {

    consumer.close();

    }

    }

    }

  • 相关阅读:
    Poj2516 最小费用最大流
    spss研究高等院校人文社会研究课题受什么因素影响
    使用bs4实现将诗词名句网站中三国演义小说章节内容爬取
    python爬虫1 爬虫概要
    解析出所有城市名称
    xpath爬取58二手房的房源信息
    Po两段小代码,说几个小细节____关于九九乘法表&amp;amp;amp;国际象棋棋盘
    小爬虫demo——爬取“妹子”等网站链接____使用requests库
    爬取京东历史图书图片并下载到本地____requests库
    python基础:复习整理笔记(一)____关于 工具、程序执行原理、python风格规范
  • 原文地址:https://www.cnblogs.com/shuzhiwei/p/11043610.html
Copyright © 2011-2022 走看看