zoukankan      html  css  js  c++  java
  • 从Oracle到Elasticsearch

    自己写的数据交换工具——从Oracle到Elasticsearch

     

    先说说需求的背景,由于业务数据都在Oracle数据库中,想要对它进行数据的分析会非常非常慢,用传统的数据仓库-->数据集市这种方式,集市层表会非常大,查询的时候如果再做一些group的操作,一个访问需要一分钟甚至更久才能响应。

    为了解决这个问题,就想把业务库的数据迁移到Elasticsearch中,然后针对es再去做聚合查询。

    问题来了,数据库中的数据量很大,如何导入到ES中呢?

    Logstash JDBC

    Logstash提供了一款JDBC的插件,可以在里面写sql语句,自动查询然后导入到ES中。这种方式比较简单,需要注意的就是需要用户自己下载jdbc的驱动jar包。

    input {
        jdbc {
            jdbc_driver_library => "ojdbc14-10.2.0.3.0.jar"
            jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
            jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:test"
            jdbc_user => "test"
            jdbc_password => "test123"
            schedule => "* * * * *"
            statement => "select * from TARGET_TABLE"
            add_field => ["type","a"]
        }
    }
    output{
        elasticsearch {
            hosts =>["10.10.1.205:9200"]
            index => "product"
            document_type => "%{type}"
        }
    }

    不过,它的性能实在是太差了!我导了一天,才导了两百多万的数据。

    因此,就考虑自己来导。

    自己的数据交换工具

    思路:

    最后使用发现,自己写的导入程序,比Logstash jdbc快5-6倍~~~~~~ 嗨皮!!!!

    遇到的问题

    • 1 JDBC需要采用分页的方式读取全量数据
    • 2 要模仿bulk文件进行存储
    • 3 由于bulk文件过大,导致curl内存溢出

    程序开源

    下面的代码需要注意的就是

    public class JDBCUtil {
        private static Connection conn = null;
        private static PreparedStatement sta=null;
        static{
            try {
                Class.forName("oracle.jdbc.driver.OracleDriver");
                conn = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:test", "test", "test123");
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            } catch (SQLException e) {
                e.printStackTrace();
            }
            System.out.println("Database connection established");
        }
        /**
        * 把查到的数据格式化写入到文件
        *
        * @param list 需要存储的数据
        * @param index 索引的名称
        * @param type 类型的名称
        * @param path 文件存储的路径
        **/
        public static void writeTable(List<Map> list,String index,String type,String path) throws SQLException, IOException {
            System.out.println("开始写文件");
            File file = new File(path);
            int count = 0;
            int size = list.size();
            for(Map map : list){
                FileUtils.write(file,  "{ "index" : { "_index" : ""+index+"", "_type" : ""+type+"" } }
    ","UTF-8",true);
                FileUtils.write(file, JSON.toJSONString(map)+"
    ","UTF-8",true);
    //            System.out.println("写入了" + ((count++)+1) + "[" + size + "]");
            }
            System.out.println("写入完成");
        }
    
        /**
         * 读取数据
         * @param sql
         * @return
         * @throws SQLException
         */
        public static List<Map> readTable(String tablename,int start,int end) throws SQLException {
            System.out.println("开始读数据库");
            //执行查询
            sta = conn.prepareStatement("select * from(select rownum as rn,t.* from "+tablename+" t )where rn >="+start+" and rn <"+end);
            ResultSet rs = sta.executeQuery();
    
            //获取数据列表
            List<Map> data = new ArrayList();
            List<String> columnLabels = getColumnLabels(rs);
    
            Map<String, Object> map = null;
            while(rs.next()){
                map = new HashMap<String, Object>();
    
                for (String columnLabel : columnLabels) {
                    Object value = rs.getObject(columnLabel);
                    map.put(columnLabel.toLowerCase(), value);
                }
                data.add(map);
            }
            sta.close();
            System.out.println("数据读取完毕");
            return data;
        }
        /**
         * 获得列名
         * @param resultSet
         * @return
         * @throws SQLException
         */
        private static List<String> getColumnLabels(ResultSet resultSet)
                throws SQLException {
            List<String> labels = new ArrayList<String>();
    
            ResultSetMetaData rsmd = (ResultSetMetaData) resultSet.getMetaData();
            for (int i = 0; i < rsmd.getColumnCount(); i++) {
                labels.add(rsmd.getColumnLabel(i + 1));
            }
    
            return labels;
        }
        /**
        * 获得数据库表的总数,方便进行分页
        *
        * @param tablename 表名
        */
        public static int count(String tablename) throws SQLException {
            int count = 0;
            Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE);
            ResultSet rs = stmt.executeQuery("select count(1) from "+tablename);
            while (rs.next()) {
                count = rs.getInt(1);
            }
            System.out.println("Total Size = " + count);
            rs.close();
            stmt.close();
            return count;
        }
        /**
         * 执行查询,并持久化文件
         * 
         * @param tablename 导出的表明
         * @param page 分页的大小
         * @param path 文件的路径
         * @param index 索引的名称
         * @param type 类型的名称
         * @return
         * @throws SQLException
         */
        public static void readDataByPage(String tablename,int page,String path,String index,String type) throws SQLException, IOException {
            int count = count(tablename);
            int i =0;
            for(i =0;i<count;){
                List<Map> map = JDBCUtil.readTable(tablename,i,i+page);
                JDBCUtil.writeTable(map,index,type,path);
                i+=page;
            }
        }
    }

    在main方法中传入必要的参数即可:

    public class Main {
        public static void main(String[] args) {
            try {
                JDBCUtil.readDataByPage("TABLE_NAME",1000,"D://data.json","index","type");
            } catch (SQLException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    这样得到bulk的数据后,就可以运行脚本分批导入了。

    下面脚本的思路,就是每100000行左右的数据导入到一个目标文件,使用bulk命令导入到es中。注意一个细节就是不能随意的切分文件,因为bulk的文件是两行为一条数据的。

    #!/bin/bash
    
    count=0
    rm target.json
    touch target.json
    
    
    while read line;do
    
    ((count++))
    
    {
            echo $line >> target.json
    
            if [ $count -gt 100000 ] && [ $((count%2)) -eq 0 ];then
                    count=0
                    curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null
                    rm target.json
                    touch target.json
            fi
    
    }
    
    done < $1
    echo 'last submit'
    curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null

    最后执行脚本:

    sh auto_bulk.sh data.json

    自己测试最后要比logstasj jdbc快5-6倍。

  • 相关阅读:
    APMServ5.2.6 无法启动Apache的一个问题
    【转】流媒体技术笔记(视频编码相关)
    用APMServ一键快速搭建Apache+PHP+MySQL+Nginx+Memcached+ASP运行平台
    java swing 基础
    python class 类
    python 经验
    python 导入(转)
    kernel ipv4/ip_output.c
    python+正则表达式(转)
    Eclipse中如何快速添加、删除jar包
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/7683273.html
Copyright © 2011-2022 走看看