zoukankan      html  css  js  c++  java
  • 手写MQ框架(二)-服务端实现

    一、起航

    书接上文->手写MQ框架(一)-准备启程

    本着从无到有,从有到优的原则,所以计划先通过web实现功能,然后再优化改写为socket的形式。

    1、关于技术选型

    web框架使用了之前写的gmvc框架(手写MVC框架(一)-再出发),消息存储采用存在数据库的方式,使用的框架也是前段时间写的gdao(手写DAO框架(一)-从“1”开始 )。

    2、项目搭建

    项目本来是单项目的形式,但是考虑到将服务端、客户端分开不是很友好,所以采用了maven父子模块的形式。

    其中,父pom配置如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.shuimutong</groupId>
        <artifactId>gmq</artifactId>
        <version>${global.version}</version>
        <packaging>pom</packaging>
        <url>http://maven.apache.org</url>
    
        <modules>
            <module>gmq-server</module>
            <module>gmq-client</module>
        </modules>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <global.version>0.0.1-SNAPSHOT</global.version>
            <fastjson.version>1.2.60</fastjson.version>
            <gdao.version>2.0.0-SNAPSHOT</gdao.version>
            <gmvc.version>1.0.1-SNAPSHOT</gmvc.version>
            <gutil.version>0.0.2-SNAPSHOT</gutil.version>
        </properties>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>me.lovegao</groupId>
                    <artifactId>gdao</artifactId>
                    <version>${gdao.version}</version>
                </dependency>
                <dependency>
                    <groupId>com.shuimutong</groupId>
                    <artifactId>gmvc</artifactId>
                    <version>${gmvc.version}</version>
                </dependency>
                <dependency>
                    <groupId>com.shuimutong</groupId>
                    <artifactId>gutil</artifactId>
                    <version>${gutil.version}</version>
                </dependency>
                <dependency>
                    <groupId>com.alibaba</groupId>
                    <artifactId>fastjson</artifactId>
                    <version>${fastjson.version}</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-lang3</artifactId>
                    <version>3.4</version>
                </dependency>
                <dependency>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                    <version>1.2.16</version>
                </dependency>
                <dependency>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                    <version>1.6.1</version>
                </dependency>
                <dependency>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                    <version>1.6.2</version>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.7.0</version>
                    <configuration>
                        <target>1.8</target>
                        <source>1.8</source>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    View Code

    这次要说的mq服务端pom配置如下:

    <?xml version="1.0"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>com.shuimutong</groupId>
            <artifactId>gmq</artifactId>
            <version>${global.version}</version>
        </parent>
        <groupId>com.shuimutong</groupId>
        <artifactId>gmq-server</artifactId>
        <packaging>war</packaging>
        <version>${global.version}</version>
        <name>gmq-server</name>
        <url>http://maven.apache.org</url>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.11</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>javax.servlet</groupId>
                <artifactId>javax.servlet-api</artifactId>
                <version>3.1.0</version>
            </dependency>
            <dependency>
                <groupId>me.lovegao</groupId>
                <artifactId>gdao</artifactId>
            </dependency>
            <dependency>
                <groupId>com.shuimutong</groupId>
                <artifactId>gmvc</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>javax.servlet</groupId>
                        <artifactId>javax.servlet-api</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </dependency>
        </dependencies>
        <build>
            <finalName>com.shuimutong.gmq_server</finalName>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.7.0</version>
                    <configuration>
                        <target>1.8</target>
                        <source>1.8</source>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    View Code

     项目依赖的gdao(https://gitee.com/simpleha/gdao.git)、gmvc(https://gitee.com/simpleha/gmvc.git)、gutil(https://gitee.com/simpleha/gutil.git)需要先clone到本地并编译到maven仓库。

    二、接口梳理

    1、SyncController

    在发布消息或者订阅消息前,我们需要先新增topic。我这里把新增topic和后面的发布订阅消息分开了,主要是考虑到两个类被访问频率有差别,分开后有利于以后的针对优化。

    具体实现如下:

    package com.shuimutong.gmq.server.controller;
    
    import java.util.List;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.alibaba.fastjson.JSONObject;
    import com.shuimutong.gmq.server.bean.enums.RequestParamEnum;
    import com.shuimutong.gmq.server.bean.enums.ResponseCodeEnum;
    import com.shuimutong.gmq.server.bean.vo.ResponseDataVo;
    import com.shuimutong.gmq.server.bean.vo.UriDescVo;
    import com.shuimutong.gmq.server.exception.ServiceException;
    import com.shuimutong.gmq.server.service.SyncService;
    import com.shuimutong.gmq.server.service.TopicService;
    import com.shuimutong.gmvc.annotation.XAutowired;
    import com.shuimutong.gmvc.annotation.XController;
    import com.shuimutong.gmvc.annotation.XRequestMapping;
    import com.shuimutong.gmvc.util.RequestResolveUtil;
    
    /**
     * 非消息信息同步controller
     * @ClassName:  MessageController   
     * @Description:(这里用一句话描述这个类的作用)   
     * @author: 水木桶
     * @date:   2019年10月20日 下午9:45:47     
     * @Copyright: 2019 [水木桶]  All rights reserved.
     */
    @XController
    @XRequestMapping("/sync")
    public class SyncController {
        private final static Logger log = LoggerFactory.getLogger(SyncController.class);
        @XAutowired
        private TopicService topicService;
        @XAutowired
        private SyncService syncService;
    
        /**
         * 获取uri说明
         * @param request
         * @param reponse
         */
        @XRequestMapping("/getPath")
        public void getPath(HttpServletRequest request, HttpServletResponse reponse) {
            List<UriDescVo> uriList = syncService.listUriDesc();
            ResponseDataVo responseData = new ResponseDataVo(ResponseCodeEnum.OK, uriList);
            RequestResolveUtil.returnJson(request, reponse, JSONObject.toJSONString(responseData));
        }
        
        /**
         * 新增topic
         * @param request
         * @param reponse
         */
        @XRequestMapping("/addTopic")
        public void addTopic(HttpServletRequest request, HttpServletResponse reponse) {
            ResponseDataVo responseData = null;
            String topic = request.getParameter(RequestParamEnum.TOPIC.getParamName());
            if(StringUtils.isBlank(topic)) {
                responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主题为空");
            } else {
                try {
                    boolean addState = topicService.addTopic(topic);
                    if(addState) {
                        responseData = new ResponseDataVo(ResponseCodeEnum.OK, "添加成功");
                    } else {
                        responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主题已存在");
                    }
                } catch (ServiceException e) {
                    log.error("addTopicException," + topic, e);
                    responseData = new ResponseDataVo(ResponseCodeEnum.SERVER_ERROR);
                }
            }
            RequestResolveUtil.returnJson(request, reponse, JSONObject.toJSONString(responseData));
        }
    }

    其中getPath()暂时没有用到,以后用到再讨论吧。

    2、MessageController

    消息的topic创建之后,就是消息的发布和订阅了。

    具体实现如下:

    package com.shuimutong.gmq.server.controller;
    
    import java.util.List;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.alibaba.fastjson.JSONObject;
    import com.shuimutong.gmq.server.bean.SystemConstant;
    import com.shuimutong.gmq.server.bean.dos.TopicDo;
    import com.shuimutong.gmq.server.bean.enums.RequestParamEnum;
    import com.shuimutong.gmq.server.bean.enums.ResponseCodeEnum;
    import com.shuimutong.gmq.server.bean.vo.ResponseDataVo;
    import com.shuimutong.gmq.server.exception.ServiceException;
    import com.shuimutong.gmq.server.service.MessageService;
    import com.shuimutong.gmq.server.service.TopicService;
    import com.shuimutong.gmvc.annotation.XAutowired;
    import com.shuimutong.gmvc.annotation.XController;
    import com.shuimutong.gmvc.annotation.XRequestMapping;
    import com.shuimutong.gmvc.util.RequestResolveUtil;
    import com.shuimutong.guti.bean.TwoTuple;
    
    /**
     * 发消息、收消息controller
     * @ClassName:  MessageController   
     * @Description:(这里用一句话描述这个类的作用)   
     * @author: 水木桶
     * @date:   2019年10月20日 下午9:45:47     
     * @Copyright: 2019 [水木桶]  All rights reserved.
     */
    @XController
    @XRequestMapping(SystemConstant.STR_URL_MESSAGE)
    public class MessageController {
        private final static Logger log = LoggerFactory.getLogger(MessageController.class);
        @XAutowired
        private MessageService messageService;
        @XAutowired
        private TopicService topicService;
        
    
        /**
         * 获取消息
         * @param request
         * @param reponse
         */
        @XRequestMapping(SystemConstant.STR_URL_GET_MESSAGE)
        public void getMessage(HttpServletRequest request, HttpServletResponse reponse) {
            ResponseDataVo responseData = null;
            String topic = request.getParameter(RequestParamEnum.TOPIC.getParamName());
            String offsetStr = request.getParameter(RequestParamEnum.OFFSET.getParamName());
            String sizeStr = request.getParameter(RequestParamEnum.SIZE.getParamName());
            
            if(StringUtils.isBlank(topic) || !StringUtils.isNumeric(offsetStr) || !StringUtils.isNumeric(sizeStr)) {
                responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主题为空或者数字非法");
            } else {
                int offset = Integer.parseInt(offsetStr);
                int size = Integer.parseInt(sizeStr);
                if(offset < 0 || size < 1) {
                    responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "数字异常");
                } else {
                    try {
                        TopicDo topicDo = topicService.findByTopic(topic);
                        if(topicDo == null) {
                            responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主题不存在");
                        } else {
                            List<TwoTuple<Long, String>> list = messageService.listMessage(topic, offset, size);
                            responseData = new ResponseDataVo(ResponseCodeEnum.OK, list);
                        }
                    } catch (ServiceException e) {
                        log.error("getMessageException", e);
                        responseData = new ResponseDataVo(ResponseCodeEnum.SERVER_ERROR);
                    }
                }
            }
            RequestResolveUtil.returnJson(request, reponse, JSONObject.toJSONString(responseData));
        }
        
        
        /**
         * 生产方发送消息到服务端
         * @param request
         * @param reponse
         */
        @XRequestMapping(SystemConstant.STR_URL_SEND_MESSAGE)
        public void sendMessage(HttpServletRequest request, HttpServletResponse reponse) {
            ResponseDataVo responseData = null;
            String topic = request.getParameter(RequestParamEnum.TOPIC.getParamName());
            String message = request.getParameter(RequestParamEnum.MESSAGE.getParamName());
            if(StringUtils.isBlank(topic) || StringUtils.isBlank(message)) {
                responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主题或者消息为空");
            } else {
                try {
                    TopicDo topicDo = topicService.findByTopic(topic);
                    if(topicDo == null) {
                        responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主题不存在");
                    } else {
                        messageService.saveMessage(topic, message);
                        responseData = new ResponseDataVo(ResponseCodeEnum.OK);
                    }
                } catch (ServiceException e) {
                    log.error("sendMessageException", e);
                    responseData = new ResponseDataVo(ResponseCodeEnum.SERVER_ERROR);
                }
            }
            RequestResolveUtil.returnJson(request, reponse, JSONObject.toJSONString(responseData));
        }
    }

    3、CacheController

    设计的mq的消费形式是拉的形式。采用拉的形式,就需要客户端自己去计数,消费到哪了。

    所以这里增加了这个缓存接口,提供kv存储的功能。数据存储在数据库里。

    代码请移步到下文的gitee连接查看。

    有了上面这几个接口,就可以实现简单的发布消息、订阅消息的功能了,当然是手动的方式获取。

    三、相关表

    既然为了分享,资料就得提供全。所以这里就列一下关联的数据库表结构。

    1、主题表

    CREATE TABLE `gmq_topic` (
      `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
      `topic` varchar(255) NOT NULL DEFAULT '' COMMENT '主题',
      `create_time` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '创建时间',
      PRIMARY KEY (`id`),
      UNIQUE KEY `uniq_idx_topic` (`topic`) USING BTREE
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='gmq-主题表';

    2、消息表

    CREATE TABLE `gmq_message` (
      `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
      `topic` varchar(255) NOT NULL DEFAULT '' COMMENT '主题',
      `message_body` text NOT NULL COMMENT '消息内容',
      `create_time` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '创建时间',
      PRIMARY KEY (`id`),
      KEY `idx_id_topic` (`id`,`topic`) USING BTREE
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='存储的消息';

    3、kv表

    CREATE TABLE `gmq_message` (
      `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
      `topic` varchar(255) NOT NULL DEFAULT '' COMMENT '主题',
      `message_body` text NOT NULL COMMENT '消息内容',
      `create_time` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '创建时间',
      PRIMARY KEY (`id`),
      KEY `idx_id_topic` (`id`,`topic`) USING BTREE
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='存储的消息';

    四、总结

    主要功能就是上面些。回头看来也不算很多,但是具体实现过程中真是曲折重重,这其中一部分是框架的原因。

    所以上篇博客到这篇博客中间,我不仅写完了这个项目,其实还顺便修复了框架使用中遇到的一些问题。比如bigint类型的数据查出来真的是BigDecimal,而不是long。

    gmq项目主要包括两块,上面介绍的是服务端,接下来会介绍客户端。

    最后,附上代码地址:https://gitee.com/simpleha/gmq.git

    下一篇,客户端的实现->手写MQ框架(三)-客户端实现 

  • 相关阅读:
    应该选取表中哪些字段作为索引?
    maven聚合(依赖聚合)
    maven(1)
    maven打包记录1
    tomcat 日志(2)
    tomcat日志(1)
    存储过程
    EXISTS的用法介绍
    学习笔记-移动设备的处理器指令集 armv6 armv7 armv7s arm64
    学习笔记-nil NULL NSNull Nil的区别
  • 原文地址:https://www.cnblogs.com/shuimutong/p/11923296.html
Copyright © 2011-2022 走看看