zoukankan      html  css  js  c++  java
  • Spark:将DataFrame写入Mysql

    Spark将DataFrame进行一些列处理后,需要将之写入mysql,下面是实现过程

    1.mysql的信息

    mysql的信息我保存在了外部的配置文件,这样方便后续的配置添加。

    1 //配置文件示例:
    2 [hdfs@iptve2e03 tmp_lillcol]$ cat job.properties 
    3 #mysql数据库配置
    4 mysql.driver=com.mysql.jdbc.Driver
    5 mysql.url=jdbc:mysql://127.0.0.1:3306/database1?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
    6 mysql.username=user
    7 mysql.password=123456


    2.需要的jar依赖(sbt版本,maven的对应修改即可)

     1 libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2"
     2 libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2"
     3 libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2"
     4 libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2"
     5 libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2"
     6 libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2"
     7 libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2"
     8 libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.38"
     9 libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.2"
    10 libraryDependencies += "com.yammer.metrics" % "metrics-core" % "2.2.0"

    3.完整实现代码

      1 import java.io.FileInputStream
      2 import java.sql.{Connection, DriverManager}
      3 import java.util.Properties
      4 
      5 import org.apache.spark.sql.hive.HiveContext
      6 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
      7 import org.apache.spark.{SparkConf, SparkContext}
      8 
      9 /**
     10   * @author Administrator
     11   *         2018/10/16-10:15
     12   *
     13   */
     14 object SaveDataFrameASMysql {
     15   var hdfsPath: String = ""
     16   var proPath: String = ""
     17   var DATE: String = ""
     18 
     19   val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
     20   val sc: SparkContext = new SparkContext(sparkConf)
     21   val sqlContext: SQLContext = new HiveContext(sc)
     22 
     23   def main(args: Array[String]): Unit = {
     24     hdfsPath = args(0)
     25     proPath = args(1)
     26     //不过滤读取
     27     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
     28     dim_sys_city_dict.show(10)
     29 
     30     //保存mysql
     31     saveASMysqlTable(dim_sys_city_dict, "TestMysqlTble2", SaveMode.Append, proPath)
     32   }
     33 
     34   /**
     35     * 将DataFrame保存为Mysql表
     36     *
     37     * @param dataFrame 需要保存的dataFrame
     38     * @param tableName 保存的mysql 表名
     39     * @param saveMode  保存的模式 :Append、Overwrite、ErrorIfExists、Ignore
     40     * @param proPath   配置文件的路径
     41     */
     42   def saveASMysqlTable(dataFrame: DataFrame, tableName: String, saveMode: SaveMode, proPath: String) = {
     43     var table = tableName
     44     val properties: Properties = getProPerties(proPath)
     45     val prop = new Properties //配置文件中的key 与 spark 中的 key 不同 所以 创建prop 按照spark 的格式 进行配置数据库
     46     prop.setProperty("user", properties.getProperty("mysql.username"))
     47     prop.setProperty("password", properties.getProperty("mysql.password"))
     48     prop.setProperty("driver", properties.getProperty("mysql.driver"))
     49     prop.setProperty("url", properties.getProperty("mysql.url"))
     50     if (saveMode == SaveMode.Overwrite) {
     51       var conn: Connection = null
     52       try {
     53         conn = DriverManager.getConnection(
     54           prop.getProperty("url"),
     55           prop.getProperty("user"),
     56           prop.getProperty("password")
     57         )
     58         val stmt = conn.createStatement
     59         table = table.toUpperCase
     60         stmt.execute(s"truncate table $table") //为了不删除表结构,先truncate 再Append
     61         conn.close()
     62       }
     63       catch {
     64         case e: Exception =>
     65           println("MySQL Error:")
     66           e.printStackTrace()
     67       }
     68     }
     69     dataFrame.write.mode(SaveMode.Append).jdbc(prop.getProperty("url"), table, prop)
     70   }
     71 
     72   /**
     73     * 获取 Mysql 表的数据
     74     *
     75     * @param sqlContext
     76     * @param tableName 读取Mysql表的名字
     77     * @param proPath   配置文件的路径
     78     * @return 返回 Mysql 表的 DataFrame
     79     */
     80   def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String) = {
     81     val properties: Properties = getProPerties(proPath)
     82     sqlContext
     83       .read
     84       .format("jdbc")
     85       .option("url", properties.getProperty("mysql.url"))
     86       .option("driver", properties.getProperty("mysql.driver"))
     87       .option("user", properties.getProperty("mysql.username"))
     88       .option("password", properties.getProperty("mysql.password"))
     89       //        .option("dbtable", tableName.toUpperCase)
     90       .option("dbtable", tableName)
     91       .load()
     92 
     93   }
     94 
     95   /**
     96     * 获取 Mysql 表的数据 添加过滤条件
     97     *
     98     * @param sqlContext
     99     * @param table           读取Mysql表的名字
    100     * @param filterCondition 过滤条件
    101     * @param proPath         配置文件的路径
    102     * @return 返回 Mysql 表的 DataFrame
    103     */
    104   def readMysqlTable(sqlContext: SQLContext, table: String, filterCondition: String, proPath: String) = {
    105     val properties: Properties = getProPerties(proPath)
    106     var tableName = ""
    107     tableName = "(select * from " + table + " where " + filterCondition + " ) as t1"
    108     sqlContext
    109       .read
    110       .format("jdbc")
    111       .option("url", properties.getProperty("mysql.url"))
    112       .option("driver", properties.getProperty("mysql.driver"))
    113       .option("user", properties.getProperty("mysql.username"))
    114       .option("password", properties.getProperty("mysql.password"))
    115       .option("dbtable", tableName)
    116       .load()
    117   }
    118 
    119   /**
    120     * 获取配置文件
    121     *
    122     * @param proPath
    123     * @return
    124     */
    125   def getProPerties(proPath: String) = {
    126     val properties: Properties = new Properties()
    127     properties.load(new FileInputStream(proPath))
    128     properties
    129   }
    130 }

    4.测试

     1 def main(args: Array[String]): Unit = {
     2 hdfsPath = args(0)
     3 proPath = args(1)
     4 //不过滤读取
     5 val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
     6 dim_sys_city_dict.show(10)
     7 
     8 //保存mysql
     9 saveASMysqlTable(dim_sys_city_dict, "TestMysqlTble2", SaveMode.Append, proPath)
    10 }

    5.运行结果数据敏感进行过处理

     1 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
     2 |dict_id|city_id|city_name|city_code|group_id|group_name|area_code| bureau_id|sort|bureau_name|
     3 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
     4 |     1|    249|       **|    **_ab|     100|      **按时|    **-查到|xcaasd...| 21|    张三公司|
     5 |     2|    240|       **|    **_ab|     300|      **按时|    **-查到|xcaasd...| 21|    张三公司|
     6 |     3|    240|       **|    **_ab|     100|      **按时|    **-查到|xcaasd...| 21|    张三公司|
     7 |     4|    242|       **|    **_ab|     300|      **按时|    **-查到|xcaasd...| 01|    张三公司|
     8 |     5|    246|       **|    **_ab|     100|      **按时|    **-查到|xcaasd...| 01|    张三公司|
     9 |     6|    246|       **|    **_ab|     300|      **按时|    **-查到|xcaasd...| 01|    张三公司|
    10 |     7|    248|       **|    **_ab|     200|      **按时|    **-查到|xcaasd...| 01|    张三公司|
    11 |     8|    242|       **|    **_ab|     400|      **按时|    **-查到|xcaasd...| 01|    张三公司|
    12 |     9|    247|       **|    **_ab|     200|      **按时|    **-查到|xcaasd...| 01|    张三公司|
    13 |     0|    243|       **|    **_ab|     400|      **按时|    **-查到|xcaasd...| 01|    张三公司|
    14 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
    15 
    16 mysql> desc TestMysqlTble1;
    17 +-------------+-------------+------+-----+---------+-------+
    18 | Field       | Type        | Null | Key | Default | Extra |
    19 +-------------+-------------+------+-----+---------+-------+
    20 | dict_id     | varchar(32) | YES  |     | NULL    |       |
    21 | city_id     | varchar(32) | YES  |     | NULL    |       |
    22 | city_name   | varchar(32) | YES  |     | NULL    |       |
    23 | city_code   | varchar(32) | YES  |     | NULL    |       |
    24 | group_id    | varchar(32) | YES  |     | NULL    |       |
    25 | group_name  | varchar(32) | YES  |     | NULL    |       |
    26 | area_code   | varchar(32) | YES  |     | NULL    |       |
    27 | bureau_id   | varchar(64) | YES  |     | NULL    |       |
    28 | sort        | varchar(32) | YES  |     | NULL    |       |
    29 | bureau_name | varchar(32) | YES  |     | NULL    |       |
    30 +-------------+-------------+------+-----+---------+-------+
    31 10 rows in set (0.00 sec)
    32 
    33 mysql> desc TestMysqlTble2;
    34 +-------------+------+------+-----+---------+-------+
    35 | Field       | Type | Null | Key | Default | Extra |
    36 +-------------+------+------+-----+---------+-------+
    37 | dict_id     | text | YES  |     | NULL    |       |
    38 | city_id     | text | YES  |     | NULL    |       |
    39 | city_name   | text | YES  |     | NULL    |       |
    40 | city_code   | text | YES  |     | NULL    |       |
    41 | group_id    | text | YES  |     | NULL    |       |
    42 | group_name  | text | YES  |     | NULL    |       |
    43 | area_code   | text | YES  |     | NULL    |       |
    44 | bureau_id   | text | YES  |     | NULL    |       |
    45 | sort        | text | YES  |     | NULL    |       |
    46 | bureau_name | text | YES  |     | NULL    |       |
    47 +-------------+------+------+-----+---------+-------+
    48 10 rows in set (0.00 sec)
    49 
    50 
    51 mysql> select count(1) from TestMysqlTble1;
    52 +----------+
    53 | count(1) |
    54 +----------+
    55 |       21 |
    56 +----------+
    57 1 row in set (0.00 sec)
    58 
    59 mysql> select count(1) from TestMysqlTble2;
    60 +----------+
    61 | count(1) |
    62 +----------+
    63 |       21 |
    64 +----------+
    65 1 row in set (0.00 sec)

    6.效率问题

    一开始直接这么用的时候小数据还没什么,但是数据量大一点的时候速度就不行了,于是想方设法的想优化一下,用了几个手段效果不明显,然后进去看源代码,发现了两个关键的片段

     1  /**
     2    * Saves the content of the [[DataFrame]] to a external database table via JDBC. In the case the
     3    * table already exists in the external database, behavior of this function depends on the
     4    * save mode, specified by the `mode` function (default to throwing an exception).
     5    *
     6    * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
     7    * your external database systems.
     8    *
     9    * @param url JDBC database url of the form `jdbc:subprotocol:subname`
    10    * @param table Name of the table in the external database.
    11    * @param connectionProperties JDBC database connection arguments, a list of arbitrary string
    12    *                             tag/value. Normally at least a "user" and "password" property
    13    *                             should be included.
    14    *
    15    * @since 1.4.0
    16    */
    17   def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
    18     val props = new Properties()
    19     extraOptions.foreach { case (key, value) =>
    20       props.put(key, value)
    21     }
    22     // connectionProperties should override settings in extraOptions
    23     props.putAll(connectionProperties)
    24     val conn = JdbcUtils.createConnectionFactory(url, props)()
    25 
    26     try {
    27       var tableExists = JdbcUtils.tableExists(conn, url, table)
    28 
    29       if (mode == SaveMode.Ignore && tableExists) {
    30         return
    31       }
    32 
    33       if (mode == SaveMode.ErrorIfExists && tableExists) {
    34         sys.error(s"Table $table already exists.")
    35       }
    36 
    37       if (mode == SaveMode.Overwrite && tableExists) {
    38         JdbcUtils.dropTable(conn, table)
    39         tableExists = false
    40       }
    41 
    42       // Create the table if the table didn't exist.
    43       if (!tableExists) {
    44         val schema = JdbcUtils.schemaString(df, url)
    45         val sql = s"CREATE TABLE $table ($schema)"
    46         val statement = conn.createStatement
    47         try {
    48           statement.executeUpdate(sql)
    49         } finally {
    50           statement.close()
    51         }
    52       }
    53     } finally {
    54       conn.close()
    55     }
    56 
    57     JdbcUtils.saveTable(df, url, table, props)//-----------------------------关键点1
    58   }
    59 
    60 
    61   /**
    62    * Saves the RDD to the database in a single transaction.
    63    */
    64   def saveTable(
    65       df: DataFrame,
    66       url: String,
    67       table: String,
    68       properties: Properties) {
    69     val dialect = JdbcDialects.get(url)
    70     val nullTypes: Array[Int] = df.schema.fields.map { field =>
    71       getJdbcType(field.dataType, dialect).jdbcNullType
    72     }
    73 
    74     val rddSchema = df.schema
    75     val getConnection: () => Connection = createConnectionFactory(url, properties)
    76     val batchSize = properties.getProperty("batchsize", "1000").toInt
    77     df.foreachPartition { iterator => //------------------------------------关键点2
    78       savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
    79     }
    80   }

    也就是说,自带的方法就是按照分区来存的,每一个分区开启一个mysql连接,所以最简单的优化方式就是在保存之前对DataFrame进行重新分区,注意数据倾斜问题,不然可能效率没有提升。
    当然目前测试过最快的就是文件拿下来直接通过load data的命令导入mysql,但是这个比较麻烦。

    下面是分区示例

     1 def main(args: Array[String]): Unit = {
     2     hdfsPath = args(0)
     3     proPath = args(1)
     4     //不过滤读取
     5     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
     6     dim_sys_city_dict.show(10)
     7 
     8     //保存mysql
     9     saveASMysqlTable(dim_sys_city_dict.repartition(10), "TestMysqlTble2", SaveMode.Append, proPath)
    10   }

    7.总结

     将DataFrame写入mysql有几点需要注意的地方:

    • 需要保存的表最好事先建好,否则字段类型会使用默认的,Text类型实在是耗资源,对比前后两张表,下面分别为源表TestMysqlTble1和DataFrame保存的mysql表TestMysqlTble2
     1 mysql> desc TestMysqlTble1;                
     2 +-------------+-------------+------+-----+---------+-------+
     3 | Field       | Type        | Null | Key | Default | Extra |
     4 +-------------+-------------+------+-----+---------+-------+
     5 | dict_id     | varchar(32) | YES  |     | NULL    |       |
     6 | city_id     | varchar(32) | YES  |     | NULL    |       |
     7 | city_name   | varchar(32) | YES  |     | NULL    |       |
     8 | city_code   | varchar(32) | YES  |     | NULL    |       |
     9 | group_id    | varchar(32) | YES  |     | NULL    |       |
    10 | group_name  | varchar(32) | YES  |     | NULL    |       |
    11 | area_code   | varchar(32) | YES  |     | NULL    |       |
    12 | bureau_id   | varchar(64) | YES  |     | NULL    |       |
    13 | sort        | varchar(32) | YES  |     | NULL    |       |
    14 | bureau_name | varchar(32) | YES  |     | NULL    |       |
    15 +-------------+-------------+------+-----+---------+-------+
    16 10 rows in set (0.00 sec)
    17 
    18 mysql> desc TestMysqlTble2;
    19 +-------------+------+------+-----+---------+-------+
    20 | Field       | Type | Null | Key | Default | Extra |
    21 +-------------+------+------+-----+---------+-------+
    22 | dict_id     | text | YES  |     | NULL    |       |
    23 | city_id     | text | YES  |     | NULL    |       |
    24 | city_name   | text | YES  |     | NULL    |       |
    25 | city_code   | text | YES  |     | NULL    |       |
    26 | group_id    | text | YES  |     | NULL    |       |
    27 | group_name  | text | YES  |     | NULL    |       |
    28 | area_code   | text | YES  |     | NULL    |       |
    29 | bureau_id   | text | YES  |     | NULL    |       |
    30 | sort        | text | YES  |     | NULL    |       |
    31 | bureau_name | text | YES  |     | NULL    |       |
    32 +-------------+------+------+-----+---------+-------+
    33 10 rows in set (0.00 sec)
    •  关于 SaveMode.Overwrite 
     1 def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
     2     val props = new Properties()
     3     extraOptions.foreach { case (key, value) =>
     4       props.put(key, value)
     5     }
     6     // connectionProperties should override settings in extraOptions
     7     props.putAll(connectionProperties)
     8     val conn = JdbcUtils.createConnectionFactory(url, props)()
     9 
    10     try {
    11       var tableExists = JdbcUtils.tableExists(conn, url, table)
    12 
    13       if (mode == SaveMode.Ignore && tableExists) {
    14         return
    15       }
    16 
    17       if (mode == SaveMode.ErrorIfExists && tableExists) {
    18         sys.error(s"Table $table already exists.")
    19       }
    20 
    21       if (mode == SaveMode.Overwrite && tableExists) {
    22         JdbcUtils.dropTable(conn, table)//----------------------------------------关键点1
    23         tableExists = false
    24       }
    25 
    26       // Create the table if the table didn't exist.
    27       if (!tableExists) {
    28         val schema = JdbcUtils.schemaString(df, url)
    29         val sql = s"CREATE TABLE $table ($schema)"
    30         val statement = conn.createStatement
    31         try {
    32           statement.executeUpdate(sql)
    33         } finally {
    34           statement.close()
    35         }
    36       }
    37     } finally {
    38       conn.close()
    39     }
    40 
    41     JdbcUtils.saveTable(df, url, table, props)
    42   }
    43 
    44  /**
    45    * Drops a table from the JDBC database.
    46    */
    47   def dropTable(conn: Connection, table: String): Unit = {
    48     val statement = conn.createStatement
    49     try {
    50       statement.executeUpdate(s"DROP TABLE $table")//-------------------------------------关键点2
    51     } finally {
    52       statement.close()
    53     }
    54   }

    从上述两段关键代码可以看到,在写入的时候会先判断表存不存在,SaveMode.Overwrite 的时候会执行 dropTable(conn: Connection, table: String)把原来的表删除掉,这也意味着你会失去你的表结构,新建的表会出现上一个问题都用默认类型,所以在保存的方法中我添加了下面的操作

     1 if (saveMode == SaveMode.Overwrite) {
     2  51       var conn: Connection = null
     3  52       try {
     4  53         conn = DriverManager.getConnection(
     5  54           prop.getProperty("url"),
     6  55           prop.getProperty("user"),
     7  56           prop.getProperty("password")
     8  57         )
     9  58         val stmt = conn.createStatement
    10  59         table = table.toUpperCase
    11  60         stmt.execute(s"truncate table $table") //为了不删除表结构,先truncate 再Append
    12  61         conn.close()
    13  62       }
    14  63       catch {
    15  64         case e: Exception =>
    16  65           println("MySQL Error:")
    17  66           e.printStackTrace()
    18  67       }
    truncate仅仅是删除数据,并不删除结构。

    如果表一开始不存在

    如果一开始不存在需要分两种情况:

    1.非SaveMode.Overwrite模式

    没有问题,会直接建表,用默认的数据类型

    2.SaveMode.Overwrite模式

    会报错,下面是在没有TestMysqlTble2的情况下使用SaveMode.Overwrite

     1 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'iptv.TESTMYSQLTBLE2' doesn't exist
     2         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
     3         at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
     4         at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
     5         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
     6         at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
     7         at com.mysql.jdbc.Util.getInstance(Util.java:387)
     8         at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:939)
     9         at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3878)
    10         at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3814)
    11         at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2478)
    12         at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2625)
    13         at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2547)
    14         at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2505)
    15         at com.mysql.jdbc.StatementImpl.executeInternal(StatementImpl.java:840)
    16         at com.mysql.jdbc.StatementImpl.execute(StatementImpl.java:740)
    17         at com.iptv.job.basedata.SaveDataFrameASMysql$.saveASMysqlTable(SaveDataFrameASMysql.scala:62)
    18         at com.iptv.job.basedata.SaveDataFrameASMysql$.main(SaveDataFrameASMysql.scala:33)
    19         at com.iptv.job.basedata.SaveDataFrameASMysql.main(SaveDataFrameASMysql.scala)
    20         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    21         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    22         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    23         at java.lang.reflect.Method.invoke(Method.java:498)
    24         at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    25         at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    26         at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    27         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    28         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

    报错详情

    1 at com.iptv.job.basedata.SaveDataFrameASMysql$.saveASMysqlTable(SaveDataFrameASMysql.scala:62)
    2 生面报错位置对应的代码为
    3 stmt.execute(s"truncate table $table") //为了不删除表结构,先truncate 再Append
    4 即truncate需要表存在

    至此,DataFrame写mysql功能实现

    文章为个人工作总结,转载请注明出处!!!!!!!

  • 相关阅读:
    重要常识
    ORACLE EBS中有些VIEW经常被OU屏蔽掉数据
    如何使用ftp从Metalink上下载补丁
    SO做了Booked之后,一直处理于“已延交”,发运事务处理的活动区变灰
    WIP模块常用表结构
    BOM查看多个物料下的子物料组件
    OE模块常用表结构
    如何诊断OM中的订单出现的问题
    INV模块常用表结构
    OM定义运费和特别费用类型
  • 原文地址:https://www.cnblogs.com/lillcol/p/9796935.html
Copyright © 2011-2022 走看看