zoukankan      html  css  js  c++  java
  • Java 访问 Kylin 总结

    这次开发功能是OEM统计报表。统计报表的数据由大数据平台部的同事收集,数据的展示由我们部门开发。

    大数据那边使用 Kylin 分布式分析引擎(kylin官方文档)。

    Kylin 虽比较偏向大数据相关,但最终大佬们决定把访问 Kylin 的 Dubbo 接口也由我们开发,比较坑。

    解决方案一:Mybatis

    首先我们搭建一个 shop-mod-bigdata (+ shop-mod-bigdata-interface) 模块用于访问 Kylin,暴露 Dubbo 服务。

    Kylin 官方文档提供了 JDBC 的方式访问Kylin。

    为了长期考虑,是否可以用上连接池?

    Kylin 有提供 JDBC 的方式访问,那就应该会遵循 JDBC 的规范,

    参考 Mysql 的连接池配置进行配置

    <!-- Kylin 数据源(OEM) -->
        <bean id="kylinOemDataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
    		destroy-method="close">
    		<property name="driverClassName" value="${ds.kylin.jdbc.Driver}" />
    		<property name="url" value="${ds.kylin.oem.jdbc.url}" />
    		<property name="username" value="${ds.kylin.oem.jdbc.username}" />
    		<property name="password" value="${ds.kylin.oem.jdbc.password}" />
    		
    		<!-- 连接池配置 -->
    		<property name="testWhileIdle" value="${ds.kylin.testWhileIdle}" />
    		<property name="testOnBorrow" value="${ds.kylin.testOnBorrow}" />
    		<property name="testOnReturn" value="${ds.kylin.testOnReturn}" />
    		<property name="validationQuery" value="${ds.kylin.validationQuery}" />
    		<property name="validationInterval" value="${ds.kylin.validationInterval}" />
    		<property name="timeBetweenEvictionRunsMillis" value="${ds.kylin.timeBetweenEvictionRunsMillis}" />
    		<property name="maxActive" value="${ds.kylin.maxActive}" />
    		<property name="maxIdle" value="${ds.kylin.maxIdle}" />
    		<property name="minIdle" value="${ds.kylin.minIdle}" />
    		<property name="maxWait" value="${ds.kylin.maxWait}" />
    		<property name="initialSize" value="${ds.kylin.initialSize}" />
    		<property name="removeAbandonedTimeout" value="${ds.kylin.removeAbandonedTimeout}" />
    		<property name="removeAbandoned" value="${ds.kylin.removeAbandoned}" />
    		<property name="logAbandoned" value="${ds.kylin.logAbandoned}" />
    		<property name="minEvictableIdleTimeMillis" value="${ds.kylin.minEvictableIdleTimeMillis}" />
    		<property name="jmxEnabled" value="${ds.kylin.jmxEnabled}" />
    		<property name="jdbcInterceptors" value="${ds.kylin.jdbcInterceptors}" />
    	</bean>
    

    加上连接池配置后,下面就要验证和测试了。

    1:调试,跟源码。

    2:为便于观察运行细节是否正常,修改 log4j 日志配置

    log4j.logger.org.apache.tomcat.jdbc=DEBUG
    log4j.logger.org.springframework.jdbc=DEBUG
    log4j.logger.org.apache.kylin.jdbc=DEBUG
    log4j.logger.org.apache.ibatis=DEBUG
    log4j.logger.org.mybatis.spring=DEBUG
    

    3:测试有没有内存泄漏,长时间运行时是否有问题:CPU 是否异常,线程数是否异常,连接池是否异常,JVM垃圾回收是否异常,

    刚好用用学到的 JVM 相关的东东,这里我就直接使用 JDK 自带的工具(jvisualvm),

    入口的话就直接用单元测试+线程池来跑

    /**
     * Kylin 访问压测
     * 
     * @author zhangjl
     * @date 2018年3月9日
     */
    public class KylinPressureTest extends TestBase {
        @Resource
        private PersonFacade personFacade;
    
        @Test
        public void pressureTest() {
            ExecutorService threadPool = Executors.newFixedThreadPool(10);
            final AtomicInteger counter = new AtomicInteger();
            final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
    
            // 生成垃圾
            Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    int index = generateGarbage();
                    LOGGER.info("index={}", index);
                }
            }, 5, 5, TimeUnit.SECONDS);
            
            // 访问 Kylin
            for (int i = 0; i < 10000; i++) {
                LOGGER.info("第" + (i + 1) + "轮开始");
    
                for (int j = 0; j < 10; j++) {
                    threadPool.execute(new Runnable() {
                        @Override
                        public void run() {
                            int currNum = counter.addAndGet(1);
                            
                            try {
                                PageInfo page = new PageInfo();
                                page.setPagination(true);
                                PersonDto condition = new PersonDto();
                                condition.setGender(counter.get());
    
                                ApiResult<PageResult<PersonDto>> result = personFacade.page(condition, page);
                                LOGGER.info("Kylin访问结果, result={}", result);
                            } catch (Exception e) {
                                LOGGER.error("Kylin访问异常", e);
                            }
    
                            LOGGER.info("Kylin访问,当前为第{}个,time={}", currNum, sdf.format(new Date()));
                        }
                    });
                }
    
                try {
                    Thread.sleep(1L * 60L * 1000L);
                } catch (InterruptedException e) {
                    LOGGER.error("线程中断", e);
                }
            }
    
            // 等待
            waitingUserInputExit();
        }
    
        private void waitingUserInputExit() {
            Scanner scanner = null;
            try {
                scanner = new Scanner(System.in);
                while (true) {
                    String input = scanner.next();
                    if ("exit".equals(input)) {
                        break;
                    }
                }
            } catch (Exception e) {
                LOGGER.error("输出异常", e);
            } finally {
                if (null != scanner) {
                    scanner.close();
                }
            }
        }
    
        private int generateGarbage() {
            int index = 0;
            byte[] array1 = new byte[100 * 1024 * 1024]; // 100M
            // byte[] array2 = new byte[100 * 1024 * 1024]; // 100M
    
            array1[index++] = new Byte("1");
            // array2[index++] = new Byte("2");
    
            return index;
        }
    }
    

    修改 VM arguments 参数:

    -Xmx1024m -Xms1024m -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -Xloggc:mygc.log
    

    结果图:

    根据测试结果,连接池应该没什么大问题。

    感觉应该可以用 Mybatis ?那就加上 Mybatis 的配置,再试一波。

    简单 SQL 验证没问题。

    又遇到一个问题:分页。

    Mysql分页使用的是:LIMIT ${pageSize}, ${offset}

    Kylin分页使用的是:LIMIT ${pageSize} OFFSET ${offset}

    好像区别并不大,嗯嗯,完全可以基于以前的 Mysql Dialect 改代码。

    <!-- Kylin jdbcTemplate(OEM) -->
    	<!-- <bean id="kylinOemJdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
    		<property name="dataSource" ref="kylinOemDataSource"></property>
    	</bean> -->
    	
    	<!-- Kylin 不配置统一事务管理器(OEM) -->
    	
    	<!-- Kylin sqlSessionFactory(OEM) -->
    	<bean id="kylinOemSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
    		<property name="dataSource" ref="kylinOemDataSource" />
    		<property name="plugins">
    			<list>
    				<bean class="yunnex.shop.bigdata.common.kylin.KylinMybatisLogInterceptor"></bean>
    				<bean class="yunnex.shop.bigdata.common.kylin.KylinMybatisPaginationInterceptor"></bean>
    			</list>
    		</property>
    		<property name="mapperLocations">
    			<array>
    				<value>classpath*:/yunnex/shop/bigdata/oem/**/*.xml</value>
    			</array>
    		</property>
    	</bean>
    
    	<!-- Kylin MapperScannerConfigurer(OEM) -->
    	<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
    		<property name="basePackage" value="yunnex.shop.bigdata.oem.**.mapper,yunnex.shop.bigdata.oem.**.dao" />
    		<property name="sqlSessionFactoryBeanName" value="kylinOemSqlSessionFactory"></property>
    	</bean>
    
    /**
     * 分页拦截器
     * 
     * @author zhangjl
     * @date 2018年3月9日
     */
    @Intercepts({@Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class})})
    public class KylinMybatisPaginationInterceptor implements Interceptor {
        private Logger LOGGER = LoggerFactory.getLogger(KylinMybatisPaginationInterceptor.class);
    
        Dialect dialect = new KylinDialect();
    
        @Value("${sql.rows.max.return}")
        private Integer rowlimit;
    
        @SuppressWarnings("unchecked")
        public Object intercept(Invocation invocation) throws Throwable {
            MappedStatement mappedStatement = (MappedStatement) invocation.getArgs()[0];
            Object parameter = invocation.getArgs()[1];
            BoundSql boundSql = mappedStatement.getBoundSql(parameter);
            String sqlId = mappedStatement.getId();
            String originalSql = boundSql.getSql().trim();
            RowBounds rowBounds = (RowBounds) invocation.getArgs()[2];
    
            Object parameterObject = boundSql.getParameterObject();
            if (boundSql.getSql() == null || "".equals(boundSql.getSql())) {
                return null;
            }
    
            // 分页参数--上下文传参
            PageInfo page = null;
    
            // map传参每次都将currentPage重置,先判读map再判断context
            if (parameterObject instanceof PageInfo) {
                page = (PageInfo) parameterObject;
            } else if (parameterObject instanceof Map) {
                Map<String, Object> map = (Map<String, Object>) parameterObject;
                if (map.containsKey("page")) {
                    page = (PageInfo) map.get("page");
                }
            } else if (null != parameterObject) {
                Field pageField = ReflectionUtil.getFieldByFieldName(parameterObject, "page");
                if (pageField != null) {
                    page = (PageInfo) ReflectionUtil.getValueByFieldName(parameterObject, "page");
                }
            }
    
            // 后面用到了context的东东
            if (page != null && page.isPagination() == true) {
                if (page.getPageSize() > rowlimit) {
                    LOGGER.warn("[toolarge_pagesize] page size greater than {},#sqlid:{}#,#pagesize:{}#,#sql:{}#", rowlimit, sqlId, page.getPageSize(),
                                    originalSql);
                    page.setPageSize(rowlimit);
                }
    
                int totalRows = page.getTotalRows();
                // 得到总记录数
                if (totalRows == 0 && page.isNeedCount()) {
                    StringBuilder countSql = new StringBuilder();
                    countSql.append(KylinPageHepler.getCountString(originalSql));
                    Connection connection = mappedStatement.getConfiguration().getEnvironment().getDataSource().getConnection();
                    PreparedStatement countStmt = connection.prepareStatement(countSql.toString());
                    BoundSql countBS = new BoundSql(mappedStatement.getConfiguration(), countSql.toString(), boundSql.getParameterMappings(),
                                    parameterObject);
                    Field metaParamsField = ReflectionUtil.getFieldByFieldName(boundSql, "metaParameters");
                    if (metaParamsField != null) {
                        MetaObject mo = (MetaObject) ReflectionUtil.getValueByFieldName(boundSql, "metaParameters");
                        ReflectionUtil.setValueByFieldName(countBS, "metaParameters", mo);
                    }
                    setParameters(countStmt, mappedStatement, countBS, parameterObject);
                    ResultSet rs = countStmt.executeQuery();
                    if (rs.next()) {
                        totalRows = rs.getInt(1);
                    }
                    rs.close();
                    countStmt.close();
                    connection.close();
                }
    
                // 分页计算
                page.init(totalRows, page.getPageSize(), page.getCurrentPage());
    
                if (rowBounds == null || rowBounds == RowBounds.DEFAULT) {
                    rowBounds = new RowBounds(page.getPageSize() * (page.getCurrentPage() - 1), page.getPageSize());
                }
    
                // 分页查询 本地化对象 修改数据库注意修改实现
                String pagesql = dialect.getLimitString(originalSql, rowBounds.getOffset(), rowBounds.getLimit());
                invocation.getArgs()[2] = new RowBounds(RowBounds.NO_ROW_OFFSET, RowBounds.NO_ROW_LIMIT);
                BoundSql newBoundSql = new BoundSql(mappedStatement.getConfiguration(), pagesql, boundSql.getParameterMappings(),
                                boundSql.getParameterObject());
                Field metaParamsField = ReflectionUtil.getFieldByFieldName(boundSql, "metaParameters");
                if (metaParamsField != null) {
                    MetaObject mo = (MetaObject) ReflectionUtil.getValueByFieldName(boundSql, "metaParameters");
                    ReflectionUtil.setValueByFieldName(newBoundSql, "metaParameters", mo);
                }
                MappedStatement newMs = copyFromMappedStatement(mappedStatement, new BoundSqlSqlSource(newBoundSql));
    
                invocation.getArgs()[0] = newMs;
            }
    
            return invocation.proceed();
        }
    
        public static class BoundSqlSqlSource implements SqlSource {
            BoundSql boundSql;
    
            public BoundSqlSqlSource(BoundSql boundSql) {
                this.boundSql = boundSql;
            }
    
            public BoundSql getBoundSql(Object parameterObject) {
                return boundSql;
            }
        }
    
        public Object plugin(Object arg0) {
            return Plugin.wrap(arg0, this);
        }
    
        public void setProperties(Properties arg0) {
    
        }
    
        /**
         * 对SQL参数(?)设值,参考org.apache.ibatis.executor.parameter.DefaultParameterHandler
         * 
         * @param ps
         * @param mappedStatement
         * @param boundSql
         * @param parameterObject
         * @throws SQLException
         */
        @SuppressWarnings({"rawtypes", "unchecked"})
        private void setParameters(PreparedStatement ps, MappedStatement mappedStatement, BoundSql boundSql, Object parameterObject) throws SQLException {
            ErrorContext.instance().activity("setting parameters").object(mappedStatement.getParameterMap().getId());
            List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
            if (parameterMappings != null) {
                Configuration configuration = mappedStatement.getConfiguration();
                TypeHandlerRegistry typeHandlerRegistry = configuration.getTypeHandlerRegistry();
                MetaObject metaObject = parameterObject == null ? null : configuration.newMetaObject(parameterObject);
                for (int i = 0; i < parameterMappings.size(); i++) {
                    ParameterMapping parameterMapping = parameterMappings.get(i);
                    if (parameterMapping.getMode() != ParameterMode.OUT) {
                        Object value;
                        String propertyName = parameterMapping.getProperty();
                        PropertyTokenizer prop = new PropertyTokenizer(propertyName);
                        if (parameterObject == null) {
                            value = null;
                        } else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
                            value = parameterObject;
                        } else if (boundSql.hasAdditionalParameter(propertyName)) {
                            value = boundSql.getAdditionalParameter(propertyName);
                        } else if (propertyName.startsWith(ForEachSqlNode.ITEM_PREFIX) && boundSql.hasAdditionalParameter(prop.getName())) {
                            value = boundSql.getAdditionalParameter(prop.getName());
                            if (value != null) {
                                value = configuration.newMetaObject(value).getValue(propertyName.substring(prop.getName().length()));
                            }
                        } else {
                            value = metaObject == null ? null : metaObject.getValue(propertyName);
                        }
                        TypeHandler typeHandler = parameterMapping.getTypeHandler();
                        if (typeHandler == null) {
                            throw new ExecutorException(
                                            "There was no TypeHandler found for parameter " + propertyName + " of statement " + mappedStatement.getId());
                        }
                        typeHandler.setParameter(ps, i + 1, value, parameterMapping.getJdbcType());
                    }
                }
            }
        }
    
        private MappedStatement copyFromMappedStatement(MappedStatement ms, SqlSource newSqlSource) {
            Builder builder = new MappedStatement.Builder(ms.getConfiguration(), ms.getId(), newSqlSource, ms.getSqlCommandType());
            builder.resource(ms.getResource());
            builder.fetchSize(ms.getFetchSize());
            builder.statementType(ms.getStatementType());
            builder.keyGenerator(ms.getKeyGenerator());
            builder.keyProperty(buildKeyProperty(ms.getKeyProperties()));
            builder.timeout(ms.getTimeout());
            builder.parameterMap(ms.getParameterMap());
            builder.resultMaps(ms.getResultMaps());
            builder.useCache(ms.isUseCache());
            builder.cache(ms.getCache());
            MappedStatement newMs = builder.build();
            return newMs;
        }
    
        private static String buildKeyProperty(String[] props) {
            if (null != props && props.length > 0) {
                StringBuilder sb = new StringBuilder();
                for (String p : props) {
                    sb.append(p).append(",");
                }
    
                return sb.substring(0, sb.length() - 1);
            }
            return null;
        }
    }
    

      

    /**
     * Kylin Dialect
     * 
     * @author zhangjl
     * @date 2018年3月9日
     */
    public class KylinDialect extends Dialect {
        protected static final String SQL_END_DELIMITER = ";";
    
        public String getLimitString(String sql, boolean hasOffset) {
            return KylinPageHepler.getLimitString(sql, -1, -1);
        }
    
        @Override
        public String getLimitString(String sql, int offset, int limit) {
            return KylinPageHepler.getLimitString(sql, offset, limit);
        }
    
        @Override
        public boolean supportsLimit() {
            return true;
        }
    }
    

      

    /**
     * 分页帮助类
     * 
     * @author zhangjl
     * @date 2018年3月9日
     */
    public class KylinPageHepler {
        /**
         * 得到查询总数的sql
         */
        public static String getCountString(String querySelect) {
            querySelect = getLineSql(querySelect);
            int orderIndex = getLastOrderInsertPoint(querySelect);
    
            int formIndex = getAfterFormInsertPoint(querySelect);
            String select = querySelect.substring(0, formIndex);
    
            // 如果SELECT 中包含 DISTINCT 只能在外层包含COUNT
            if (select.toLowerCase().indexOf("select distinct") != -1 || querySelect.toLowerCase().indexOf("group by") != -1) {
                return new StringBuffer(querySelect.length()).append("select count(1) from (").append(querySelect.substring(0, orderIndex))
                                .append(" ) t").toString();
            } else {
                return new StringBuffer(querySelect.length()).append("select count(1) ").append(querySelect.substring(formIndex, orderIndex))
                                .toString();
            }
        }
    
        /**
         * 得到最后一个Order By的插入点位置
         * 
         * @return 返回最后一个Order By插入点的位置
         */
        private static int getLastOrderInsertPoint(String querySelect) {
            int orderIndex = querySelect.toLowerCase().lastIndexOf("order by");
            if (orderIndex == -1 || !isBracketCanPartnership(querySelect.substring(orderIndex, querySelect.length()))) {
                throw new RuntimeException("Kylin SQL 分页必须要有Order by 语句!");
            }
            return orderIndex;
        }
    
        /**
         * 得到分页的SQL
         * 
         * @param offset 偏移量
         * @param limit 位置
         * @return 分页SQL
         */
        public static String getLimitString(String querySelect, int offset, int limit) {
            querySelect = getLineSql(querySelect);
    
            // String sql = querySelect + " limit " + offset + " ," + limit;
            String sql = querySelect + " limit " + limit + " offset " + offset;
            return sql;
        }
    
        /**
         * 将SQL语句变成一条语句,并且每个单词的间隔都是1个空格
         * 
         * @param sql SQL语句
         * @return 如果sql是NULL返回空,否则返回转化后的SQL
         */
        private static String getLineSql(String sql) {
            return sql.replaceAll("[
    ]", " ").replaceAll("\s{2,}", " ");
        }
    
        /**
         * 得到SQL第一个正确的FROM的的插入点
         */
        private static int getAfterFormInsertPoint(String querySelect) {
            String regex = "\s+FROM\s+";
            Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
            Matcher matcher = pattern.matcher(querySelect);
            while (matcher.find()) {
                int fromStartIndex = matcher.start(0);
                String text = querySelect.substring(0, fromStartIndex);
                if (isBracketCanPartnership(text)) {
                    return fromStartIndex;
                }
            }
            return 0;
        }
    
        /**
         * 判断括号"()"是否匹配,并不会判断排列顺序是否正确
         * 
         * @param text 要判断的文本
         * @return 如果匹配返回TRUE,否则返回FALSE
         */
        private static boolean isBracketCanPartnership(String text) {
            if (text == null || (getIndexOfCount(text, '(') != getIndexOfCount(text, ')'))) {
                return false;
            }
            return true;
        }
    
        /**
         * 得到一个字符在另一个字符串中出现的次数
         * 
         * @param text 文本
         * @param ch 字符
         */
        private static int getIndexOfCount(String text, char ch) {
            int count = 0;
            for (int i = 0; i < text.length(); i++) {
                count = (text.charAt(i) == ch) ? count + 1 : count;
            }
            return count;
        }
    }
    

    OK,改造完成,又得一波验证跟测试。

    解决方案二:Freemarker

    解决方案验证没问题,其实有一个更简洁的方案。

    1 使用 Freemarker 的渲染功能 ,渲染SQL模板(达到类似 Mybatis 的能力)

    2 使用数据库保存 SQL 模板;

    3 使用Spring的JdbcTemplate

    个人觉得这个方案有比用 Mybatis 更好的优势,SQL 有问题了,用 DML 改下 SQL 模板,都不用改代码了!

    下面放出一个 Freemaker 渲染 SQL 模板示例代码:

    import java.io.StringWriter;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.junit.Test;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import freemarker.cache.StringTemplateLoader;
    import freemarker.template.Configuration;
    import freemarker.template.Template;
    
    /**
     * Freemarker 测试
     * @author zhangjl
     * @date 2018年3月9日
     */
    public class FreemarkerTest {
        private static final Logger LOG = LoggerFactory.getLogger(FreemarkerTest.class);
    
        @Test
        public void test1() {
            try {
                /**
                 * <pre>
                 *  SELECT *
                    FROM shop
                    WHERE 1 = 1
                    <#if id??>AND id = ${id?c} </#if>
                    <#if order??>ORDER BY ${order} <#if sort??> ${sort} </#if> </#if>
                 * </pre>
                 */
                String sqlTemplate = "SELECT * FROM shop WHERE 1 = 1 <#if id??>AND id = ${id?c} </#if> <#if order??>ORDER BY ${order} <#if sort??> ${sort} </#if> </#if>";
                String sqlId = "shop.list";
                Map<String, Object> paramMap = new HashMap<>();
                paramMap.put("id", 1L);
    
                String sql = getSql(sqlId, sqlTemplate, paramMap);
                LOG.info("解析后的SQL={}", sql);
            } catch (Exception e) {
                LOG.error("解析SQL异常", e);
            }
        }
    
        public String getSql(String sqlId, String sqlTemplate, Map<String, Object> paramMap) throws Exception {
            StringTemplateLoader loader = new StringTemplateLoader();
            loader.putTemplate(sqlId, sqlTemplate);
    
            Configuration conf = new Configuration();
            conf.setTemplateLoader(loader);
    
            Template template = conf.getTemplate(sqlId, "utf-8");
    
            StringWriter writer = new StringWriter();
            template.process(paramMap, writer);
            String sql = writer.toString();
            writer.close();
    
            return sql;
        }
    }
    

      

  • 相关阅读:
    cd /etc/init.d查看centos下自己注册的服务
    添加提前闭合标签 之后添加js可以执行js
    networkinterface 获取本机ip
    docker学习遇到的问题
    乌班图安装
    c3p0连接错误 An attempt by a client to checkout a Connection has timed out.
    【Spring开发】—— Spring注入静态变量
    mysql命令行导入导出
    IE8下不支持console.log()
    菜鸟教程
  • 原文地址:https://www.cnblogs.com/ken-jl/p/8681347.html
Copyright © 2011-2022 走看看