您的位置:首页 > 数据库 > SQL

SQL Server 2005 中实现通用的异步触发器架构收藏

2012-01-09 13:44 309 查看
在SQL Server 2005中,通过新增的Service Broker可以实现异步触发器的处理功能。本文提供一种使用Service Broker实现的通用异步触发器方法。

在本方法中,通过Service Broker构造异步触发器处理架构,对于要使用这种架构的表,只需要创建相应的触发器及处理触发器中数据的存储过程,并且在异步触发器架构中登记触发器和处理的存储过程即可。如果一个触发器中的数据要被多个表使用,只需要在dbo.tb_async_trigger_subscribtion中登记相应处理数据的存储过程即可,即一个表的数据变更可以被多个表订阅(使用)。

架构的步骤如下:

1. 数据库配置

需要配置数据库以允许使用Service Broker。本文以tempdb库为例,故配置均在tempdb上下文中进行。

USE tempdb

GO

-- 允许Service Broker

ALTER DATABASE tempdb SET

ENABLE_BROKER

GO

2. 构建异步触发器相关的对象

下面的T-SQL创建异步触发器处理架构相关的对象。

-- =======================================

-- 异步触发器对象

-- 1. service broker 对象

-- =======================================

-- a. message type, 要求使用xml 传递数据

CREATE MESSAGE TYPE MSGT_async_trigger

VALIDATION = WELL_FORMED_XML

GO

-- b. 只需要发送消息

CREATE CONTRACT CNT_async_trigger(

MSGT_async_trigger SENT BY INITIATOR)

GO

-- c. 存储消息的队列

CREATE QUEUE dbo.Q_async_trigger

GO

-- d. 用于消息处理的服务

CREATE SERVICE SRV_async_trigger

ON QUEUE dbo.Q_async_trigger(

CNT_async_trigger)

GO

-- =======================================

-- 异步触发器对象

-- 2. 异步触发器处理的对象

-- =======================================

-- a. 登记异步触发器的表

CREATE TABLE dbo.tb_async_trigger(

ID int IDENTITY

PRIMARY KEY,

table_name sysname,

trigger_name sysname

)

-- b. 登记订阅异步触发器的存储过程

CREATE TABLE dbo.tb_async_trigger_subscriber(

ID int IDENTITY

PRIMARY KEY,

procedure_name sysname

)

-- c. 异步触发器和存储过程之间的订阅关系

CREATE TABLE dbo.tb_async_trigger_subscribtion(

trigger_id int

REFERENCES dbo.tb_async_trigger(

ID),

procedure_id int

REFERENCES dbo.tb_async_trigger_subscriber(

ID),

PRIMARY KEY(

trigger_id, procedure_id)

)

GO

-- d. 发送消息的存储过程

CREATE PROC dbo.p_async_trigger_send

@message xml

AS

SET NOCOUNT ON

DECLARE

@handle uniqueidentifier

BEGIN DIALOG CONVERSATION @handle

FROM SERVICE [SRV_async_trigger]

TO SERVICE N'SRV_async_trigger'

ON CONTRACT CNT_async_trigger

WITH

ENCRYPTION = OFF;

SEND

ON CONVERSATION @handle

MESSAGE TYPE MSGT_async_trigger(

@message);

-- 消息发出即可, 不需要回复, 因此发出后即可结束会话

END CONVERSATION @handle

GO

-- e. 处理异步触发器发送的消息

CREATE PROC dbo.p_async_trigger_process

AS

SET NOCOUNT ON

DECLARE

@handle uniqueidentifier,

@message xml,

@rows int

SET @rows = 1

WHILE @rows > 0

BEGIN

-- 处理已经收到的消息

WAITFOR(

RECEIVE TOP(1)

@handle = conversation_handle,

@message = CASE

WHEN message_type_name = N'MSGT_async_trigger'

THEN CONVERT(xml, message_body)

ELSE NULL

END

FROM dbo.Q_async_trigger

), TIMEOUT 10

SET @rows = @@ROWCOUNT

IF @rows > 0

BEGIN

-- 结束会话

END CONVERSATION @handle;

-- 处理消息

-- a. 取发送者信息

DECLARE

@table_name sysname,

@trigger_name sysname,

@sql nvarchar(max)

SELECT

@table_name = @message.value('(/root/table_name)[1]', 'sysname'),

@trigger_name = @message.value('(/root/trigger_name)[1]', 'sysname')

-- b. 调用异步触发器订阅的存储过程

;WITH

SUB AS(

SELECT

TR.table_name,

TR.trigger_name,

SUB.procedure_name

FROM dbo.tb_async_trigger TR,

dbo.tb_async_trigger_subscriber SUB,

dbo.tb_async_trigger_subscribtion TRSUB

WHERE TRSUB.trigger_id = TR.ID

AND TRSUB.procedure_id = SUB.ID

)

SELECT

@sql = (

SELECT

N'

EXEC ' + procedure_name + N'

@message

'

FROM SUB

WHERE table_name = @table_name

AND trigger_name = @trigger_name

FOR XML PATH(''), ROOT('r'), TYPE

).value('(/r)[1]', 'nvarchar(max)')

EXEC sp_executesql @sql, N'@message xml', @message

END

END

GO

-- f. 绑定处理的存储过程到队列

ALTER QUEUE dbo.Q_async_trigger

WITH ACTIVATION(

STATUS = ON,

PROCEDURE_NAME = dbo.p_async_trigger_process,

MAX_QUEUE_READERS = 10,

EXECUTE AS OWNER)

GO

3. 使用示例

下面的T-SQL演示使用异步触发器构架。示例中创建了三个表:

Dbo.t1 这个是源表,此表的数据变化将用于其他表

Dbo.t2 这个表要求保持与dbo.t1同步

Dbo.tb_log 这个表记录dbo.t1中的数据变化情况

触发器 TR_async_trigger 用于将表Dbo.t1中的数据变化发送到异步触发器构架中。dbo.p_Sync_t1_t2和dbo.p_Record_log用于处理dbo.t1于中变化的数据。

在处理时,需要把相关的信息登记到异步触发器架构的表中。

-- =======================================

-- 3. 使用示例

-- =======================================

-- ===============================

-- 测试对象

-- a. 源表

CREATE TABLE dbo.t1(

id int IDENTITY

PRIMARY KEY,

col int

)

-- b. 同步的目的表

CREATE TABLE dbo.t2(

id int IDENTITY

PRIMARY KEY,

col int

)

-- c. 记录操作的日志表

CREATE TABLE dbo.tb_log(

id int IDENTITY

PRIMARY KEY,

user_name sysname,

operate_type varchar(10),

inserted xml,

deleted xml

)

GO

-- a. 异步发送处理消息的触发器

CREATE TRIGGER TR_async_trigger

ON dbo.t1

FOR INSERT, UPDATE, DELETE

AS

IF @@ROWCOUNT = 0

RETURN

SET NOCOUNT ON

-- 将要发送的数据生成xml 数据

DECLARE

@message xml

SELECT

@message = (

SELECT

table_name = (

SELECT TOP 1

OBJECT_NAME(parent_object_id)

FROM sys.objects

WHERE object_id = @@PROCID),

trigger_name = OBJECT_NAME(@@PROCID),

user_name = SUSER_SNAME(),

inserted = (

SELECT * FROM inserted FOR XML AUTO, TYPE),

deleted = (

SELECT * FROM deleted FOR XML AUTO, TYPE)

FOR XML PATH(''), ROOT('root'), TYPE

)

-- 发送消息

EXEC dbo.p_async_trigger_send

@message = @message

GO

-- b. 处理异步触发器的存储过程

-- b.1 同步到t2 的存储过程

CREATE PROC dbo.p_Sync_t1_t2

@message xml

AS

SET NOCOUNT ON

DECLARE

@inserted bit,

@deleted bit

SELECT

@inserted = @message.exist('/root/inserted'),

@deleted = @message.exist('/root/deleted')

IF @inserted = 1

IF @deleted = 1 -- 更新

BEGIN

;WITH

I AS(

SELECT

id = T.c.value('@id[1]', 'int'),

col = T.c.value('@col[1]', 'int')

FROM @message.nodes('/root/inserted/inserted') T(c)

),

D AS(

SELECT

id = T.c.value('@id[1]', 'int'),

col = T.c.value('@col[1]', 'int')

FROM @message.nodes('/root/deleted/deleted') T(c)

)

UPDATE A SET

col = I.col

FROM dbo.t2 A, I, D

WHERE A.ID = I.ID

AND I.ID = D.ID

END

ELSE -- 插入

BEGIN

SET IDENTITY_INSERT dbo.t2 ON

;WITH

I AS(

SELECT

id = T.c.value('@id[1]', 'int'),

col = T.c.value('@col[1]', 'int')

FROM @message.nodes('/root/inserted/inserted') T(c)

)

INSERT dbo.t2(

id, col)

SELECT

id, col

FROM I

SET IDENTITY_INSERT dbo.t2 OFF

END

ELSE -- 删除

BEGIN

;WITH

D AS(

SELECT

id = T.c.value('@id[1]', 'int'),

col = T.c.value('@col[1]', 'int')

FROM @message.nodes('/root/deleted/deleted') T(c)

)

DELETE A

FROM dbo.t2 A, D

WHERE A.ID = D.ID

END

GO

-- b.2 记录操作记录到dbo.tb_log 的存储过程

CREATE PROC dbo.p_Record_log

@message xml

AS

SET NOCOUNT ON

DECLARE

@inserted bit,

@deleted bit

SELECT

@inserted = @message.exist('/root/inserted'),

@deleted = @message.exist('/root/deleted')

INSERT dbo.tb_log(

user_name,

operate_type,

inserted,

deleted)

SELECT

@message.value('(/root/user_name)[1]', 'sysname'),

operate_type = CASE

WHEN @inserted = 1 AND @deleted = 1 THEN 'update'

WHEN @inserted = 1 THEN 'insert'

WHEN @deleted = 1 THEN 'delete'

END,

@message.query('/root/inserted'),

@message.query('/root/deleted')

GO

-- ===============================

-- 在异步触发器处理系统中登记对象

INSERT dbo.tb_async_trigger(

table_name, trigger_name)

VALUES(

N't1', N'TR_async_trigger')

INSERT dbo.tb_async_trigger_subscriber(

procedure_name)

SELECT N'dbo.p_Sync_t1_t2' UNION ALL

SELECT N'dbo.p_Record_log'

INSERT dbo.tb_async_trigger_subscribtion(

trigger_id, procedure_id)

SELECT 1, 1 UNION ALL

SELECT 1, 2

GO

4. 使用测试

下面的T-SQL修改表dbo.t1中的数据,并检查dbo.t2、dbo.tb_log中的数据,以确定异步触发器架构的工作是否成功。

执行完成后可以看到dbo.t2、dbo.tb_log中有相关的记录。

-- ===============================

-- 测试

INSERT dbo.t1

SELECT 1 UNION ALL

SELECT 2

UPDATE dbo.t1 SET

col = 2

WHERE id = 1

DELETE dbo.t1

WHERE id = 2

-- 显示结果

WAITFOR DELAY '00:00:05' -- 延迟5 分钟, 以便有时间处理消息(因为是异步的)

SELECT * FROM dbo.t2

SELECT * FROM dbo.tb_log

GO

5. 使用测试

下面的T-SQL删除本文中建立的所有对象。

-- =======================================

-- 5. 删除相关的对象

-- =======================================

-- a. 删除service broker 对象

DROP SERVICE SRV_async_trigger

DROP QUEUE dbo.Q_async_trigger

DROP CONTRACT CNT_async_trigger

DROP MESSAGE TYPE MSGT_async_trigger

GO

-- b. 删除异步触发器处理的相关对象

DROP PROC dbo.p_async_trigger_process

DROP PROC dbo.p_async_trigger_send

DROP TABLE dbo.tb_async_trigger_subscribtion

DROP TABLE dbo.tb_async_trigger_subscriber

DROP TABLE dbo.tb_async_trigger

GO

-- c. 删除测试的对象

DROP TABLE dbo.tb_log, dbo.t1, dbo.t2

DROP PROC dbo.p_Sync_t1_t2, dbo.p_Record_log
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: