zoukankan      html  css  js  c++  java
  • kafka使用(1)

     

    kafka使用(1)

    1ConsumerRecord

    @Component
    public class ConsumerDemo {
       @KafkaListener(topics = "topicTEST")
       public void listen (ConsumerRecord<?, ?> record){
           System.out.printf("topic is %s, offset is %d, timestamp is %s, value is %s ", record.topic(), record.offset(), record.timestamp(),record.value());
      }
    }

     

    假设我们有3个kafka broker分别brokerA、brokerB、brokerC.

    1. 当我们创建的topic有3个分区partition时并且replication-factor为1,基本上一个broker上一个分区。挡一个broker宕机了,该topic就无法使用了,因为三个分区只有两个能用,

    2. 当我们创建的topic有3个分区partition时并且replication-factor为2时,可能分区数据分布情况是

    节点分区分区副本
    brokerA partiton0 partiton1
    brokerB partiton1 partiton2
    brokerC partiton2 partiton0

    2创建主题

    2.1注解方式

        //创建TopicName为topic.quick.initial的Topic并设置分区数为8以及副本数为1
       @Bean
       public NewTopic initialTopic() {
           return new NewTopic("topic.quick.initial",8, (short) 1 );
      }

    2.2KafkaAdmin和AdminClient

        @Bean
       public KafkaAdmin kafkaAdmin() {
           Map<String, Object> props = new HashMap<>();
           //配置Kafka实例的连接地址
           props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.228.128:9090");
           KafkaAdmin admin = new KafkaAdmin(props);
           return admin;
      }

       @Bean
       public AdminClient adminClient() {
           return AdminClient.create(kafkaAdmin().getConfigurationProperties());
      }
        @Autowired
       private AdminClient adminClient;

       @Test
       public void testCreateTopic() throws InterruptedException {
           NewTopic topic = new NewTopic("topic.quick.initial2", 1, (short) 1);
           adminClient.createTopics(Arrays.asList(topic));
           Thread.sleep(1000);
      }
    • 结果:

    image-20200915155146240

    2.3为什么要分区

    分区提高系统的吞吐量,但是分区的参数只能增大,不能减小。

     

    2.4获得主题的信息

        /**
        * 获得主题的信息
        */
       @Test
       public void testSelectTopicInfo() throws ExecutionException, InterruptedException {
           DescribeTopicsResult topicTest = adminClient.describeTopics(Arrays.asList("topic.quick.initial"));
           topicTest.all().get().forEach((k,v)->{
               System.out.println("k: "+k+" ,v: "+v.toString()+" ");
          });
      }
    k: topic.quick.initial ,v: (name=topic.quick.initial, internal=false, partitions=(partition=0, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=1, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=2, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=3, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=4, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=5, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=6, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)),(partition=7, leader=kafka:9090 (id: 1 rack: null), replicas=kafka:9090 (id: 1 rack: null), isr=kafka:9090 (id: 1 rack: null)), authorizedOperations=null)

    3自定义发送消息监听器

    @Configuration
    public class KafkaSendListenerHandler implements ProducerListener {
       private static final Logger log = LoggerFactory.getLogger(KafkaSendListenerHandler.class);


       @Override
       public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
           log.info("监听消息发送成功...");
           String key = (String)producerRecord.key();
           log.info("key : " + key);
           log.info("Message send success : " + producerRecord.toString());
           log.info("-----------------------------");
      }

       @Override
       public void onError(ProducerRecord producerRecord, Exception exception) {

      }
    }

    4kafka事务管理

    • KafkaTemplate 的 executeInTransaction 方法来声明事务

        @Test
       public void testExecuteInTransaction() throws InterruptedException {
           kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
               @Override
               public Object doInOperations(KafkaOperations kafkaOperations) {
                   kafkaOperations.send("topicTEST", "test executeInTransaction");
                   throw new RuntimeException("fail");
                   //return true;
              }
          });
      }
    • 使用@Transactional注解方式使用注解方式开启事务,首先需要配置KafkaTransactionManager,这个类是Kafka提供事务管理类,需要使用生产者工厂来创建这个事务管理类。需要注意的是,在producerFactory中开启事务功能,并设置TransactionIdPrefix,TransactionIdPrefix是用来生成Transactional.id的前缀。

     @Bean
       public ProducerFactory<String, Object> producerFactory() {
           DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
           factory.transactionCapable();
           factory.setTransactionIdPrefix("tran-");
           return factory;
      }

       @Bean
       public Map<String, Object> producerConfigs() {
           Map<String, Object> props = new HashMap<>();
           props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.228.128:9090");
           props.put("acks", "all");
           props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
           props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
           return props;
      }

       @Bean
       public KafkaTemplate<String, Object> kafkaTemplate() {
           return new KafkaTemplate<String, Object>(producerFactory());
      }

       @Bean
       public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
           KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);
           return manager;
      }
    使用事务结果:
    org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted
    at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:422) [kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) [kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) [kafka-clients-2.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_152]

    2020-09-15 18:50:54.064 INFO 14636 --- [           main] o.s.t.c.transaction.TransactionContext   : Rolled back transaction for test: ....

     

  • 相关阅读:
    iOS自动布局框架-Masonry详解
    iOS设置圆角的三种方式
    iOS9 App Thinning(应用瘦身)功能介绍
    以无线方式安装企业内部应用
    Xcode 9 Analyzing Crash Reports
    IQKeyboardManager 问题锦集
    列表
    字符串索引,切片,步长及方法详解
    while循环语句、格式化输出、常用运算符、字符编码
    if语句简单练习
  • 原文地址:https://www.cnblogs.com/Lambquan/p/13675100.html
Copyright © 2011-2022 走看看