您的位置:首页 > 其它

Comet主动推送技术在项目中的使用

2015-03-22 18:27 197 查看
原文地址:/article/3848641.html

Comet是一种服务器端推的技术,所谓服务器端推也就是当有事件要通知给某个用户的时候,是由服务器端直接发送到用户的浏览器。

服务器端Push目前一般有两种方式,HTTP streaming和Long polling。详细的介绍可以看这里 http://en.wikipedia.org/wiki/Push_technology
有一个Comet的框架叫做Cometd,使用的方式为Long polling。它是使用了jetty continuations特性,jetty continuations使得异步的request成为可能,这里我们来讨论下为何需要jetty continuations呢?

比如我们的浏览器的一个请求发送到服务器端了,并进行长轮询,保持了连接不结束,直到一次长轮询timeout或者有事件发生,并接收到服务端推来的消息,所以在一次长轮询的过程中,大部分时间都是在等待,如果使用老式同步的方式进行编程的话,那么有多少个连接就需要多少个线程在那里,而大都数都是在等待,所以这无疑是系统资源的巨大浪费。

jetty continuations很好的解决了这一问题,当有请求过来之后,将连接的相关信息封装到一个continuation的对象中,通过调用continuation的suspend方法,然后返回,把当前线程交还到线程池,所以这个时候线程可以返回到线程池等待并处理其他新的请求。

当有事件要发给之前的某个请求的时候,再调用对应的continuation的resume方法,将原来的哪个请求重新发送到servelt进行处理,并将消息发送给客户端,然后客户端会重新进行一次长轮询。

Jetty是一个纯java实现的非常轻量级的web容器,高度组件化,可以很方便的将各种组件进行组装,而且可以非常容易的将jetty嵌入到自己的应用中。

jetty运行时的核心类是Server类,这个类的配置一般在jetty.xml中配置,然后jetty自带的一个简单的ioc容器将server加载初始化。

下图主要描述了Jetty在NIO的模式下工作的情形,这里只说到将任务分配到ThreadPool,后面的ThreadPool的处理没有说,大家可以去看下源码。



在jetty中,web容器启动是从Server开始的,一个Server可以对应多个Connector,从名字就可以知道,Connector是来处理外部连接的,Connector的实现有多种,即可以是非阻塞的(如SelectChannelConnector),也可以是阻塞的(如BlockingChannelConnector,当然jetty中这个阻塞的已经使用nio优化过,性能应该比使用java io实现的好),

我们不能直接说谁的性能好,谁的性能不好,关键还是看应用场景,因为NIO实现的非阻塞的话,doSelect的过程是阻塞的。所以当并发量小,且请求可以快速得到响应的话,用阻塞的就可以很好的满足了,但是当并发量很大,且后端资源紧张,请求需要等待很长一段时间的(比如长轮询),那么NIO的性能肯定必传统的高很多很多倍。

这里稍微讲一下NIO的概念把,在NIO的Scoket通讯模型中,一个socket连接对应一个SocketChannel,SocketChannel可以将某个事件注册到某一个Selector上,然后对Selector进行select操作,当有请求来的时候,并可以通过Selector的selectedKeys()获得所有收到事件的channel,然后便可以对channel进行操作了。这个其实和linux中的select函数类似,只不过这里是面向对象的,在linux中,我们将需要监听的sockt连接加入到一个文件描述符的集合中FD_SET中,然后select函数对这个集合进行检测,根据得到的结果来判断某个fd对应的标志位是否为1来判断是否有数据。这样也就是一个线程可以同事处理多个连接。

换话题了,我们都知道请求最终都是在Servlet中被处理的,而Servlet得到的是request,response,这些对象什么时候出来的呢?不急,上面不是说到一个EndPoint(实现了Runnable接口)EndPoint对象在被初始化的时候就对其_connection成员进行了初始化,生成一个HttpConnection对象,newConnection的方法其实在SelectChannelConnector中被覆盖了。然后这个EndPoint对象不是被分配到ThreadPool了么,ThreadPool将其加入到队列中,当有空闲线程的时候,就对这个endPoint对象进行处理了,运行EndPoint的run方法,然后会调用自己的connection对象的handle方法,最终将connection对象交给Server的handler进行处理。Server本身继承自HandlerWrapper,自己的_handler是一个HandlerCollection的实例,HandlerCollection实例的配置在jetty.xml中有配置,在处理httpconnection对象的时候所配置的handler会依次被执行。

DefaultHandler中就涉及到上下文处理,然后交给各个项目的servlet进行处理。

环境配置方法:

服务器端:

类库清单:WEB-INF/lib

jetty-6.1.9.jar

jetty-util-6.1.9.jar

servlet-api-2.5-6.1.9.jar

(以上Jetty服务器自带)

cometd-api-0.9.20080221.jar

cometd-bayeux-6.1.9.jar

web.xml配置:

<!--
配置ContinuationCometdServlet, 这个是必须的。配置后就可以支持comted -->

<
servlet >

<
servlet-name>
cometd </
servlet-name >

<
servlet-class>
org.mortbay.cometd.continuation.ContinuationCometdServlet</
servlet-class>

<!--
对队列的内容进行过滤 -->

<
init-param>

<
param-name>
filters </
param-name >

<
param-value>
/WEB-INF/filters.json</
param-value>

</
init-param>

<!--
超时设置The server side poll timeout in milliseconds (default 250000). This is how long the server will

hold a reconnect request before responding. -->

<
init-param>

<
param-name>
timeout </
param-name >

<
param-value>
120000 </
param-value >

</
init-param>

<!--
The client side poll timeout in milliseconds (default 0). How long a client will wait between

reconnects -->

<
init-param>

<
param-name>
interval </
param-name >

<
param-value>
0 </
param-value >

</
init-param>

<!--
the client side poll timeout if multiple connections are detected from the same browser

(default 1500). -->

<
init-param>

<
param-name>
multiFrameInterval</
param-name>

<
param-value>
1500 </
param-value >

</
init-param>

<!--
0=none, 1=info, 2=debug -->

<
init-param>

<
param-name>
logLevel </
param-name >

<
param-value>
0 </
param-value >

</
init-param>

<!--
If "true" then the server will accept JSON wrapped in a comment and will generate JSON wrapped

in a comment. This is a defence against Ajax Hijacking.
-->

<
init-param>

<
param-name>
JSONCommented</
param-name>

<
param-value>
true </
param-value >

</
init-param>

<
init-param>

<
param-name>
alwaysResumePoll</
param-name>

<
param-value>
false </
param-value >
<!-- use true for x-site cometd
-->

</
init-param>

<
load-on-startup>
1 </
load-on-startup >

</
servlet >

<
servlet-mapping>

<
servlet-name>
cometd </
servlet-name >

<
url-pattern>
/cometd/*</
url-pattern>

</
servlet-mapping>

filters.json内容如下:

格式如下:

{

"channels": "/**", --要过滤的队列(支持通配符)

"filter":"org.mortbay.cometd.filter.NoMarkupFilter", --使用的过滤器,实现接口dojox.cometd.DataFilter

"init" : {} --初始化的值,调用 DataFilter.init方法传入

}

示例内容如下:

[

{

" channels"
: "
/** ",

"
filter "
: "org.mortbay.cometd.filter.NoMarkupFilter
" ,

"
init "
: {}

} ,

{

" channels
" : "/chat/*
" ,

"
filter "
: "org.mortbay.cometd.filter.RegexFilter
" ,

"
init "
: [

[ "[fF
] .ck ",
" dang
" ],

[ "
teh " ,"
the "
]

]

},

{

" channels"
: "
/chat/** " ,

" filter"
: "
org.mortbay.cometd.filter.RegexFilter"
,

" init"
: [

[ "
[ Mm
] icrosoft
" , "Micro\\$oft
" ],

[ "
.*tomcat.* " , null ]

]

}

]

这时,服务器端的配置就已经完成的,基本的cometd功能就可以使用了。

客户端通过dojox.cometd.init("http://127.0.0.2:8080/cometd");就可以进行连接。

代码开发:

接下来,我们要准备客户端(使用dojo来实现)

一共三个文件

index.html

chat.js

chat.css(不是必须)

下面来看一下这两个文件的内容(加入注释)

index.html

<html
>

<head
>

<
title >
Cometd chat </
title >

<
script type
="text/javascript"
src ="../dojo/dojo/dojo.js"
></ script
> <!-- dojo类库 -->

<
script type
="text/javascript"
src ="../dojo/dojox/cometd.js.uncompressed.js"></
script >
<!-- dojo-cometd类库 -->

<
script type
="text/javascript"
src ="chat.js"
></ script
> <!-- chat js文件,控制cometd的连接,消息的发送与接收 -->

<
link rel
="stylesheet" type
="text/css" href
="chat.css" >

</head
>

<body
>

<h1
> Cometd Chat
</ h1
>

<div
id ="chatroom"
>

<
div id
="chat" ></
div >

<
div id
="input" >

<
div id
="join" >
<!-- 未登录时,显示的登录名和登录按钮
-->

Username: 
< input
id ="username" type
="text"/>

< input id="joinB"
class="button"
type="submit"
name="join"
value="Join"
/>

</
div >

<
div id
="joined" class
="hidden" >
<!-- 登录后,显示的消息框和发送,退出按钮(默认为隐藏) -->

Chat: 
< input
id ="phrase" type
="text"></
input >

<
input id
="sendB" class
="button" type
="submit" name
="join" value
="Send" />

<
input id
="leaveB" class
="button" type
="submit" name
="join" value
="Leave" />

</
div >

</
div >

</
div >

</body
>

chat.js文件

1 //
引入所需要的类

2 dojo.require(
"dojox.cometd
");

3 dojo.require("
dojox.cometd.timestamp"
);

4

5 //
定义一个room类

6 var
room =
{

7 //
定义属性

8 _last:
"",
// 最后发送消息的人员(如果不是本人,则显示为空)

9 _username:
null,
// 当前的用户名

10 _connected:
true,
// 当前的连接状态 true已经连接, false表示未连接

11 groupName:
"whimsical
", //
组名(未知)

12

13
// 登录操作

14 join:
function(name){

15

16
if(name
== null
||
name.length==
0 ){

17 alert('Please enter a username!
');

18 }else
{

19

20 dojox.cometd.init(

new String(document.location).replace(/
http:\ /
\ /[
^ \
/ ]*/
,'').replace(/
\ /
examples\ /
. *$
/ ,'')
+ "/cometd
" );

21 //
dojox.cometd.init("http://127.0.0.2:8080/cometd");

22
this._connected
=
true ;

23

24
this._username
= name;

25 dojo.byId('join').className=
'hidden';

26 dojo.byId('joined').className=
'';

27 dojo.byId('phrase').focus();

28

29
// subscribe and join

30 dojox.cometd.startBatch();

31 dojox.cometd.subscribe("
/chat/demo"
, room, "
_chat ", { groupName:
this.groupName});

32 dojox.cometd.publish("
/chat/demo"
, {

33 user: room._username,

34 join: true
,

35 chat : room._username+
" has joined
"

36 }, { groupName:
this .groupName });

37 dojox.cometd.endBatch();

38

39
// handle cometd failures while in the room

40 room._meta
= dojo.subscribe(
" /cometd/meta
" ,
this, function(event){

41 console.debug(event);

42 if
(event.action ==
"
handshake " ){

43 room._chat({ data: {

44 join: true
,

45 user:"
SERVER "
,

46 chat:"
reinitialized"

47 } });

48 dojox.cometd.subscribe("
/chat/demo"
, room, "
_chat ", { groupName:
this.groupName });

49 }else
if
(event.action ==
"connect
" ){

50 if
(event.successful &&
!
this ._connected){

51 room._chat({ data: {

52 leave: true
,

53 user: "
SERVER "
,

54 chat: "
reconnected!"

55 } });

56 }

57 if
( !
event.successful &&
this._connected){

58 room._chat({ data: {

59 leave: true
,

60 user: "
SERVER "
,

61 chat: "
disconnected!"

62 } });

63 }

64 this
._connected =
event.successful;

65 }

66 }, {groupName: this
.groupName });

67 }

68 },

69

70
// 离开操作

71 leave:
function(){

72 if
( !
room._username){

73 return
;

74 }

75

76
if(room._meta){

77 dojo.unsubscribe(room._meta, null
, null
, { groupName: this .groupName });

78 }

79 room._meta=
null ;

80

81 dojox.cometd.startBatch();

82 dojox.cometd.unsubscribe("
/chat/demo"
, room, "
_chat ", { groupName:
this.groupName });

83 dojox.cometd.publish("
/chat/demo"
, {

84 user: room._username,

85 leave: true
,

86 chat : room._username+
" has left
"

87 }, { groupName:
this .groupName });

88 dojox.cometd.endBatch();

89

90
// switch the input form

91 dojo.byId('join').className
= '';

92 dojo.byId('joined').className=
'hidden';

93 dojo.byId('username').focus();

94 room._username =
null
;

95 dojox.cometd.disconnect();

96 },

97

98
// 发送消息

99 chat:
function(text){

100 if
( !
text ||
! text.length){

101 return
false
;

102 }

103 dojox.cometd.publish("
/chat/demo"
, { user: room._username, chat: text}, { groupName: this
.groupName });

104 },

105

106
// 从服务器收到消息后,回调的方法

107 _chat:
function(message){

108 var
chat =
dojo.byId('chat');

109 if
( !
message.data){

110 console.debug("
bad message format "
+ message);

111 return
;

112 }

113 var
from =
message.data.user;

114 var
special=
message.data.join || message.data.leave;

115 var
text =
message.data.chat;

116 if
( !
text){ return; }

117

118
if(
! special
&& from ==
room._last ){

119 from=
"


" ;

120 }else
{

121 room._last=
from;

122 from+=
" :
" ;

123 }

124

125
if(special){

126 chat.innerHTML +=
"
<span class=\ "
alert\ "><span class=\
"from\
" > "
+from
+ "  

</span><span class=\ "
text\ " >"
+ text
+ "</span></span><br/>
" ;

127 room._last=
"" ;

128 }else
{

129 chat.innerHTML +=
"
<span class=\ "
from\ ">
" +
from +"
 </span><span class=\"
text\ "
> "+
text +
" </span><br/>"
;

130 }

131 chat.scrollTop =
chat.scrollHeight -
chat.clientHeight;

132 },

133

134 //
初始操作

135 _init:
function(){

136 dojo.byId('join').className=
'';

137 dojo.byId('joined').className=
'hidden';

138 dojo.byId('username').focus();

139

140
var element
= dojo.byId('username');

141 element.setAttribute("
autocomplete"
, "
OFF ");

142 dojo.connect(element, "
onkeyup "
, function(e){
// 支持回车,登录

143
if(e.keyCode
== dojo.keys.ENTER){

144 room.join(dojo.byId('username').value);

145 return
false
;

146 }

147 return
true
;

148 });

149

150 dojo.connect(dojo.byId('joinB'),
" onclick
", function(e){
// 绑定 room.join方法到 Join按扭

151 room.join(dojo.byId('username').value);

152 e.preventDefault();

153 });

154

155 element
=dojo.byId('phrase');
// 取得消息框

156 element.setAttribute(
" autocomplete
" ,
"OFF "
);

157 dojo.connect(element, "
onkeyup "
, function(e){
// 支持回车发送消息

158
if(e.keyCode
== dojo.keys.ENTER){

159 room.chat(dojo.byId('phrase').value);

160 dojo.byId('phrase').value=
'';

161 e.preventDefault();

162 }

163 });

164

165 dojo.connect(dojo.byId('sendB'),
" onclick
", function(e){
// 绑定 room.chat方法到 sendB按扭

166 room.chat(dojo.byId('phrase').value);

167 dojo.byId('phrase').value=
'';

168 });

169 dojo.connect(dojo.byId('leaveB'), "
onclick "
, room, "
leave ");
// 绑定 room.leave方法到 leaveB按扭

170 }

171 };

172

173 //
页面装载时,调用room._init方法

174 dojo.addOnLoad(room,
" _init
");

175 //页面关闭时,调用 room.leave方法

176 dojo.addOnUnload(room,
" leave
");

177

178 //
vim:ts=4:noet:

补充:服务器端如何监控消息队列,以及进行订阅,发送消息操作

要进行 监控消息队列,以及进行订阅,发送消息操作的关键就是取得 Bayeux接口实现类 的实例

可以通过 ServletContextAttributeListener 这个监听器接口,通过attributeAdded方式加入

实现方法如下:

1 public
class
BayeuxStartupListener implements
ServletContextAttributeListener

2 {

3 public
void
initialize(Bayeux bayeux)

4 {

5 synchronized
(bayeux)

6 {

7 if
( !
bayeux.hasChannel( "
/service/echo " ))

8 {

9 //
取得 bayeux实例

10 }

11 }

12 }

13

14 public
void
attributeAdded(ServletContextAttributeEvent scab)

15 {

16 if
(scab.getName().equals(Bayeux.DOJOX_COMETD_BAYEUX))

17 {

18 Bayeux bayeux=
(Bayeux) scab.getValue();

19 initialize(bayeux);

20 }

21 }

22

23
public
void attributeRemoved(ServletContextAttributeEvent scab)

24 {

25

26 }

27

28
public
void attributeReplaced(ServletContextAttributeEvent scab)

29 {

30

31 }

32 }

取到 Bayeux实例后,就可以借助BayeuxService类帮我们实现消息队列的监听,订阅消息以及发送消息

1
public
void initialize(Bayeux bayeux)

2 {

3 synchronized
(bayeux)

4 {

5 if
( !
bayeux.hasChannel( "
/service/echo " ))

6 {

7 //
取得 bayeux实例

8
new ChatService(bayeux);

9 }

10 }

11 }

具体方法请看下面这段代码:

1 //
定义 ChatService类,继承 BayeuxService

2 public
static
class ChatService
extends BayeuxService {

3

4 ConcurrentMap
< String,Set
< String
>> _members
= new
ConcurrentHashMap
< String,Set
< String >>
();

5

6 public
ChatService(Bayeux bayeux)

7 {

8 super
(bayeux, "
chat " );
//必须,把 Bayeux传入到 BayeuxService对象中

9 subscribe(
" /chat/**
" , "trackMembers
");
// 订阅队列,收到消息后,会回调trackMembers方法

10
/*

11 subscribe支持回调的方法如下:

12 # myMethod(Client fromClient, Object data)

13 # myMethod(Client fromClient, Object data, String id)

14 # myMethod(Client fromClient, String channel, Object data,String id)

15 # myMethod(Client fromClient, Message message)

16

17 参数:

18 Client fromClient 发送消息的客户端

19 Object data 消息内容

20 id The id of the message

21 channel 队列名称

22 Message message 消息对象。继承于Map

23

24 */

25 }

26

27 //
发布消息到队列

28
public
void sendMessage(String message) {

29 Map<
String,Object>
mydata =
new HashMap
< String, Object
> ();

30 mydata.put("
chat "
, message);

31

32 Client sender =
getBayeux().newClient("
server "
);

33

34 getBayeux().getChannel("
/chat/demo"
, false
).publish(sender, mydata, "
0 "/*
null */
);

35

36 }

37

38 //
发送消息给指定的client(非广播方式)

39
public
void sendMessageToClient(Client joiner, String message) {

40 Map<
String,Object>
mydata =
new HashMap
< String, Object
> ();

41 mydata.put("
chat "
, message);

42

43 send(joiner, "
/chat/demo"
, mydata, "
0 "
/* null*/
);

44 }

45

46 //
订阅消息回调方法

47
public
void trackMembers(Client joiner, String channel, Map
< String,Object
> data, String id)

48 {

49 //
解释消息内容,如果消息内容中 有 join这个字段且值为true

50
if (Boolean.TRUE.equals(data.get(
" join
")))

51 {

52 //
根据队列,取得当前登录的人员

53 Set
<String
> m =
_members.get(channel);

54 if
(m ==
null )

55 {

56 //
如果为空,则创建一个新的Set实现

57 Set
< String
> new_list
=new CopyOnWriteArraySet<
String >
();

58 m=
_members.putIfAbsent(channel,new_list);

59 if
(m ==
null )

60 m=
new_list;

61 }

62

63 final
Set <
String > members
= m;

64 final
String username=
(String)data.get("
user "
);

65

66 members.add(username);

67 //
为该client增加事件,Remove事件。当用户退出时,触发该方法。

68 joiner.addListener(
new RemoveListener(){

69 public
void
removed(String clientId, boolean
timeout)

70 {

71 members.remove(username);

72 }

73 });

74

75
// 为该client增加事件,消息的发送和接收事件。当用户退出时,触发该方法。

76 joiner.addListener(
new MessageListener() {

77 public
void
deliver(Client fromClient, Client toClient, Message message) {

78 System.out.println("
message from "
+
fromClient.getId() +
" to
"

79
+ toClient.getId()
+
" message is
" +
message.getData());

80 }

81 });

82

83 Map
<String,Object
> mydata
= new
HashMap<
String, Object> ();

84 mydata.put("
chat "
, "members=
"
+ members);

85 //
把已经登录的人员信息列表,发送回给消息发送者

86 send(joiner,channel,mydata,id);

87

88 }

89 }

90
}

参考:http://www.ibm.com/developerworks/cn/web/wa-lo-w2fpak-comet/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: