您的位置:首页 > 其它

netty 点对点聊天程序

2017-08-17 18:17 337 查看
使用最新Netty实现一个简单的聊天程序package com.anxpp.im.server;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;import java.io.IOException;import java.util.Scanner;import com.anxpp.im.common.IMConfig;import com.anxpp.im.common.IMMessage;import com.anxpp.im.common.MsgPackDecode;import com.anxpp.im.common.MsgPackEncode;import com.anxpp.im.server.handler.ServerHandler;public class Server implements Runnable,IMConfig{ServerHandler serverHandler = new ServerHandler();public static void main(String[] args) throws IOException{new Server().start();}public void start() throws IOException{new Thread(this).start();runServerCMD();}/**启动服务端控制台* @throws IOException */private void runServerCMD() throws IOException {//服务端主动推送消息int toID = 1;IMMessage message = new IMMessage(APP_IM,CLIENT_VERSION,SERVER_ID,TYPE_MSG_TEXT,toID,MSG_EMPTY);@SuppressWarnings("resource")Scanner scanner = new Scanner(System.in);do{message.setMsg(scanner.nextLine());}while (serverHandler.sendMsg(message));}public void run() {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)// .childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65536, 0, 2, 0, 2));ch.pipeline().addLast("msgpack decoder",new MsgPackDecode());ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));ch.pipeline().addLast("msgpack encoder",new MsgPackEncode());ch.pipeline().addLast(serverHandler);}});ChannelFuture f = b.bind(SERVER_PORT).sync();f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}}2.
package com.anxpp.im.server.handler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException;
import com.anxpp.im.common.IMMessage;
import com.anxpp.im.common.OnlineUser;
import com.anxpp.im.server.config.BaseConfig;
@ChannelHandler.Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter implements BaseConfig{
private ChannelHandlerContext ctx;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.err.println("服务端Handler创建...");
super.handlerAdded(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println("channelInactive");
super.channelInactive(ctx);
}
/**
* tcp链路建立成功后调用
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
System.err.println("有客户端连接:"+ctx.channel().remoteAddress().toString());
}
/**
* 发送消息
*/
public boolean sendMsg(IMMessage msg) throws IOException {
System.err.println("服务器推送消息:"+msg);
ctx.writeAndFlush(msg);
return msg.getMsg().equals("q") ? false : true;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
System.err.println("服务器接收到消息:"+msg);
IMMessage message = (IMMessage)msg;
if(OnlineUser.get(message.getReceiveId())==null){
OnlineUser.put(message.getUid(), ctx);
}
ChannelHandlerContext c = OnlineUser.get(message.getReceiveId());
if(c==null){
message.setMsg("对方不在线!");
OnlineUser.get(message.getUid()).writeAndFlush(message);
}
else
c.writeAndFlush(message);
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.err.println("与客户端断开连接:"+cause.getMessage());
cause.printStackTrace();
ctx.close();
}
}
3package com.anxpp.im.common;public interface IMConfig {/**客户端配置*/int CLIENT_VERSION = 1; //版本号/**服务端配置*/String SERVER_HOST = "127.0.0.1"; //服务器IPint SERVER_PORT = 9090; //服务器端口/**消息相关*/int SERVER_ID = 0; //表示服务器消息byte APP_IM = 1; //即时通信应用ID为1byte TYPE_CONNECT = 0; //连接后第一次消息确认建立连接和发送认证信息byte TYPE_MSG_TEXT = 1; //文本消息String MSG_EMPTY = ""; //空消息}4.
package com.anxpp.im.common;

import org.msgpack.annotation.Message;

@Message
public class IMMessage {
//应用ID
private byte appId;
//版本号
private int version;
//用户ID
private int uid;
//消息类型    0:登陆    1:文字消息
private byte msgType;
//接收方
private int receiveId;
//消息内容
private String msg;
public IMMessage() {}
/**
* 构造方法
* @param appId     应用通道
* @param version   应用版本
* @param uid       用户ID
* @param msgType   消息类型
* @param receiveId 消息接收者
* @param msg       消息内容
*/
public IMMessage(byte appId, int version, int uid, byte msgType, int receiveId, String msg) {
this.appId = appId;
this.version = version;
this.uid = uid;
this.msgType = msgType;
this.receiveId = receiveId;
this.msg = msg;
}
public byte getAppId() {
return appId;
}
public void setAppId(byte appId) {
this.appId = appId;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public int getUid() {
return uid;
}
public void setUid(int uid) {
this.uid = uid;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public byte getMsgType() {
return msgType;
}
public void setMsgType(byte msgType) {
this.msgType = msgType;
}
public int getReceiveId() {
return receiveId;
}
public void setReceiveId(int receiveId) {
this.receiveId = receiveId;
}
@Override
public String toString() {
return "appId:"+this.appId+",version:"+this.version+",uid:"+this.uid+",msgType:"+this.msgType+",receiveId:"+this.receiveId+",msg:"+this.msg;
}
}
5.package com.anxpp.im.common;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageDecoder;import java.util.List;import org.msgpack.MessagePack;/*** 解码工具*/public class MsgPackDecode extends MessageToMessageDecoder<ByteBuf>{@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {final int length = msg.readableBytes();final byte[] array = new byte[length];msg.getBytes(msg.readerIndex(), array, 0, length);4000out.add(new MessagePack().read(array,IMMessage.class));}}6.
package com.anxpp.im.common;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.IOException;
import org.msgpack.MessagePack;
/**
* 编码工具
*/
public class MsgPackEncode extends MessageToByteEncoder<IMMessage>{
@Override
protected void encode(ChannelHandlerContext ctx, IMMessage msg, ByteBuf out) throws IOException {
out.writeBytes(new MessagePack().write(msg));
}
}
7.package com.anxpp.im.common;import io.netty.channel.ChannelHandlerContext;import java.util.HashMap;/*** 在线用户表* @author Administrator**/public class OnlineUser {//用户表private static HashMap<Integer, ChannelHandlerContext> onlineUser = new HashMap<Integer, ChannelHandlerContext>();public static void put(Integer uid, ChannelHandlerContext uchc){onlineUser.put(uid, uchc);}public static void remove(Integer uid){onlineUser.remove(uid);}public static ChannelHandlerContext get(Integer uid){return onlineUser.get(uid);}}8,。package com.anxpp.im.client;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;import java.io.IOException;import java.util.Scanner;import com.anxpp.im.client.handler.ClientHandler;import com.anxpp.im.common.IMConfig;import com.anxpp.im.common.IMMessage;import com.anxpp.im.common.MsgPackDecode;import com.anxpp.im.common.MsgPackEncode;public class Client implements Runnable,IMConfig {public static int UID = 8889;public static int toID = 8888;private ClientHandler clientHandler = new ClientHandler();public static void main(String[] args) throws IOException{new Client().start();}public void start() throws IOException{new Thread(this).start();runServerCMD();}public void sendMsg(IMMessage msg) throws IOException {clientHandler.sendMsg(msg);}/**启动客户端控制台*/private void runServerCMD() throws IOException {IMMessage message = new IMMessage(APP_IM,CLIENT_VERSION,UID,TYPE_MSG_TEXT,toID,MSG_EMPTY);@SuppressWarnings("resource")Scanner scanner = new Scanner(System.in);do{message.setMsg(scanner.nextLine());}while (clientHandler.sendMsg(message));}@Overridepublic void run() {EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.SO_KEEPALIVE, true);b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65536, 0, 2, 0, 2));ch.pipeline().addLast("msgpack decoder",new MsgPackDecode());ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));ch.pipeline().addLast("msgpack encoder",new MsgPackEncode());ch.pipeline().addLast(clientHandler);}});ChannelFuture f = b.connect(SERVER_HOST, SERVER_PORT).sync();f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();}}}9.package com.anxpp.im.client.handler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.io.IOException;import com.anxpp.im.client.Client;import com.anxpp.im.common.IMConfig;import com.anxpp.im.common.IMMessage;public class ClientHandler extends ChannelInboundHandlerAdapter implements IMConfig{private ChannelHandlerContext ctx;/*** tcp链路简历成功后调用*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("成功连接服务器");this.ctx = ctx;IMMessage message = new IMMessage(APP_IM,CLIENT_VERSION,Client.UID,TYPE_CONNECT,SERVER_ID,MSG_EMPTY);sendMsg(message);}/*** 发送消息* @param msg* @return* @throws IOException*/public boolean sendMsg(IMMessage msg) throws IOException {System.out.println("client:" + msg);ctx.channel().writeAndFlush(msg);return msg.getMsg().equals("q") ? false : true;}/*** 收到消息后调用* @throws IOException*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {IMMessage m = (IMMessage)msg;System.out.println(m.getUid() + ":" + m.getMsg());}/*** 发生异常时调用*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.err.println("与服务器断开连接:"+cause.getMessage());ctx.close();}}
10 代码结构
                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: