zoukankan      html  css  js  c++  java
  • ElasticSearch实战-编码实践

    1.概述

      前面在《ElasticSearch实战-入门》中给大家分享如何搭建这样一个集群,在完成集群的搭建后,今天给大家分享如何实现对应的业务功能模块,下面是今天的分享内容,目录如下所示:

    • 编码实践
    • 效果预览
    • 总结

    2.编码实践

      由于 ES 集群支持 Restful 接口,我们可以直接通过 Java 来调用 Restful 接口来查询我们需要的数据结果,并将查询到的结果在在我们的业务界面可视化出来。我们知道在 ES 集群的 Web 管理界面有这样一个入口,如下图所示:

      我们可以在此界面的入口中拼接 JSON 字符串来查询我们想要的结果,下面,我们通过 Java 的 API 去调用 Restful 接口来查询我们想要的结果。

    2.1 字符串拼接实现

      接着,我们去实现要查询的核心代码,具体内容实现如下所示:

    public String buildQueryString(Map<String, Object> param) throws ParseException {
            SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            StringBuilder builder = new StringBuilder("{"query":{"bool":{"must":[");
            if (param.get("msgType") != null) {
                Integer msgType = (int) param.get("msgType") == 0 ? 2 : 1;
                builder.append("{"term":{"msg_type":").append(msgType).append("}}");
            }if (param.get("start") != null && param.get("end") != null) {
                String start = String.valueOf(dfs.parse(param.get("start").toString()).getTime()).substring(0, 10);
                String end = String.valueOf(dfs.parse(param.get("end").toString()).getTime()).substring(0, 10);
                builder.append(",{"range":{"itime":{"from":" + start + ","to":" + end + "}}}");
            }
            if (param.get("receiverValue") != null) {
                builder.append(",{"wildcard":{"receiver_value":"*").append(param.get("receiverValue")).append("*"}}");
            }
            builder.append("],"must_not":[],"should":[]}}");
            builder.append(","sort":[{"itime":"desc"}],"facets":{}");
            builder.append(","from": ").append(param.get("startIndex")).append(","size": ").append(param.get("pageSize")).append("}");
            LOG.info("API Query -> " + builder.toString());
            return builder.toString();
        }

    2.2 查询实现核心代码

      接着是实现查询的核心代码,具体内容实现如下所示:

    public SerachResponse<ApiSent> querySent(Map<String, Object> param) {
            SerachResponse<ApiSent> search_result = null;
            try {
                long time = System.currentTimeMillis();
                ResponseWrapper wrapper = httpUtils.sendJson(configService.loadConfig(Configure.API_SENT), buildQueryString(param));
                if (wrapper.responseCode == HttpStatus.SC_OK) {
                    search_result = _gson.fromJson(wrapper.responseContent, new TypeToken<SerachResponse<ApiSent>>() {
                    }.getType());
                    LOG.info(String.format("API query ES spent time=%sms", (System.currentTimeMillis() - time)));
                    return search_result;
                } else {
                    LOG.info(String.format("API query ES spent time=%sms", (System.currentTimeMillis() - time)));
                    LOG.error(String.format("api sent request es server response not 200,response=%s,exception=%s", wrapper.error, wrapper.exceptionString));
                }
            } catch (Exception ex) {
                LOG.error(String.format("parsed es sent data exception.", ex));
            }
            return search_result;
    
        }
    • Configure类
    public class Configure {
        public static final String API_SENT = "API_SENT";
    }

    2.3 DAO层获取 ES 集群的连接信息

    public class ConfigService {
    
        private static Log logger = LogFactory.getLog(ConfigService.class);
    
        @Autowired
        private ConfigDao configDao;
    
        @Cacheable("sysConfigCache")
        public String loadConfig(String type) {
            String value = configDao.getConfig(type);
            logger.info(String.format("Load Config,type=%s,value=%s", type, value));
            return value;
        }
    }
    • ConfigDao接口
    public interface ConfigDao {
        
        String getConfig(String type);
    
    }

      其对应的实现内容如下所示:

    <select id="getConfig" parameterType="String" resultType="String">
       select value from t_system_config where type=#{type}
    </select>

      DB库存储的 ES 连接信息,如下图所示:

    2.4 HTTP 接口的实现代码

      关于 HttpUtils 的代码实现较为简单,这里直接附上代码的实现内容,如下所示:

    • IHttpUtils 接口
    public interface IHttpUtils {
        public ResponseWrapper sendJson(String url, String content);
    }
    • HttpUtils 类实现接口
    public class HttpUtils implements IHttpUtils {
    
        private static Logger LOG = Logger.getLogger(HttpUtils.class.getName());
        protected static Gson _gson = new Gson();
    
        protected void initSSL() {
            try {
                TrustManager[] tmCerts = new javax.net.ssl.TrustManager[1];
                tmCerts[0] = new SimpleTrustManager();
                javax.net.ssl.SSLContext sc = javax.net.ssl.SSLContext.getInstance("SSL");
                sc.init(null, tmCerts, null);
                javax.net.ssl.HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
                HostnameVerifier hv = new SimpleHostnameVerifier();
                HttpsURLConnection.setDefaultHostnameVerifier(hv);
            } catch (Exception e) {
                LOG.error("init SSL exception.", e);
            }
        }
    
        @Override
        public ResponseWrapper sendJson(String url, String content) {
            return sendJson(url, content, null);
        }
    
        @Override
        public ResponseWrapper sendJson(String url, String content, String authCode) {
            return sendRequest(url, content, METHOD_POST, authCode, CONTENT_TYPE_JSON);
        }
    
    public ResponseWrapper sendRequest(String url, String content, String method, String authCode,
                String contentType) {
            LOG.info("Send request to - " + url + ", with content - " + content);
            HttpURLConnection conn = null;
            OutputStream out = null;
            StringBuffer sb = new StringBuffer();
            cn.jpush.utils.ResponseWrapper wrapper = new ResponseWrapper();
    
            try {
                if (StringUtils.isSSL(url)) {
                    initSSL();
                }
    
                if (METHOD_GET.equals(method)) {
                    if (!Strings.isNullOrEmpty(content)) url += "?" + content;
                }
                URL aUrl = new URL(url);
                wrapper.address = aUrl.getHost();
    
                conn = (HttpURLConnection) aUrl.openConnection();
                conn.setConnectTimeout(DEFAULT_CONNECTION_TIMEOUT);
                conn.setReadTimeout(DEFAULT_SOCKET_TIMEOUT);
                conn.setUseCaches(false);
                conn.setRequestMethod(method);
                conn.setRequestProperty("Connection", "Keep-Alive");
                conn.setRequestProperty("Accept-Charset", CHARSET);
                conn.setRequestProperty("Charset", CHARSET);
                conn.setRequestProperty("Authorization", authCode);
                conn.setRequestProperty("Send-Source", "portal");
                conn.setRequestProperty("Content-Type", contentType);
    
                if (METHOD_POST.equals(method)) {
                    conn.setDoOutput(true);
                    byte[] data = content.getBytes(CHARSET);
                    conn.setRequestProperty("Content-Length", String.valueOf(data.length));
                    out = conn.getOutputStream();
                    out.write(data);
                    out.flush();
                } else {
                    conn.setDoOutput(false);
                }
                int status = conn.getResponseCode();
                InputStream in = null;
                if (status == 200) {
                    in = conn.getInputStream();
                } else {
                    in = conn.getErrorStream();
                }
                InputStreamReader reader = new InputStreamReader(in, CHARSET);
                char[] buff = new char[1024];
                int len;
                while ((len = reader.read(buff)) > 0) {
                    sb.append(buff, 0, len);
                }
    
                String responseContent = sb.toString();
                wrapper.responseCode = status;
                wrapper.responseContent = responseContent;
    
                String quota = conn.getHeaderField(RATE_LIMIT_QUOTA);
                String remaining = conn.getHeaderField(RATE_LIMIT_Remaining);
                String reset = conn.getHeaderField(RATE_LIMIT_Reset);
                wrapper.setRateLimit(quota, remaining, reset);
    
                if (status == 200) {
                    LOG.debug("Succeed to get response - 200 OK");
                    LOG.debug("Response Content - " + responseContent);
    
                } else if (status > 200 && status < 400) {
                    LOG.warn("Normal response but unexpected - responseCode:" + status
                            + ", responseContent:" + responseContent);
    
                } else {
                    LOG.warn("Got error response - responseCode:" + status + ", responseContent:"
                            + responseContent);
    
                    switch (status) {
                        case 400:
                            LOG.error("Your request params is invalid. Please check them according to error message.");
                            wrapper.setErrorObject();
                            break;
                        case 401:
                            LOG.error("Authentication failed! Please check authentication params according to docs.");
                            wrapper.setErrorObject();
                            break;
                        case 403:
                            LOG.error("Request is forbidden! Maybe your is listed in blacklist?");
                            wrapper.setErrorObject();
                            break;
                        case 410:
                            LOG.error("Request resource is no longer in service. Please according to notice on official website.");
                            wrapper.setErrorObject();
                        case 429:
                            LOG.error("Too many requests! Please review your request quota.");
                            wrapper.setErrorObject();
                            break;
                        case 500:
                        case 502:
                        case 503:
                        case 504:
                            LOG.error("Seems encountered server error. Maybe is in maintenance? Please retry later.");
                            break;
                        default:
                            LOG.error("Unexpected response.");
                    }
    
                }
    
            } catch (SocketTimeoutException e) {
                if (e.getMessage().contains(KEYWORDS_READ_TIMED_OUT)) {
                    LOG.error(KEYWORDS_READ_TIMED_OUT, e);
                }
                wrapper.exceptionString = e.getMessage();
    
            } catch (IOException e) {
                LOG.error(KEYWORDS_CONNECT_TIMED_OUT, e);
                wrapper.exceptionString = e.getMessage();
    
            } finally {
                if (null != out) {
                    try {
                        out.close();
                    } catch (IOException e) {
                        LOG.error("Failed to close stream.", e);
                    }
                }
                if (null != conn) {
                    conn.disconnect();
                }
            }
            LOG.info(String.format("Send Response to - %s, Response Wrapper - %s", url, wrapper));
            return wrapper;
        }
    }

    3.截图预览

      下面给大家附上一张业务界面可视化的数据结果预览图,如下图所示:

      上图为我发送的测试数据,通过收集模块将我发送的数据收集并存储到 ES 集群,通过接口代码将这部分数据可视化到业务界面进行展示。

    4.总结

      总体来说,ES 集群从搭建部署到编码实现都较为简单,在使用 JSON 字符串拼接查询时需要细心点,后续有时间可以为大家分享下 ES 的查询的效率,及其他方面的性能指标。

    5.结束语

      这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

  • 相关阅读:
    java内嵌ftp服务器
    echostudio3 破解
    替换java中资源文件类
    使用Apache FtpServer搭建FTP服务器
    创建一个输入标识符 也就是一个输入的光标
    vbscript操作文件
    vbscript操作文件
    echostudio3 破解
    js做一个简易计算器
    js函数,传入原文,返回密文
  • 原文地址:https://www.cnblogs.com/smartloli/p/4720429.html
Copyright © 2011-2022 走看看