您的位置:首页 > 其它

netty websocket 后台消息推送

2015-04-23 10:48 405 查看



websocket server端的代码:

[java]
view plaincopy





public class WebSocketServer {

private final int port;
public static ChannelHandlerContext ctx = null;

public WebSocketServer(int port) {
this.port = port;
}

public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));

// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new WebSocketServerPipelineFactory());

// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));

System.out.println("Web socket server started at port " + port + '.');
System.out.println("Open your browser and navigate to http://localhost:" + port + '/');
}

public static void main(String[] args) {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new WebSocketServer(port).run();

try {
Thread.sleep(30000);

while(true) {

if (ctx != null && ctx.getChannel().isWritable()) {
ctx.getChannel().write(new TextWebSocketFrame("1"));
}

Thread.sleep(3000);
if (ctx != null && ctx.getChannel().isWritable()) {
ctx.getChannel().write(new TextWebSocketFrame("2"));
}

Thread.sleep(3000);
if(ctx != null && ctx.getChannel().isWritable()) {
ctx.getChannel().write(new TextWebSocketFrame("3"));
Thread.sleep(3000);
}
}
} catch(Exception e) {
e.printStackTrace();
}
}
}

package com.test.websocket;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;

import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*;

/**
* Handles handshakes and messages
*/
public class WebSocketServerHandler extends SimpleChannelUpstreamHandler {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocketServerHandler.class);

private static final String WEBSOCKET_PATH = "/websocket";

private WebSocketServerHandshaker handshaker;

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object msg = e.getMessage();
if (msg instanceof HttpRequest) {
handleHttpRequest(ctx, (HttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}

private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) throws Exception {

// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(req), null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
wsFactory.sendUnsupportedWebSocketVersionResponse(ctx.getChannel());
} else {
handshaker.handshake(ctx.getChannel(), req).addListener(WebSocketServerHandshaker.HANDSHAKE_LISTENER);
}

WebSocketServer.ctx = ctx;
}

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.getChannel(), (CloseWebSocketFrame) frame);

WebSocketServer.ctx = null;
return;
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
e.getChannel().close();

WebSocketServer.ctx = null;
}

private static String getWebSocketLocation(HttpRequest req) {
return "ws://" + req.getHeader(HOST) + WEBSOCKET_PATH;
}
}

package com.test.websocket;

import static org.jboss.netty.channel.Channels.*;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;

/**
*/
public class WebSocketServerPipelineFactory implements ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("handler", new WebSocketServerHandler());
return pipeline;
}
}

web Js代码

[javascript]
view plaincopy





function connect() {
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}

if (window.WebSocket) {
socket = new WebSocket("ws://192.168.2.72:8080/websocket");
socket.onmessage = function(event) {
var ta = document.getElementById('responseText');
ta.innerHTML = event.data;
};

socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.innerHTML = "Web Socket opened!";
};

socket.onclose = function(event) {
var ta = document.getElementById('responseText');
ta.innerHTML = "Web Socket closed";
};
} else {
alert("Your browser does not support Web Socket.");
}
}

$(document).ready(function(){
connect();
});
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: