zoukankan      html  css  js  c++  java
  • kafka中生产者和消费者API

    使用idea实现相关API操作,先要再pom.xml重添加Kafka依赖:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.8.2</artifactId>
                <version>0.8.1</version>
                <exclusions>
                    <exclusion>
                        <artifactId>jmxtools</artifactId>
                        <groupId>com.sun.jdmk</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>jmxri</artifactId>
                        <groupId>com.sun.jmx</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>jms</artifactId>
                        <groupId>javax.jms</groupId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-api</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>

    Kafka生产者API:

     1 package cn.itcast.storm.kafka.simple;
     2 
     3 import kafka.javaapi.producer.Producer;
     4 import kafka.producer.KeyedMessage;
     5 import kafka.producer.ProducerConfig;
     6 
     7 import java.util.Properties;
     8 import java.util.UUID;
     9 
    10 /**
    11  * 这是一个简单的Kafka producer代码
    12  * 包含两个功能:
    13  * 1、数据发送
    14  * 2、数据按照自定义的partition策略进行发送
    15  *
    16  *
    17  * KafkaSpout的类
    18  */
    19 public class KafkaProducerSimple {
    20     public static void main(String[] args) {
    21         /**
    22          * 1、指定当前kafka producer生产的数据的目的地
    23          *  创建topic可以输入以下命令,在kafka集群的任一节点进行创建。
    24          *  bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test
    25          */
    26         String TOPIC = "orderMq";
    27         /**
    28          * 2、读取配置文件
    29          */
    30         Properties props = new Properties();
    31         /*
    32          * key.serializer.class默认为serializer.class
    33          */
    34         props.put("serializer.class", "kafka.serializer.StringEncoder");
    35         /*
    36          * kafka broker对应的主机,格式为host1:port1,host2:port2
    37          */
    38         props.put("metadata.broker.list", "kafka01:9092,kafka02:9092,kafka03:9092");
    39         /*
    40          * request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1
    41          * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。
    42          * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
    43          * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。
    44          * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。
    45          * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。
    46          * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。
    47          * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失
    48          */
    49         props.put("request.required.acks", "1");
    50         /*
    51          * 可选配置,如果不配置,则使用默认的partitioner partitioner.class
    52          * 默认值:kafka.producer.DefaultPartitioner
    53          * 用来把消息分到各个partition中,默认行为是对key进行hash。
    54          */
    55         props.put("partitioner.class", "cn.itcast.storm.kafka.MyLogPartitioner");
    56 //        props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
    57         /**
    58          * 3、通过配置文件,创建生产者
    59          */
    60         Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
    61         /**
    62          * 4、通过for循环生产数据
    63          */
    64         for (int messageNo = 1; messageNo < 100000; messageNo++) {
    65 //            String messageStr = new String(messageNo + "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey," +
    66 //                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
    67 //                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
    68 //                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
    69 //                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
    70 //                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
    71 //                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
    72 //                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
    73 //                    "用来配合自定义的MyLogPartitioner进行数据分发");
    74 
    75             /**
    76              * 5、调用producer的send方法发送数据
    77              * 注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发
    78              */
    79             producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "itcast"));
    80         }
    81     }
    82 }

    Kafka消费者API:

     1 package cn.itcast.storm.kafka.simple;
     2 
     3 import kafka.consumer.Consumer;
     4 import kafka.consumer.ConsumerConfig;
     5 import kafka.consumer.ConsumerIterator;
     6 import kafka.consumer.KafkaStream;
     7 import kafka.javaapi.consumer.ConsumerConnector;
     8 import kafka.message.MessageAndMetadata;
     9 
    10 import java.util.HashMap;
    11 import java.util.List;
    12 import java.util.Map;
    13 import java.util.Properties;
    14 import java.util.concurrent.ExecutorService;
    15 import java.util.concurrent.Executors;
    16 
    17 public class KafkaConsumerSimple implements Runnable {
    18     public String title;
    19     public KafkaStream<byte[], byte[]> stream;
    20     public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {
    21         this.title = title;
    22         this.stream = stream;
    23     }
    24     @Override
    25     public void run() {
    26         System.out.println("开始运行 " + title);
    27         ConsumerIterator<byte[], byte[]> it = stream.iterator();
    28         /**
    29          * 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞
    30          * 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false
    31          * */
    32         while (it.hasNext()) {
    33             MessageAndMetadata<byte[], byte[]> data = it.next();
    34             String topic = data.topic();
    35             int partition = data.partition();
    36             long offset = data.offset();
    37             String msg = new String(data.message());
    38             System.out.println(String.format(
    39                     "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]",
    40                     title, topic, partition, offset, msg));
    41         }
    42         System.out.println(String.format("Consumer: [%s] exiting ...", title));
    43     }
    44 
    45     public static void main(String[] args) throws Exception{
    46         Properties props = new Properties();
    47         props.put("group.id", "dashujujiagoushi");
    48         props.put("zookeeper.connect", "zk01:2181,zk02:2181,zk03:2181");
    49         props.put("auto.offset.reset", "largest");
    50         props.put("auto.commit.interval.ms", "1000");
    51         props.put("partition.assignment.strategy", "roundrobin");
    52         ConsumerConfig config = new ConsumerConfig(props);
    53         String topic1 = "orderMq";
    54         String topic2 = "paymentMq";
    55         //只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出
    56         ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);
    57         //定义一个map
    58         Map<String, Integer> topicCountMap = new HashMap<>();
    59         topicCountMap.put(topic1, 3);
    60         //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流
    61         Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
    62         //取出 `kafkaTest` 对应的 streams
    63         List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic1);
    64         //创建一个容量为4的线程池
    65         ExecutorService executor = Executors.newFixedThreadPool(3);
    66         //创建20个consumer threads
    67         for (int i = 0; i < streams.size(); i++)
    68             executor.execute(new KafkaConsumerSimple("消费者" + (i + 1), streams.get(i)));
    69     }
    70 }

    kafka自定义patition:

     1 package cn.itcast.storm.kafka;
     2 
     3 import kafka.producer.Partitioner;
     4 import kafka.utils.VerifiableProperties;
     5 import org.apache.log4j.Logger;
     6 
     7 
     8 public class MyLogPartitioner implements Partitioner {
     9     private static Logger logger = Logger.getLogger(MyLogPartitioner.class);
    10 
    11     public MyLogPartitioner(VerifiableProperties props) {
    12     }
    13 
    14     public int partition(Object obj, int numPartitions) {
    15         return Integer.parseInt(obj.toString())%numPartitions;
    16 //        return 1;
    17     }
    18 
    19 }
  • 相关阅读:
    oracle11g 卸载和安装(win7,32位)
    MySQL忘记密码解决办法
    GPIO硬件资源的申请,内核空间和用户空间的数据交换,ioctl(.....),设备文件的自动创建
    模块参数,系统调用,字符设备编程重要数据结构,设备号的申请与注册,关于cdev的API
    开发环境的搭建,符合导出,打印优先级阈值
    定时器中断
    Linux系统移植的重要文件
    linux 相关指令
    linux各文件夹含义和作用
    外部中断实验
  • 原文地址:https://www.cnblogs.com/ahu-lichang/p/6936788.html
Copyright © 2011-2022 走看看