目的:用Talend调用Sql Server带表结构list参数的存储过程
实现:因Talend目前组件好像没有实现这样的功能,又不想再写个组件,故使用tJavaRow来编码实现。
1、我们首先可以通过Maven来下载比较新的mssql-jdbc.jar,我是下载如下jar测试的!
<dependency> <groupId>com.microsoft.sqlserver</groupId> <artifactId>mssql-jdbc</artifactId> <version>6.1.0.jre8</version> </dependency>
2、建立测试的Job,我建立的是一个简单的restful接口调用sql server存储过程的示例,如下:
在该示例中:
tLibraryLoad:主要是把我们第一步下载的jar加载进来;
tRESTRequest:输出body设置为Document;
tJavaRow:自己写代码调用带表结构list参数的存储过程,Base Settings code如下(这只是测试示例代码,实际根据自己的需求修改及优化):
java.sql.Connection mssql_conn = null; com.microsoft.sqlserver.jdbc.SQLServerCallableStatement mssql_statement = null; String runPlan = null; String runStatus = ""; try { org.dom4j.Document doc = (org.dom4j.Document) ((routines.system.Document) inputdata.body).getDocument(); org.dom4j.Element root = doc.getRootElement(); System.out.println("Root: " + root.getName()); org.dom4j.Element firstPlanElement = root.element("Plan"); if (firstPlanElement != null) { System.out.println("Plan: " + firstPlanElement.getText()); runPlan = firstPlanElement.getText(); } else { System.out.println("Plan is null!"); } java.util.List<org.dom4j.Element> itemListElements = root.elements("ItemList"); System.out.println("itemListElements: " + itemListElements.size()); com.microsoft.sqlserver.jdbc.SQLServerDataTable sourceTable = new com.microsoft.sqlserver.jdbc.SQLServerDataTable(); sourceTable.addColumnMetadata("ItemName", java.sql.Types.VARCHAR); sourceTable.addColumnMetadata("Weight", java.sql.Types.DECIMAL); sourceTable.addColumnMetadata("Quantity", java.sql.Types.DECIMAL); sourceTable.addColumnMetadata("Location", java.sql.Types.VARCHAR); sourceTable.addColumnMetadata("OrderNo", java.sql.Types.VARCHAR); sourceTable.addColumnMetadata("Operator", java.sql.Types.VARCHAR); for (org.dom4j.Element itemElement : itemListElements) { org.dom4j.Element firstItemNameElement = itemElement.element("ItemName"); org.dom4j.Element firstWeightElement = itemElement.element("Weight"); org.dom4j.Element firstQuantityElement = itemElement.element("Quantity"); org.dom4j.Element firstLocationElement = itemElement.element("Location"); org.dom4j.Element firstOrderNoElement = itemElement.element("OrderNo"); org.dom4j.Element firstOperatorElement = itemElement.element("Operator"); sourceTable.addRow(firstItemNameElement.getText(), Double.parseDouble(firstWeightElement.getText()),Double.parseDouble(firstQuantityElement.getText()),firstLocationElement.getText(),firstOrderNoElement.getText(),firstOperatorElement.getText()); } String driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; java.lang.Class.forName(driverClass); String mssqlurl = "jdbc:sqlserver://" + context.MSDB_Host; if (!"".equals("" + context.MSDB_Port)) { mssqlurl += ":" + "" + context.MSDB_Port + ";"; } if (!"".equals(context.MSDB_Database)) { mssqlurl += "DatabaseName=" + context.MSDB_Database + ";"; } mssqlurl += "appName=" + projectName + ";" + ""; mssql_conn = java.sql.DriverManager.getConnection(mssqlurl, context.MSDB_Username, context.MSDB_Password); mssql_statement = (com.microsoft.sqlserver.jdbc.SQLServerCallableStatement)mssql_conn.prepareCall("{call " + "[WDPM].[usp_SaveStockItems]" + "(?,?)}"); if (runPlan == null) { mssql_statement.setNull(1, java.sql.Types.VARCHAR); } else { mssql_statement.setString(1, runPlan); } mssql_statement.setStructured(2, "[WDPM].[MaterialItem]", sourceTable); mssql_statement.execute(); runStatus = "Success"; } catch (Exception e) { runStatus = "Error:"+e.getMessage(); //throw new RuntimeException(e); } finally { if (mssql_statement!=null){ try { mssql_statement.close(); }catch (java.sql.SQLException e){ e.printStackTrace(); } } if (mssql_conn!=null){ try { mssql_conn.close(); }catch (java.sql.SQLException e){ e.printStackTrace(); } } } row1.Plan = runPlan; row1.Status = runStatus;
Advanced Settings code如下:
import com.microsoft.sqlserver.jdbc.SQLServerDriver;
这样调用Sql Server带表结构list参数的存储过程就基本完成了,其他restful返回之类的就不在这累述了!
示例中使用的Sql Server的存储过程脚本如下,仅供参考:
创建自定义的table类型:
CREATE TYPE [WDPM].[MaterialItem] AS TABLE( [ItemName] [varchar](50) NULL, [Weight] [decimal](12, 4) NULL, [Quantity] [decimal](12, 4) NULL, [Location] [varchar](30) NULL, [OrderNo] [varchar](30) NULL, [Operator] [varchar](30) NULL ) GO
创建存储过程:
create procedure [WDPM].[usp_SaveStockItems] ( @Plan varchar(36), @ItemList [WDPM].[MaterialItem] readonly ) as begin declare @WarehouseCode char(2) = 'FP' if not exists(select 1 from [WDPM].[testSnapshot] where [Plan] = @Plan) begin raiserror('计划 %s 不存在!', 16, 1, @Plan) return end insert [WDPM].[testTaking] ([Plan], [ItemName], [Weight], [Quantity], [Location] , [WarehouseCode], [Remark], [Operator] , [CreatedDate], [CreatedBy], [ModifiedDate], [ModifiedBy]) select @Plan, [ItemName], [Weight], [Quantity], [Location] , @WarehouseCode, '', [Operator] , getdate(), 0, getdate(), 0 from @ItemList end
sql调用样例:
declare @ItemList [WDPM].[MaterialItem] declare @Plan varchar(36) = 'TEST000001' insert @ItemList (ItemName, Weight, Quantity, Location, OrderNo, Operator) select Fabric_No, Weight, Quantity, Location, Note_No, 'AngusYang' from [fpStore] s with(nolock) where [Location] = 'Test' exec [WDPM].[usp_SaveStockItems] @Plan, @ItemList