zoukankan      html  css  js  c++  java
  • datax中oracleWriter

    在使用datax的oraclewriter时,由于对oracle的不熟悉,以及c++编译的不熟悉,颇费了一些周折。在此,记录一下,供再次使用的人参考。

    1.oracleWriter :oracle提供了OCCI接口,便于直接往oracle里load数据,但是是c++的接口,所以,datax的oracleWriter通过对cpp代码的包装,使用JNI的方式去调用。

    2.oracleJdbcWriter使用起来就简单多了,后面附上代码,不再赘述。

    准备工作为:oracle客户端的安装和liboraclewriter.so的编译。

    一:oracle客户端安装。本位在redhat64上使用了oracle11g,以下为详细安装步骤。

      1.oracle官网下载rpm包,进行安装。

           官网地址:http://www.oracle.com/technetwork/topics/linuxx86-64soft-092277.html  

            下载包:

          oracle-instantclient11.2-basic-11.2.0.1.0-1.x86_64.rpm
          oracle-instantclient11.2-devel-11.2.0.1.0-1.x86_64.rpm
          oracle-instantclient11.2-sqlplus-11.2.0.1.0-1.x86_64.rpm

            安装命令:

          rpm -ivh oracle-instantclient11.2-basic-11.2.0.1.0-1.x86_64.rpm 
          rpm -ivh oracle-instantclient11.2-sqlplus-11.2.0.1.0-1.x86_64.rpm
          rpm -ivh oracle-instantclient11.2-devel-11.2.0.1.0-1.x86_64.rpm  

       环境变量配置:

          /etc/profile,追加以下内容

          export ORACLE_HOME=/usr/lib/oracle/11.2/client64
          export TNS_ADMIN=$ORACLE_HOME/network/admin
          export NLS_LANG='AMERICAN_AMERICA.UTF8'
          export LD_LIBRARY_PATH=$ORACLE_HOME/lib
          export PATH=$ORACLE_HOME/bin:$PATH

          保存,source /etc/profile 使之生效。

        至此,oracle的安装完毕。

    /**       
    因为对oracle的不熟悉,补充几句,以免耽误时间找各种配置。 sqlplus 使用时,网上很多资料讲要配置$TNS_ADMIN下的tnsnames.org。 其实,OCCI使用时,oracleWriter的代码里,我们使用以下方式去连接,是不需要配置的。 不需配置tnsnames.ora的形式: java代码中: logon = username + "/" + password + "@//" + ip + ":" + port + "/" + dbname; 命令行中 : sqlplus user/pass@//ip:port/db 格式不对时,会报以下错误:    ERROR:     ORA-12541: TNS:no listener */

      

    二:liboraclewriter.so的编译及使用,以下为详细安装步骤。

    1. 将datax下oracledumper下的代码,下载到linux机上,准备编译。(在之前装好oracle客户端的机器上)
    2.  如果本地代码使用的包名不同于源码的,需要修改对应的文件。

    修改文件:include中:xx_OracleWriterJni.h(修改文件名和 OracleWriterJni.h中方法的包路径名。)

         src中:xx_OracleWriterJni.cpp (修改文件名)

    3.修改src下Makefile 文件(不熟c++编译的切记)。

      注意点:

       根据错误提示,有一个变量应该需要追加const

    -Wl,-rpath  so文件运行时,依赖的包路径。确保此路径准确,oracle安装后,确认路径正确对应。
    INCLUDE=-I../include -I$$JAVA_HOME/include/linux -I$$JAVA_HOME/include/
    LIBS=-lclntsh -liconv -L../lib -L${ORACLE_HOME}/lib -L../../../../libs/  -L/usr/lib/oracle/11.2/client64/lib/
    CC=g++
    OBJS=liboraclewriter.so
    CFLAGS=-shared -fPIC  -Wl,-rpath=/usr/lib/oracle/11.2/client64/lib/
    CPP=common.cpp dumper.cpp oradumper.cpp strsplit.cpp com_suning_dc_cybertron_datax_plugins_writer_oraclewriter_OracleWriterJni.cpp
    
    OBJS: $(CPP)
        $(CC) $(INCLUDE) -o $(OBJS) $(CPP) $(CFLAGS) $(LIBS) 
    clean:
        rm -rf $(OBJS)

    4.make 生成liboraclewriter.so。(make环境缺少g++等,自行安装,不赘述)。

    至此,可以happy的去代码中使用适合本地环境的occi接口的oraclewriter了,其他几个so包使用源码中自带的就好。

    System.load(PropertyReader.getJETFIRE_HOME() + "libs"
                        + "/libcommon.so");
    System.load(PropertyReader.getJETFIRE_HOME() + "libs"
                        + "/libcharset.so");
    System.load(PropertyReader.getJETFIRE_HOME() + "libs"
                        + "/libiconv.so.2");
    System.load(PropertyReader.getJETFIRE_HOME() + "libs"
                        + "/liboraclewriter.so");

    OCCI 和 jdbc writer的性能对比:

    occi :
    Total time costs          :                142s
    Average byte speed        :               1MB/s
    Average line speed        :            35211L/s
    Total transferred records :             5000000
    Total discarded records   :                   0
    jdbc:
    Total time costs          :               2229s
    Average byte speed        :              68KB/s
    Average line speed        :             2243L/s
    Total transferred records :             5000000

      

    问题及异常:

    1.OCI使用的是direct-insert的方式,需要锁表,有时报错如下:
    
    现象:
    OCI Error -1 occurred at File oradumper.cpp:1605
    Error[32661] - ORA-00604: error occurred at recursive SQL level 1
    ORA-01031: insufficient privileges
    
    status: -1, 1605, oradumper.cpp, init_load
    1605, oradumper.cpp, init_load
    
    对策:
    oracle上除了赋予用户insert select权限以外,还要
    grant lock any table to 你的用户。

    附:jdbcWriter代码

    public class OracleJdbcWriter extends Writer {
    
        private Logger logger = Logger.getLogger(OracleJdbcWriter.class);
    
        private String password;
    
        private String username;
    
        private String dbname;
    
        private String table;
    
        private String pre;
    
        private String post;
    
        private String encoding;
    
        private int limit;
    
        private int failCount;// count error lines
    
        private long concurrency;
    
        private int batchSize;
    
        private String sourceUniqKey = "";
    
        private String port;
    
        private String insert;
    
        private String host;
    
        private String DRIVER_NAME = "oracle.jdbc.driver.OracleDriver";
    
        private Connection connection;
    
        private String writeColumns;
    
        @Override
        public int init() {
            password = param.getValue(ParamKey.password, "");
            username = param.getValue(ParamKey.username, "");
            host = param.getValue(ParamKey.ip);
            port = param.getValue(ParamKey.port, "3306");
            dbname = param.getValue(ParamKey.dbname, "");
            table = param.getValue(ParamKey.table, "");
            pre = param.getValue(ParamKey.pre, "");
            post = param.getValue(ParamKey.post, "");
            insert = param.getValue(ParamKey.insert, "");
            encoding = param.getValue(ParamKey.encoding, "UTF-8");
            limit = param.getIntValue(ParamKey.limit, 1000);
            concurrency = param.getIntValue(ParamKey.concurrency, 1);
            writeColumns = param.getValue(ParamKey.writeColumns, "");
            batchSize = param.getIntValue(ParamKey.batchSize, 50000);
            this.sourceUniqKey = DBSource.genKey(this.getClass(), host, port,
                    dbname);
            this.host = param.getValue(ParamKey.ip);
            this.port = param.getValue(ParamKey.port, "3306");
            this.dbname = param.getValue(ParamKey.dbname);
    
            return PluginStatus.SUCCESS.value();
        }
    
        @Override
        public int prepare(PluginParam param) {
            this.init();
            this.setParam(param);
    
            DBSource.register(this.sourceUniqKey, this.genProperties());
    
            if (StringUtils.isBlank(this.pre))
                return PluginStatus.SUCCESS.value();
    
            Statement stmt = null;
            try {
                this.connection = DBSource.getConnection(this.sourceUniqKey);
    
                stmt = this.connection.createStatement(
                        ResultSet.TYPE_SCROLL_INSENSITIVE,
                        ResultSet.CONCUR_UPDATABLE);
    
                for (String subSql : this.pre.split(";")) {
                    this.logger.info(String.format("Excute prepare sql %s .",
                            subSql));
                    stmt.execute(subSql);
                }
                this.connection.commit();
                return PluginStatus.SUCCESS.value();
            } catch (Exception e) {
                throw new DataExchangeException(e.getCause());
            } finally {
                try {
                    if (null != stmt) {
                        stmt.close();
                    }
                    if (null != this.connection) {
                        this.connection.close();
                        this.connection = null;
                    }
                } catch (SQLException e) {
                }
            }
        }
    
        @Override
        public int connect() {
            return PluginStatus.SUCCESS.value();
        }
    
        @Override
        public int startWrite(LineReceiver receiver) {
            PreparedStatement ps = null;
            try {
                this.connection = DBSource.getConnection(this.sourceUniqKey);
                Line line = null;
                int lines = 0;
                line = receiver.getFromReader();
                if (line == null) {// 读取数据为空的情况。
                    return PluginStatus.SUCCESS.value();
                }
                this.insert = buildInsertStr(line);
                ps = this.connection.prepareStatement(this.insert,
                        ResultSet.TYPE_SCROLL_INSENSITIVE,
                        ResultSet.CONCUR_UPDATABLE);
                this.connection.setAutoCommit(false);
                while (line != null) {
                    try {
                        for (int i = 0; i < line.getFieldNum(); i++) {
                            ps.setObject(i + 1, line.getField(i));
                        }
                        ps.execute();
                    } catch (SQLException e) {
                        logger.error("Invalid Data: " + line.toString(','));
                        logger.error(e.getMessage());
                        failCount++;
                        if (failCount >= this.limit) {
                            logger.error("出错条数 (" + failCount + ") 超过限制条数。");
                            e.printStackTrace();
                            throw new DataExchangeException(e);
                        } else {
                            continue;
                        }
    
                    }
                    if (lines++ == this.batchSize) {
                        logger.info(lines + " committed by worker "
                                + Thread.currentThread().getName() + " .");
                        lines = 0;
                        this.connection.commit();
    
                    }
                    line = receiver.getFromReader();
                }
                this.connection.commit();
                this.connection.setAutoCommit(true);
                this.getMonitor().setFailedLines(this.failCount);
                return PluginStatus.SUCCESS.value();
            } catch (Exception e2) {
                throw new DataExchangeException(e2.getCause());
            } finally {
                if (null != ps)
                    try {
                        ps.close();
                    } catch (SQLException e3) {
                    }
            }
        }
    
        @Override
        public int post(PluginParam param) {
            if (StringUtils.isBlank(this.post))
                return PluginStatus.SUCCESS.value();
    
            Statement stmt = null;
            try {
                this.connection = DBSource.getConnection(this.sourceUniqKey);
    
                stmt = this.connection.createStatement(
                        ResultSet.TYPE_SCROLL_INSENSITIVE,
                        ResultSet.CONCUR_UPDATABLE);
    
                for (String subSql : this.post.split(";")) {
                    this.logger.info(String.format("Excute prepare sql %s .",
                            subSql));
                    stmt.execute(subSql);
                }
    
                return PluginStatus.SUCCESS.value();
            } catch (Exception e) {
                e.printStackTrace();
                throw new DataExchangeException(e.getCause());
            } finally {
                try {
                    if (null != stmt) {
                        stmt.close();
                    }
                    if (null != this.connection) {
                        this.connection.close();
                        this.connection = null;
                    }
                } catch (Exception e2) {
                }
    
            }
        }
    
        @Override
        public List<PluginParam> split(PluginParam param) {
            OracleJdbcWriterSplitter splitter = new OracleJdbcWriterSplitter();
            splitter.setParam(param);
            splitter.init();
            return splitter.split();
        }
    
        @Override
        public int commit() {
            return PluginStatus.SUCCESS.value();
        }
    
        @Override
        public int finish() {
            return PluginStatus.SUCCESS.value();
        }
    
        private Properties genProperties() {
            Properties p = new Properties();
            p.setProperty("driverClassName", this.DRIVER_NAME);
            String url = "jdbc:oracle:thin:@" + this.host + ":" + this.port + "/"
                    + this.dbname;
            p.setProperty("url", url);
            p.setProperty("username", this.username);
            p.setProperty("password", this.password);
            p.setProperty("maxActive", String.valueOf(this.concurrency + 2));
    
            return p;
        }
    
        private String buildInsertStr(Line line) {
            String sql = "";
            String[] subcos = null;
            // 判断到底是全部插入表还是部分插入,最终确定sql
            if (!writeColumns.equals("")) {
                // 如果是部分插入的话
                sql = "insert into " + table + "(";
                subcos = writeColumns.split(",");
                sql += writeColumns;
                sql += ") values(";
                for (int i = 0; i < subcos.length; i++) {
                    sql += "?";
                    sql += ",";
                }
                sql = sql.substring(0, sql.length() - 1);
                sql += ")";
            } else {
                sql = "insert into " + table + " values(";
                for (int i = 0; i < line.getFieldNum(); i++) {
                    sql += "?";
                    sql += ",";
                }
                sql = sql.substring(0, sql.length() - 1);
                sql += ")";
            }
            return sql;
        }
    
    }
  • 相关阅读:
    Group Normalization
    Resnet小记
    Mxnet 查看模型params的网络结构
    ResNets和Inception的理解
    基础 | batchnorm原理及代码详解
    交叉熵代价函数原理
    深度学习笔记:优化方法总结(BGD,SGD,Momentum,AdaGrad,RMSProp,Adam)
    机器学习 logistic分类
    ubuntu16.04 安装搜狗输入法
    工作5年半了,最近准备做一些工作的小结了
  • 原文地址:https://www.cnblogs.com/drawwindows/p/5091227.html
Copyright © 2011-2022 走看看