zoukankan      html  css  js  c++  java
  • Data Flow ->> Script Component

    和Control Flow中的Script Task非常类似,不同的是Script Component是Per-Row的执行类型。打个比方,在Script Component中加入两个Output的字段,Script中针对每一行可以输出不同的值给这两个Output字段。Script Component要求你指定它是Source、Destination和Transformation中的哪一种。Source只有Input,Destination和Transformation可以有Input和Output,不同的是Destination的Input不能同时作为Output,而Transfomation两者都可以有。

    --------------------------------------- Update 10/13/2015 --------------------------------------------------------------------------------------------

    今天刚好有一个场景需要实现,具体就是要更新某张表的字段,因为表名是动态的,希望用Data Flow数据库上游生成的值更新。这个实现过程其实可以通过把数据保存到一个文件或者数据库表中再join的,但是因为表名是不确定的,如果用SQL实现就必须用动态SQ。还有另一个办法就是用Script Component。就像下图。通过加载、筛选再输出表字段的结果集给Fuzzy lookup,生成similarity和confidence给下游作为输入更新表的字段。这里有两个Script Component。

    input component

    #region Help:  Introduction to the Script Component
    /* The Script Component allows you to perform virtually any operation that can be accomplished in
     * a .Net application within the context of an Integration Services data flow.
     *
     * Expand the other regions which have "Help" prefixes for examples of specific ways to use
     * Integration Services features within this script component. */
    #endregion
    
    #region Namespaces
    using System;
    using System.Data;
    using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
    using Microsoft.SqlServer.Dts.Runtime.Wrapper;
    using System.Data.SqlClient;
    #endregion
    
    
    /// <summary>
    /// This is the class to which to add your code.  Do not change the name, attributes, or parent
    /// of this class.
    /// </summary>
    [Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
    public class ScriptMain : UserComponent
    {
        #region Help:  Using Integration Services variables and parameters
        /* To use a variable in this script, first ensure that the variable has been added to
         * either the list contained in the ReadOnlyVariables property or the list contained in
         * the ReadWriteVariables property of this script component, according to whether or not your
         * code needs to write into the variable.  To do so, save this script, close this instance of
         * Visual Studio, and update the ReadOnlyVariables and ReadWriteVariables properties in the
         * Script Transformation Editor window.
         * To use a parameter in this script, follow the same steps. Parameters are always read-only.
         *
         * Example of reading from a variable or parameter:
         *  DateTime startTime = Variables.MyStartTime;
         *
         * Example of writing to a variable:
         *  Variables.myStringVariable = "new value";
         */
        #endregion
    
        #region Help:  Using Integration Services Connnection Managers
        /* Some types of connection managers can be used in this script component.  See the help topic
         * "Working with Connection Managers Programatically" for details.
         *
         * To use a connection manager in this script, first ensure that the connection manager has
         * been added to either the list of connection managers on the Connection Managers page of the
         * script component editor.  To add the connection manager, save this script, close this instance of
         * Visual Studio, and add the Connection Manager to the list.
         *
         * If the component needs to hold a connection open while processing rows, override the
         * AcquireConnections and ReleaseConnections methods.
         * 
         * Example of using an ADO.Net connection manager to acquire a SqlConnection:
         *  object rawConnection = Connections.SalesDB.AcquireConnection(transaction);
         *  SqlConnection salesDBConn = (SqlConnection)rawConnection;
         *
         * Example of using a File connection manager to acquire a file path:
         *  object rawConnection = Connections.Prices_zip.AcquireConnection(transaction);
         *  string filePath = (string)rawConnection;
         *
         * Example of releasing a connection manager:
         *  Connections.SalesDB.ReleaseConnection(rawConnection);
         */
        #endregion
    
        #region Help:  Firing Integration Services Events
        /* This script component can fire events.
         *
         * Example of firing an error event:
         *  ComponentMetaData.FireError(10, "Process Values", "Bad value", "", 0, out cancel);
         *
         * Example of firing an information event:
         *  ComponentMetaData.FireInformation(10, "Process Values", "Processing has started", "", 0, fireAgain);
         *
         * Example of firing a warning event:
         *  ComponentMetaData.FireWarning(10, "Process Values", "No rows were received", "", 0);
         */
        #endregion
    
        /// <summary>
        /// This method is called once, before rows begin to be processed in the data flow.
        ///
        /// You can remove this method if you don't need to do anything here.
        /// </summary>
        /// 
    
        IDTSConnectionManager100 connMgr;
        SqlConnection sqlConn;
        SqlDataReader sqlReader;
    
        public override void AcquireConnections(object Transaction)
        {
            connMgr = this.Connections.StageDB;
            sqlConn = (SqlConnection)connMgr.AcquireConnection(null);
    
        }
    
    
        public override void PreExecute()
        {
            SqlCommand cmd = new SqlCommand("SELECT src_column_name FROM Stage.dbo.[" + this.Variables.varvalidtablename + "] Where src_column_name is not null and trg_column_name is null", sqlConn);
            sqlReader = cmd.ExecuteReader();
            /*
             * Add your code here
             */
        }
    
        /// <summary>
        /// This method is called after all the rows have passed through this component.
        ///
        /// You can delete this method if you don't need to do anything here.
        /// </summary>
        public override void PostExecute()
        {
            sqlReader.Close();
            /*
             * Add your code here
             */
        }
    
        public override void CreateNewOutputRows()
        {
            /*
              Add rows by calling the AddRow method on the member variable named "<Output Name>Buffer".
              For example, call MyOutputBuffer.AddRow() if your output was named "MyOutput".
            */
            while (sqlReader.Read())
            {
                {
                    MyOutputBuffer.AddRow();
                    MyOutputBuffer.SrcColumnName = sqlReader.GetString(0);
                }
            }
        }
    
        public override void ReleaseConnections()
        {
    
            connMgr.ReleaseConnection(sqlConn);
    
        }
    
    }

    destination script component

    #region Help:  Introduction to the Script Component
    /* The Script Component allows you to perform virtually any operation that can be accomplished in
     * a .Net application within the context of an Integration Services data flow.
     *
     * Expand the other regions which have "Help" prefixes for examples of specific ways to use
     * Integration Services features within this script component. */
    #endregion
    
    #region Namespaces
    using System;
    using System.Data;
    using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
    using Microsoft.SqlServer.Dts.Runtime.Wrapper;
    using System.Data.SqlClient;
    #endregion
    
    /// <summary>
    /// This is the class to which to add your code.  Do not change the name, attributes, or parent
    /// of this class.
    /// </summary>
    [Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
    public class ScriptMain : UserComponent
    {
        #region Help:  Using Integration Services variables and parameters
        /* To use a variable in this script, first ensure that the variable has been added to
         * either the list contained in the ReadOnlyVariables property or the list contained in
         * the ReadWriteVariables property of this script component, according to whether or not your
         * code needs to write into the variable.  To do so, save this script, close this instance of
         * Visual Studio, and update the ReadOnlyVariables and ReadWriteVariables properties in the
         * Script Transformation Editor window.
         * To use a parameter in this script, follow the same steps. Parameters are always read-only.
         *
         * Example of reading from a variable or parameter:
         *  DateTime startTime = Variables.MyStartTime;
         *
         * Example of writing to a variable:
         *  Variables.myStringVariable = "new value";
         */
        #endregion
    
        #region Help:  Using Integration Services Connnection Managers
        /* Some types of connection managers can be used in this script component.  See the help topic
         * "Working with Connection Managers Programatically" for details.
         *
         * To use a connection manager in this script, first ensure that the connection manager has
         * been added to either the list of connection managers on the Connection Managers page of the
         * script component editor.  To add the connection manager, save this script, close this instance of
         * Visual Studio, and add the Connection Manager to the list.
         *
         * If the component needs to hold a connection open while processing rows, override the
         * AcquireConnections and ReleaseConnections methods.
         * 
         * Example of using an ADO.Net connection manager to acquire a SqlConnection:
         *  object rawConnection = Connections.SalesDB.AcquireConnection(transaction);
         *  SqlConnection salesDBConn = (SqlConnection)rawConnection;
         *
         * Example of using a File connection manager to acquire a file path:
         *  object rawConnection = Connections.Prices_zip.AcquireConnection(transaction);
         *  string filePath = (string)rawConnection;
         *
         * Example of releasing a connection manager:
         *  Connections.SalesDB.ReleaseConnection(rawConnection);
         */
        #endregion
    
        #region Help:  Firing Integration Services Events
        /* This script component can fire events.
         *
         * Example of firing an error event:
         *  ComponentMetaData.FireError(10, "Process Values", "Bad value", "", 0, out cancel);
         *
         * Example of firing an information event:
         *  ComponentMetaData.FireInformation(10, "Process Values", "Processing has started", "", 0, fireAgain);
         *
         * Example of firing a warning event:
         *  ComponentMetaData.FireWarning(10, "Process Values", "No rows were received", "", 0);
         */
        #endregion
    
        IDTSConnectionManager100 connMgr;
        SqlConnection sqlConn;
        SqlCommand sqlCmd;
        SqlParameter sqlParam;
    
        /// <summary>
        /// This method is called once, before rows begin to be processed in the data flow.
        ///
        /// You can remove this method if you don't need to do anything here.
        /// </summary>
        /// 
    
        public override void AcquireConnections(object Transaction)
        {
    
            connMgr = this.Connections.StageDB;
            sqlConn = (SqlConnection)connMgr.AcquireConnection(null);
    
        }
    
        public override void PreExecute()
        {
            sqlCmd = new SqlCommand("DECLARE @str NVARCHAR(30); SET @str = @trg_column_name + ':' + CAST(@similarity AS NVARCHAR(10)) + ':' + CAST(@confidence AS NVARCHAR(10));" + 
                " UPDATE Stage.dbo.[" + this.Variables.varvalidtablename + "]" +
                " SET src_similarity_confidence_with_trg = ISNULL(src_similarity_confidence_with_trg,'') + '{' + @str + '}' WHERE src_column_name = @src_column_name", sqlConn);
            sqlParam = new SqlParameter("@similarity", SqlDbType.Float);
            sqlCmd.Parameters.Add(sqlParam);
            sqlParam = new SqlParameter("@confidence", SqlDbType.Float);
            sqlCmd.Parameters.Add(sqlParam);
            sqlParam = new SqlParameter("@src_column_name", SqlDbType.NVarChar, 128);
            sqlCmd.Parameters.Add(sqlParam);
            sqlParam = new SqlParameter("@trg_column_name", SqlDbType.NVarChar, 128);
            sqlCmd.Parameters.Add(sqlParam);
            /*
             * Add your code here
             */
        }
    
        /// <summary>
        /// This method is called after all the rows have passed through this component.
        ///
        /// You can delete this method if you don't need to do anything here.
        /// </summary>
        public override void PostExecute()
        {
            base.PostExecute();
            /*
             * Add your code here
             */
        }
    
        /// <summary>
        /// This method is called once for every row that passes through the component from Input0.
        ///
        /// Example of reading a value from a column in the the row:
        ///  string zipCode = Row.ZipCode
        ///
        /// Example of writing a value to a column in the row:
        ///  Row.ZipCode = zipCode
        /// </summary>
        /// <param name="Row">The row that is currently passing through the component</param>
        public override void Input0_ProcessInputRow(Input0Buffer Row)
        {
            sqlCmd.Parameters["@similarity"].Value = Row.Similarity;
            sqlCmd.Parameters["@confidence"].Value = Row.Confidence;
            sqlCmd.Parameters["@src_column_name"].Value = Row.SrcColumnName;
            sqlCmd.Parameters["@trg_column_name"].Value = Row.SrcColumnName;
    
            sqlCmd.ExecuteNonQuery();
        }
    
    
        public override void ReleaseConnections()
        {
    
            connMgr.ReleaseConnection(sqlConn);
    
        }
    
    }
  • 相关阅读:
    POI做报表
    (一) DB2的备份和恢复:准备
    西天取经为节约成本该裁掉哪位?
    python中configpraser模块
    python中subprocess模块
    python中os模块
    python中random模块
    python中time模块和datetime模块
    python中序列化json模块和pickle模块
    迭代器生成器函数的递归调用与二分法
  • 原文地址:https://www.cnblogs.com/jenrrychen/p/4557166.html
Copyright © 2011-2022 走看看