zoukankan      html  css  js  c++  java
  • Kafka生产者——结合spring开发

    Kafka生产者端

    可靠性保证:

    producer向broker发送消息数据,需要有一定的可靠性,至少要保证数据:

    1、不丢失

    2、不重复

    producer提供了一些参数,在编写producer是进行合理设置和编写,就可以保证数据的可靠性。

    acks 参数配置

    为保证producer发送的数据能够可靠的发送到指定topic,topic的每个partition收到消息后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到 ack,就会进行下一轮的发送,否则重新发送数据。

    0: producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟, broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;

    1: producer 等待 broker 的 ack, partition 的 leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据;

    -1(all) : producer 等待 broker 的 ack, partition 的 leader 和 follower 全部落盘成功后才返回 ack。但是如果在 follower 同步完成后, broker 发送 ack 之前leader 发生故障,那么会造成数据重复。

    Exactly Once 语义

    当ack级别设置为-1的时候,可以保证producer到broker之间不会丢失数据,即At
    Least Once 语义 。相对的,将服务器ack级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once 语义 。

    At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的, At Least Once可以保证数据不重复,但是不能保证数据不丢失。

    对于一些重要信息,我们要求既不能重复也不能丢失,这时我们需要使用Exactly Once 语义 。0.11 版本的 Kafka,引入了一项重大特性:幂等性。 所谓幂等性就是producer无论向broker发送了多少次重复数据,broker都只会持久化一条。幂等性结合At Least Once语义,就结合成了Kafka的Exactly Once语义。
    At Least Once + 幂等性 = Exactly Once
    启动幂等性,只需要将Producer的参数enable.idompotence 设置为true,ack设置为-1即可。

    开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一个分区的消息会附带Sequence Number(自动增长)。Broker端会对<PID,Partition,SeqNumber>做缓存,当具有相同主键的消息提交的时候,broker只会持久化一条消息。
    msg1<PID:1,Partition:1,SeqNumber:0,message : a >
    msg2<PID:1,Partition:1,SeqNumber:1,message : b >
    msg2<PID:1,Partition:1,SeqNumber:2,message : c >

    但是,PID重启就会变化,同时不同分区也会有不同主键,所以幂等性无法保证跨分区跨会话。这里我们就需要引进kafka事务。

    事务

    Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败 。为了实现跨分区跨会话事务,引入一个全局唯一的Transaction id ,将pproducer的pid和Transaction id进行绑定,这样,当producer重启后,就可以通过Transaction ID 获得原来的 PID。这个参数通过客户端程序来进行设定 。

    我们使用kafka消息事务的场景有以下两种:

    1. 在一次业务中,存在消费消息,又存在生产消息。此时如果消息生产失败,那么消费者需要回滚。这种情况称为consumer-transform-producer
    2. 在一次业务中,存在多次生产消息,其中后续生产的消息抛出异常,前置生产的消息需要回滚。

    事务要求生产者开启幂等性特性,因此通过将transactional.id参数设置为非空从而开启事务特性的同时
    需要将ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG设置为true(默认值为true),如果显示设
    置为false,则会抛出异常。


    以上是保证producer发送数据可靠性保证的相关参数,结合spring-kafka的具体使用如下。

    spring-kafka生产端

    spring-kafkaProducer.xml配置:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
    	http://www.springframework.org/schema/beans/spring-beans.xsd
    	http://www.springframework.org/schema/context
    	http://www.springframework.org/schema/context/spring-context.xsd
    	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">
        <context:component-scan base-package="producer" />
        <bean id="producerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <!--broker集群地址-->
                    <entry key="bootstrap.servers" value="192.168.25.10:9092,192.168.25.11:9092,192.168.25.12:9092"/>
                    <!--acks 参数配置-->
                    <entry key="acks" value="all"/>
                    <!--发送失败重试次数-->
                    <entry key ="retries" value="3"/>
                    <!--批次发送大小的内存阀值-->
                    <entry key="batch.size" value="16384"/>
                    <!--批处理延迟时间上限-->
                    <entry key="linger.ms" value="1"/>
                    <!--开启幂等性-->
                    <entry key="enable.idempotence" value="true"/>
                    <!--批处理缓冲区-->
                    <entry key="buffer.memory" value="33554432"/>
                    <!--key序列化器-->
                    <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                    <!--value序列化器-->
                    <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                </map>
            </constructor-arg>
        </bean>
        <!--配置一个生产者监听器,在该类写发送成功或失败的回调方法-->
        <bean id="producerLisener" class="producer.KafkaSendResultHandler"></bean>
        <!--springkafka提供的发送类,对kafka发送方法进行加强性的封装-->
        <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
            <constructor-arg ref="producerFactory"/>
            <constructor-arg name="autoFlush" value="true"/>
            <property name="defaultTopic" value="myTopic"/>
            <property name="producerListener" ref="producerLisener"></property>
        </bean>
        <!--producer工厂-->
        <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <ref bean="producerProperties"/>
            </constructor-arg>
        </bean>
    </beans>
    

    部分重要参数详解:

    acks:

    ​ 这个参数用来指定分区中必须有多少个副本收到这条消息,之后生产者才会认为这条消息时写入成功
    的。

    • ack=0, 生产者在成功写入消息之前不会等待任何来自服务器的相应。如果出现问题生产者是感知
      不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最
      大速度发送消息,从而达到很高的吞吐量。

    • ack=1,默认值为1,只要集群的首领节点收到消息,生产这就会收到一个来自服务器的成功响
      应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收
      到一个错误响应,为了避免数据丢失,生产者会重发消息。但是,这样还有可能会导致数据丢失,
      如果收到写成功通知,此时首领节点还没来的及同步数据到follower节点,首领节点崩溃,就会导
      致数据丢失。

    • ack=-1, 只有当所有参与复制的节点都收到消息时,生产这会收到一个来自服务器的成功响应,
      这种模式是最安全的,它可以保证不止一个服务器收到消息。

      注意:acks参数配置的是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出异常

    retries :

    ​ 生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,如果达到
    了 retires 设置的次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待
    100ms,可以通过 retry.backoff.ms 参数来修改这个时间间隔。

    batch.size :

    ​ 当有多个消息要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可
    以使用的内存大小,按照字节数计算,而不是消息个数。当批次被填满,批次里的所有消息会被发送出
    去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也可能
    被发送。所以就算把 batch.size 设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置
    的太小,生产者会因为频繁发送消息而增加一些额外的开销。

    max.request.size :

    ​ 该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指单个请求里
    所有消息的总大小。 broker 对可接收的消息最大值也有自己的限制( message.max.size ),所以两
    边的配置最好匹配,避免生产者发送的消息被 broker 拒绝。

    linger.ms:批处理延迟时间上限

    buffer.memory:批处理缓冲区

    enable.idempotence:是否开启幂等性

    ProducerListener类

    消息发送后的回调方法,注意的是,这里的监听回显的数据时要发送的数据,不是返回的数据,可以通过日志来观察发送数据是否正确。

    public class KafkaSendResultHandler implements ProducerListener {
       private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);
        public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
            log.info("kafka message send successful : "+"---topic:"+topic+"---partition:"+partition+"---key:"+key+"---value:"+value+"---RecordMetadata:"+recordMetadata);
        }
    
        public void onError(String topic, Integer partition, Object key, Object value, Exception e) {
            log.error("kafka message send fail : "+"---topic:"+topic+"---partition:"+partition+"---key:"+key+"---value:"+value);
            e.printStackTrace();
        }
    
        public boolean isInterestedInSuccess() {
            log.info("ProducerListener started");
            return true;
        }
    }
    

    ProducerClient类

    对kafkaTemplate的再一次封装,kafka在消息发送的时候发送方式可以分为同步发送和异步发送。

    同步发送:

    ​ 同步发送的意思就是,一条消息发送之后,会阻塞当前线程, 直至返回 ack。

      //同步发送
       public void syncSend(){
        testTemplate.send("topic",result.toString()).get(10, TimeUnit.SECONDS);
       }
        
    

    异步发送:

    //异步发送
       public void asyncSend() {
    
          ListenableFuture<SendResult<Integer, String>> future = testTemplate.send("topic",result.toString());
    
          future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
                        @Override
                        public void onSuccess(SendResult<Integer, String> result) {
                           log.info("success");
                        }
                        @Override
                        public void onFailure(Throwable ex) {
                           log.error("failure");
                        }
                    });
    }
    

    ProducerClient对kafkaTemplate的封装(不带事务)

    这里只封装了最简单的发送方法,同时可对其他发送方法进行封装,只需要修改传参即可。

    public class ProducerClient {
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
        /*同步发送*/
        //轮询方式发送
        public void sendMessage(String topicName,String message){
            Map<String,Object> m = new HashMap<String,Object>();
            SendResult<String, String> sendResult = null;
            try {
                sendResult = kafkaTemplate.send(topicName, message).get();
                /*检查recordMetadata的offset数据,不检查producerRecord*/
                if(sendResult!=null) {
                    Long offsetIndex = sendResult.getRecordMetadata().offset();
                    if (offsetIndex != null && offsetIndex >= 0) {
                        m.put("code", KafkaMesConstant.SUCCESS_CODE);
                        m.put("message", KafkaMesConstant.SUCCESS_MES);
                    } else {
                        m.put("code", KafkaMesConstant.KAFKA_NO_OFFSET_CODE);
                        m.put("message", KafkaMesConstant.KAFKA_NO_OFFSET_MES);
                    }
                }else {
                    m.put("code", KafkaMesConstant.KAFKA_NO_RESULT_CODE);
                    m.put("message", KafkaMesConstant.KAFKA_NO_RESULT_MES);
                }
            }  catch (InterruptedException e) {
                e.printStackTrace();
                m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
                m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);
            } catch (ExecutionException e) {
                e.printStackTrace();
                m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
                m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);
            }
            System.out.println("kafkaServers response : "+m);
        }
    }
    
    public class KafkaMesConstant {
        public static final String SUCCESS_CODE = "00000";
        public static final String SUCCESS_MES = "成功";
    
        /*kakfa-code*/
        public static final String KAFKA_SEND_ERROR_CODE = "30001";
        public static final String KAFKA_NO_RESULT_CODE = "30002";
        public static final String KAFKA_NO_OFFSET_CODE = "30003";
    
        /*kakfa-mes*/
        public static final String KAFKA_SEND_ERROR_MES = "发送消息超时,联系liuhui";
        public static final String KAFKA_NO_RESULT_MES = "未查询到返回结果,联系liuhui";
        public static final String KAFKA_NO_OFFSET_MES = "未查到返回数据的offset,联系liuhui";
    }
    

    测试一下

    public class excuter {
    
        @Test
        public void producer() throws InterruptedException {
            ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
            ProducerClient producerClient = (ProducerClient) context.getBean("producerClient");
            producerClient.sendMessage("topic2", new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date()).toString());
            Thread.sleep(1000);
        }
    
    }
    

    控制台结果:(我这里没有使用日志输出,在实际开发中需要使用日志开发)

    ProducerListener started
    kafka message send successful : ---topic:topic2---partition:null---key:null---value:2019-11-19 02:57:07---RecordMetadata:topic2-2@4928
    kafkaServers response : {code=00000, message=成功}
    
  • 相关阅读:
    类方法代码重构寻找坏味道
    迭代二分查找二分查找
    系统牛逼[置顶] 使用RAMP理解内在动机 Understanding Intrinsic Motivation with RAMP
    对象服务器Webservices获取天气
    手机服务器Android消息推送(二)基于MQTT协议实现的推送功能
    概率小数2013年阿里巴巴暑期实习招聘笔试题目(不完整,笔试时间:2013.5.5)
    像素颜色JavaFX示例简易图片处理工具
    算法队列SPFA算法详解
    选择文件Eclipse制作jar包
    nullnull推箱子
  • 原文地址:https://www.cnblogs.com/luckyhui28/p/12003641.html
Copyright © 2011-2022 走看看