zoukankan      html  css  js  c++  java
  • Kafka+SpringMVC+Maven应用示例

      本文借助主流SpringMVC框架向大家介绍如何在具体应用中简单快捷的使用kafka。kafka、maven以及SpringMVC在现在的企业级应用中都占据着非常重要的地位,所以本文将三者结合起来也可以方便大家进一步熟悉基于Maven的SpringMVC框架搭建。

    项目展示

      国际惯例,首先先向大家展示一下项目最终的运行效果:

      当项目正常启动后,在浏览器中输入:http://127.0.0.1:8080/kafkaSpringMVC/welcome 进入欢迎界面:

      然后点击Send a Message 进入消息发送页面:

      从上面可以看出,发送的消息是当前系统的时间(当然你也可以修改成为自己感冒的消息),点击Submit后将消息发送到kafka集群服务器,然后自动返回到Welcome欢迎界面。在欢迎界面点击Get a Message:

      从上述界面中我们可以看见页面中已经获取到了刚才发送的消息,点击RETURN HOME,返回欢迎界面,好啦项目展示就这么简单。

    开发环境

    • 操作系统:MacOS 10.12.3(同样适用于Linux系统和Windows系统)
    • JDK: java version "1.8.0_121"
    • 开发平台:Eclipse Neon.2 Release (4.6.2)
    • WEB容器:wildfly-8.1.0.Final
    • zookeeper: zookeeper-3.4.9
    • kafka: kafka-2.10-0.10.2.0
    • maven: Eclipse Neon.2 Release(4.6.2)自带maven工具,版本为3.3.9

    项目框架

      项目框架如下图所示:

    项目开发流程

      首先搭建Maven Web Project框架,搭建过程可参考我的另一篇随笔maven web框架搭建,大家也可以在网上搜索更多的文章来学习,这里就不在重复描述啦。本项目命名为:kafkaSpringMVC。

      框架搭建完成后,下面就需要引入Spring MVC 所需要的jar包和kafka客户端开发需要的jar包。本示例采用的Spring 版本为4.3.6.REALEASE,使用的kafka客户端版本为0.10.2.0。下面通过修改pom.xml文件来引入外部依赖包:

      pom.xml

     1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     2     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     3     <modelVersion>4.0.0</modelVersion>
     4     <groupId>com.unionpay</groupId>
     5     <artifactId>kafkaSpringMVC</artifactId>
     6     <packaging>war</packaging>
     7     <version>0.0.1-SNAPSHOT</version>
     8     <name>kafkaSpringMVC Maven Webapp</name>
     9     <url>http://maven.apache.org</url>
    10 
    11     <properties>
    12         <springframework>4.3.6.RELEASE</springframework>
    13     </properties>
    14 
    15     <dependencies>
    16         <dependency>
    17             <groupId>junit</groupId>
    18             <artifactId>junit</artifactId>
    19             <version>3.8.1</version>
    20             <scope>test</scope>
    21         </dependency>
    22 
    23         <dependency>
    24             <groupId>org.springframework</groupId>
    25             <artifactId>spring-core</artifactId>
    26             <version>${springframework}</version>
    27         </dependency>
    28 
    29         <dependency>
    30             <groupId>org.springframework</groupId>
    31             <artifactId>spring-context</artifactId>
    32             <version>${springframework}</version>
    33         </dependency>
    34 
    35         <dependency>
    36             <groupId>org.springframework</groupId>
    37             <artifactId>spring-tx</artifactId>
    38             <version>${springframework}</version>
    39         </dependency>
    40 
    41         <dependency>
    42             <groupId>org.springframework</groupId>
    43             <artifactId>spring-webmvc</artifactId>
    44             <version>${springframework}</version>
    45         </dependency>
    46 
    47         <dependency>
    48             <groupId>org.springframework</groupId>
    49             <artifactId>spring-web</artifactId>
    50             <version>${springframework}</version>
    51         </dependency>
    52 
    53         <dependency>
    54             <groupId>org.springframework</groupId>
    55             <artifactId>spring-jms</artifactId>
    56             <version>${springframework}</version>
    57         </dependency>
    58 
    59         <dependency>
    60             <groupId>org.apache.kafka</groupId>
    61             <artifactId>kafka-clients</artifactId>
    62             <version>0.10.2.0</version>
    63         </dependency>
    64 
    65     </dependencies>
    66     <build>
    67         <finalName>kafkaSpringMVC</finalName>
    68     </build>
    69 </project>

      写完pom.xml文件后,保存。然后右键项目名称,选择Maven->Update Project... 更新项目,引入jar包。

      Update Project完成后,可以在maven依赖包里看见刚才引入的本项目需要的jar包:

      接下来编写web.xml文件:

      web.xml

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     3     xmlns="http://java.sun.com/xml/ns/javaee"
     4     xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
     5     id="WebApp_ID" version="3.0">
     6     
     7     <context-param>
     8         <param-name>contextConfigLocation</param-name>
     9         <param-value>classpath:kafka-beans.xml</param-value>
    10     </context-param>
    11     
    12     <listener>
    13         <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    14     </listener>
    15     
    16     <servlet>
    17         <servlet-name>springDispatcherServlet</servlet-name>
    18         <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    19         <init-param>
    20             <param-name>contextConfigLocation</param-name>
    21             <param-value>classpath:spring-mvc-dispatcher.xml</param-value>
    22         </init-param>
    23         <load-on-startup>1</load-on-startup>
    24     </servlet>
    25     
    26     <servlet-mapping>
    27         <servlet-name>springDispatcherServlet</servlet-name>
    28         <url-pattern>/</url-pattern>
    29     </servlet-mapping>
    30     
    31 </web-app>

      简单明了,从web.xml描述来看,MVC映射是通过spring-mvc-dispatcher.xml文件说明的,而项目中用到的所有的model 则是通过kafka-beans.xml文件注入的。那么下面分别在resources文件夹下建立这两个xml配置文件:

      spring-mvc-dispatcher.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:mvc="http://www.springframework.org/schema/mvc"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
            http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd">
    
        <context:component-scan base-package="com.unionpay.controller"></context:component-scan>
        
        <bean
            class="org.springframework.web.servlet.view.InternalResourceViewResolver">
            <property name="prefix" value="/WEB-INF/views/"></property>
            <property name="suffix" value=".jsp"></property>
        </bean>
        
    </beans>

      就这么简单。由于kafka-beans.xml主要作用是配置注入的beans,所以按照程序,还是先建立好Model才符合逻辑。

      在src/main/java路径下分别建立三个package包:com.unionpay.producer、com.unionpay.consumer、com.unionpay.controller。从字面意思很容易理解,producer包主要用于存放生产者,consumer包主要用于存放消费者,controller包主要用于存放逻辑控制类。

      完成后在com.unionpay.producer包下建立KafkaProducerDemo.java文件(最好不要命名为Producer或者KafkaProducer,这样会与引入的jar包中原有的类重名):

      KafkaProducerDemo.java

     1 package com.unionpay.producer;
     2 
     3 import java.util.Properties;
     4 
     5 import org.apache.kafka.clients.producer.KafkaProducer;
     6 import org.apache.kafka.clients.producer.ProducerRecord;
     7 
     8 public class KafkaProducerDemo {
     9 
    10     Properties properties;
    11 
    12     public KafkaProducerDemo(Properties properties) {
    13         super();
    14         this.properties = properties;
    15     }
    16 
    17     public Properties getProperties() {
    18         return properties;
    19     }
    20 
    21     public void setProperties(Properties properties) {
    22         this.properties = properties;
    23     }
    24 
    25     public void sendMessage(String msg) {
    26 
    27         KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
    28 
    29         ProducerRecord<String, String> record = new ProducerRecord<String, String>(properties.getProperty("topic"),
    30                 msg);
    31         producer.send(record);
    32 
    33         producer.close();
    34 
    35     }
    36 
    37 }

      Properties属性主要是为了配置KafkaProducer类,具体信息通过Spring注入,这样可以显得更加高大上和灵活。后面的配置文件中我们可以看得到具体的配置信息。

      同样,在com.unionpay.consumer包下面新建类KafkaConsumerDemo.java:

     1 package com.unionpay.consumer;
     2 
     3 import java.util.Arrays;
     4 import java.util.Properties;
     5 
     6 import org.apache.kafka.clients.consumer.ConsumerRecord;
     7 import org.apache.kafka.clients.consumer.ConsumerRecords;
     8 import org.apache.kafka.clients.consumer.KafkaConsumer;
     9 
    10 public class KafkaConsumerDemo {
    11 
    12     private Properties props;
    13     
    14     public KafkaConsumerDemo(Properties props) {
    15         super();
    16         this.props = props;
    17     }
    18 
    19     public Properties getProps() {
    20         return props;
    21     }
    22 
    23     public void setProps(Properties props) {
    24         this.props = props;
    25     }
    26 
    27     public String receive(){
    28         
    29         KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
    30         consumer.subscribe(Arrays.asList(props.getProperty("topic")));
    31         
    32         String msg = "";
    33         while(true){
    34             ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
    35             for(ConsumerRecord<String, String> consumerRecord:consumerRecords){
    36                 msg += consumerRecord.value();
    37             }
    38             consumer.close();
    39             return msg;
    40         }
    41     }
    42     
    43 }

      也是基于同样的原因,KafkaConsumer的配置信息properties也是通过Spring配置文件注入。

      当Producer和Consumer编写完成后,就可以编写kafka-beans.xml文件啦:

      kafka-beans.xml

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <beans xmlns="http://www.springframework.org/schema/beans"
     3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
     4     xmlns:mvc="http://www.springframework.org/schema/mvc"
     5     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
     6         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
     7         http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd">
     8 
     9 
    10     <context:component-scan base-package="com.unionpay.producer"></context:component-scan>
    11     <context:component-scan base-package="com.unionpay.consumer"></context:component-scan>
    12 
    13 
    14     <bean id="kafkaProducerDemo" class="com.unionpay.producer.KafkaProducerDemo">
    15         <property name="properties">
    16             <props>
    17                 <prop key="topic">my-replicated-topic</prop>
    18                 <prop key="bootstrap.servers">127.0.0.1:9092</prop>
    19                 <prop key="acks">all</prop>
    20                 <prop key="key.serializer">org.apache.kafka.common.serialization.StringSerializer
    21                 </prop>
    22                 <prop key="value.serializer">org.apache.kafka.common.serialization.StringSerializer
    23                 </prop>
    24                 <prop key="buffer.memory">33554432</prop>
    25             </props>
    26 
    27         </property>
    28     </bean>
    29 
    30     <bean id="kafkaConsumerDemo" class="com.unionpay.consumer.KafkaConsumerDemo">
    31         <property name="props">
    32             <props>
    33                 <prop key="topic">my-replicated-topic</prop>
    34                 <prop key="bootstrap.servers">127.0.0.1:9092</prop>
    35                 <prop key="group.id">group1</prop>
    36                 <prop key="enable.auto.commit">true</prop>
    37                 <prop key="auto.commit.interval.ms">1000</prop>
    38                 <prop key="session.timeout.ms">30000</prop>
    39                 <prop key="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer
    40                 </prop>
    41                 <prop key="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer
    42                 </prop>
    43             </props>
    44 
    45         </property>
    46     </bean>
    47 </beans>

      kafka的主要配置一共有三种:broker、producer和consumer,对于客户端来说就是后两种啦。而后两种的配置项从官方文件可以知道,每个都至少有30多种配置内容。通过上面这种注入配置方式的话,在<props><props>中随便添加配置内容,是不是很灵活呢^_^

      下面在com.unionpay.controller包下编写Controller类,控制业务逻辑:

      KafkaController.java

    package com.unionpay.controller;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import javax.annotation.Resource;
    
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.servlet.ModelAndView;
    
    import com.unionpay.consumer.KafkaConsumerDemo;
    import com.unionpay.producer.KafkaProducerDemo;
    
    @Controller
    public class KafkaController {
    
        @Resource(name = "kafkaProducerDemo")
        KafkaProducerDemo producer;
    
        @Resource(name = "kafkaConsumerDemo")
        KafkaConsumerDemo consumer;
    
        @RequestMapping(value = "/welcome")
        public ModelAndView welcome() {
            System.out.println("--------welcome--------");
            ModelAndView mv = new ModelAndView();
            mv.setViewName("welcome");
            return mv;
        }
    
        @RequestMapping(value = "/sendmessage", method = RequestMethod.GET)
        public ModelAndView sendMessage() {
            System.out.println("--------sendmessage--------");
            Date date = new Date();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String now = sdf.format(date);
    
            ModelAndView mv = new ModelAndView();
            mv.addObject("time", now);
            mv.setViewName("kafka_send");
            return mv;
        }
    
        @RequestMapping(value = "/onsend", method = RequestMethod.POST)
        public ModelAndView onsend(@RequestParam("message") String msg) {
            System.out.println("--------onsend--------");
            producer.sendMessage(msg);
    
            ModelAndView mv = new ModelAndView();
            mv.setViewName("welcome");
            return mv;
        }
    
        @RequestMapping(value = "/receive")
        public ModelAndView receive() {
            System.out.println("--------receive--------");
            
            String msg = consumer.receive();
            
            ModelAndView mv = new ModelAndView();
            mv.addObject("msg", msg);
            mv.setViewName("kafka_receive");
            return mv;
        }
    
    }

      到目前为止,我们的MVC中已经完成了两个啦(M and C),下面编写最后的三个JSP文件。从解析配置文件(spring-mvc-dispatcher.xml)来看,我们的JSP页面应该建立在/WEB-INF/views/目录下,所以我们首先在/WEB-INF/目录下建立views文件夹。然后在该文件夹下面建立三个jsp文件:

      welcome.jsp

     1 <%@ page language="java" contentType="text/html; charset=UTF-8"
     2     pageEncoding="UTF-8"%>
     3 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
     4 <html>
     5 <head>
     6 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
     7 <title>welcome</title>
     8 </head>
     9 <body>
    10     <h1>Welcome</h1>
    11     <h2><a href="sendmessage">Send a Message</a></h2>
    12     <h2><a href="receive">Get a Message</a></h2>
    13 </body>
    14 </html>

      kafka_send.jsp

     1 <%@ page language="java" contentType="text/html; charset=UTF-8"
     2     pageEncoding="UTF-8"%>
     3 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
     4 <html>
     5 <head>
     6 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
     7 <title>kafka_send</title>
     8 </head>
     9 <body>
    10     <h1>Send a Message</h1>
    11     <form action="onsend" method="post">
    12     MessageText:<textarea name="message">${time}</textarea>
    13     <br>
    14     <input type="submit" value="Submit">
    15     </form>
    16     
    17     <h2><a href="welcome">RETURN HOME</a></h2>
    18 
    19 </body>
    20 </html>

      kafka-receive.jsp

     1 <%@ page language="java" contentType="text/html; charset=UTF-8"
     2     pageEncoding="UTF-8"%>
     3 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
     4 <html>
     5 <head>
     6 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
     7 <title>kafka_receive</title>
     8 </head>
     9 <body>
    10 
    11     <h1>Kafka_Reveive!!!</h1>
    12     <h2>Receive Message : ${msg}</h2>
    13     <h2><a href="welcome">RETURN HOME</a></h2>
    14 </body>
    15 </html>

      啊,终于大功告成啦。下面就要品尝我们的劳动果实啦。将项目部署在容器里,然后首先启动zookeeper集群服务器,然后启动kafka集群服务器:

    //启动zookeeper集群服务器
    cd ~/DevelopEnvironment/zookeeper-3.4.9-kafka/bin
    ./zkServer.sh start
    
    //启动kafka集群服务器
    cd ~/DevelopEnvironment/kafka_2.10-0.10.2.0/bin
    ./kafka-server-start.sh ../config/server.properties 
    ./kafka-server-start.sh ../config/server-1.properties 
    ./kafka-server-start.sh ../config/server-2.properties

      然后通过Eclipse启动容器:

      

      从上面终端打印出来的信息可以知道,部署成功啦。下面就要轮回到本文开头啦,在网页地址栏中输入:http://127.0.0.1:8080/kafkaSpringMVC/welcome 进入欢迎界面,然后按照开始描述的操作进行操作,看看能否成功发送和接受消息呢?

      怎么样,你成功了吗?反正我是成功了,也希望你也成功啦。如果出现什么错误的话也千万别着急,去根据报错信息找找原因,因为你也马上就要成功啦。

      源码下载:kafkaSpringMVC.zip

    参考文献

  • 相关阅读:
    高并发系统设计(二十):分布式架构如何跟踪排查慢请求问题?
    Git将多个commit合并成一个commit
    高并发系统设计(十九)【注册中心】:微服务架构结合RPC框架如何做到分布式系统寻址?
    高并发系统设计(十八):【RPC框架】10万QPS下如何实现毫秒级的服务调用?
    AfxSocketInit()
    TEXTMETRIC 结构详解
    OnInitialUpdate函数
    SetForegroundWindow
    GetSafeHwnd()函数
    MFC之CCommandLineInfo
  • 原文地址:https://www.cnblogs.com/jxwch/p/6602169.html
Copyright © 2011-2022 走看看