您的位置:首页 > 其它

基于Socket通信的P2P聊天

2017-07-11 16:37 281 查看
一、Socket通信的简介

socket是在应用层和传输层之间的一个抽象层,它把TCP/IP层复杂的操作抽象为几个简单的接口供应用层调用以实现进程在网络中通信,socket是一种”打开—读/写—关闭”模式的实现,服务器和客户端各自维护一个”文件”,在建立连接打开后,可以向自己文件写入内容供对方读取或者读取对方内容,通讯结束时关闭文件;以使用TCP协议通讯的socket为例,其交互流程:Tcp协议:三次握手协议(服务端accept,客服端connect),四次挥手(客服端close,服务端close)服务端accept阻塞的,等待着(多个)客服端的链接;大致的链接和通信流程如下:



从Socket读取数据,从Socket打开文件输入流,从输入流读取数据,如果Socket有数据读取到数据,Socket没有数据,进行读取阻塞;

二、IM的流程

① 创建与服务端的消息通道Socket

② 验证账号与密码

③ 取得所有联系人BuddyList

④ 服务器根据消息的目标账号to 发送消息—转发

三、项目的服务器与客户端

现在梳理一下服务器和客户端的链接和通信的流程,QQConnectionManager维护的是整个服务器的链接库,QQConnection则是对应的一个客户端连接服务器的socket链接管理器,存储在QQConnectionManager中,而QQConnection里面对应包含了与单个服务器连接的socket和对应输入输出流,用于与客户端接收和发送消息,QQMessage则是包含了消息里面的一些属性,如消息类型,消息内容,消息的接收者和发送者等等;这里所说的服务器只是本地模拟的一个服务器,所以好友在一开始则是初始化的默认数据,我们在此只是实现聊天的功能;



我们来看下服务端代码包分类:



bean目录结构里面包含的是QQ消息和用户、好友、数据库的实体类;core则是整个项目的重要实现;Listener里面则是服务器监听客户端发送过来的消息,然后根据需求,消息类型判断是否需要将其转发到另外一个客户端;main里面则是服务端开启则开启一个阻塞的线程等待客户端的链接;

四、服务端的实现

QQImserver

public class QQImServer {

public static void main(String[] args) {

try {
// ① 创建一个线程 等其他客户端的连接
final ServerSocket server = new ServerSocket(5223);
System.out.println("---服务器启动---" + new Date().toString());
new Thread() {//
public void run() {
while (true) {
QQConnection conn = null;
try {
//接受来自不同客服端的链接,为不同的客服端分配Socket
Socket client = server.accept();
System.out.println("---有客户端接入---" + client);
// ② 如果客户端连接成功分配置一个线程
conn = new QQConnection(client);
conn.addOnRecevieMsgListener(new LoginMsgListener(conn));
conn.addOnRecevieMsgListener(new ChatP2PListener());
conn.addOnRecevieMsgListener(new ChatRoomListener());
conn.addOnRecevieMsgListener(new LoginOutListener());
// ③ 该线程内等待用户数据
conn.connect();
// ④ 分配一个线程给客户端
} catch (IOException e) {
e.printStackTrace();
conn.disconnect();
}
}
};
}.start();
} catch (Exception e) {//
e.printStackTrace();
}
}
}


QQMessageType

public class QQMessageType {

public static final String MSG_TYPE_REGISTER = "register";      // 注册
public static final String MSG_TYPE_LOGIN = "login";            // 登录
public static final String MSG_TYPE_LOGIN_OUT = "loginout";     // 登出
public static final String MSG_TYPE_BUDDY_LIST = "buddylist";   // 好友列表
public static final String MSG_TYPE_CHAT_P2P = "chatp2p";       // 聊天
public static final String MSG_TYPE_CHAT_ROOM = "chatroom";     // 群聊
public static final String MSG_TYPE_OFFLINE = "offline";        // 下线
public static final String MSG_TYPE_SUCCESS = "success";        // 成功
public static final String MSG_TYPE_FAILURE = "failure";        // 失败

}


QQMessage,这里的ProtocalObject 则是封装了对对象与XML之间进行转换得方法,用到了XStream.jar实现

public class QQMessage extends ProtocalObject {

public String type = QQMessageType.MSG_TYPE_CHAT_P2P;// 类型的数据 chat login
public long from = 0L;                               // 发送者 account
public S
fee9
tring fromNick = "";                         // 昵称
public int fromAvatar = 1;                           // 头像
public long to = 0L;                                 // 接收者 account
public String content = "";                          // 消息的内容
public String sendTime = MyTime.getTime();           // 发送时间

}


QQConnection,用于在链接成功后,通过socket获取输入输出流,向该链接的客户端发送和接收数据,将接收到的消息通过listener的方式传递给实现者去转发消息

public class QQConnection extends Thread {

private Socket scoket = null;
public DataOutputStream writer = null;
public DataInputStream reader = null;
public QQUser who = null;
public String ip;
public int port;

public QQConnection(Socket scoket) {
super();
try {
this.scoket = scoket;
writer = new DataOutputStream(this.scoket.getOutputStream());
reader = new DataInputStream(this.scoket.getInputStream());
} catch (Exception e) {
e.printStackTrace();
}
}

public QQConnection(String ip, int port) {
super();
this.ip = ip;
this.port = port;
init(ip, port);
}

private void init(String ip, int port) {
try {
this.scoket = new Socket(ip, port);
writer = new DataOutputStream(this.scoket.getOutputStream());
reader = new DataInputStream(this.scoket.getInputStream());
} catch (Exception e) {
e.printStackTrace();
}
}

// 连接
public void connect() {
if (this.scoket == null) {
init(ip, port);
}
flag = true;
start();
}

// 断开连接
public void disconnect() {
try {
flag = false;
writer.close();
reader.close();
// stop();
} catch (Exception e) {
e.printStackTrace();
}
}

// 1.申明监听器与响应方法
public static interface OnRecevieMsgListener {
public void onReceive(QQMessage msg);
}

// 2.支持多个监听器
private List<OnRecevieMsgListener> listeners = new ArrayList<OnRecevieMsgListener>();

// 3.添加监听器
public void addOnRecevieMsgListener(OnRecevieMsgListener listener) {
listeners.add(listener);
}

// 4.删除监听器
public void removeOnRecevieMsgListener(OnRecevieMsgListener listener) {
listeners.remove(listener);
}

private boolean flag = true;
@Override
public void run() {
super.run();
// 等待 数据
while (flag) {
try {
//从Socket里面读取来自远方Socket的消息,这个方法是阻塞的
String xml;
xml = reader.readUTF();
disconnect();
System.out.println(xml);
if (xml != null && !"".equals(xml)) {
QQMessage msg = new QQMessage();
msg = (QQMessage) msg.fromXml(xml);
for (OnRecevieMsgListener l : listeners) {
l.onReceive(msg);
}
}
} catch (EOFException e) {
disconnect();
System.out.println("=-=EOFException---");
if (who != null) {
QQConnectionManager.remove(who.account);
}
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
disconnect();
if (who != null) {
QQConnectionManager.remove(who.account);
}
}
}
}
}


QQConnectionManager,用于管理每个链接成功的客户端的socket,存放于hashMap中

public class QQConnectionManager {

public static HashMap<Long, QQConnection> conns = new HashMap<Long, QQConnection>();
private static QQBuddyList list = new QQBuddyList();

public static void put(long account, QQConnection conn) {
System.out.println("====账号"+account+"上线了");
conns.put(account, conn);
QQUser u = Db.getByAccount(account);
QQBuddy item = new QQBuddy();
item.account = u.account;
item.avatar = u.avatar;
item.nick = u.nick;
list.buddyList.add(0, item);
}

public static void remove(Long account) {
if (conns.containsKey(account)) {
System.out.println("====账号"+account+"下线了");
conns.remove(account);
QQBuddy delete = null;
for (QQBuddy item : list.buddyList) {
if (account==item.account) {
delete = item;
break;
}
}
if (delete != null) {
System.out.println("====从在线名单上移除 0000"+account);
list.buddyList.remove(delete);
}
}
}

public static String getBuddyList() {
return list.toXml();
}

public static QQConnection get(long account) {
if (conns.containsKey(account)) {
return conns.get(account);
}
return null;
}
}


LoginMsgListener,这里我们举例登陆成功后,服务器如何将消息转发回客户端,MessageSender 这里是转发消息给客户端,代码在最后会给出下载链接

public class LoginMsgListener extends MessageSender implements OnRecevieMsgListener {

private QQConnection conn = null;

public LoginMsgListener(QQConnection conn) {
super();
this.conn = conn;
}

@Override
public void onReceive(QQMessage fromCient) {
if (QQMessageType.MSG_TYPE_LOGIN.equals(fromCient.type)) {
try {
QQMessage toClient = new QQMessage();
if (QQMessageType.MSG_TYPE_LOGIN.equals(fromCient.type)) {
String[] params = fromCient.content.split("#");
String account = params[0];
String pwd = params[1];
QQUser user = Db.getByAccount(Long.parseLong(account));
if (user == null) {
// 不存在
toClient.type = QQMessageType.MSG_TYPE_FAILURE;
toClient.content = "不存在";
toClient(toClient, conn);
} else {
// 存在
if (user.password.equals(pwd)) {
// 登录 成功,把联系人列表给登录者
toClient.type = QQMessageType.MSG_TYPE_BUDDY_LIST;
// 返回在线名单
// 创建带身份的连接对象
conn.who = user;
if (!QQConnectionManager.conns.keySet().contains(
user.account)) {
QQConnectionManager.put(user.account, conn);
}
//取出好友列表
toClient.content = QQConnectionManager
.getBuddyList();
toEveryClient(toClient);
} else {
toClient.type = QQMessageType.MSG_TYPE_FAILURE;
toClient.content = "失败";
toClient(toClient, conn);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}


到这里服务端的代码主要逻辑完成了,我们来看看客户端的主要逻辑,bean类跟服务器相同,主要还是看下ConnectionManager的逻辑;这里让其集成Observable类则使用的观察者模式,因为开启线程循环取接收服务器发送过来的消息,我们要通知界面拿到消息,则接受消息的类ConnectionManager需要被观察,在UI界面作为观察者;这里也实现了发送和接收消息

public class ConnectionManager extends Observable implements Runnable{

private static final String HOST = "192.168.1.144";
private static final int PORT = 5223;
private static ConnectionManager sInstance = new ConnectionManager();
private Socket mSocket;
private InetSocketAddress mAddress;
private DataInputStream mReader;
private DataOutputStream mWriter;
private boolean flag;
private long mAccount;

public static ConnectionManager getInstance() {
return sInstance;
}

public void connect() throws UnknownHostException, IOException {
mSocket = new Socket();
mAddress = new InetSocketAddress(HOST, PORT);
mSocket.connect(mAddress);
mReader = new DataInputStream(mSocket.getInputStream());
mWriter = new DataOutputStream(mSocket.getOutputStream());
flag = true;
new Thread(this).start();
}

public long getAccount() {
return mAccount;
}

public void disConnect() throws IOException {
QQMessage message = new QQMessage();
message.from = mAccount;
message.type = QQMessageType.MSG_TYPE_LOGIN_OUT;
sendMesage(message);
mSocket.close();
mSocket = null;
}

@Override
public void run() {
while (flag) {
try {
String xml = mReader.readUTF();
QQMessage message = new QQMessage();
message = (QQMessage) message.fromXml(xml);
setChanged();
notifyObservers(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}

public void sendMesage(QQMessage message) throws IOException {
mWriter.writeUTF(message.toXml());
}

public void login(String account, String pwd) throws IOException {
mAccount = Long.parseLong(account);
QQMessage message = new QQMessage();
message.type = QQMessageType.MSG_TYPE_LOGIN;
message.content = account + "#" + pwd;
sendMesage(message);
}
}


最后,给出相关代码的下载链接:

http://download.csdn.net/download/jacky_can/9895432
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息