Comet技术在项目中的使用
2014-12-10 15:24
141 查看
Comet是一种服务器端推的技术,所谓服务器端推也就是当有事件要通知给某个用户的时候,是由服务器端直接发送到用户的浏览器。
服务器端Push目前一般有两种方式,HTTP streaming和Long polling。详细的介绍可以看这里 http://en.wikipedia.org/wiki/Push_technology
有一个Comet的框架叫做Cometd,使用的方式为Long polling。它是使用了jetty continuations特性,jetty continuations使得异步的request成为可能,这里我们来讨论下为何需要jetty continuations呢?
比如我们的浏览器的一个请求发送到服务器端了,并进行长轮询,保持了连接不结束,直到一次长轮询timeout或者有事件发生,并接收到服务端推来的消息,所以在一次长轮询的过程中,大部分时间都是在等待,如果使用老式同步的方式进行编程的话,那么有多少个连接就需要多少个线程在那里,而大都数都是在等待,所以这无疑是系统资源的巨大浪费。
jetty continuations很好的解决了这一问题,当有请求过来之后,将连接的相关信息封装到一个continuation的对象中,通过调用continuation的suspend方法,然后返回,把当前线程交还到线程池,所以这个时候线程可以返回到线程池等待并处理其他新的请求。
当有事件要发给之前的某个请求的时候,再调用对应的continuation的resume方法,将原来的哪个请求重新发送到servelt进行处理,并将消息发送给客户端,然后客户端会重新进行一次长轮询。
Jetty是一个纯java实现的非常轻量级的web容器,高度组件化,可以很方便的将各种组件进行组装,而且可以非常容易的将jetty嵌入到自己的应用中。
jetty运行时的核心类是Server类,这个类的配置一般在jetty.xml中配置,然后jetty自带的一个简单的ioc容器将server加载初始化。
下图主要描述了Jetty在NIO的模式下工作的情形,这里只说到将任务分配到ThreadPool,后面的ThreadPool的处理没有说,大家可以去看下源码。
![](http://geeklu.com/wp-content/uploads/2010/07/jetty-server.jpg)
在jetty中,web容器启动是从Server开始的,一个Server可以对应多个Connector,从名字就可以知道,Connector是来处理外部连接的,Connector的实现有多种,即可以是非阻塞的(如SelectChannelConnector),也可以是阻塞的(如BlockingChannelConnector,当然jetty中这个阻塞的已经使用nio优化过,性能应该比使用java io实现的好),
我们不能直接说谁的性能好,谁的性能不好,关键还是看应用场景,因为NIO实现的非阻塞的话,doSelect的过程是阻塞的。所以当并发量小,且请求可以快速得到响应的话,用阻塞的就可以很好的满足了,但是当并发量很大,且后端资源紧张,请求需要等待很长一段时间的(比如长轮询),那么NIO的性能肯定必传统的高很多很多倍。
这里稍微讲一下NIO的概念把,在NIO的Scoket通讯模型中,一个socket连接对应一个SocketChannel,SocketChannel可以将某个事件注册到某一个Selector上,然后对Selector进行select操作,当有请求来的时候,并可以通过Selector的selectedKeys()获得所有收到事件的channel,然后便可以对channel进行操作了。这个其实和linux中的select函数类似,只不过这里是面向对象的,在linux中,我们将需要监听的sockt连接加入到一个文件描述符的集合中FD_SET中,然后select函数对这个集合进行检测,根据得到的结果来判断某个fd对应的标志位是否为1来判断是否有数据。这样也就是一个线程可以同事处理多个连接。
换话题了,我们都知道请求最终都是在Servlet中被处理的,而Servlet得到的是request,response,这些对象什么时候出来的呢?不急,上面不是说到一个EndPoint(实现了Runnable接口)EndPoint对象在被初始化的时候就对其_connection成员进行了初始化,生成一个HttpConnection对象,newConnection的方法其实在SelectChannelConnector中被覆盖了。然后这个EndPoint对象不是被分配到ThreadPool了么,ThreadPool将其加入到队列中,当有空闲线程的时候,就对这个endPoint对象进行处理了,运行EndPoint的run方法,然后会调用自己的connection对象的handle方法,最终将connection对象交给Server的handler进行处理。Server本身继承自HandlerWrapper,自己的_handler是一个HandlerCollection的实例,HandlerCollection实例的配置在jetty.xml中有配置,在处理httpconnection对象的时候所配置的handler会依次被执行。
DefaultHandler中就涉及到上下文处理,然后交给各个项目的servlet进行处理。
环境配置方法:
服务器端:
类库清单:WEB-INF/lib
jetty-6.1.9.jar
jetty-util-6.1.9.jar
servlet-api-2.5-6.1.9.jar
(以上Jetty服务器自带)
cometd-api-0.9.20080221.jar
cometd-bayeux-6.1.9.jar
web.xml配置:
<!-- 配置ContinuationCometdServlet, 这个是必须的。配置后就可以支持comted
-->
< servlet
>
< servlet-name
> cometd
</ servlet-name
>
< servlet-class
> org.mortbay.cometd.continuation.ContinuationCometdServlet
</ servlet-class
>
<!-- 对队列的内容进行过滤
-->
< init-param
>
< param-name
> filters
</ param-name
>
< param-value
> /WEB-INF/filters.json
</ param-value
>
</ init-param
>
<!-- 超时设置The server side poll timeout in milliseconds (default 250000). This is how long the server will
hold a reconnect request before responding. -->
< init-param
>
< param-name
> timeout
</ param-name
>
< param-value
> 120000
</ param-value
>
</ init-param
>
<!-- The client side poll timeout in milliseconds (default 0). How long a client will wait between
reconnects -->
< init-param
>
< param-name
> interval
</ param-name
>
< param-value
> 0
</ param-value
>
</ init-param
>
<!-- the client side poll timeout if multiple connections are detected from the same browser
(default 1500). -->
< init-param
>
< param-name
> multiFrameInterval
</ param-name
>
< param-value
> 1500
</ param-value
>
</ init-param
>
<!-- 0=none, 1=info, 2=debug
-->
< init-param
>
< param-name
> logLevel
</ param-name
>
< param-value
> 0
</ param-value
>
</ init-param
>
<!-- If "true" then the server will accept JSON wrapped in a comment and will generate JSON wrapped
in a comment. This is a defence against Ajax Hijacking.
-->
< init-param
>
< param-name
> JSONCommented
</ param-name
>
< param-value
> true
</ param-value
>
</ init-param
>
< init-param
>
< param-name
> alwaysResumePoll
</ param-name
>
< param-value
> false
</ param-value
> <!--
use true for x-site cometd
-->
</ init-param
>
< load-on-startup
> 1
</ load-on-startup
>
</ servlet
>
< servlet-mapping
>
< servlet-name
> cometd
</ servlet-name
>
< url-pattern
> /cometd/*
</ url-pattern
>
</ servlet-mapping
>
filters.json内容如下:
格式如下:
{
"channels": "/**", --要过滤的队列(支持通配符)
"filter":"org.mortbay.cometd.filter.NoMarkupFilter", --使用的过滤器,实现接口dojox.cometd.DataFilter
"init" : {} --初始化的值,调用 DataFilter.init方法传入
}
示例内容如下:
[
{
" channels
" :
" /** "
,
" filter
" : "
org.mortbay.cometd.filter.NoMarkupFilter
" ,
" init
" : {}
} ,
{
" channels
" : "
/chat/* "
,
" filter
" : "
org.mortbay.cometd.filter.RegexFilter
" ,
" init
" : [
[ "[fF
] .ck "
, "
dang " ],
[ "
teh " ,
" the
" ]
]
},
{
" channels
" :
" /chat/**
" ,
" filter
" :
" org.mortbay.cometd.filter.RegexFilter
" ,
" init
" : [
[ "
[ Mm
] icrosoft
" , "
Micro\\$oft "
],
[ "
.*tomcat.* " , null ]
]
}
]
这时,服务器端的配置就已经完成的,基本的cometd功能就可以使用了。
客户端通过dojox.cometd.init("http://127.0.0.2:8080/cometd");就可以进行连接。
代码开发:
接下来,我们要准备客户端(使用dojo来实现)
一共三个文件
index.html
chat.js
chat.css(不是必须)
下面来看一下这两个文件的内容(加入注释)
index.html
<
html >
<
head >
< title
> Cometd chat
</ title >
< script
type ="text/javascript"
src ="../dojo/dojo/dojo.js"
></ script
> <!-- dojo类库
-->
< script
type ="text/javascript"
src ="../dojo/dojox/cometd.js.uncompressed.js"
></ script
> <!-- dojo-cometd类库
-->
< script
type ="text/javascript"
src ="chat.js"
></ script
> <!-- chat js文件,控制cometd的连接,消息的发送与接收
-->
< link
rel ="stylesheet"
type ="text/css"
href ="chat.css"
>
</
head >
<
body >
<
h1 >
Cometd Chat </
h1 >
<
div id
="chatroom" >
< div
id ="chat"
></ div >
< div
id ="input"
>
< div
id ="join"
> <!--
未登录时,显示的登录名和登录按钮
-->
Username:
<
input id ="username"
type ="text"
/>
< input id
="joinB" class
="button" type
="submit" name
="join" value
="Join" />
</ div
>
< div
id ="joined"
class ="hidden"
> <!-- 登录后,显示的消息框和发送,退出按钮(默认为隐藏)
-->
Chat:
<
input id ="phrase"
type ="text"
></ input
>
< input
id ="sendB"
class ="button"
type ="submit"
name ="join"
value ="Send"
/>
< input
id ="leaveB"
class ="button"
type ="submit"
name ="join"
value ="Leave"
/>
</ div
>
</ div
>
</ div
>
</
body >
chat.js文件
1
// 引入所需要的类
2
dojo.require( "
dojox.cometd "
);
3 dojo.require(
" dojox.cometd.timestamp
" );
4
5
// 定义一个room类
6
var room
= {
7
// 定义属性
8
_last: ""
, //
最后发送消息的人员(如果不是本人,则显示为空)
9
_username: null
, //
当前的用户名
10
_connected: true
, //
当前的连接状态 true已经连接, false表示未连接
11
groupName: "
whimsical "
, //
组名(未知)
12
13
//
登录操作
14
join: function
(name){
15
16
if
(name ==
null
|| name.length
== 0
){
17 alert('Please enter a username
! ');
18 }
else {
19
20
dojox.cometd.init(
new String(document.location).replace(
/ http:\
/ \ /
[ ^
\ / ]
*/ ,'').replace(
/ \
/ examples\
/ . *
$ /
,'') + "
/cometd "
);
21
// dojox.cometd.init("http://127.0.0.2:8080/cometd");
22
this
._connected =
true
;
23
24
this
._username =
name;
25 dojo.byId('join').className
= 'hidden';
26 dojo.byId('joined').className
= '';
27 dojo.byId('phrase').focus();
28
29
//
subscribe and join
30
dojox.cometd.startBatch();
31 dojox.cometd.subscribe(
" /chat/demo
" , room,
" _chat "
, { groupName: this
.groupName});
32 dojox.cometd.publish(
" /chat/demo
" , {
33 user: room._username,
34 join:
true ,
35 chat : room._username
+ "
has joined "
36
}, { groupName:
this .groupName });
37 dojox.cometd.endBatch();
38
39
//
handle cometd failures while in the room
40
room._meta
= dojo.subscribe(
" /cometd/meta
" , this
, function
(event){
41 console.debug(event);
42
if (event.action
==
" handshake
" ){
43 room._chat({ data: {
44 join:
true ,
45 user:
" SERVER
" ,
46 chat:
" reinitialized
"
47
} });
48 dojox.cometd.subscribe(
" /chat/demo
" , room,
" _chat "
, { groupName: this
.groupName });
49 }
else
if (event.action
== "
connect "
){
50
if (event.successful
&&
! this ._connected){
51 room._chat({ data: {
52 leave:
true ,
53 user:
" SERVER
" ,
54 chat:
" reconnected!
"
55
} });
56 }
57
if (
! event.successful
&& this
._connected){
58 room._chat({ data: {
59 leave:
true ,
60 user:
" SERVER
" ,
61 chat:
" disconnected!
"
62
} });
63 }
64
this ._connected
= event.successful;
65 }
66 }, {groupName:
this .groupName });
67 }
68 },
69
70
//
离开操作
71
leave: function
(){
72
if (
! room._username){
73
return ;
74 }
75
76
if
(room._meta){
77 dojo.unsubscribe(room._meta,
null ,
null , { groupName:
this .groupName });
78 }
79 room._meta
= null
;
80
81
dojox.cometd.startBatch();
82 dojox.cometd.unsubscribe(
" /chat/demo
" , room,
" _chat "
, { groupName: this
.groupName });
83 dojox.cometd.publish(
" /chat/demo
" , {
84 user: room._username,
85 leave:
true ,
86 chat : room._username
+ "
has left "
87
}, { groupName:
this .groupName });
88 dojox.cometd.endBatch();
89
90
//
switch the input form
91
dojo.byId('join').className
= '';
92 dojo.byId('joined').className
= 'hidden';
93 dojo.byId('username').focus();
94 room._username
=
null ;
95 dojox.cometd.disconnect();
96 },
97
98
//
发送消息
99
chat: function
(text){
100
if (
! text ||
!
text.length){
101
return
false ;
102 }
103 dojox.cometd.publish(
" /chat/demo
" , { user: room._username, chat: text}, { groupName:
this .groupName });
104 },
105
106
//
从服务器收到消息后,回调的方法
107
_chat: function
(message){
108
var chat
= dojo.byId('chat');
109
if (
! message.data){
110 console.debug(
" bad message format
" +
message);
111
return ;
112 }
113
var from
= message.data.user;
114
var special
= message.data.join
|| message.data.leave;
115
var text
= message.data.chat;
116
if (
! text){ return
; }
117
118
if
( !
special &&
from == room._last ){
119 from
= "
![](http://www.blogjava.net/Images/dot.gif)
" ;
120 }
else {
121 room._last
= from;
122 from
+= "
: " ;
123 }
124
125
if
(special){
126 chat.innerHTML
+=
" <span class=\
" alert\ "
><span class=\ "
from\ "
> " +
from +
"
</span><span class=\ "
text\ " >
" +
text + "
</span></span><br/>
" ;
127 room._last
= ""
;
128 }
else {
129 chat.innerHTML
+=
" <span class=\
" from\ "
> "
+ from +
" </span><span class=\
" text\
" > "
+ text
+ " </span><br/>
" ;
130 }
131 chat.scrollTop
= chat.scrollHeight
- chat.clientHeight;
132 },
133
134
// 初始操作
135
_init: function
(){
136 dojo.byId('join').className
= '';
137 dojo.byId('joined').className
= 'hidden';
138 dojo.byId('username').focus();
139
140
var
element =
dojo.byId('username');
141 element.setAttribute(
" autocomplete
" ,
" OFF "
);
142 dojo.connect(element,
" onkeyup
" , function
(e){ //
支持回车,登录
143
if
(e.keyCode ==
dojo.keys.ENTER){
144 room.join(dojo.byId('username').value);
145
return
false ;
146 }
147
return
true ;
148 });
149
150
dojo.connect(dojo.byId('joinB'),
" onclick "
, function
(e){ //
绑定 room.join方法到 Join按扭
151
room.join(dojo.byId('username').value);
152 e.preventDefault();
153 });
154
155
element =
dojo.byId('phrase');
// 取得消息框
156
element.setAttribute(
" autocomplete
" , "
OFF "
);
157 dojo.connect(element,
" onkeyup
" , function
(e){ //
支持回车发送消息
158
if
(e.keyCode ==
dojo.keys.ENTER){
159 room.chat(dojo.byId('phrase').value);
160 dojo.byId('phrase').value
= '';
161 e.preventDefault();
162 }
163 });
164
165
dojo.connect(dojo.byId('sendB'),
" onclick "
, function
(e){ //
绑定 room.chat方法到 sendB按扭
166
room.chat(dojo.byId('phrase').value);
167 dojo.byId('phrase').value
= '';
168 });
169 dojo.connect(dojo.byId('leaveB'),
" onclick
" , room,
" leave "
); //
绑定 room.leave方法到 leaveB按扭
170
}
171 };
172
173
// 页面装载时,调用room._init方法
174
dojo.addOnLoad(room,
" _init "
);
175 //
页面关闭时,调用 room.leave方法
176
dojo.addOnUnload(room,
" leave "
);
177
178
// vim:ts=4:noet:
补充:服务器端如何监控消息队列,以及进行订阅,发送消息操作
要进行 监控消息队列,以及进行订阅,发送消息操作的关键就是取得 Bayeux接口实现类 的实例
可以通过 ServletContextAttributeListener 这个监听器接口,通过attributeAdded方式加入
实现方法如下:
1
public
class BayeuxStartupListener
implements ServletContextAttributeListener
2 {
3
public
void initialize(Bayeux bayeux)
4 {
5
synchronized (bayeux)
6 {
7
if (
! bayeux.hasChannel(
" /service/echo
" ))
8 {
9
// 取得 bayeux实例
10
}
11 }
12 }
13
14
public
void attributeAdded(ServletContextAttributeEvent scab)
15 {
16
if (scab.getName().equals(Bayeux.DOJOX_COMETD_BAYEUX))
17 {
18 Bayeux bayeux
= (Bayeux) scab.getValue();
19 initialize(bayeux);
20 }
21 }
22
23
public
void
attributeRemoved(ServletContextAttributeEvent scab)
24 {
25
26
}
27
28
public
void
attributeReplaced(ServletContextAttributeEvent scab)
29 {
30
31
}
32 }
取到 Bayeux实例后,就可以借助BayeuxService类帮我们实现消息队列的监听,订阅消息以及发送消息
1
public
void
initialize(Bayeux bayeux)
2 {
3
synchronized (bayeux)
4 {
5
if (
! bayeux.hasChannel(
" /service/echo
" ))
6 {
7
// 取得 bayeux实例
8
new ChatService(bayeux);
9 }
10 }
11 }
具体方法请看下面这段代码:
1
// 定义 ChatService类,继承 BayeuxService
2
public
static class
ChatService extends
BayeuxService {
3
4
ConcurrentMap
< String,Set
< String >>
_members =
new
ConcurrentHashMap <
String,Set <
String >> ();
5
6
public ChatService(Bayeux bayeux)
7 {
8
super (bayeux,
" chat
" ); //
必须,把 Bayeux传入到 BayeuxService对象中
9
subscribe(
" /chat/**
" , "
trackMembers "
); //
订阅队列,收到消息后,会回调trackMembers方法
10
/*
11
subscribe支持回调的方法如下:
12 # myMethod(Client fromClient, Object data)
13 # myMethod(Client fromClient, Object data, String id)
14 # myMethod(Client fromClient, String channel, Object data,String id)
15 # myMethod(Client fromClient, Message message)
16
17 参数:
18 Client fromClient 发送消息的客户端
19 Object data 消息内容
20 id The id of the message
21 channel 队列名称
22 Message message 消息对象。继承于Map
23
24
*/
25
}
26
27
// 发布消息到队列
28
public
void
sendMessage(String message) {
29 Map
< String,Object
> mydata
= new
HashMap <
String, Object > ();
30 mydata.put(
" chat
" , message);
31
32 Client sender
= getBayeux().newClient(
" server
" );
33
34 getBayeux().getChannel(
" /chat/demo
" ,
false ).publish(sender, mydata,
" 0 "
/* null
*/ );
35
36
}
37
38
// 发送消息给指定的client(非广播方式)
39
public
void
sendMessageToClient(Client joiner, String message) {
40 Map
< String,Object
> mydata
= new
HashMap <
String, Object > ();
41 mydata.put(
" chat
" , message);
42
43 send(joiner,
" /chat/demo
" , mydata,
" 0
" /* null
*/ );
44 }
45
46
// 订阅消息回调方法
47
public
void
trackMembers(Client joiner, String channel, Map
< String,Object
> data, String id)
48 {
49
// 解释消息内容,如果消息内容中 有 join这个字段且值为true
50
if
(Boolean.TRUE.equals(data.get(
" join "
)))
51 {
52
// 根据队列,取得当前登录的人员
53
Set <
String >
m = _members.get(channel);
54
if (m
== null )
55 {
56
// 如果为空,则创建一个新的Set实现
57
Set
< String >
new_list =
new CopyOnWriteArraySet
< String
> ();
58 m
= _members.putIfAbsent(channel,new_list);
59
if (m
== null )
60 m
= new_list;
61 }
62
63
final Set
< String >
members =
m;
64
final String username
= (String)data.get(
" user
" );
65
66 members.add(username);
67
// 为该client增加事件,Remove事件。当用户退出时,触发该方法。
68
joiner.addListener(
new RemoveListener(){
69
public
void removed(String clientId,
boolean timeout)
70 {
71 members.remove(username);
72 }
73 });
74
75
// 为该client增加事件,消息的发送和接收事件。当用户退出时,触发该方法。
76
joiner.addListener(
new MessageListener() {
77
public
void deliver(Client fromClient, Client toClient, Message message) {
78 System.out.println(
" message from
"
+ fromClient.getId()
+ "
to "
79
+ toClient.getId()
+ "
message is "
+
message.getData());
80 }
81 });
82
83
Map <
String,Object >
mydata =
new HashMap
< String, Object
> ();
84 mydata.put(
" chat
" , "
members= "
+ members);
85
// 把已经登录的人员信息列表,发送回给消息发送者
86
send(joiner,channel,mydata,id);
87
88 }
89 }
90 }
91
服务器端Push目前一般有两种方式,HTTP streaming和Long polling。详细的介绍可以看这里 http://en.wikipedia.org/wiki/Push_technology
有一个Comet的框架叫做Cometd,使用的方式为Long polling。它是使用了jetty continuations特性,jetty continuations使得异步的request成为可能,这里我们来讨论下为何需要jetty continuations呢?
比如我们的浏览器的一个请求发送到服务器端了,并进行长轮询,保持了连接不结束,直到一次长轮询timeout或者有事件发生,并接收到服务端推来的消息,所以在一次长轮询的过程中,大部分时间都是在等待,如果使用老式同步的方式进行编程的话,那么有多少个连接就需要多少个线程在那里,而大都数都是在等待,所以这无疑是系统资源的巨大浪费。
jetty continuations很好的解决了这一问题,当有请求过来之后,将连接的相关信息封装到一个continuation的对象中,通过调用continuation的suspend方法,然后返回,把当前线程交还到线程池,所以这个时候线程可以返回到线程池等待并处理其他新的请求。
当有事件要发给之前的某个请求的时候,再调用对应的continuation的resume方法,将原来的哪个请求重新发送到servelt进行处理,并将消息发送给客户端,然后客户端会重新进行一次长轮询。
Jetty是一个纯java实现的非常轻量级的web容器,高度组件化,可以很方便的将各种组件进行组装,而且可以非常容易的将jetty嵌入到自己的应用中。
jetty运行时的核心类是Server类,这个类的配置一般在jetty.xml中配置,然后jetty自带的一个简单的ioc容器将server加载初始化。
下图主要描述了Jetty在NIO的模式下工作的情形,这里只说到将任务分配到ThreadPool,后面的ThreadPool的处理没有说,大家可以去看下源码。
![](http://geeklu.com/wp-content/uploads/2010/07/jetty-server.jpg)
在jetty中,web容器启动是从Server开始的,一个Server可以对应多个Connector,从名字就可以知道,Connector是来处理外部连接的,Connector的实现有多种,即可以是非阻塞的(如SelectChannelConnector),也可以是阻塞的(如BlockingChannelConnector,当然jetty中这个阻塞的已经使用nio优化过,性能应该比使用java io实现的好),
我们不能直接说谁的性能好,谁的性能不好,关键还是看应用场景,因为NIO实现的非阻塞的话,doSelect的过程是阻塞的。所以当并发量小,且请求可以快速得到响应的话,用阻塞的就可以很好的满足了,但是当并发量很大,且后端资源紧张,请求需要等待很长一段时间的(比如长轮询),那么NIO的性能肯定必传统的高很多很多倍。
这里稍微讲一下NIO的概念把,在NIO的Scoket通讯模型中,一个socket连接对应一个SocketChannel,SocketChannel可以将某个事件注册到某一个Selector上,然后对Selector进行select操作,当有请求来的时候,并可以通过Selector的selectedKeys()获得所有收到事件的channel,然后便可以对channel进行操作了。这个其实和linux中的select函数类似,只不过这里是面向对象的,在linux中,我们将需要监听的sockt连接加入到一个文件描述符的集合中FD_SET中,然后select函数对这个集合进行检测,根据得到的结果来判断某个fd对应的标志位是否为1来判断是否有数据。这样也就是一个线程可以同事处理多个连接。
换话题了,我们都知道请求最终都是在Servlet中被处理的,而Servlet得到的是request,response,这些对象什么时候出来的呢?不急,上面不是说到一个EndPoint(实现了Runnable接口)EndPoint对象在被初始化的时候就对其_connection成员进行了初始化,生成一个HttpConnection对象,newConnection的方法其实在SelectChannelConnector中被覆盖了。然后这个EndPoint对象不是被分配到ThreadPool了么,ThreadPool将其加入到队列中,当有空闲线程的时候,就对这个endPoint对象进行处理了,运行EndPoint的run方法,然后会调用自己的connection对象的handle方法,最终将connection对象交给Server的handler进行处理。Server本身继承自HandlerWrapper,自己的_handler是一个HandlerCollection的实例,HandlerCollection实例的配置在jetty.xml中有配置,在处理httpconnection对象的时候所配置的handler会依次被执行。
DefaultHandler中就涉及到上下文处理,然后交给各个项目的servlet进行处理。
环境配置方法:
服务器端:
类库清单:WEB-INF/lib
jetty-6.1.9.jar
jetty-util-6.1.9.jar
servlet-api-2.5-6.1.9.jar
(以上Jetty服务器自带)
cometd-api-0.9.20080221.jar
cometd-bayeux-6.1.9.jar
web.xml配置:
<!-- 配置ContinuationCometdServlet, 这个是必须的。配置后就可以支持comted
-->
< servlet
>
< servlet-name
> cometd
</ servlet-name
>
< servlet-class
> org.mortbay.cometd.continuation.ContinuationCometdServlet
</ servlet-class
>
<!-- 对队列的内容进行过滤
-->
< init-param
>
< param-name
> filters
</ param-name
>
< param-value
> /WEB-INF/filters.json
</ param-value
>
</ init-param
>
<!-- 超时设置The server side poll timeout in milliseconds (default 250000). This is how long the server will
hold a reconnect request before responding. -->
< init-param
>
< param-name
> timeout
</ param-name
>
< param-value
> 120000
</ param-value
>
</ init-param
>
<!-- The client side poll timeout in milliseconds (default 0). How long a client will wait between
reconnects -->
< init-param
>
< param-name
> interval
</ param-name
>
< param-value
> 0
</ param-value
>
</ init-param
>
<!-- the client side poll timeout if multiple connections are detected from the same browser
(default 1500). -->
< init-param
>
< param-name
> multiFrameInterval
</ param-name
>
< param-value
> 1500
</ param-value
>
</ init-param
>
<!-- 0=none, 1=info, 2=debug
-->
< init-param
>
< param-name
> logLevel
</ param-name
>
< param-value
> 0
</ param-value
>
</ init-param
>
<!-- If "true" then the server will accept JSON wrapped in a comment and will generate JSON wrapped
in a comment. This is a defence against Ajax Hijacking.
-->
< init-param
>
< param-name
> JSONCommented
</ param-name
>
< param-value
> true
</ param-value
>
</ init-param
>
< init-param
>
< param-name
> alwaysResumePoll
</ param-name
>
< param-value
> false
</ param-value
> <!--
use true for x-site cometd
-->
</ init-param
>
< load-on-startup
> 1
</ load-on-startup
>
</ servlet
>
< servlet-mapping
>
< servlet-name
> cometd
</ servlet-name
>
< url-pattern
> /cometd/*
</ url-pattern
>
</ servlet-mapping
>
filters.json内容如下:
格式如下:
{
"channels": "/**", --要过滤的队列(支持通配符)
"filter":"org.mortbay.cometd.filter.NoMarkupFilter", --使用的过滤器,实现接口dojox.cometd.DataFilter
"init" : {} --初始化的值,调用 DataFilter.init方法传入
}
示例内容如下:
[
{
" channels
" :
" /** "
,
" filter
" : "
org.mortbay.cometd.filter.NoMarkupFilter
" ,
" init
" : {}
} ,
{
" channels
" : "
/chat/* "
,
" filter
" : "
org.mortbay.cometd.filter.RegexFilter
" ,
" init
" : [
[ "[fF
] .ck "
, "
dang " ],
[ "
teh " ,
" the
" ]
]
},
{
" channels
" :
" /chat/**
" ,
" filter
" :
" org.mortbay.cometd.filter.RegexFilter
" ,
" init
" : [
[ "
[ Mm
] icrosoft
" , "
Micro\\$oft "
],
[ "
.*tomcat.* " , null ]
]
}
]
这时,服务器端的配置就已经完成的,基本的cometd功能就可以使用了。
客户端通过dojox.cometd.init("http://127.0.0.2:8080/cometd");就可以进行连接。
代码开发:
接下来,我们要准备客户端(使用dojo来实现)
一共三个文件
index.html
chat.js
chat.css(不是必须)
下面来看一下这两个文件的内容(加入注释)
index.html
<
html >
<
head >
< title
> Cometd chat
</ title >
< script
type ="text/javascript"
src ="../dojo/dojo/dojo.js"
></ script
> <!-- dojo类库
-->
< script
type ="text/javascript"
src ="../dojo/dojox/cometd.js.uncompressed.js"
></ script
> <!-- dojo-cometd类库
-->
< script
type ="text/javascript"
src ="chat.js"
></ script
> <!-- chat js文件,控制cometd的连接,消息的发送与接收
-->
< link
rel ="stylesheet"
type ="text/css"
href ="chat.css"
>
</
head >
<
body >
<
h1 >
Cometd Chat </
h1 >
<
div id
="chatroom" >
< div
id ="chat"
></ div >
< div
id ="input"
>
< div
id ="join"
> <!--
未登录时,显示的登录名和登录按钮
-->
Username:
<
input id ="username"
type ="text"
/>
< input id
="joinB" class
="button" type
="submit" name
="join" value
="Join" />
</ div
>
< div
id ="joined"
class ="hidden"
> <!-- 登录后,显示的消息框和发送,退出按钮(默认为隐藏)
-->
Chat:
<
input id ="phrase"
type ="text"
></ input
>
< input
id ="sendB"
class ="button"
type ="submit"
name ="join"
value ="Send"
/>
< input
id ="leaveB"
class ="button"
type ="submit"
name ="join"
value ="Leave"
/>
</ div
>
</ div
>
</ div
>
</
body >
chat.js文件
1
// 引入所需要的类
2
dojo.require( "
dojox.cometd "
);
3 dojo.require(
" dojox.cometd.timestamp
" );
4
5
// 定义一个room类
6
var room
= {
7
// 定义属性
8
_last: ""
, //
最后发送消息的人员(如果不是本人,则显示为空)
9
_username: null
, //
当前的用户名
10
_connected: true
, //
当前的连接状态 true已经连接, false表示未连接
11
groupName: "
whimsical "
, //
组名(未知)
12
13
//
登录操作
14
join: function
(name){
15
16
if
(name ==
null
|| name.length
== 0
){
17 alert('Please enter a username
! ');
18 }
else {
19
20
dojox.cometd.init(
new String(document.location).replace(
/ http:\
/ \ /
[ ^
\ / ]
*/ ,'').replace(
/ \
/ examples\
/ . *
$ /
,'') + "
/cometd "
);
21
// dojox.cometd.init("http://127.0.0.2:8080/cometd");
22
this
._connected =
true
;
23
24
this
._username =
name;
25 dojo.byId('join').className
= 'hidden';
26 dojo.byId('joined').className
= '';
27 dojo.byId('phrase').focus();
28
29
//
subscribe and join
30
dojox.cometd.startBatch();
31 dojox.cometd.subscribe(
" /chat/demo
" , room,
" _chat "
, { groupName: this
.groupName});
32 dojox.cometd.publish(
" /chat/demo
" , {
33 user: room._username,
34 join:
true ,
35 chat : room._username
+ "
has joined "
36
}, { groupName:
this .groupName });
37 dojox.cometd.endBatch();
38
39
//
handle cometd failures while in the room
40
room._meta
= dojo.subscribe(
" /cometd/meta
" , this
, function
(event){
41 console.debug(event);
42
if (event.action
==
" handshake
" ){
43 room._chat({ data: {
44 join:
true ,
45 user:
" SERVER
" ,
46 chat:
" reinitialized
"
47
} });
48 dojox.cometd.subscribe(
" /chat/demo
" , room,
" _chat "
, { groupName: this
.groupName });
49 }
else
if (event.action
== "
connect "
){
50
if (event.successful
&&
! this ._connected){
51 room._chat({ data: {
52 leave:
true ,
53 user:
" SERVER
" ,
54 chat:
" reconnected!
"
55
} });
56 }
57
if (
! event.successful
&& this
._connected){
58 room._chat({ data: {
59 leave:
true ,
60 user:
" SERVER
" ,
61 chat:
" disconnected!
"
62
} });
63 }
64
this ._connected
= event.successful;
65 }
66 }, {groupName:
this .groupName });
67 }
68 },
69
70
//
离开操作
71
leave: function
(){
72
if (
! room._username){
73
return ;
74 }
75
76
if
(room._meta){
77 dojo.unsubscribe(room._meta,
null ,
null , { groupName:
this .groupName });
78 }
79 room._meta
= null
;
80
81
dojox.cometd.startBatch();
82 dojox.cometd.unsubscribe(
" /chat/demo
" , room,
" _chat "
, { groupName: this
.groupName });
83 dojox.cometd.publish(
" /chat/demo
" , {
84 user: room._username,
85 leave:
true ,
86 chat : room._username
+ "
has left "
87
}, { groupName:
this .groupName });
88 dojox.cometd.endBatch();
89
90
//
switch the input form
91
dojo.byId('join').className
= '';
92 dojo.byId('joined').className
= 'hidden';
93 dojo.byId('username').focus();
94 room._username
=
null ;
95 dojox.cometd.disconnect();
96 },
97
98
//
发送消息
99
chat: function
(text){
100
if (
! text ||
!
text.length){
101
return
false ;
102 }
103 dojox.cometd.publish(
" /chat/demo
" , { user: room._username, chat: text}, { groupName:
this .groupName });
104 },
105
106
//
从服务器收到消息后,回调的方法
107
_chat: function
(message){
108
var chat
= dojo.byId('chat');
109
if (
! message.data){
110 console.debug(
" bad message format
" +
message);
111
return ;
112 }
113
var from
= message.data.user;
114
var special
= message.data.join
|| message.data.leave;
115
var text
= message.data.chat;
116
if (
! text){ return
; }
117
118
if
( !
special &&
from == room._last ){
119 from
= "
![](http://www.blogjava.net/Images/dot.gif)
" ;
120 }
else {
121 room._last
= from;
122 from
+= "
: " ;
123 }
124
125
if
(special){
126 chat.innerHTML
+=
" <span class=\
" alert\ "
><span class=\ "
from\ "
> " +
from +
"
</span><span class=\ "
text\ " >
" +
text + "
</span></span><br/>
" ;
127 room._last
= ""
;
128 }
else {
129 chat.innerHTML
+=
" <span class=\
" from\ "
> "
+ from +
" </span><span class=\
" text\
" > "
+ text
+ " </span><br/>
" ;
130 }
131 chat.scrollTop
= chat.scrollHeight
- chat.clientHeight;
132 },
133
134
// 初始操作
135
_init: function
(){
136 dojo.byId('join').className
= '';
137 dojo.byId('joined').className
= 'hidden';
138 dojo.byId('username').focus();
139
140
var
element =
dojo.byId('username');
141 element.setAttribute(
" autocomplete
" ,
" OFF "
);
142 dojo.connect(element,
" onkeyup
" , function
(e){ //
支持回车,登录
143
if
(e.keyCode ==
dojo.keys.ENTER){
144 room.join(dojo.byId('username').value);
145
return
false ;
146 }
147
return
true ;
148 });
149
150
dojo.connect(dojo.byId('joinB'),
" onclick "
, function
(e){ //
绑定 room.join方法到 Join按扭
151
room.join(dojo.byId('username').value);
152 e.preventDefault();
153 });
154
155
element =
dojo.byId('phrase');
// 取得消息框
156
element.setAttribute(
" autocomplete
" , "
OFF "
);
157 dojo.connect(element,
" onkeyup
" , function
(e){ //
支持回车发送消息
158
if
(e.keyCode ==
dojo.keys.ENTER){
159 room.chat(dojo.byId('phrase').value);
160 dojo.byId('phrase').value
= '';
161 e.preventDefault();
162 }
163 });
164
165
dojo.connect(dojo.byId('sendB'),
" onclick "
, function
(e){ //
绑定 room.chat方法到 sendB按扭
166
room.chat(dojo.byId('phrase').value);
167 dojo.byId('phrase').value
= '';
168 });
169 dojo.connect(dojo.byId('leaveB'),
" onclick
" , room,
" leave "
); //
绑定 room.leave方法到 leaveB按扭
170
}
171 };
172
173
// 页面装载时,调用room._init方法
174
dojo.addOnLoad(room,
" _init "
);
175 //
页面关闭时,调用 room.leave方法
176
dojo.addOnUnload(room,
" leave "
);
177
178
// vim:ts=4:noet:
补充:服务器端如何监控消息队列,以及进行订阅,发送消息操作
要进行 监控消息队列,以及进行订阅,发送消息操作的关键就是取得 Bayeux接口实现类 的实例
可以通过 ServletContextAttributeListener 这个监听器接口,通过attributeAdded方式加入
实现方法如下:
1
public
class BayeuxStartupListener
implements ServletContextAttributeListener
2 {
3
public
void initialize(Bayeux bayeux)
4 {
5
synchronized (bayeux)
6 {
7
if (
! bayeux.hasChannel(
" /service/echo
" ))
8 {
9
// 取得 bayeux实例
10
}
11 }
12 }
13
14
public
void attributeAdded(ServletContextAttributeEvent scab)
15 {
16
if (scab.getName().equals(Bayeux.DOJOX_COMETD_BAYEUX))
17 {
18 Bayeux bayeux
= (Bayeux) scab.getValue();
19 initialize(bayeux);
20 }
21 }
22
23
public
void
attributeRemoved(ServletContextAttributeEvent scab)
24 {
25
26
}
27
28
public
void
attributeReplaced(ServletContextAttributeEvent scab)
29 {
30
31
}
32 }
取到 Bayeux实例后,就可以借助BayeuxService类帮我们实现消息队列的监听,订阅消息以及发送消息
1
public
void
initialize(Bayeux bayeux)
2 {
3
synchronized (bayeux)
4 {
5
if (
! bayeux.hasChannel(
" /service/echo
" ))
6 {
7
// 取得 bayeux实例
8
new ChatService(bayeux);
9 }
10 }
11 }
具体方法请看下面这段代码:
1
// 定义 ChatService类,继承 BayeuxService
2
public
static class
ChatService extends
BayeuxService {
3
4
ConcurrentMap
< String,Set
< String >>
_members =
new
ConcurrentHashMap <
String,Set <
String >> ();
5
6
public ChatService(Bayeux bayeux)
7 {
8
super (bayeux,
" chat
" ); //
必须,把 Bayeux传入到 BayeuxService对象中
9
subscribe(
" /chat/**
" , "
trackMembers "
); //
订阅队列,收到消息后,会回调trackMembers方法
10
/*
11
subscribe支持回调的方法如下:
12 # myMethod(Client fromClient, Object data)
13 # myMethod(Client fromClient, Object data, String id)
14 # myMethod(Client fromClient, String channel, Object data,String id)
15 # myMethod(Client fromClient, Message message)
16
17 参数:
18 Client fromClient 发送消息的客户端
19 Object data 消息内容
20 id The id of the message
21 channel 队列名称
22 Message message 消息对象。继承于Map
23
24
*/
25
}
26
27
// 发布消息到队列
28
public
void
sendMessage(String message) {
29 Map
< String,Object
> mydata
= new
HashMap <
String, Object > ();
30 mydata.put(
" chat
" , message);
31
32 Client sender
= getBayeux().newClient(
" server
" );
33
34 getBayeux().getChannel(
" /chat/demo
" ,
false ).publish(sender, mydata,
" 0 "
/* null
*/ );
35
36
}
37
38
// 发送消息给指定的client(非广播方式)
39
public
void
sendMessageToClient(Client joiner, String message) {
40 Map
< String,Object
> mydata
= new
HashMap <
String, Object > ();
41 mydata.put(
" chat
" , message);
42
43 send(joiner,
" /chat/demo
" , mydata,
" 0
" /* null
*/ );
44 }
45
46
// 订阅消息回调方法
47
public
void
trackMembers(Client joiner, String channel, Map
< String,Object
> data, String id)
48 {
49
// 解释消息内容,如果消息内容中 有 join这个字段且值为true
50
if
(Boolean.TRUE.equals(data.get(
" join "
)))
51 {
52
// 根据队列,取得当前登录的人员
53
Set <
String >
m = _members.get(channel);
54
if (m
== null )
55 {
56
// 如果为空,则创建一个新的Set实现
57
Set
< String >
new_list =
new CopyOnWriteArraySet
< String
> ();
58 m
= _members.putIfAbsent(channel,new_list);
59
if (m
== null )
60 m
= new_list;
61 }
62
63
final Set
< String >
members =
m;
64
final String username
= (String)data.get(
" user
" );
65
66 members.add(username);
67
// 为该client增加事件,Remove事件。当用户退出时,触发该方法。
68
joiner.addListener(
new RemoveListener(){
69
public
void removed(String clientId,
boolean timeout)
70 {
71 members.remove(username);
72 }
73 });
74
75
// 为该client增加事件,消息的发送和接收事件。当用户退出时,触发该方法。
76
joiner.addListener(
new MessageListener() {
77
public
void deliver(Client fromClient, Client toClient, Message message) {
78 System.out.println(
" message from
"
+ fromClient.getId()
+ "
to "
79
+ toClient.getId()
+ "
message is "
+
message.getData());
80 }
81 });
82
83
Map <
String,Object >
mydata =
new HashMap
< String, Object
> ();
84 mydata.put(
" chat
" , "
members= "
+ members);
85
// 把已经登录的人员信息列表,发送回给消息发送者
86
send(joiner,channel,mydata,id);
87
88 }
89 }
90 }
91
相关文章推荐
- Comet技术在项目中的使用
- Comet技术在项目中的使用
- Comet技术在项目中的使用
- Comet主动推送技术在项目中的使用
- WebService从零到项目开发使用1—技术研究之基础篇 WSDL
- WebService从零到项目开发使用1—技术研究之基础篇 XML
- FlashCom学习例子: 视频录制(项目使用Flash8+Flash Media Server技术)
- WebService从零到项目开发使用5—技术研究之JAX-WS快速入门
- WebService从零到项目开发使用1—技术研究之基础篇 WSDL
- WebService从零到项目开发使用1—技术研究之基础篇 XML
- WebService从零到项目开发使用3—技术研究之Axis2 POJO开发Web服务
- 一起谈.NET技术,使用VS2010的Database项目模板统一管理数据库对象
- 【应用赏析】使用移动GIS技术改进你的企业和项目
- 【技术直通车】使用Map Project Center 配置完全离线的移动项目
- WebService从零到项目开发使用1—技术研究之基础篇 SOAP
- 黑马程序员_加速Java应用开发速度4:使用模板技术加速项目开发速度
- 读webwork时相关联到AdminApp这个使用了Hibernate等技术的项目
- 我为什么反对在实际项目中使用类似ext的js技术框架
- 使用Urlrewrite技术实现Struts2+Hibernate3+Spring的项目的伪静态
- CMS视频学习笔记1:cms项目和使用技术简介