zoukankan      html  css  js  c++  java
  • JAVA封装消息中间件调用一(kafka生产者篇)

      这段时间因为工作关系一直在忙于消息中间件的发开,现在趁着项目收尾阶段分享下对kafka的一些使用心得。

      kafka的原理我这里就不做介绍了,可参考http://orchome.com/kafka/index 这里我重点给大家介绍下kafka生产者的使用

      kafka可分为新旧版本,旧版本(0.8Scala版本)我们不去研究,新版本(0.9和0.10)增加了异步发送的API

      示例代码如下

      pom.xml增加依赖

           <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.10</artifactId>
                <version>0.10.1.1</version>
            </dependency>

    JAVA发送方法:

    Properties props = new Properties();
     props.put("bootstrap.servers", bootstrap.servers);
     props.put("acks", "all");
     props.put("retries", 0);
     props.put("batch.size", 0);
     props.put("linger.ms", 1);
     props.put("buffer.memory", 33554432);
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     Producer<String,String> producer = new KafkaProducer<String,String>(props);
    ProducerRecord<String,String> record = new ProducerRecord<String,String>(message.getTopic().getName(), message.getMessageId(), JSONObject.toJSONString(message));
    RecordMetadata recordMetadata = producer.send(record).get();

    ack是判别请求是否为完整的条件(就是是判断是不是成功发送了)。我们指定了“all”将会阻塞消息,这种设置性能最低,但是是最可靠的。

    retries,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。

    producer(生产者)缓存每个分区未发送消息。缓存的大小是通过 batch.size 配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。

    默认缓冲可立即发送,即遍缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置linger.ms大于0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。这类似于TCP的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,这个设置将增加1毫秒的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是 linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。

    buffer.memory 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。

    key.serializervalue.serializer示例,将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerializaerStringSerializer处理简单的string或byte类型。



    ProducerRecord介绍:topic【消息主题】 key【消息的key值,通常用于消息的分区】 value【消息体】
        /**
         * Create a record to be sent to Kafka
         * 
         * @param topic The topic the record will be appended to
            
         * @param key The key that will be included in the record
         * @param value The record contents
         */
        public ProducerRecord(String topic, K key, V value) {
            this(topic, null, null, key, value);
        }    
     producer.send发送方式为异步发送,添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率。 由于send调用是异步的,它将为分配消息的此消息的RecordMetadata返回一个Future。如果future调用get(),则将阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常

      完全无阻塞的话,可以利用回调参数提供的请求完成时将调用的回调通知。

    ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
     producer.send(myRecord,
                   new Callback() {
                       public void onCompletion(RecordMetadata metadata, Exception e) {
                           if(e != null)
                             
                           System.out.println("获取消息发送结果");
                       }
                   });

     总体来说,新版API生产者发送方式比较简单,这里我也不多做描述。重点在与消费者的实现,我将会在下一篇给大家详细介绍

      

      

  • 相关阅读:
    手机端上传图片及java后台接收和ajaxForm提交
    JEECG中datagrid方法自定义查询条件
    微信分享到朋友圈按钮 右上角提示
    Js获取后台集合List的值和下标的方法
    redis系列之数据库与缓存数据一致性解决方案
    替换{0}为指定的字符串(MessageFormat)
    java中对array数组的常用操作
    面试题-Java Web-网络通信
    你应该知道的JAVA面试题
    各大互联网公司java开发面试常问问题
  • 原文地址:https://www.cnblogs.com/zwt1990/p/6930387.html
Copyright © 2011-2022 走看看