zoukankan      html  css  js  c++  java
  • kafka 消费者和生产者测试类

    pom.xml:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>bj.zm</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kafka</name>
    <url>http://maven.apache.org</url>

    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>3.8.1</version>
    <scope>test</scope>
    </dependency>

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.0</version>
    </dependency>
    </dependencies>
    </project>

    KafkaConsumer.java

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;

    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;

    public class KafkaConsumer extends Thread{

    private String topic;

    public kafkaConsumer(String topic){
    super();
    this.topic = topic;
    }


    @Override
    public void run() {
    ConsumerConnector consumer = createConsumer();
    System.out.println("消费者对象:"+consumer);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, 1);
    Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
    while(iterator.hasNext()){
    String message = new String(iterator.next().message());
    System.out.println("消费者数据 "+topic +":"+ message);
    }
    }

    private ConsumerConnector createConsumer() {
    Properties properties = new Properties();
    // properties.put("zookeeper.connect", "10.202.27.5:2181,10.202.27.6:2181,10.202.27.7:2181/kafka/st");
    properties.put("zookeeper.connect", "10.202.36.28:2182,10.202.36.30:2182,10.202.36.29:2182/kafka/st");
    properties.put("group.id", "sfst");
    return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }


    public static void main(String[] args) {
    //new kafkaConsumer("EXP_IMG_TO_WQS").start();
    //new kafkaConsumer("EXP_IMG_TO_SSS").start();
    //new kafkaConsumer("EXP_IMG_TYPE1").start();
    new kafkaConsumer("EXP_IMAGE_TOPIC").start();

    }
    }

    KafkaProducer.java

    import java.util.Properties;
    import java.util.concurrent.TimeUnit;

    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import kafka.serializer.StringEncoder;


    public class kafkaProducer extends Thread{

    private String topic;

    public kafkaProducer(String topic){
    super();
    this.topic = topic;
    }


    @Override
    public void run() {
    Producer producer = createProducer();
    int i=0;
    while(true){
    // producer.send(new KeyedMessage<Integer, String>(topic, "{"logId":"1","waybillId":"10376683317","waybillNo":"785000213695","sourceZoneCode":"755A","destZoneCode":"755","oneselfPickupFlg":"1","consignorCompName":"科技公司","consignorAddr":"软件产业基地","consignorPhone":"10086","consignorContName":"王珂","consignorMobile":"8888888","addresseeCompName":"寄件公司","addresseeAddr":"收件地址","addresseePhone":"11111111","addresseeContName":"收件联系人","addresseeMobile":"55555","meterageWeightQty":"10","realWeightQty":"10","quantity":"1","freeParcelFlg":"0","innerParcelFlg":"0","versionNo":"1","inputTypeCode":"2","modifiedEmpCode":""+i+""}"));
    producer.send(new KeyedMessage<Integer, String>(topic, "{"isImageDto":"1","waybillId":"10376683317","billCode":"785000213695","sourceZoneCode":"755A","destZoneCode":"755","oneselfPickupFlg":"1","consignorCompName":"科技公司","consignorAddr":"软件产业基地","consignorPhone":"10086","consignorContName":"王珂","consignorMobile":"8888888","addresseeCompName":"寄件公司","addresseeAddr":"收件地址","addresseePhone":"11111111","addresseeContName":"收件联系人","addresseeMobile":"55555","meterageWeightQty":"10","realWeightQty":"10","quantity":"1","freeParcelFlg":"0","innerParcelFlg":"0","waybillType":"1","inputTypeCode":"2","modifiedEmpCode":""+i+""}"));
    System.out.println("==============: " + i);
    try {
    TimeUnit.SECONDS.sleep(1);
    i++;
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    private Producer createProducer() {
    Properties properties = new Properties();
    // properties.put("zookeeper.connect", "10.202.27.5:2181,10.202.27.6:2181,10.202.27.7:2181");
    // properties.put("metadata.broker.list", "10.202.27.5:9093,10.202.27.6:9093,10.202.27.7:9093");

    properties.put("zookeeper.connect", "10.202.36.28:2182,10.202.36.30:2182,10.202.36.29:2182");
    properties.put("metadata.broker.list", "10.202.36.29:9093,10.202.36.28:9093,10.202.36.30:9093");
    properties.put("serializer.class", StringEncoder.class.getName());

    return new Producer<Integer, String>(new ProducerConfig(properties));
    }


    public static void main(String[] args) {
    // new kafkaProducer("EXP_WB_TOPIC").start();
    new kafkaProducer("EXP_IMAGE_TOPIC").start();
    // new kafkaProducer("EXP_IMG_TO_SSS").start();

    }

    }

  • 相关阅读:
    idea双击打不开没反应的解决办法
    Golang Learn Log #0
    获取请求header的各种方法
    Linux 指令
    Windows下Anaconda的安装和简单使用
    如何把Python脚本导出为exe程序
    C++星号的含义
    仓库盘点功能-ThinkPHP_学习随笔
    详解html中的元老级元素:“table”
    IE在开发工具启动的情况下(打开F12)时 JS才能执行
  • 原文地址:https://www.cnblogs.com/junwangzhe/p/7374866.html
Copyright © 2011-2022 走看看