zoukankan      html  css  js  c++  java
  • kafka java客户端编程


    kafka_2.10-0.8.1.1


    maven

    <dependencies>
      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.1.1</version>
        <exclusions>
          <exclusion>
            <artifactId>jmxtools</artifactId>
            <groupId>com.sun.jdmk</groupId>
          </exclusion>
          <exclusion>
            <artifactId>jmxri</artifactId>
            <groupId>com.sun.jmx</groupId>
          </exclusion>
          <exclusion>
            <artifactId>jms</artifactId>
            <groupId>javax.jms</groupId>
          </exclusion>
        </exclusions>
      </dependency>
      <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.15</version>
        <exclusions>
          <exclusion>
            <artifactId>jmxtools</artifactId>
            <groupId>com.sun.jdmk</groupId>
          </exclusion>
          <exclusion>
            <artifactId>jmxri</artifactId>
            <groupId>com.sun.jmx</groupId>
          </exclusion>
          <exclusion>
            <artifactId>jms</artifactId>
            <groupId>javax.jms</groupId>
          </exclusion>
          <exclusion>
            <artifactId>mail</artifactId>
            <groupId>javax.mail</groupId>
          </exclusion>
         </exclusions>
      </dependency>
      <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
      </dependency>
    </dependencies>


    producer

     1 package org.admln.kafka.test;
     2 
     3 import java.util.Properties;
     4 
     5 import kafka.javaapi.producer.Producer;
     6 import kafka.producer.KeyedMessage;
     7 import kafka.producer.ProducerConfig;
     8 
     9 public class Producertest {
    10      
    11      public static void main(String[] args) {
    12          Properties props = new Properties();
    13          //props.put("zk.connect", "192.168.1.110:2181");
    14          // serializer.class为消息的序列化类
    15          props.put("serializer.class", "kafka.serializer.StringEncoder");
    16          // 配置metadata.broker.list, 为了高可用, 最好配两个broker实例
    17          props.put("metadata.broker.list", "192.168.1.113:9092");
    18          // 设置Partition类, 对队列进行合理的划分
    19          //props.put("partitioner.class", "idoall.testkafka.Partitionertest");
    20          // ACK机制, 消息发送需要kafka服务端确认
    21          props.put("request.required.acks", "1");
    22 
    23          props.put("num.partitions", "2");
    24          ProducerConfig config = new ProducerConfig(props);
    25          Producer<String, String> producer = new Producer<String, String>(config);
    26          for (int i = 0; i < 10; i++)
    27          {
    28            String msg = "hello" + i;
    29            producer.send(new KeyedMessage<String, String>("test",msg));
    30            System.out.println("i:"+i+" msg:"+msg);
    31          }
    32        }
    33 }


     consumer

     运行consumer一直接收不到消息,还没找到原因


    欲为大树,何与草争;心若不动,风又奈何。
  • 相关阅读:
    kingso_sort
    kingso_module
    KINGSO介绍
    kingso
    铁饭碗的含义不是在一个地方永远有饭吃,而是在任何地方都有饭
    立威廉_百度百科
    甜蜜间谍_百度百科
    贝克曼
    报喜鸟集团有限公司_百度百科
    浙江乔治白服饰股份有限公司
  • 原文地址:https://www.cnblogs.com/admln/p/kafka-JavaClient-program.html
Copyright © 2011-2022 走看看