zoukankan      html  css  js  c++  java
  • 从Kafka发消息到第一个kafka stream程序(基础)

    环境:

    Windows 10,

    Ubuntu 18.04 (虚拟机)

    Java 8

    Spring Boot 2.5.2,其下的 spring-kafka 版本 2.7.3,kafka-streams 版本 2.7.1

    apache-zookeeper-3.7.0,单机 standalone

    kafka_2.13-2.8.0,单机 standalone

    Eclipse Version: 2021-03 (4.19.0):开发工具

    Poatman:接口调用工具 来自#博客园

    ---

    两个Spring  Boot项目:

    1、nothing - 非Web项目

    执行kafka stream程序,从 1#主题(TOPIC) 接收信息,再传到 2#主题。

    2、web - Web-Servlet项目

    向 1#主题 发送消息;

    接收 1#主题 的消息;

    接收 2#主题 的消息——kafka stream处理过的;

    ---

    关于kafka主题的创建:

    在本文中,主题都是 自动创建的——web项目启动后,发送消息给主题时创建、监听主题时给创建。

    nothing项目 必须在 web项目 启动,并执行相关操作 生成 主题 后 才可以使用。

    自动创建 的主题 是有限制的——只有一个分区?需要看文档。

    注,本文写成时,作者对 kafka还不是太熟,就是 水平有限 的意思,本君尽量避免错误描述。

    Ubuntu上运行 ZooKeeper:

    ./zkServer.sh start

    停止:

    ./zkServer.sh stop
    注,conf目录下的 zoo_sample.cfg 复制为 zoo.cfg,并根据需要 修改其中的 dataDir 等参数。来自#博客园
    ---
    ZooKeeper启动后,启动Kafka:
    修改配置文件 config/server.properties ,主要配置项:
    listeners=PLAINTEXT://192.168.151.81:9092 # 某网卡的IP,配置后,可以被其它主机访问
    advertised.listeners=PLAINTEXT://192.168.151.81:9092 # 某网卡的IP,配置后,可以被其它主机访问
    log.dirs=/home/ben/kafka/logs-1 # 数据目录
    zookeeper.connect=localhost:2181
    zookeeper.connection.timeout.ms=18000
    注,根据需要配置,还有更多参数。
    启动命令:
    bin/kafka-server-start.sh config/server.properties
    停止命令:
    1、执行 Ctrl + C (ZooKeeper正常时,可以立即停止,否则,可以通过 kill -9 PID 干掉)来自#博客园
    2、bin/kafka-server-stop.sh
    ---
     
    目录
     
     
    一、Kafka发送消息
    web项目 测试:给 1#主题 发送消息,再 使用 KafkaListener 监听 1#主题 的消息。
     
    引入 kafak依赖包:
            <!-- Kafka -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>

    spring-kafka包 的内容展示:

    配置 application.properties:来自#博客园

    #
    # ubuntu's ip
    ubuntu.ip=192.168.151.81
    
    #
    # Kafka
    spring.kafka.bootstrap-servers=${ubuntu.ip}:9092
    spring.kafka.consumer.group-id=myGroup

    生产者——调用接口:

     1 @RestController
     2 @RequestMapping(path="/kafka")
     3 public class KafkaProducer {
     4 
     5     @Autowired
     6     private KafkaTemplate<String, Object> kafkaTemplate;
     7 
     8     // 发送消息接口
     9     @GetMapping(path="/sendNormal")10     public void sendMessage1(@RequestParam String msg) {
    11         kafkaTemplate.send("topic01", msg);
    12     }
    13 }

    消费者——@KafkaListener:

     1 @Component
     2 public class MyKafkaConsumer {
     3     
     4     // 消费监听
     5     @KafkaListener(topics= {"topic01"})
     6     public void onMessage1(ConsumerRecord<?, ?> record) {
     7         // 消费的哪个topic、partition的消息,打印出消息内容
     8         System.out.println("kafka简单消费:topic=" + record.topic() + ", partition=" + record.partition() 
     9             + ", offset=" + record.offset() + "-" + record.value());
    10     }
    11 }

    消费者输出结果:

    kafka简单消费:topic=topic01, partition=0, offset=1-that is a test,在中国
    二、Kafka事务
    在一个事务进行中,如果出现异常导致事务执行失败,则事务中所有消息发送都不会执行成功。来自#博客园
    比如,一个事务中发送了消息 1、2、3,在2和3之间发生了异常,此时,1、2、3 都不会发送成功。如果没有事务的控制,这种情况下 1、2可以发送成功,只有3不会发送。
     
    在上一节 的基础上。
     
    配置 application.properties:
    # 事务
    spring.kafka.producer.transaction-id-prefix=tx # tx可以是其它名称

    运行web项目,调用生产者发送接口报错

    java.lang.IllegalStateException: No transaction is in process; possible solutions...省略

    在生产者放接口添加@Transactional,再次调用,仍然报错

    1     @GetMapping(path="/sendNormal")
    2     @Transactional
    3     public void sendMessage1(@RequestParam String msg) {
    4         kafkaTemplate.send(tsrc, msg);
    5         
    6     }

    错误信息:

    org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type 'org.springframework.transaction.TransactionManager' 
    available: expected single matching bean but found 2: transactionManager,kafkaTransactionManager

    注意,看了一些其它文章,此时应该执行成功了。这里Spring容器中出现了两个 事务manager——检查Spring容器(ApplicationContext中 Arrays.toString(context.getBeanDefinitionNames()) ),怎么办呢?

    修改@Transactional,指定使用 kafkaTransactionManager:

    1     @GetMapping(path="/sendNormal")
    2     @Transactional(transactionManager = "kafkaTransactionManager")
    3     public void sendMessage1(@RequestParam String msg) {
    4         kafkaTemplate.send(tsrc, msg);
    5         
    6     }

    再次调用接口发送,成功,监听器输出下面的信息:来自#博客园

    kafka简单消费:topic=topic01, partition=0, offset=7-事务transaction消息

    说明,

    将 transactionManager  配置为 另一个Bean transactionManager 也执行成功。

    事务的基本用法有了,接下来 验证事务的有效性

    验证代码:消息中包含error时,抛出异常

     1     @GetMapping(path="/sendThreeInTrans")
     2     @Transactional(transactionManager = "kafkaTransactionManager")
     3     public void sendThreeInTrans(@RequestParam String msg) {
     4         kafkaTemplate.send(tsrc, msg + "-1");
     5         kafkaTemplate.send(tsrc, msg + "-2");
     6         
     7         if (msg.contains("error")) {
     8             throw new RuntimeException("事务执行失败");
     9         }
    10         
    11         kafkaTemplate.send(tsrc, msg + "-3");
    12     }

    msg = 正确的事务transaction消息 时,输出:3条消息都被监听到了

    kafka简单消费:topic=topic01, partition=0, offset=9-正确的事务transaction消息-1
    kafka简单消费:topic=topic01, partition=0, offset=10-正确的事务transaction消息-2
    kafka简单消费:topic=topic01, partition=0, offset=11-正确的事务transaction消息-3

    msg = 错误的事务transaction消息error 时,输出:没有消息被监听到

    测试将 transactionManager 配置为 transactionManager,得到了相同的效果。

    疑问:

    既然两个都有效,是否不需要在 application.properties 中配置就可以使用事务呢

    测试结果是 不可以。

    去掉配置后,发送错误消息的结果:异常发生前的两条消息 成功发送到了 kafka 并被监听器接收到了

    检查spring容器中的Bean,去掉配置后,就没有 kafkaTransactionManager 这个Bean了。

    检查 两个Bean 的类型:transactionManager,kafkaTransactionManager

    通过ApplicationContext检查:

    1         System.out.println(context.getBean("transactionManager"));
    2         System.out.println(context.getBean("kafkaTransactionManager"));
    3         System.out.println(context.getBean("transactionManager").getClass());
    4         System.out.println(context.getBean("kafkaTransactionManager").getClass());

    结果:

    org.springframework.orm.jpa.JpaTransactionManager@4cddc3d9
    org.springframework.kafka.transaction.KafkaTransactionManager@673fdc28
    class org.springframework.orm.jpa.JpaTransactionManager
    class org.springframework.kafka.transaction.KafkaTransactionManager

    原来,一个时 JPA的,一个时 Kafka的啊。

    不好意思,我的项目有依赖 spring-boot-starter-data-jpa ,这才导致了 上面的错误——出现两个bean

    org.springframework.transaction.TransactionManager接口及其子孙依赖结构:其中就有 JpaTransactionManager、KafkaTransactionManager

    在其它博文中,还提到另外一种执行事务的方式:

    kafkaTemplate.executeInTransaction(...)

    事务的事情,还没完呢!

    上面是在 application.properties 中配置开启了 kafka事务,下面介绍另一种方式。

    注释掉配置:

    # 事务
    #spring.kafka.producer.transaction-id-prefix=tx

    添加kafka配置类:

     1 @Configuration
     2 public class WebKafkaConfig {
     3 
     4     @Value("${spring.kafka.bootstrap-servers}")
     5     private String servers;
     6     
     7     @Bean
     8     public ProducerFactory producerFactory() {
     9         DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(this.senderProps());
    10         factory.transactionCapable();
    11         factory.setTransactionIdPrefix("code-tx-");
    12         return factory;
    13     }
    14     
    15     @Bean
    16     public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
    17         KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);
    18         return manager;
    19     }
    20     
    21     private Map<String, Object> senderProps() {
    22         Map<String, Object> props = new HashMap<>();
    23         
    24         //连接地址
    25         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    26         //重试,0为不启用重试机制
    27         props.put(ProducerConfig.RETRIES_CONFIG, 1);
    28         //acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
    29         props.put(ProducerConfig.ACKS_CONFIG, "all");
    30         // 设置幂等性
    31         props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    32 
    33         // 没有时报错:
    34         // Missing required configuration "key.serializer" which has no default value.
    35         //键的序列化方式
    36         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    37         //值的序列化方式
    38         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    39         
    40         return props;
    41     }
    42 }

    16行:

    这个函数的名称为 transactionManager,生产的Bean也是这个名字,此时,不会出现 两个 transactionManager Bean的情况了。

    监听器 直接使用 @Transactional 即可——但却覆盖了JpaTransactionManager实例:

    修改函数名为 myKafkaTransactionManager,再配置@Transactional(transactionManager = "myKafkaTransactionManager"),运行程序。

    测试事务:测试成功。

    疑问

    但是,自定义了上面的 KafkaTransactionManager 后,属于 JPA 的 transactionManager 消失了。

    此时,使用JPA要用到 transactionManager 会不会报错呢?需要进一步验证,TODO

    事务总结:

    1、2种配置方式

    2、两种使用方式

    更进一步

    学习spring、spring boot的事务机制;

    @Transactional 的函数中,既包括kafka,又包括 数据库操作,是否也有效呢?一旦错误发生,大家都执行失败?

     
    三、Kafka Stream
     流式计算。
    依赖 kafka-streams包 即可使用。
            <!-- Kafka stream -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
            </dependency>

    包结构:

     
    本节完成一个小试验:
    在nothing项目中,
    使用 kafka stream 接收 主题 streams-plaintext-input 的消息,
    给消息添加后缀"-suffix" 后,
    发送到 主题 streams-pipe-output
    再由 web项目的 主题 streams-pipe-output 监听器把处理后的消息打印出来。
    另外,发送消息也是调用 web项目的接口。
     
    参考程序:
    官网-TUTORIAL: WRITE A KAFKA STREAMS APPLICATION
     
    说明,
    本来应该写一个 单词统计程序(WordCount)的,可是,参考了好多篇博文 都没搞定,不是这个错,就是 另一个错。
    暂且先实现个小的 kafka stream程序,体验下 流计算的乐趣先。
     
    nothing项目,依赖 kafka-streams包,编写 程序:
     1     public static void startKafkaStream3() {
     2         Properties props = new Properties(); // 基本配置
     3         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
     4         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.242.81:9092");
     5         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
     6         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
     7 
     8         final StreamsBuilder builder = new StreamsBuilder();
     9 
    10         builder.stream("streams-plaintext-input") // 消息来源主题
    11         .mapValues((v)->{ // 除了mapValues,还有很多 处理函数,还可以建立自定义的 org.apache.kafka.streams.processor.Processor 类
    12             return v + "-suffix";
    13         })
    14         .to("streams-pipe-output"); // 处理后的消息目的地主题
    15 
    16         final Topology topology = builder.build(); // 很重要的概念,拓扑
    17 
    18         final KafkaStreams streams = new KafkaStreams(topology, props);
    19         final CountDownLatch latch = new CountDownLatch(1);
    20 
    21         // attach shutdown handler to catch control-c
    22         Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
    23             @Override
    24             public void run() {
    25                 streams.close(); // 关闭kafka stream,,居然可以这么玩!
    26                 latch.countDown();
    27             }
    28         });
    29 
    30         try {
    31             streams.start(); // 启动kafka stream计算
    32             latch.await();
    33         } catch (Throwable e) {
    34             System.exit(1);
    35         }
    36         System.exit(0);
    37     }

    11~13行是 官网没有的,用来给 接收的消息添加后缀。来自#博客园

    nothing项目启动后,执行上面的程序即可(第一个项目——关键)。

    在 web项目 开发:

    不需要依赖 kafka-streams包,继续上面的 程序开发即可。

    1、消息发送接口 到 主题 streams-plaintext-input 来自#博客园

    1     @GetMapping(path="/sendNormal2")
    2     @Transactional
    3     public void sendNormal2(@RequestParam String msg) {
    4         kafkaTemplate.send("streams-plaintext-input", msg);
    5     }

    2、监听 主题 streams-pipe-output 的消息

        @KafkaListener(topics= {"streams-pipe-output"})
        public void onMessage1(ConsumerRecord<?, ?> record) {
            // 消费的哪个topic、partition的消息,打印出消息内容
            System.out.println("kafka简单消费:topic=" + record.topic() + ", partition=" + record.partition() 
                + ", offset=" + record.offset() + "-" + record.value());
        }

    测试结果:

    测试发送了 4条消息,都成功添加了后缀,并从 主题 streams-pipe-output 获取成功。

    本篇博文就这么愉快地结束吧!Kafka原来这么强大啊,还要继续挖掘才是。来自#博客园

    参考资料:

    0、kafka官网

    1、Spring-Kafka(五)—— 使用Kafka事务的两种方式

    作者:海苔胖胖

    2、【Kafka】- KafkaStream wordcount 案例

    程序一直起不来,报下面一些错误:

    2021-07-19 11:43:39.611  WARN 7242 --- [           main] o.a.k.s.p.internals.StateDirectory       : 
    Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file due to the fact that this directory can be cleared by the OS # windows 10 上 下面两条错误,,到Ubuntu运行就没有了 来自#博客园 2021-07-19 11:08:35.534 ERROR 14520 --- [ main] o.a.k.s.p.internals.StateDirectory :
    Failed to change permissions for the directory mpkafka-streams 2021-07-19 11:08:35.535 ERROR 14520 --- [ main] o.a.k.s.p.internals.StateDirectory :
    Failed to change permissions for the directory mpkafka-streamsstreams-wordcount 2021-07-19 11:19:53.102 ERROR 14056 --- [ms-close-thread] o.a.k.s.p.internals.StateDirectory :
    Some task directories still locked while closing state, this indicates unclean shutdown: {} # TODO

    3、Kafka Stream 微服务领域流处理

    作者:久七年

    很好的博文,可是,第一个程序运行就遇到故障——不知道怎么读取 目标主题topic02 的数据。

    报了下面的一些错误:序列化、反序列哈的问题吧,还需探究

    这篇博文中介绍的比较全面。

    Failed to convert from type [java.util.ArrayList<?>] to type [org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>] for value '[2, 2, 2, 2, 2,...
    
    nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting 
    from type [java.lang.Integer] to type [org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>], failedMessage=GenericMessage
    [payload=[2, 2, 2, 2, 2, 2, 2, 2, 2,... Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.ArrayList]
    to [org.apache.kafka.clients.consumer.ConsumerRecord] for GenericMessage [payload=[[]], headers={kafka_offset=[14],
    kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@31b53850, kafka_timestampType=[CREATE_TIME], kafka_receivedPartitionId=[0],
    kafka_receivedMessageKey=[null], kafka_batchConvertedHeaders=[{}], kafka_receivedTopic=[topic1], kafka_receivedTimestamp=[1626514480655],
    kafka_groupId=myGroup}]

    4、Kafka Stream

    作者:努力的小强

    基础概念、功能介绍。

    学了好几天,写了三小时,还不错。 

  • 相关阅读:
    js,js中使用正则表达式
    web开发中文件下载
    EL表达式
    Servlet Filter
    压缩文件 乱码问题(转载)
    MFC CopyDirectory
    SaveFileDialog
    Create Window
    CDateTimeCtrl 设置时间
    键值表
  • 原文地址:https://www.cnblogs.com/luo630/p/15029899.html
Copyright © 2011-2022 走看看