zoukankan      html  css  js  c++  java
  • 手写MQ框架(三)-客户端实现

    一、背景

    书接手写MQ框架(二)-服务端实现  ,前面介绍了服务端的实现。但是具体使用框架过程中,用户肯定是以客户端的形式跟服务端打交道的。客户端的好坏直接影响了框架使用的便利性。

    虽然框架目前是通过web的形式提供功能的,但是某的目标其实是通过socket实现,所以不仅需要有客户端,还要包装一下,让用户在使用过程中不需要关心服务端是如何实现的。

    简单来说,就是客户端使用必须方便。

    二、客户端实现

    1、HttpUtil

    目前客户端的核心功能是HttpUtil这个类,使用httpClient实现的,主要是为了请求服务端。

    具体实现如下:

    package com.shuimutong.gmq.client.util;
    
    import java.io.IOException;
    import java.net.URISyntaxException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.http.HttpEntity;
    import org.apache.http.NameValuePair;
    import org.apache.http.client.entity.UrlEncodedFormEntity;
    import org.apache.http.client.methods.CloseableHttpResponse;
    import org.apache.http.client.methods.HttpGet;
    import org.apache.http.client.methods.HttpPost;
    import org.apache.http.client.utils.URIBuilder;
    import org.apache.http.impl.client.CloseableHttpClient;
    import org.apache.http.impl.client.HttpClients;
    import org.apache.http.message.BasicNameValuePair;
    import org.apache.http.util.EntityUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.shuimutong.gmq.client.bean.HttpResponseBean;
    import com.shuimutong.gutil.common.GUtilCommonUtil;
    
    /**
     * http请求工具类
     * @ClassName: HttpUtil
     * @Description:(这里用一句话描述这个类的作用)
     * @author: 水木桶
     * @date: 2019年10月29日 下午9:43:54
     * @Copyright: 2019 [水木桶] All rights reserved.
     */
    public class HttpUtil {
        private final static Logger log = LoggerFactory.getLogger(HttpUtil.class);
        private static CloseableHttpClient HTTP_CLIENT = HttpClients.createMinimal();
        static {
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    try {
                        HTTP_CLIENT.close();
                    } catch (IOException e) {
                        log.error("HTTP_CLIENT-closeException", e);
                    }
                }
            });
        }
    
        /**
         * get请求
         * 
         * @param url
         * @return
         * @throws IOException
         */
        public static HttpResponseBean get(String url) throws IOException {
            HttpResponseBean responseBean = null;
            HttpGet httpGet = new HttpGet(url);
            CloseableHttpResponse res = HTTP_CLIENT.execute(httpGet);
            try {
                HttpEntity httpEntity = res.getEntity();
                String body = EntityUtils.toString(httpEntity);
                responseBean = new HttpResponseBean(res.getStatusLine(), body);
                EntityUtils.consume(httpEntity);
            } finally {
                res.close();
            }
            return responseBean;
        }
        
        /**
         * 带参数的get请求
         * @param url
         * @param requsetParams
         * @return
         * @throws IOException
         * @throws URISyntaxException
         */
        public static HttpResponseBean get(String url, Map<String, String> requsetParams) throws IOException {
            HttpResponseBean responseBean = null;
            HttpGet httpGet;
            try {
                URIBuilder uriBuilder = new URIBuilder(url);
                if(!GUtilCommonUtil.checkListEmpty(requsetParams)) {
                    List<NameValuePair> nvps = new ArrayList<NameValuePair>();
                    requsetParams.forEach((k,v) -> {
                        nvps.add(new BasicNameValuePair(k, v));
                    });
                    uriBuilder.setParameters(nvps);
                }
                httpGet = new HttpGet(uriBuilder.build());
            } catch (Exception e) {
                throw new IOException(e);
            }
            CloseableHttpResponse res = HTTP_CLIENT.execute(httpGet);
            try {
                HttpEntity httpEntity = res.getEntity();
                String body = EntityUtils.toString(httpEntity);
                responseBean = new HttpResponseBean(res.getStatusLine(), body);
                EntityUtils.consume(httpEntity);
            } finally {
                res.close();
            }
            return responseBean;
        }
    
        /**
         * post请求
         * @param url
         * @param requsetParams
         * @return
         * @throws IOException
         */
        public static HttpResponseBean post(String url, Map<String, String> requsetParams) throws IOException {
            HttpResponseBean responseBean = null;
            HttpPost httpPost = new HttpPost(url);
            if(!GUtilCommonUtil.checkListEmpty(requsetParams)) {
                List<NameValuePair> nvps = new ArrayList<NameValuePair>();
                requsetParams.forEach((k,v) -> {
                    nvps.add(new BasicNameValuePair(k, v));
                });
                httpPost.setEntity(new UrlEncodedFormEntity(nvps));
            }
            CloseableHttpResponse response = HTTP_CLIENT.execute(httpPost);
            try {
                HttpEntity httpEntity = response.getEntity();
                String body = EntityUtils.toString(httpEntity);
                responseBean = new HttpResponseBean(response.getStatusLine(), body);
                EntityUtils.consume(httpEntity);
            } finally {
                response.close();
            }
            return responseBean;
        }
    }

    封装了get请求和post请求,封装了响应结果。

    加了一个钩子,在jvm关闭时能够主动关闭创建的资源。

    2、订阅消息、生产消息

    这两部分主要就是调用上面的HttpUtil,然后将结果包装一下。

    具体代码请参考前文的git。

    3、实例管理

    为了使得用户不需要关心具体实现,所以建了实例管理类。

    package com.shuimutong.gmq.client.util;
    
    import com.shuimutong.gmq.client.cache.CommonObjCache;
    import com.shuimutong.gmq.client.cache.impl.CommonObjCacheImpl;
    import com.shuimutong.gmq.client.consumer.GmqConsumer;
    import com.shuimutong.gmq.client.producer.GmqProducer;
    
    public class GmqInstanceManage {
        public static GmqProducer getGmqProducer(String gmqServerUrl) {
            return new GmqProducer(gmqServerUrl);
        }
        
        public static GmqConsumer getGmqConsumer(String gmqServerUrl) {
            return new GmqConsumer(gmqServerUrl);
        }
        
        public static CommonObjCache getCommonCache(String serverUrl) {
            return new CommonObjCacheImpl(serverUrl);
        }
    }

    主要是为了封装变化。因为之后再迭代的话,实例的具体实现肯定不是目前这么简单,所以要尽量让使用者少关心具体实现。

    使用时关心的越多,后续项目迭代肯定越困难。

    三、使用示例

    1、生产消息

    @Test
        public void produceMsg() {
            GmqProducer producer = GmqInstanceManage.getGmqProducer(gmqServerUrl);
            for(int i=0; i<5; i++) {
                String message = "message:" + i;
                try {
                    SendMqResult res = producer.sendMq(topic, message);
                    System.out.println(res.getRes());
                } catch (SendMqException e) {
                    e.printStackTrace();
                }
            }
        }

    2、消费消息

    主要思路是:消费消息之前,先查询当前已经消费到了哪条消息。消息消费之后,将消费的编号存入缓存。

    典型的主动拉消息,消息是否消费由自己负责的模式。

    实现如下:

    @Test
        public void comsumerMsgByCache() {
            GmqConsumer comsumer = GmqInstanceManage.getGmqConsumer(gmqServerUrl);
            CommonObjCache commonCache = GmqInstanceManage.getCommonCache(gmqServerUrl);
            String gmqSign = "gmq_consumer_id";
            long consumerId = 0;
            int size = 2;
            for(int i=0; i<5; i++) {
                try {
                    CacheObj cacheId = commonCache.getById(gmqSign);
                    if(cacheId != null) {
                        consumerId = Long.parseLong(cacheId.getContent());
                    }
                    
                    List<MqContent> res = comsumer.getMq(topic, consumerId, size);
                    for(MqContent mq : res) {
                        System.out.println(JSONObject.toJSONString(mq));
                        if(mq.getId() > consumerId) {
                            consumerId = mq.getId();
                        }
                    }
                    commonCache.save(gmqSign, String.valueOf(consumerId));
                    System.out.println("保存consumerId:" + consumerId);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    四、总结

    gmq的初版至今已经完成,当然这只是开始。

    后续计划先将gmvc框架替换掉,直接使用netty进行通信。

    然后把消息存到数据库改为存到磁盘上。

    然后就是服务的高可用改造。

    届时欢迎指导。

    第2版设计、开发中……

    下一篇:手写MQ框架(四)-使用netty改造梳理

  • 相关阅读:
    shopping car 1.0
    文件分类
    求1-100的所有数的和
    输出 1-100 内的所有奇数和
    求1-2+3-4+5 ... 99的所有数的和
    关闭提示的下拉框
    h5页面乱码-设置编码
    常用的css
    渲染后新元素没有绑定事件
    爬虫日记-关于一些动态爬取
  • 原文地址:https://www.cnblogs.com/shuimutong/p/11923420.html
Copyright © 2011-2022 走看看