zoukankan      html  css  js  c++  java
  • 在SQL Server 2005中实现异步触发器架构

    在SQL Server 2005数据库中,通过新增的Service Broker可以实现异步触发器的处理功能。本文提供一种使用Service Broker实现的通用异步触发器方法。

         在本这个方法中,通过Service Broker构造异步触发器处理架构,对于要使用这种架构的表,只需要创建相应的触发器及处理触发器中数据的存储过程,并且在异步触发器架构中登记触发器和处理的存储过程即可。如果一个触发器中的数据要被多个表使用,只需要在dbo.tb_async_trigger_subscribtion中登记相应处理数据的存储过程即可,即一个表的数据变更可以被多个表订阅(使用)。
    架构的步骤如下:
    1. 数据库配置
    需要配置数据库以允许使用Service Broker。本文以tempdb库为例,故配置均在tempdb上下文中进行。
    USE tempdb
    GO

    -- 允许Service Broker
    ALTER DATABASE tempdb SET
    ENABLE_BROKER
    GO
    2. 构建异步触发器相关的对象

    下面的T-SQL创建异步触发器处理架构相关的对象

    -- =======================================
    -- 异步触发器对象
    -- 1. service broker 对象
    -- =======================================
    -- a. message type, 要求使用xml 传递数据
    CREATE MESSAGE TYPE MSGT_async_trigger
    VALIDATION = WELL_FORMED_XML
    GO

    -- b. 只需要发送消息
    CREATE CONTRACT CNT_async_trigger(
        MSGT_async_trigger SENT BY INITIATOR)
    GO

    -- c. 存储消息的队列
    CREATE QUEUE dbo.Q_async_trigger
    GO

    -- d. 用于消息处理的服务
    CREATE SERVICE SRV_async_trigger
        ON QUEUE dbo.Q_async_trigger(
            CNT_async_trigger)
    GO


    -- =======================================
    -- 异步触发器对象
    -- 2. 异步触发器处理的对象
    -- =======================================
    -- a. 登记异步触发器的表
    CREATE TABLE dbo.tb_async_trigger(
        ID int IDENTITY
            PRIMARY KEY,
        table_name sysname,
        trigger_name sysname
    )

    -- b. 登记订阅异步触发器的存储过程
    CREATE TABLE dbo.tb_async_trigger_subscriber(
        ID int IDENTITY
            PRIMARY KEY,
        procedure_name sysname
    )

    -- c. 异步触发器和存储过程之间的订阅关系
    CREATE TABLE dbo.tb_async_trigger_subscribtion(
        trigger_id int
            REFERENCES dbo.tb_async_trigger(
                ID),
        procedure_id int
            REFERENCES dbo.tb_async_trigger_subscriber(
                ID),
        PRIMARY KEY(
            trigger_id, procedure_id)
    )
    GO

    -- d. 发送消息的存储过程
    CREATE PROC dbo.p_async_trigger_send
        @message xml
    AS
    SET NOCOUNT ON
    DECLARE
        @handle uniqueidentifier
    BEGIN DIALOG CONVERSATION @handle
        FROM SERVICE [SRV_async_trigger]
        TO SERVICE N'SRV_async_trigger'
        ON CONTRACT CNT_async_trigger
        WITH
            ENCRYPTION = OFF;
    SEND
        ON CONVERSATION @handle
        MESSAGE TYPE MSGT_async_trigger(
            @message);
    -- 消息发出即可, 不需要回复, 因此发出后即可结束会话
    END CONVERSATION @handle
    GO

    -- e. 处理异步触发器发送的消息
    CREATE PROC dbo.p_async_trigger_process
    AS
    SET NOCOUNT ON
    DECLARE
        @handle uniqueidentifier,
        @message xml,
        @rows int
    SET @rows = 1
    WHILE @rows > 0
    BEGIN
        -- 处理已经收到的消息
        WAITFOR(
            RECEIVE TOP(1)
                @handle = conversation_handle,
                @message = CASE
                                WHEN message_type_name = N'MSGT_async_trigger'
                                    THEN CONVERT(xml, message_body)
                                ELSE NULL
                            END
            FROM dbo.Q_async_trigger
        ), TIMEOUT 10
        SET @rows = @@ROWCOUNT
        IF @rows > 0
        BEGIN
            -- 结束会话
            END CONVERSATION @handle;

            -- 处理消息
            -- a. 取发送者信息
            DECLARE
                @table_name sysname,
                @trigger_name sysname,
                @sql nvarchar(max)
            SELECT
                @table_name = @message.value('(/root/table_name)[1]', 'sysname'),
                @trigger_name = @message.value('(/root/trigger_name)[1]', 'sysname')

            -- b. 调用异步触发器订阅的存储过程
            ;WITH
            SUB AS(
                SELECT
                    TR.table_name,
                    TR.trigger_name,
                    SUB.procedure_name
                FROM dbo.tb_async_trigger TR,
                    dbo.tb_async_trigger_subscriber SUB,
                    dbo.tb_async_trigger_subscribtion TRSUB
                WHERE TRSUB.trigger_id = TR.ID
                    AND TRSUB.procedure_id = SUB.ID
            )
            SELECT
                @sql = (
                        SELECT
                            N'
    EXEC ' + procedure_name + N'
        @message
    '
                        FROM SUB
                        WHERE table_name = @table_name
                            AND trigger_name = @trigger_name
                        FOR XML PATH(''), ROOT('r'), TYPE
                    ).value('(/r)[1]', 'nvarchar(max)')
            EXEC sp_executesql @sql, N'@message xml', @message
        END
    END
    GO

    -- f. 绑定处理的存储过程到队列
    ALTER QUEUE dbo.Q_async_trigger
        WITH ACTIVATION(
            STATUS = ON,
            PROCEDURE_NAME = dbo.p_async_trigger_process,
            MAX_QUEUE_READERS = 10,
            EXECUTE AS OWNER)
    GO

    3. 使用示例

    下面的T-SQL演示使用异步触发器构架。示例中创建了三个表:

    Dbo.t1 这个是源表,此表的数据变化将用于其他表

    Dbo.t2 这个表要求保持与dbo.t1同步

    Dbo.tb_log 这个表记录dbo.t1中的数据变化情况

    触发器 TR_async_trigger 用于将表Dbo.t1中的数据变化发送到异步触发器构架中。dbo.p_Sync_t1_t2和dbo.p_Record_log用于处理dbo.t1于中变化的数据。

    在处理时,需要把相关的信息登记到异步触发器架构的表中。

    -- =======================================
    -- 3. 使用示例
    -- =======================================
    -- ===============================
    -- 测试对象
    -- a. 源表
    CREATE TABLE dbo.t1(
        id int IDENTITY
            PRIMARY KEY,
        col int
    )
    -- b. 同步的目的表
    CREATE TABLE dbo.t2(
        id int IDENTITY
            PRIMARY KEY,
        col int
    )
    -- c. 记录操作的日志表
    CREATE TABLE dbo.tb_log(
        id int IDENTITY
            PRIMARY KEY,
        user_name sysname,
        operate_type varchar(10),
        inserted xml,
        deleted xml
    )
    GO

    -- a. 异步发送处理消息的触发器
    CREATE TRIGGER TR_async_trigger
    ON dbo.t1
    FOR INSERT, UPDATE, DELETE
    AS
    IF @@ROWCOUNT = 0
        RETURN

    SET NOCOUNT ON

    -- 将要发送的数据生成xml 数据
    DECLARE
        @message xml
    SELECT
        @message = (
                SELECT
                    table_name = (
                            SELECT TOP 1
                                OBJECT_NAME(parent_object_id)
                            FROM sys.objects
                            WHERE object_id = @@PROCID),
                    trigger_name = OBJECT_NAME(@@PROCID),
                    user_name = SUSER_SNAME(),
                    inserted = (
                            SELECT * FROM inserted FOR XML AUTO, TYPE),
                    deleted = (
                            SELECT * FROM deleted FOR XML AUTO, TYPE)
                FOR XML PATH(''), ROOT('root'), TYPE
            )
    -- 发送消息
    EXEC dbo.p_async_trigger_send
        @message = @message
    GO

    -- b. 处理异步触发器的存储过程
    -- b.1 同步到t2 的存储过程
    CREATE PROC dbo.p_Sync_t1_t2
        @message xml
    AS
    SET NOCOUNT ON
    DECLARE
        @inserted bit,
        @deleted bit
    SELECT
        @inserted = @message.exist('/root/inserted'),
        @deleted = @message.exist('/root/deleted')
    IF @inserted = 1
        IF @deleted = 1 -- 更新
        BEGIN
            ;WITH
            I AS(
                SELECT
                    id = T.c.value('@id[1]', 'int'),
                    col = T.c.value('@col[1]', 'int')
                FROM @message.nodes('/root/inserted/inserted') T(c)
            ),
            D AS(
                SELECT
                    id = T.c.value('@id[1]', 'int'),
                    col = T.c.value('@col[1]', 'int')
                FROM @message.nodes('/root/deleted/deleted') T(c)
            )
            UPDATE A SET
                col = I.col
            FROM dbo.t2 A, I, D
            WHERE A.ID = I.ID
                AND I.ID = D.ID
        END
        ELSE            -- 插入
        BEGIN
            SET IDENTITY_INSERT dbo.t2 ON
            ;WITH
            I AS(
                SELECT
                    id = T.c.value('@id[1]', 'int'),
                    col = T.c.value('@col[1]', 'int')
                FROM @message.nodes('/root/inserted/inserted') T(c)
            )
            INSERT dbo.t2(
                id, col)
            SELECT
                id, col
            FROM I
            SET IDENTITY_INSERT dbo.t2 OFF
        END
    ELSE                -- 删除
    BEGIN
        ;WITH
        D AS(
            SELECT
                id = T.c.value('@id[1]', 'int'),
                col = T.c.value('@col[1]', 'int')
            FROM @message.nodes('/root/deleted/deleted') T(c)
        )
        DELETE A
        FROM dbo.t2 A, D
        WHERE A.ID = D.ID
    END
    GO

    -- b.2 记录操作记录到dbo.tb_log 的存储过程
    CREATE PROC dbo.p_Record_log
        @message xml
    AS
    SET NOCOUNT ON
    DECLARE
        @inserted bit,
        @deleted bit
    SELECT
        @inserted = @message.exist('/root/inserted'),
        @deleted = @message.exist('/root/deleted')
    INSERT dbo.tb_log(
        user_name,
        operate_type,
        inserted,
        deleted)
    SELECT
        @message.value('(/root/user_name)[1]', 'sysname'),
        operate_type = CASE
                        WHEN @inserted = 1 AND @deleted = 1 THEN 'update'
                        WHEN @inserted = 1 THEN 'insert'
                        WHEN @deleted = 1 THEN 'delete'
                    END,
        @message.query('/root/inserted'),
        @message.query('/root/deleted')
    GO


    -- ===============================
    -- 在异步触发器处理系统中登记对象
    INSERT dbo.tb_async_trigger(
        table_name, trigger_name)
    VALUES(
        N't1', N'TR_async_trigger')

    INSERT dbo.tb_async_trigger_subscriber(
        procedure_name)
    SELECT N'dbo.p_Sync_t1_t2' UNION ALL
    SELECT N'dbo.p_Record_log'

    INSERT dbo.tb_async_trigger_subscribtion(
        trigger_id, procedure_id)
    SELECT 1, 1 UNION ALL
    SELECT 1, 2
    GO

    4.使用测试

    下面的T-SQL修改表dbo.t1中的数据,并检查dbo.t2、dbo.tb_log中的数据,以确定异步触发器架构的工作是否成功。

    执行完成后可以看到dbo.t2、dbo.tb_log中有相关的记录。



    -- ===============================
    -- 测试
    INSERT dbo.t1
    SELECT 1 UNION ALL
    SELECT 2

    UPDATE dbo.t1 SET
        col = 2
    WHERE id = 1

    DELETE dbo.t1
    WHERE id = 2

    -- 显示结果
    WAITFOR DELAY '00:00:05'
    -- 延迟5 分钟, 以便有时间处理消息(因为是异步的)
    SELECT * FROM dbo.t2
    SELECT * FROM dbo.tb_log
    GO







    5.使用测试

    下面的T-SQL删除本文中建立的所有对象。



    -- =======================================
    -- 5. 删除相关的对象
    -- =======================================
    -- a. 删除service broker 对象
    DROP SERVICE SRV_async_trigger
    DROP QUEUE dbo.Q_async_trigger
    DROP CONTRACT CNT_async_trigger
    DROP MESSAGE TYPE MSGT_async_trigger
    GO

    -- b. 删除异步触发器处理的相关对象
    DROP PROC dbo.p_async_trigger_process
    DROP PROC dbo.p_async_trigger_send
    DROP TABLE dbo.tb_async_trigger_subscribtion
    DROP TABLE dbo.tb_async_trigger_subscriber
    DROP TABLE dbo.tb_async_trigger
    GO

    -- c. 删除测试的对象
    DROP TABLE dbo.tb_log, dbo.t1, dbo.t2
    DROP PROC dbo.p_Sync_t1_t2, dbo.p_Record_log

  • 相关阅读:
    定点c程序之五:定点数的字长效应
    定点c程序之六(完):Q值的选取和动态Q值
    定点c程序之四:除法
    语音编解码器 3
    C#中强制转换、Convert转换 和Parse转换的差异(二)
    什么是AOP?我也来说说对AOP的理解
    SQL生成流水号
    sql得到中文首字母
    PowerDesigner从数据库生实体成到PD中,power designer逆向工程
    SQL解析IP地址的几种方式
  • 原文地址:https://www.cnblogs.com/qanholas/p/3036977.html
Copyright © 2011-2022 走看看