netty 点对点聊天程序
2017-08-17 18:17
295 查看
使用最新Netty实现一个简单的聊天程序package com.anxpp.im.server;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;import java.io.IOException;import java.util.Scanner;import com.anxpp.im.common.IMConfig;import com.anxpp.im.common.IMMessage;import com.anxpp.im.common.MsgPackDecode;import com.anxpp.im.common.MsgPackEncode;import com.anxpp.im.server.handler.ServerHandler;public class Server implements Runnable,IMConfig{ServerHandler serverHandler = new ServerHandler();public static void main(String[] args) throws IOException{new Server().start();}public void start() throws IOException{new Thread(this).start();runServerCMD();}/**启动服务端控制台* @throws IOException */private void runServerCMD() throws IOException {//服务端主动推送消息int toID = 1;IMMessage message = new IMMessage(APP_IM,CLIENT_VERSION,SERVER_ID,TYPE_MSG_TEXT,toID,MSG_EMPTY);@SuppressWarnings("resource")Scanner scanner = new Scanner(System.in);do{message.setMsg(scanner.nextLine());}while (serverHandler.sendMsg(message));}public void run() {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)// .childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65536, 0, 2, 0, 2));ch.pipeline().addLast("msgpack decoder",new MsgPackDecode());ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));ch.pipeline().addLast("msgpack encoder",new MsgPackEncode());ch.pipeline().addLast(serverHandler);}});ChannelFuture f = b.bind(SERVER_PORT).sync();f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}}2.
package com.anxpp.im.server.handler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.io.IOException; import com.anxpp.im.common.IMMessage; import com.anxpp.im.common.OnlineUser; import com.anxpp.im.server.config.BaseConfig; @ChannelHandler.Sharable public class ServerHandler extends ChannelInboundHandlerAdapter implements BaseConfig{ private ChannelHandlerContext ctx; @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.err.println("服务端Handler创建..."); super.handlerAdded(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.err.println("channelInactive"); super.channelInactive(ctx); } /** * tcp链路建立成功后调用 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; System.err.println("有客户端连接:"+ctx.channel().remoteAddress().toString()); } /** * 发送消息 */ public boolean sendMsg(IMMessage msg) throws IOException { System.err.println("服务器推送消息:"+msg); ctx.writeAndFlush(msg); return msg.getMsg().equals("q") ? false : true; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException { System.err.println("服务器接收到消息:"+msg); IMMessage message = (IMMessage)msg; if(OnlineUser.get(message.getReceiveId())==null){ OnlineUser.put(message.getUid(), ctx); } ChannelHandlerContext c = OnlineUser.get(message.getReceiveId()); if(c==null){ message.setMsg("对方不在线!"); OnlineUser.get(message.getUid()).writeAndFlush(message); } else c.writeAndFlush(message); } /** * 异常处理 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { System.err.println("与客户端断开连接:"+cause.getMessage()); cause.printStackTrace(); ctx.close(); } }3package com.anxpp.im.common;public interface IMConfig {/**客户端配置*/int CLIENT_VERSION = 1; //版本号/**服务端配置*/String SERVER_HOST = "127.0.0.1"; //服务器IPint SERVER_PORT = 9090; //服务器端口/**消息相关*/int SERVER_ID = 0; //表示服务器消息byte APP_IM = 1; //即时通信应用ID为1byte TYPE_CONNECT = 0; //连接后第一次消息确认建立连接和发送认证信息byte TYPE_MSG_TEXT = 1; //文本消息String MSG_EMPTY = ""; //空消息}4.
package com.anxpp.im.common; import org.msgpack.annotation.Message; @Message public class IMMessage { //应用ID private byte appId; //版本号 private int version; //用户ID private int uid; //消息类型 0:登陆 1:文字消息 private byte msgType; //接收方 private int receiveId; //消息内容 private String msg; public IMMessage() {} /** * 构造方法 * @param appId 应用通道 * @param version 应用版本 * @param uid 用户ID * @param msgType 消息类型 * @param receiveId 消息接收者 * @param msg 消息内容 */ public IMMessage(byte appId, int version, int uid, byte msgType, int receiveId, String msg) { this.appId = appId; this.version = version; this.uid = uid; this.msgType = msgType; this.receiveId = receiveId; this.msg = msg; } public byte getAppId() { return appId; } public void setAppId(byte appId) { this.appId = appId; } public int getVersion() { return version; } public void setVersion(int version) { this.version = version; } public int getUid() { return uid; } public void setUid(int uid) { this.uid = uid; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public byte getMsgType() { return msgType; } public void setMsgType(byte msgType) { this.msgType = msgType; } public int getReceiveId() { return receiveId; } public void setReceiveId(int receiveId) { this.receiveId = receiveId; } @Override public String toString() { return "appId:"+this.appId+",version:"+this.version+",uid:"+this.uid+",msgType:"+this.msgType+",receiveId:"+this.receiveId+",msg:"+this.msg; } }5.package com.anxpp.im.common;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageDecoder;import java.util.List;import org.msgpack.MessagePack;/*** 解码工具*/public class MsgPackDecode extends MessageToMessageDecoder<ByteBuf>{@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {final int length = msg.readableBytes();final byte[] array = new byte[length];msg.getBytes(msg.readerIndex(), array, 0, length);4000out.add(new MessagePack().read(array,IMMessage.class));}}6.
package com.anxpp.im.common; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.io.IOException; import org.msgpack.MessagePack; /** * 编码工具 */ public class MsgPackEncode extends MessageToByteEncoder<IMMessage>{ @Override protected void encode(ChannelHandlerContext ctx, IMMessage msg, ByteBuf out) throws IOException { out.writeBytes(new MessagePack().write(msg)); } }7.package com.anxpp.im.common;import io.netty.channel.ChannelHandlerContext;import java.util.HashMap;/*** 在线用户表* @author Administrator**/public class OnlineUser {//用户表private static HashMap<Integer, ChannelHandlerContext> onlineUser = new HashMap<Integer, ChannelHandlerContext>();public static void put(Integer uid, ChannelHandlerContext uchc){onlineUser.put(uid, uchc);}public static void remove(Integer uid){onlineUser.remove(uid);}public static ChannelHandlerContext get(Integer uid){return onlineUser.get(uid);}}8,。package com.anxpp.im.client;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;import java.io.IOException;import java.util.Scanner;import com.anxpp.im.client.handler.ClientHandler;import com.anxpp.im.common.IMConfig;import com.anxpp.im.common.IMMessage;import com.anxpp.im.common.MsgPackDecode;import com.anxpp.im.common.MsgPackEncode;public class Client implements Runnable,IMConfig {public static int UID = 8889;public static int toID = 8888;private ClientHandler clientHandler = new ClientHandler();public static void main(String[] args) throws IOException{new Client().start();}public void start() throws IOException{new Thread(this).start();runServerCMD();}public void sendMsg(IMMessage msg) throws IOException {clientHandler.sendMsg(msg);}/**启动客户端控制台*/private void runServerCMD() throws IOException {IMMessage message = new IMMessage(APP_IM,CLIENT_VERSION,UID,TYPE_MSG_TEXT,toID,MSG_EMPTY);@SuppressWarnings("resource")Scanner scanner = new Scanner(System.in);do{message.setMsg(scanner.nextLine());}while (clientHandler.sendMsg(message));}@Overridepublic void run() {EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.SO_KEEPALIVE, true);b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65536, 0, 2, 0, 2));ch.pipeline().addLast("msgpack decoder",new MsgPackDecode());ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));ch.pipeline().addLast("msgpack encoder",new MsgPackEncode());ch.pipeline().addLast(clientHandler);}});ChannelFuture f = b.connect(SERVER_HOST, SERVER_PORT).sync();f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();}}}9.package com.anxpp.im.client.handler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.io.IOException;import com.anxpp.im.client.Client;import com.anxpp.im.common.IMConfig;import com.anxpp.im.common.IMMessage;public class ClientHandler extends ChannelInboundHandlerAdapter implements IMConfig{private ChannelHandlerContext ctx;/*** tcp链路简历成功后调用*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("成功连接服务器");this.ctx = ctx;IMMessage message = new IMMessage(APP_IM,CLIENT_VERSION,Client.UID,TYPE_CONNECT,SERVER_ID,MSG_EMPTY);sendMsg(message);}/*** 发送消息* @param msg* @return* @throws IOException*/public boolean sendMsg(IMMessage msg) throws IOException {System.out.println("client:" + msg);ctx.channel().writeAndFlush(msg);return msg.getMsg().equals("q") ? false : true;}/*** 收到消息后调用* @throws IOException*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {IMMessage m = (IMMessage)msg;System.out.println(m.getUid() + ":" + m.getMsg());}/*** 发生异常时调用*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.err.println("与服务器断开连接:"+cause.getMessage());ctx.close();}}10 代码结构