Sql Server2008 ServiceBroker发布/订阅模式实例
2013-01-25 11:32
274 查看
应用实例操作步骤如下:
一:建立两个用来交换的数据库实例,并在这些实例中启用Service Broker活动,T-SQL如下:
------(一)、启用数据库的Service Broker活动
-- Enabling Databases for Service Broker Activity
USE master
GO
IF NOT
EXISTS(SELECT
name FROM
sys.databases
WHERE name
= 'SSB_Book')
CREATE DATABASE
SSB_Book
GO
ALTER DATABASE
SSB_Book SET
ENABLE_BROKER
GO
ALTER DATABASE
SSB_Book SET
TRUSTWORTHY ON
GO
USE SSB_Book
GO
CREATE TABLE
Publications
(
Publication UNIQUEIDENTIFIER
NOT NULL
PRIMARY KEY,
Subject NVARCHAR(MAX)
NOT NULL,
OriginalXml XML
NOT NULL
)
GO
CREATE TABLE
Subscriptions
(
Subscriber UNIQUEIDENTIFIER
NOT NULL
PRIMARY KEY,
Subject NVARCHAR(MAX)
NOT NULL,
OriginalXml XML
NOT NULL
)
GO
二:建立消息类型、契约(Contract)、队列、服务,并把指定的队列绑定到契约
----创建具体消息XML格式的实例
CREATE XML
SCHEMA COLLECTION
ExpenseReportSchema AS
N'<?xml version="1.0" encoding="UTF-16" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
targetNamespace="http://Adventure-Works.com/schemas/expenseReport"
xmlns:expense="http://Adventure-Works.com/schemas/expenseReport"
elementFormDefault="qualified"
>
<xsd:complexType name="expenseReportType">
<xsd:sequence>
<xsd:element name="EmployeeName" type="xsd:string"/>
<xsd:element name="EmployeeID" type="xsd:string"/>
<xsd:element name="ItemDetail"
type="expense:ItemDetailType" maxOccurs="unbounded"/>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="ItemDetailType">
<xsd:sequence>
<xsd:element name="Date" type="xsd:date"/>
<xsd:element name="CostCenter" type="xsd:string"/>
<xsd:element name="Total" type="xsd:decimal"/>
<xsd:element name="Currency" type="xsd:string"/>
<xsd:element name="Description" type="xsd:string"/>
</xsd:sequence>
</xsd:complexType>
<xsd:element name="ExpenseReport" type="expense:expenseReportType"/>
</xsd:schema>'
;
CREATE
MESSAGE TYPE
[//Adventure-Works.com/Expenses/SubmitExpense]
VALIDATION
= VALID_XML WITH
SCHEMA COLLECTION
ExpenseReportSchema ;
------------------------
USE SSB_Book
GO
--------------一:service Broker基础架构
----------消息类型Message Type
CREATE MESSAGE
TYPE [http://ssb.csharp.at/SSB_Book/c10/PublishMessage]
VALIDATION =
WELL_FORMED_XML;
GO
CREATE MESSAGE
TYPE [http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]
VALIDATION =
NONE;
GO
CREATE MESSAGE
TYPE [http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]
VALIDATION =
WELL_FORMED_XML;
GO
-------------契约Contract
CREATE CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/PublishContract]
(
[http://ssb.csharp.at/SSB_Book/c10/PublishMessage]
SENT BY
INITIATOR,
[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]
SENT BY
INITIATOR
)
GO
CREATE CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]
(
[http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]
SENT BY
INITIATOR,
[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]
SENT BY
TARGET
)
GO
---------队列QUEUE
和服务SERVICE
-----发布者队列及其服务
CREATE QUEUE
[PublisherQueue]
GO
CREATE SERVICE
[PublisherService] ON
QUEUE [PublisherQueue]
(
[http://ssb.csharp.at/SSB_Book/c10/PublishContract],
[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]
)
GO
---------订阅者队列及其服务
CREATE QUEUE
SubscriberQueue1;
GO
CREATE SERVICE
SubscriberService1 ON
QUEUE SubscriberQueue1;
GO
CREATE QUEUE
SubscriberQueue2;
GO
CREATE SERVICE
SubscriberService2 ON
QUEUE SubscriberQueue2;
GO
----------Author队列及其服务
CREATE QUEUE
AuthorQueue;
GO
CREATE SERVICE
AuthorService ON
QUEUE AuthorQueue;
GO
三:编写存储过程
------------------三:执行存储过程如下
---------1:
CREATE PROCEDURE
sp_PublishPublication
@Publication UNIQUEIDENTIFIER,
@Subject NVARCHAR(MAX),
@OriginalXml XML
AS
BEGIN
INSERT INTO
Publications(Publication,
Subject,
OriginalXml)
VALUES
(
@Publication,
@Subject,
@OriginalXml
)
END
GO
--------------2:
CREATE PROCEDURE
sp_RemovePublication
@Publication UNIQUEIDENTIFIER
AS
BEGIN
DELETE FROM
Publications
WHERE Publication
= @Publication
END
GO
-----------------3:
CREATE PROCEDURE
sp_ProcessPublicationRequest
@Conversation UNIQUEIDENTIFIER,
@Message VARBINARY(MAX)
AS
BEGIN
DECLARE @Request
XML;
DECLARE @Subject
NVARCHAR(MAX);
SELECT @Request
= CAST(@Message
AS XML);
WITH XMLNAMESPACES(DEFAULT
'http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe')
SELECT @Subject
= @Request.value(N'(//Publish/Subject)[1]',
N'NVARCHAR(MAX)');
IF (@Subject
IS NOT
NULL)
BEGIN
EXEC sp_PublishPublication@Conversation,
@Subject,
@Message;
END
ELSE
BEGIN
END CONVERSATION
@Conversation
WITH ERROR
= 1
DESCRIPTION =
N'The publication is missing a subject';
EXEC sp_RemovePublication@Conversation;
END
END
GO
-------------------4:
CREATE PROCEDURE
sp_SendOnPublication
@Publication UNIQUEIDENTIFIER,
@Article VARBINARY(MAX)
AS
BEGIN
DECLARE @Subscription
UNIQUEIDENTIFIER;
DECLARE @cursorSubscriptions
CURSOR;
SET @cursorSubscriptions
= CURSOR
LOCAL SCROLL FOR
SELECT Subscriber
FROM Subscriptions
s
JOIN Publications
p ON
s.Subject
= p.Subject
WHERE p.Publication
= @Publication;
BEGIN TRANSACTION;
OPEN @cursorSubscriptions;
FETCH NEXT
FROM @cursorSubscriptions
INTO @Subscription;
WHILE (@@fetch_status
= 0)
BEGIN
IF (@Article
IS NOT
NULL)
BEGIN
-----------------使用游标发送用户从AuthorService
服务中匹配的消息
SEND ON
CONVERSATION @Subscription
MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage](@Article);
END
ELSE
BEGIN
SEND ON
CONVERSATION @Subscription
MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage];
END
FETCH NEXT
FROM @cursorSubscriptions
INTO @Subscription;
END
CLOSE @cursorSubscriptions;
DEALLOCATE @cursorSubscriptions;
COMMIT;
END
GO
------------------5:
CREATE PROCEDURE
sp_RemoveSubscriber
@Subscriber UNIQUEIDENTIFIER
AS
BEGIN
DELETE FROM
Subscriptions
WHERE Subscriber=
@Subscriber
END
GO
-------------6:
CREATE PROCEDURE
sp_PublishSubscriber
@Subscriber UNIQUEIDENTIFIER,
@Subject NVARCHAR(MAX),
@OriginalXml XML
AS
BEGIN
INSERT INTO
Subscriptions(Subscriber,
Subject,
OriginalXml)
VALUES
(
@Subscriber,
@Subject,
@OriginalXml
)
END
GO
------------7:
CREATE PROCEDURE
sp_ProcessSubscriptionRequest
@Conversation UNIQUEIDENTIFIER,
@Message VARBINARY(MAX)
AS
BEGIN
DECLARE @Request
XML;
DECLARE @Subject
NVARCHAR(MAX);
SELECT @Request
= CAST(@Message
AS XML);
WITH XMLNAMESPACES(DEFAULT
'http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe')
SELECT @Subject
= @Request.value(N'(//Request/Subject)[1]',
N'NVARCHAR(MAX)');
IF (@Subject
IS NOT
NULL)
BEGIN
EXEC sp_PublishSubscriber@Conversation,
@Subject,
@Message;
END
ELSE
BEGIN
END CONVERSATION
@Conversation
WITH ERROR
= 1
DESCRIPTION =
N'The subscriber is missing a subject';
EXEC sp_RemoveSubscriber@Conversation;
END
END
GO
---------------------8:
CREATE PROCEDURE
sp_PublisherService
AS
BEGIN
DECLARE @Conversation
UNIQUEIDENTIFIER;
DECLARE @Message
VARBINARY(MAX);
DECLARE @MessageTypeName
SYSNAME;
BEGIN TRANSACTION;
WAITFOR
(
RECEIVE TOP(1)
@Conversation =
conversation_handle,
@Message =
message_body,
@MessageTypeName =
message_type_name
FROM PublisherQueue
), TIMEOUT 1000;
WHILE (@Conversation
IS NOT
NULL)
BEGIN
IF (@MessageTypeName
= 'http://ssb.csharp.at/SSB_Book/c10/PublishMessage')
BEGIN
EXEC sp_ProcessPublicationRequest
@Conversation,
@Message;
END
ELSE IF (@MessageTypeName
= 'http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage')
BEGIN
EXEC sp_ProcessSubscriptionRequest
@Conversation,
@Message;
END
ELSE IF (@MessageTypeName
=
'http://ssb.csharp.at/SSB_Book/c10/ArticleMessage')
BEGIN
EXEC sp_SendOnPublication
@Conversation,
@Message;
END
ELSE IF (@MessageTypeName
IN(
N'http://schemas.microsoft.com/SQL/ServiceBroker/Error',
N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'))
BEGIN
END CONVERSATION
@Conversation;
IF (EXISTS(SELECT
* FROM
Publications
WHERE Publication
= @Conversation))
BEGIN
EXEC sp_RemovePublication
@Conversation;
END
IF (EXISTS(SELECT
* FROM
Subscribers))
BEGIN
EXEC sp_RemoveSubscriber
@Conversation;
END
END
ELSE
BEGIN
-- Unexpected message
RAISERROR (N'Received unexpected message type: %s', 16, 1,
@MessageTypeName);
ROLLBACK;
RETURN;
END
COMMIT;
SELECT @Conversation
= NULL;
BEGIN TRANSACTION;
WAITFOR
(
RECEIVE TOP(1)
@Conversation =
conversation_handle,
@Message =
message_body,
@MessageTypeName =
message_type_name
FROM PublisherQueue
), TIMEOUT 1000;
END
COMMIT;
END
GO
----------------9:
--------------订阅方请求订阅脚本:
DECLARE @ch
UNIQUEIDENTIFIER;
BEGIN DIALOG
CONVERSATION @ch
FROM SERVICE
[SubscriberService2]
TO SERVICE
'PublisherService'
ON CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]
WITH ENCRYPTION
= OFF;
SEND ON
CONVERSATION @ch
MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]
(
N'<?xml version="1.0"?>
<Request xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">
<Subject>Subject2</Subject>
</Request>'
);
GO
四:发布消息:
--------执行AuthorService服务就可以发送article消息给PublisherService进行并发;
---------[AuthorService] 发送PublishMessage消息:
DECLARE @ch
UNIQUEIDENTIFIER;
BEGIN DIALOG
CONVERSATION @ch
FROM SERVICE
[AuthorService]
TO SERVICE
'PublisherService'
ON CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/PublishContract]
WITH ENCRYPTION
= OFF;
SEND ON
CONVERSATION @ch
MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/PublishMessage]
(
N'<?xml version="1.0"?>
<Publish xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">
<Subject>Subject1</Subject>
</Publish>'
);
--------向'PublisherService'服务所在队列PublisherQueue中新增一条记录,执行dbo.sp_PublisherService存储过程后接收消息,
-----调用sp_ProcessPublicationRequest
存储过程,并持久化到Publications中
select *
from dbo.PublisherQueue;
select *
from Publications;
![](http://img.my.csdn.net/uploads/201301/25/1359084989_5839.png)
五:订阅消息
-----------1:订阅者通过订阅PublisherService服务接收消息,
----------------SubscriberService1订购主题Subject1的消息----》PublisherService
DECLARE @ch
UNIQUEIDENTIFIER;
BEGIN DIALOG
CONVERSATION @ch
FROM SERVICE
[SubscriberService1]
TO SERVICE
'PublisherService'
ON CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]
WITH ENCRYPTION
= OFF;
SEND ON
CONVERSATION @ch
MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]
(
N'<?xml version="1.0"?>
<Request xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">
<Subject>Subject1</Subject>
</Request>'
);
GO
----------------或者SubscriberService2订购主题Subject2的消息----》PublisherService
DECLARE @ch
UNIQUEIDENTIFIER;
BEGIN DIALOG
CONVERSATION @ch
FROM SERVICE
[SubscriberService2]
TO SERVICE
'PublisherService'
ON CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]
WITH ENCRYPTION
= OFF;
SEND ON
CONVERSATION @ch
MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]
(
N'<?xml version="1.0"?>
<Request xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">
<Subject>Subject2</Subject>
</Request>'
);
GO
-----向'PublisherService'服务所在队列PublisherQueue中新增一条记录,执行dbo.sp_PublisherService存储过程后接收消息,
-----调用sp_ProcessSubscriptionRequest存储过程,并持久化到Subscriptions中
select *
from dbo.PublisherQueue;
select *
from Subscriptions;
![](http://img.my.csdn.net/uploads/201301/25/1359085006_7372.png)
六:
-----执行AuthorService服务就可以发送article消息给PublisherService进行并发
----发送特定主题的article消息:
DECLARE @ch
UNIQUEIDENTIFIER;
BEGIN DIALOG
CONVERSATION @ch
FROM SERVICE
[AuthorService]
TO SERVICE
'PublisherService'
ON CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/PublishContract]
WITH ENCRYPTION
= OFF;
SEND ON
CONVERSATION @ch
MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]
(
N'This is an article on Subject2'
);
----向'PublisherService'服务所在队列PublisherQueue中新增一条记录,
----执行dbo.sp_PublisherService存储过程后接收消息,
-----调用sp_SendOnPublication存储过程分发该消息给所有匹配的订阅者
select *
from dbo.PublisherQueue;
六:应用服务JDBC调用“发送消息存储过程”、“接收消息存储过程”,供参考:
cStmt = con.prepareCall("{call [dbo].[ sp_PublisherService]");
七:测试响应时间间隔
1:发送消息测试
通过登录数据库验证sql如下:
2:接收消息测试
3:涉及SQL验证:
select *
from SubscriberQueue1;
select *
from SubscriberQueue2;
select *
from dbo.PublisherQueue;
select *
from dbo.AuthorQueue;
----发布后,订阅后都持久到表中,查阅如下
select *
from dbo.Publications;
select *
from dbo.Subscriptions;
4:原理架构图
一:建立两个用来交换的数据库实例,并在这些实例中启用Service Broker活动,T-SQL如下:
------(一)、启用数据库的Service Broker活动
-- Enabling Databases for Service Broker Activity
USE master
GO
IF NOT
EXISTS(SELECT
name FROM
sys.databases
WHERE name
= 'SSB_Book')
CREATE DATABASE
SSB_Book
GO
ALTER DATABASE
SSB_Book SET
ENABLE_BROKER
GO
ALTER DATABASE
SSB_Book SET
TRUSTWORTHY ON
GO
USE SSB_Book
GO
CREATE TABLE
Publications
(
Publication UNIQUEIDENTIFIER
NOT NULL
PRIMARY KEY,
Subject NVARCHAR(MAX)
NOT NULL,
OriginalXml XML
NOT NULL
)
GO
CREATE TABLE
Subscriptions
(
Subscriber UNIQUEIDENTIFIER
NOT NULL
PRIMARY KEY,
Subject NVARCHAR(MAX)
NOT NULL,
OriginalXml XML
NOT NULL
)
GO
二:建立消息类型、契约(Contract)、队列、服务,并把指定的队列绑定到契约
----创建具体消息XML格式的实例
CREATE XML
SCHEMA COLLECTION
ExpenseReportSchema AS
N'<?xml version="1.0" encoding="UTF-16" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
targetNamespace="http://Adventure-Works.com/schemas/expenseReport"
xmlns:expense="http://Adventure-Works.com/schemas/expenseReport"
elementFormDefault="qualified"
>
<xsd:complexType name="expenseReportType">
<xsd:sequence>
<xsd:element name="EmployeeName" type="xsd:string"/>
<xsd:element name="EmployeeID" type="xsd:string"/>
<xsd:element name="ItemDetail"
type="expense:ItemDetailType" maxOccurs="unbounded"/>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="ItemDetailType">
<xsd:sequence>
<xsd:element name="Date" type="xsd:date"/>
<xsd:element name="CostCenter" type="xsd:string"/>
<xsd:element name="Total" type="xsd:decimal"/>
<xsd:element name="Currency" type="xsd:string"/>
<xsd:element name="Description" type="xsd:string"/>
</xsd:sequence>
</xsd:complexType>
<xsd:element name="ExpenseReport" type="expense:expenseReportType"/>
</xsd:schema>'
;
CREATE
MESSAGE TYPE
[//Adventure-Works.com/Expenses/SubmitExpense]
VALIDATION
= VALID_XML WITH
SCHEMA COLLECTION
ExpenseReportSchema ;
------------------------
USE SSB_Book
GO
--------------一:service Broker基础架构
----------消息类型Message Type
CREATE MESSAGE
TYPE [http://ssb.csharp.at/SSB_Book/c10/PublishMessage]
VALIDATION =
WELL_FORMED_XML;
GO
CREATE MESSAGE
TYPE [http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]
VALIDATION =
NONE;
GO
CREATE MESSAGE
TYPE [http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]
VALIDATION =
WELL_FORMED_XML;
GO
-------------契约Contract
CREATE CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/PublishContract]
(
[http://ssb.csharp.at/SSB_Book/c10/PublishMessage]
SENT BY
INITIATOR,
[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]
SENT BY
INITIATOR
)
GO
CREATE CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]
(
[http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]
SENT BY
INITIATOR,
[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]
SENT BY
TARGET
)
GO
---------队列QUEUE
和服务SERVICE
-----发布者队列及其服务
CREATE QUEUE
[PublisherQueue]
GO
CREATE SERVICE
[PublisherService] ON
QUEUE [PublisherQueue]
(
[http://ssb.csharp.at/SSB_Book/c10/PublishContract],
[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]
)
GO
---------订阅者队列及其服务
CREATE QUEUE
SubscriberQueue1;
GO
CREATE SERVICE
SubscriberService1 ON
QUEUE SubscriberQueue1;
GO
CREATE QUEUE
SubscriberQueue2;
GO
CREATE SERVICE
SubscriberService2 ON
QUEUE SubscriberQueue2;
GO
----------Author队列及其服务
CREATE QUEUE
AuthorQueue;
GO
CREATE SERVICE
AuthorService ON
QUEUE AuthorQueue;
GO
三:编写存储过程
------------------三:执行存储过程如下
---------1:
CREATE PROCEDURE
sp_PublishPublication
@Publication UNIQUEIDENTIFIER,
@Subject NVARCHAR(MAX),
@OriginalXml XML
AS
BEGIN
INSERT INTO
Publications(Publication,
Subject,
OriginalXml)
VALUES
(
@Publication,
@Subject,
@OriginalXml
)
END
GO
--------------2:
CREATE PROCEDURE
sp_RemovePublication
@Publication UNIQUEIDENTIFIER
AS
BEGIN
DELETE FROM
Publications
WHERE Publication
= @Publication
END
GO
-----------------3:
CREATE PROCEDURE
sp_ProcessPublicationRequest
@Conversation UNIQUEIDENTIFIER,
@Message VARBINARY(MAX)
AS
BEGIN
DECLARE @Request
XML;
DECLARE @Subject
NVARCHAR(MAX);
SELECT @Request
= CAST(@Message
AS XML);
WITH XMLNAMESPACES(DEFAULT
'http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe')
SELECT @Subject
= @Request.value(N'(//Publish/Subject)[1]',
N'NVARCHAR(MAX)');
IF (@Subject
IS NOT
NULL)
BEGIN
EXEC sp_PublishPublication@Conversation,
@Subject,
@Message;
END
ELSE
BEGIN
END CONVERSATION
@Conversation
WITH ERROR
= 1
DESCRIPTION =
N'The publication is missing a subject';
EXEC sp_RemovePublication@Conversation;
END
END
GO
-------------------4:
CREATE PROCEDURE
sp_SendOnPublication
@Publication UNIQUEIDENTIFIER,
@Article VARBINARY(MAX)
AS
BEGIN
DECLARE @Subscription
UNIQUEIDENTIFIER;
DECLARE @cursorSubscriptions
CURSOR;
SET @cursorSubscriptions
= CURSOR
LOCAL SCROLL FOR
SELECT Subscriber
FROM Subscriptions
s
JOIN Publications
p ON
s.Subject
= p.Subject
WHERE p.Publication
= @Publication;
BEGIN TRANSACTION;
OPEN @cursorSubscriptions;
FETCH NEXT
FROM @cursorSubscriptions
INTO @Subscription;
WHILE (@@fetch_status
= 0)
BEGIN
IF (@Article
IS NOT
NULL)
BEGIN
-----------------使用游标发送用户从AuthorService
服务中匹配的消息
SEND ON
CONVERSATION @Subscription
MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage](@Article);
END
ELSE
BEGIN
SEND ON
CONVERSATION @Subscription
MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage];
END
FETCH NEXT
FROM @cursorSubscriptions
INTO @Subscription;
END
CLOSE @cursorSubscriptions;
DEALLOCATE @cursorSubscriptions;
COMMIT;
END
GO
------------------5:
CREATE PROCEDURE
sp_RemoveSubscriber
@Subscriber UNIQUEIDENTIFIER
AS
BEGIN
DELETE FROM
Subscriptions
WHERE Subscriber=
@Subscriber
END
GO
-------------6:
CREATE PROCEDURE
sp_PublishSubscriber
@Subscriber UNIQUEIDENTIFIER,
@Subject NVARCHAR(MAX),
@OriginalXml XML
AS
BEGIN
INSERT INTO
Subscriptions(Subscriber,
Subject,
OriginalXml)
VALUES
(
@Subscriber,
@Subject,
@OriginalXml
)
END
GO
------------7:
CREATE PROCEDURE
sp_ProcessSubscriptionRequest
@Conversation UNIQUEIDENTIFIER,
@Message VARBINARY(MAX)
AS
BEGIN
DECLARE @Request
XML;
DECLARE @Subject
NVARCHAR(MAX);
SELECT @Request
= CAST(@Message
AS XML);
WITH XMLNAMESPACES(DEFAULT
'http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe')
SELECT @Subject
= @Request.value(N'(//Request/Subject)[1]',
N'NVARCHAR(MAX)');
IF (@Subject
IS NOT
NULL)
BEGIN
EXEC sp_PublishSubscriber@Conversation,
@Subject,
@Message;
END
ELSE
BEGIN
END CONVERSATION
@Conversation
WITH ERROR
= 1
DESCRIPTION =
N'The subscriber is missing a subject';
EXEC sp_RemoveSubscriber@Conversation;
END
END
GO
---------------------8:
CREATE PROCEDURE
sp_PublisherService
AS
BEGIN
DECLARE @Conversation
UNIQUEIDENTIFIER;
DECLARE @Message
VARBINARY(MAX);
DECLARE @MessageTypeName
SYSNAME;
BEGIN TRANSACTION;
WAITFOR
(
RECEIVE TOP(1)
@Conversation =
conversation_handle,
@Message =
message_body,
@MessageTypeName =
message_type_name
FROM PublisherQueue
), TIMEOUT 1000;
WHILE (@Conversation
IS NOT
NULL)
BEGIN
IF (@MessageTypeName
= 'http://ssb.csharp.at/SSB_Book/c10/PublishMessage')
BEGIN
EXEC sp_ProcessPublicationRequest
@Conversation,
@Message;
END
ELSE IF (@MessageTypeName
= 'http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage')
BEGIN
EXEC sp_ProcessSubscriptionRequest
@Conversation,
@Message;
END
ELSE IF (@MessageTypeName
=
'http://ssb.csharp.at/SSB_Book/c10/ArticleMessage')
BEGIN
EXEC sp_SendOnPublication
@Conversation,
@Message;
END
ELSE IF (@MessageTypeName
IN(
N'http://schemas.microsoft.com/SQL/ServiceBroker/Error',
N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'))
BEGIN
END CONVERSATION
@Conversation;
IF (EXISTS(SELECT
* FROM
Publications
WHERE Publication
= @Conversation))
BEGIN
EXEC sp_RemovePublication
@Conversation;
END
IF (EXISTS(SELECT
* FROM
Subscribers))
BEGIN
EXEC sp_RemoveSubscriber
@Conversation;
END
END
ELSE
BEGIN
-- Unexpected message
RAISERROR (N'Received unexpected message type: %s', 16, 1,
@MessageTypeName);
ROLLBACK;
RETURN;
END
COMMIT;
SELECT @Conversation
= NULL;
BEGIN TRANSACTION;
WAITFOR
(
RECEIVE TOP(1)
@Conversation =
conversation_handle,
@Message =
message_body,
@MessageTypeName =
message_type_name
FROM PublisherQueue
), TIMEOUT 1000;
END
COMMIT;
END
GO
----------------9:
--------------订阅方请求订阅脚本:
DECLARE @ch
UNIQUEIDENTIFIER;
BEGIN DIALOG
CONVERSATION @ch
FROM SERVICE
[SubscriberService2]
TO SERVICE
'PublisherService'
ON CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]
WITH ENCRYPTION
= OFF;
SEND ON
CONVERSATION @ch
MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]
(
N'<?xml version="1.0"?>
<Request xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">
<Subject>Subject2</Subject>
</Request>'
);
GO
四:发布消息:
--------执行AuthorService服务就可以发送article消息给PublisherService进行并发;
---------[AuthorService] 发送PublishMessage消息:
DECLARE @ch
UNIQUEIDENTIFIER;
BEGIN DIALOG
CONVERSATION @ch
FROM SERVICE
[AuthorService]
TO SERVICE
'PublisherService'
ON CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/PublishContract]
WITH ENCRYPTION
= OFF;
SEND ON
CONVERSATION @ch
MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/PublishMessage]
(
N'<?xml version="1.0"?>
<Publish xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">
<Subject>Subject1</Subject>
</Publish>'
);
--------向'PublisherService'服务所在队列PublisherQueue中新增一条记录,执行dbo.sp_PublisherService存储过程后接收消息,
-----调用sp_ProcessPublicationRequest
存储过程,并持久化到Publications中
select *
from dbo.PublisherQueue;
select *
from Publications;
![](http://img.my.csdn.net/uploads/201301/25/1359084989_5839.png)
五:订阅消息
-----------1:订阅者通过订阅PublisherService服务接收消息,
----------------SubscriberService1订购主题Subject1的消息----》PublisherService
DECLARE @ch
UNIQUEIDENTIFIER;
BEGIN DIALOG
CONVERSATION @ch
FROM SERVICE
[SubscriberService1]
TO SERVICE
'PublisherService'
ON CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]
WITH ENCRYPTION
= OFF;
SEND ON
CONVERSATION @ch
MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]
(
N'<?xml version="1.0"?>
<Request xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">
<Subject>Subject1</Subject>
</Request>'
);
GO
----------------或者SubscriberService2订购主题Subject2的消息----》PublisherService
DECLARE @ch
UNIQUEIDENTIFIER;
BEGIN DIALOG
CONVERSATION @ch
FROM SERVICE
[SubscriberService2]
TO SERVICE
'PublisherService'
ON CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/SubscribeContract]
WITH ENCRYPTION
= OFF;
SEND ON
CONVERSATION @ch
MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]
(
N'<?xml version="1.0"?>
<Request xmlns="http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe">
<Subject>Subject2</Subject>
</Request>'
);
GO
-----向'PublisherService'服务所在队列PublisherQueue中新增一条记录,执行dbo.sp_PublisherService存储过程后接收消息,
-----调用sp_ProcessSubscriptionRequest存储过程,并持久化到Subscriptions中
select *
from dbo.PublisherQueue;
select *
from Subscriptions;
![](http://img.my.csdn.net/uploads/201301/25/1359085006_7372.png)
六:
-----执行AuthorService服务就可以发送article消息给PublisherService进行并发
----发送特定主题的article消息:
DECLARE @ch
UNIQUEIDENTIFIER;
BEGIN DIALOG
CONVERSATION @ch
FROM SERVICE
[AuthorService]
TO SERVICE
'PublisherService'
ON CONTRACT
[http://ssb.csharp.at/SSB_Book/c10/PublishContract]
WITH ENCRYPTION
= OFF;
SEND ON
CONVERSATION @ch
MESSAGE TYPE
[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]
(
N'This is an article on Subject2'
);
----向'PublisherService'服务所在队列PublisherQueue中新增一条记录,
----执行dbo.sp_PublisherService存储过程后接收消息,
-----调用sp_SendOnPublication存储过程分发该消息给所有匹配的订阅者
select *
from dbo.PublisherQueue;
六:应用服务JDBC调用“发送消息存储过程”、“接收消息存储过程”,供参考:
cStmt = con.prepareCall("{call [dbo].[ sp_PublisherService]");
七:测试响应时间间隔
1:发送消息测试
通过登录数据库验证sql如下:
2:接收消息测试
3:涉及SQL验证:
select *
from SubscriberQueue1;
select *
from SubscriberQueue2;
select *
from dbo.PublisherQueue;
select *
from dbo.AuthorQueue;
----发布后,订阅后都持久到表中,查阅如下
select *
from dbo.Publications;
select *
from dbo.Subscriptions;
4:原理架构图
![](http://img.my.csdn.net/uploads/201301/25/1359085027_4610.png)
相关文章推荐
- Shuttle ESB(四)——发布订阅模式实例介绍(1)
- Spring基于事件驱动模型的订阅发布模式代码实例详解
- Spring基于事件驱动模型的订阅发布模式代码实例详解
- Spring基于事件驱动模型的订阅发布模式代码实例详解
- JMS发送和接收实例-发布/订阅模式
- JavaScript中发布/订阅模式的简单实例
- Spring基于事件驱动模型的订阅发布模式代码实例详解
- JavaScript中发布/订阅模式的简单实例
- Shuttle ESB(五)——发布订阅模式实例实现(2)
- node.js 发布订阅模式的实例
- JavaScript中发布/订阅模式的简单实例
- js 发布订阅模式的实例讲解
- Spring基于事件驱动模型的订阅发布模式代码实例详解
- zeroMQ初体验-2.发布订阅模式(pub/sub)
- javascript设计模式——发布订阅模式
- Kafka下的生产消费者模式与订阅发布模式
- RabbitMQ消息分发模式----"Publish/Subscribe"发布/订阅模式
- 发布订阅模式
- 观察者(发布——订阅)模式
- redis的发布订阅模式