zoukankan      html  css  js  c++  java
  • kafka 生产者消费者 api接口

    生产者

    import java.util.Properties;
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import kafka.serializer.StringEncoder;
    public class KafkaProducer  extends Thread{
    private String topic;

    //构造方法
    public KafkaProducer(String topic) {
    this.topic = topic;
    }


    @Override
    public void run() {
    Producer<Integer, String> producer = createProducer();
    for(int i =1 ;i<=10;i++){
    String message = "message"+i; 
    producer.send(new KeyedMessage<Integer, String>(topic, message));
    System.out.println("生产者生成---------->"+message);

    try {
    //睡眠 1S
    sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }


    public Producer createProducer() {

    Properties properties = new Properties();
    properties.setProperty("zookeeper.connect", "192.168.80.20:2181,192.168.80.21:2181,192.168.80.22:2181");
    // properties.setProperty("zookeeper.connect", "cloud1:2181,cloud2:2181,cloud3:2181");
    properties.setProperty("serializer.class", StringEncoder.class.getName());
    //properties.setProperty("serializer.class", "kafka.serializer.StringEncoder");
    properties.setProperty("metadata.broker.list", "192.168.80.20:9092");
    return new Producer<Integer, String>(new ProducerConfig(properties));
    }
    public static void main(String[] args) {
    new KafkaProducer("test").start();
    }
    }




    消费者

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;

    public class KafkaComsumer extends Thread {
    private String topic;

    public KafkaComsumer(String topic) {
    this.topic = topic;
    }


    @Override
    public void run() {
    ConsumerConnector consumer = createConsumer();
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, 1);
    Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap );
    KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();

    while(iterator.hasNext()){
    String message = new String(iterator.next().message());
    System.out.println("接受到的信息------------》"+message);

    }


    }


    public ConsumerConnector createConsumer() {
    Properties properties = new Properties();
    properties.setProperty("zookeeper.connect", "192.168.80.20:2181,192.168.80.21:2181,192.168.80.22:2181");
    properties.setProperty("group.id", "group1");
    return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }
    public static void main(String[] args) {
    new KafkaComsumer("test").start();
    }
    }

  • 相关阅读:
    Kali Linux渗透基础知识整理(二)漏洞扫描
    Elasticsearch为记录添加时间戳timestamp
    手把手带你使用JS-SDK自定义微信分享效果
    SpringBoot学习(3)-SpringBoot添加支持CORS跨域访问
    Java 骚操作--生成二维码
    清除微信内置浏览器缓存
    使用python脚本Telnet 华为交换机备份配置
    如何备份思科、锐捷、Juniper的配置文件
    微信公众平台开发教程Java版(六) 事件处理(菜单点击/关注/取消关注)
    How do you build a database?
  • 原文地址:https://www.cnblogs.com/tlnshuju/p/6984238.html
Copyright © 2011-2022 走看看