您的位置:首页 > 数据库

Sql Server2008 ServiceBroker发布/订阅模式实例

2013-01-25 11:32 274 查看
应用实例操作步骤如下:

一:建立两个用来交换的数据库实例,并在这些实例中启用Service Broker活动,T-SQL如下:

------(一)、启用数据库的Service Broker活动

-- Enabling Databases for Service Broker Activity

USE master

GO

IF NOT
EXISTS(SELECT
name FROM
sys.databases
WHERE name
= 'SSB_Book')

CREATE DATABASE
SSB_Book

GO

ALTER DATABASE
SSB_Book SET
ENABLE_BROKER

GO

ALTER DATABASE
SSB_Book SET
TRUSTWORTHY ON

GO

USE SSB_Book

GO

CREATE TABLE
Publications

(

Publication UNIQUEIDENTIFIER
NOT NULL
PRIMARY KEY,

Subject NVARCHAR(MAX)
NOT NULL,

OriginalXml XML
NOT NULL

)

GO

CREATE TABLE
Subscriptions

(

Subscriber UNIQUEIDENTIFIER
NOT NULL
PRIMARY KEY,

Subject NVARCHAR(MAX)
NOT NULL,

OriginalXml XML
NOT NULL

)

GO

二:建立消息类型、契约(Contract)、队列、服务,并把指定的队列绑定到契约

----创建具体消息XML格式的实例
CREATE XML
SCHEMA COLLECTION
ExpenseReportSchema AS
N'<?xml version="1.0" encoding="UTF-16" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
targetNamespace="http://Adventure-Works.com/schemas/expenseReport"
xmlns:expense="http://Adventure-Works.com/schemas/expenseReport"
elementFormDefault="qualified"
>
<xsd:complexType name="expenseReportType">
<xsd:sequence>
<xsd:element name="EmployeeName" type="xsd:string"/>
<xsd:element name="EmployeeID" type="xsd:string"/>
<xsd:element name="ItemDetail"
type="expense:ItemDetailType" maxOccurs="unbounded"/>
</xsd:sequence>
</xsd:complexType>

<xsd:complexType name="ItemDetailType">
<xsd:sequence>
<xsd:element name="Date" type="xsd:date"/>
<xsd:element name="CostCenter" type="xsd:string"/>
<xsd:element name="Total" type="xsd:decimal"/>
<xsd:element name="Currency" type="xsd:string"/>
<xsd:element name="Description" type="xsd:string"/>
</xsd:sequence>
</xsd:complexType>

<xsd:element name="ExpenseReport" type="expense:expenseReportType"/>

</xsd:schema>'
;

CREATE
MESSAGE TYPE
[//Adventure-Works.com/Expenses/SubmitExpense]
VALIDATION
= VALID_XML WITH
SCHEMA COLLECTION
ExpenseReportSchema ;
------------------------

USE SSB_Book

GO

--------------一:service Broker基础架构

----------消息类型Message Type

CREATE MESSAGE
TYPE [http://ssb.csharp.at/SSB_Book/c10/PublishMessage]

VALIDATION =
WELL_FORMED_XML;

GO

CREATE MESSAGE
TYPE [http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]

VALIDATION =
NONE;

GO

CREATE MESSAGE
TYPE [http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]

VALIDATION =
WELL_FORMED_XML;

GO

-------------契约Contract

CREATE CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/PublishContract]

(

[http://ssb.csharp.at/SSB_Book/c10/PublishMessage]
SENT BY
INITIATOR,

[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]
SENT BY
INITIATOR

)

GO

CREATE CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]

(

[http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]
SENT BY
INITIATOR,

[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]
SENT BY
TARGET

)

GO

---------队列QUEUE
和服务SERVICE

-----发布者队列及其服务

CREATE QUEUE
[PublisherQueue]

GO

CREATE SERVICE
[PublisherService] ON
QUEUE [PublisherQueue]

(

[http://ssb.csharp.at/SSB_Book/c10/PublishContract],

[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]

)

GO

---------订阅者队列及其服务

CREATE QUEUE
SubscriberQueue1;

GO

CREATE SERVICE
SubscriberService1 ON
QUEUE SubscriberQueue1;

GO

CREATE QUEUE
SubscriberQueue2;

GO

CREATE SERVICE
SubscriberService2 ON
QUEUE SubscriberQueue2;

GO

----------Author队列及其服务

CREATE QUEUE
AuthorQueue;

GO

CREATE SERVICE
AuthorService ON
QUEUE AuthorQueue;

GO

三:编写存储过程

------------------三:执行存储过程如下

---------1:

CREATE PROCEDURE
sp_PublishPublication

@Publication UNIQUEIDENTIFIER,

@Subject NVARCHAR(MAX),

@OriginalXml XML

AS

BEGIN

INSERT INTO
Publications(Publication,
Subject,
OriginalXml)

VALUES

(

@Publication,

@Subject,

@OriginalXml

)

END

GO

--------------2:

CREATE PROCEDURE
sp_RemovePublication

@Publication UNIQUEIDENTIFIER

AS

BEGIN

DELETE FROM
Publications

WHERE Publication
= @Publication

END

GO

-----------------3:

CREATE PROCEDURE
sp_ProcessPublicationRequest

@Conversation UNIQUEIDENTIFIER,

@Message VARBINARY(MAX)

AS

BEGIN

DECLARE @Request
XML;

DECLARE @Subject
NVARCHAR(MAX);

SELECT @Request
= CAST(@Message
AS XML);

WITH XMLNAMESPACES(DEFAULT
'http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe')

SELECT @Subject
= @Request.value(N'(//Publish/Subject)[1]',
N'NVARCHAR(MAX)');

IF (@Subject
IS NOT
NULL)

BEGIN

EXEC sp_PublishPublication@Conversation,
@Subject,
@Message;

END

ELSE

BEGIN

END CONVERSATION
@Conversation

WITH ERROR
= 1

DESCRIPTION =
N'The publication is missing a subject';

EXEC sp_RemovePublication@Conversation;

END

END

GO

-------------------4:

CREATE PROCEDURE
sp_SendOnPublication

@Publication UNIQUEIDENTIFIER,

@Article VARBINARY(MAX)

AS

BEGIN

DECLARE @Subscription
UNIQUEIDENTIFIER;

DECLARE @cursorSubscriptions
CURSOR;

SET @cursorSubscriptions
= CURSOR
LOCAL SCROLL FOR

SELECT Subscriber

FROM Subscriptions
s

JOIN Publications
p ON
s.Subject
= p.Subject

WHERE p.Publication
= @Publication;

BEGIN TRANSACTION;

OPEN @cursorSubscriptions;

FETCH NEXT
FROM @cursorSubscriptions

INTO @Subscription;

WHILE (@@fetch_status
= 0)

BEGIN

IF (@Article
IS NOT
NULL)

BEGIN

-----------------使用游标发送用户从AuthorService
服务中匹配的消息

SEND ON
CONVERSATION @Subscription

MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage](@Article);

END

ELSE

BEGIN

SEND ON
CONVERSATION @Subscription

MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage];

END

FETCH NEXT
FROM @cursorSubscriptions

INTO @Subscription;

END

CLOSE @cursorSubscriptions;

DEALLOCATE @cursorSubscriptions;

COMMIT;

END

GO

------------------5:

CREATE PROCEDURE
sp_RemoveSubscriber

@Subscriber UNIQUEIDENTIFIER

AS

BEGIN

DELETE FROM
Subscriptions

WHERE Subscriber=
@Subscriber

END

GO

-------------6:

CREATE PROCEDURE
sp_PublishSubscriber

@Subscriber UNIQUEIDENTIFIER,

@Subject NVARCHAR(MAX),

@OriginalXml XML

AS

BEGIN

INSERT INTO
Subscriptions(Subscriber,
Subject,
OriginalXml)

VALUES

(

@Subscriber,

@Subject,

@OriginalXml

)

END

GO

------------7:

CREATE PROCEDURE
sp_ProcessSubscriptionRequest

@Conversation UNIQUEIDENTIFIER,

@Message VARBINARY(MAX)

AS

BEGIN

DECLARE @Request
XML;

DECLARE @Subject
NVARCHAR(MAX);

SELECT @Request
= CAST(@Message
AS XML);

WITH XMLNAMESPACES(DEFAULT
'http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe')

SELECT @Subject
= @Request.value(N'(//Request/Subject)[1]',
N'NVARCHAR(MAX)');

IF (@Subject
IS NOT
NULL)

BEGIN

EXEC sp_PublishSubscriber@Conversation,
@Subject,
@Message;

END

ELSE

BEGIN

END CONVERSATION
@Conversation

WITH ERROR
= 1

DESCRIPTION =
N'The subscriber is missing a subject';

EXEC sp_RemoveSubscriber@Conversation;

END

END

GO

---------------------8:

CREATE PROCEDURE
sp_PublisherService

AS

BEGIN

DECLARE @Conversation
UNIQUEIDENTIFIER;

DECLARE @Message
VARBINARY(MAX);

DECLARE @MessageTypeName
SYSNAME;

BEGIN TRANSACTION;

WAITFOR

(

RECEIVE TOP(1)

@Conversation =
conversation_handle,

@Message =
message_body,

@MessageTypeName =
message_type_name

FROM PublisherQueue

), TIMEOUT 1000;

WHILE (@Conversation
IS NOT
NULL)

BEGIN

IF (@MessageTypeName
= 'http://ssb.csharp.at/SSB_Book/c10/PublishMessage')

BEGIN

EXEC sp_ProcessPublicationRequest
@Conversation,
@Message;

END

ELSE IF (@MessageTypeName
= 'http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage')

BEGIN

EXEC sp_ProcessSubscriptionRequest
@Conversation,
@Message;

END

ELSE IF (@MessageTypeName
=

'http://ssb.csharp.at/SSB_Book/c10/ArticleMessage')

BEGIN

EXEC sp_SendOnPublication
@Conversation,
@Message;

END

ELSE IF (@MessageTypeName
IN(

N'http://schemas.microsoft.com/SQL/ServiceBroker/Error',

N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'))

BEGIN

END CONVERSATION
@Conversation;

IF (EXISTS(SELECT
* FROM
Publications

WHERE Publication
= @Conversation))

BEGIN

EXEC sp_RemovePublication
@Conversation;

END

IF (EXISTS(SELECT
* FROM
Subscribers))

BEGIN

EXEC sp_RemoveSubscriber
@Conversation;

END

END

ELSE

BEGIN

-- Unexpected message

RAISERROR (N'Received unexpected message type: %s', 16, 1,

@MessageTypeName);

ROLLBACK;

RETURN;

END

COMMIT;

SELECT @Conversation
= NULL;

BEGIN TRANSACTION;

WAITFOR

(

RECEIVE TOP(1)

@Conversation =
conversation_handle,

@Message =
message_body,

@MessageTypeName =
message_type_name

FROM PublisherQueue

), TIMEOUT 1000;

END

COMMIT;

END

GO

----------------9:

--------------订阅方请求订阅脚本:

DECLARE @ch
UNIQUEIDENTIFIER;

BEGIN DIALOG
CONVERSATION @ch

FROM SERVICE
[SubscriberService2]

TO SERVICE
'PublisherService'

ON CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]

WITH ENCRYPTION
= OFF;

SEND ON
CONVERSATION @ch

MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]

(

N'<?xml version="1.0"?>

<Request xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">

<Subject>Subject2</Subject>

</Request>'

);

GO

四:发布消息:

--------执行AuthorService服务就可以发送article消息给PublisherService进行并发;

---------[AuthorService] 发送PublishMessage消息:

DECLARE @ch
UNIQUEIDENTIFIER;

BEGIN DIALOG
CONVERSATION @ch

FROM SERVICE
[AuthorService]

TO SERVICE
'PublisherService'

ON CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/PublishContract]

WITH ENCRYPTION
= OFF;

SEND ON
CONVERSATION @ch

MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/PublishMessage]

(

N'<?xml version="1.0"?>

<Publish xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">

<Subject>Subject1</Subject>

</Publish>'

);

--------向'PublisherService'服务所在队列PublisherQueue中新增一条记录,执行dbo.sp_PublisherService存储过程后接收消息,

-----调用sp_ProcessPublicationRequest
存储过程,并持久化到Publications中

select *
from dbo.PublisherQueue;

select *
from Publications;



五:订阅消息

-----------1:订阅者通过订阅PublisherService服务接收消息,

----------------SubscriberService1订购主题Subject1的消息----》PublisherService

DECLARE @ch
UNIQUEIDENTIFIER;

BEGIN DIALOG
CONVERSATION @ch

FROM SERVICE
[SubscriberService1]

TO SERVICE
'PublisherService'

ON CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]

WITH ENCRYPTION
= OFF;

SEND ON
CONVERSATION @ch

MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]

(

N'<?xml version="1.0"?>

<Request xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">

<Subject>Subject1</Subject>

</Request>'

);

GO

----------------或者SubscriberService2订购主题Subject2的消息----》PublisherService

DECLARE @ch
UNIQUEIDENTIFIER;

BEGIN DIALOG
CONVERSATION @ch

FROM SERVICE
[SubscriberService2]

TO SERVICE
'PublisherService'

ON CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]

WITH ENCRYPTION
= OFF;

SEND ON
CONVERSATION @ch

MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]

(

N'<?xml version="1.0"?>

<Request xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">

<Subject>Subject2</Subject>

</Request>'

);

GO

-----向'PublisherService'服务所在队列PublisherQueue中新增一条记录,执行dbo.sp_PublisherService存储过程后接收消息,

-----调用sp_ProcessSubscriptionRequest存储过程,并持久化到Subscriptions中

select *
from dbo.PublisherQueue;

select *
from Subscriptions;



六:

-----执行AuthorService服务就可以发送article消息给PublisherService进行并发

----发送特定主题的article消息:

DECLARE @ch
UNIQUEIDENTIFIER;

BEGIN DIALOG
CONVERSATION @ch

FROM SERVICE
[AuthorService]

TO SERVICE
'PublisherService'

ON CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/PublishContract]

WITH ENCRYPTION
= OFF;

SEND ON
CONVERSATION @ch

MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]

(

N'This is an article on Subject2'

);

----向'PublisherService'服务所在队列PublisherQueue中新增一条记录,

----执行dbo.sp_PublisherService存储过程后接收消息,

-----调用sp_SendOnPublication存储过程分发该消息给所有匹配的订阅者

select *
from dbo.PublisherQueue;

六:应用服务JDBC调用“发送消息存储过程”、“接收消息存储过程”,供参考:

cStmt = con.prepareCall("{call [dbo].[ sp_PublisherService]");

七:测试响应时间间隔
1:发送消息测试

通过登录数据库验证sql如下:

2:接收消息测试

3:涉及SQL验证:
select *
from SubscriberQueue1;
select *
from SubscriberQueue2;

select *
from dbo.PublisherQueue;
select *
from dbo.AuthorQueue;

----发布后,订阅后都持久到表中,查阅如下
select *
from dbo.Publications;
select *
from dbo.Subscriptions;

4:原理架构图

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: