您的位置:首页 > 编程语言 > Java开发

springmvc+maven+netty-socketio服务端构建实时通信

2016-11-15 23:41 621 查看

Socket.IO:支持WebSocket协议、用于实时通信和跨平台的框架

WebSocket是HTML5的一种新通信协议,它实现了浏览器与服务器之间的双向通讯。而Socket.IO是一个完全由JavaScript实现、基于Node.js、支持WebSocket的协议用于实时通信、跨平台的开源框架,它包括了客户端的JavaScript和服务器端的Node.js。Socket.IO除了支持WebSocket通讯协议外,还支持许多种轮询(Polling)机制以及其它实时通信方式,并封装成了通用的接口,并且在服务端实现了这些实时机制的相应代码。Socket.IO实现的Polling通信机制包括Adobe
Flash Socket、AJAX长轮询、AJAX multipart streaming、持久Iframe、JSONP轮询等。Socket.IO能够根据浏览器对通讯机制的支持情况自动地选择最佳的方式来实现网络实时应用。当前,Socket.IO最新版本是于2015年1月19日发布的1.3.0版本,该版本增强了稳定性和提高了性能,并修复了大量Bug。

Socket.IO设计的目标是构建能够在不同浏览器和移动设备上良好运行的实时应用,如实时分析系统、二进制流数据处理应用、在线聊天室、在线客服系统、评论系统、WebIM等。目前,Socket.IO已经支持主流PC浏览器(如IE、Safari、Chrome、Firefox、Opera等)和移动平台上的浏览器(iOS平台下的Safari、Android平台下的基于Webkit的浏览器等)。

Socket.IO已经具有众多强大功能的模块和扩展API,如(session.socket.io)(http session中间件,进行session相关操作)、socket.io-cookie(cookie解析中间件)、session-web-sockets(以安全的方式传递Session)、socket-logger(JSON格式的记录日志工具)、websocket.MQ(可靠的消息队列)、socket.io-mongo(使用MongoDB的适配器)、socket.io-redis(Redis的适配器)、socket.io-parser(服务端和客户端通讯的默认协议实现模块)等。

Socket.IO实现了实时、双向、基于事件的通讯机制,它解决了实时的通信问题,并统一了服务端与客户端的编程方式。启动了Socket以后,就像建立了一条客户端与服务端的管道,两边可以互通有无。它还能够和Express.js提供的传统请求方式很好的结合,即可以在同一个域名,同一个端口提供两种连接方式:

request/response, websocket(flashsocket,ajax…).

netty-socketio   客户端和服务端下载地址:https://github.com/mrniko/netty-socketio

1.maven引入依赖jar包

 <dependency>

          <groupId>com.corundumstudio.socketio</groupId>

          <artifactId>netty-socketio</artifactId>

          <version>1.7.7</version>

</dependency>

socketio服务端:SocketIo_Server.java

import java.util.Map;

import com.corundumstudio.socketio.AckRequest;

import com.corundumstudio.socketio.Configuration;

import com.corundumstudio.socketio.SocketIOClient;

import com.corundumstudio.socketio.SocketIOServer;

import com.corundumstudio.socketio.listener.ConnectListener;

import com.corundumstudio.socketio.listener.DataListener;

import com.corundumstudio.socketio.listener.DisconnectListener;

public class SocketIo_Server {

    public static void main(String[] args) throws InterruptedException {

        Configuration config = new Configuration();

        //服务器主机ip,这里配置本机

        config.setHostname("localhost");

        //端口,任意

        config.setPort(9092);

        config.setMaxFramePayloadLength(1024 * 1024);

        config.setMaxHttpContentLength(1024 * 1024);

        SocketIOServer server = new SocketIOServer(config);

        //监听广告推送事件,advert_info为事件名称,自定义

        server.addEventListener("advert_info", String.class, new DataListener<String>(){

            @Override

            public void onData(SocketIOClient client, String data, AckRequest ackRequest) throws ClassNotFoundException {

                //客户端推送advert_info事件时,onData接受数据,这里是string类型的json数据,还可以为Byte[],object其他类型

                

                String sa = client.getRemoteAddress().toString();

                String clientIp = sa.substring(1,sa.indexOf(":"));//获取客户端连接的ip

                Map params = client.getHandshakeData().getUrlParams();//获取客户端url参数

                System.out.println(clientIp+":客户端:************"+data);

            }

        });

        //监听通知事件

        server.addEventListener("notice_info", String.class, new DataListener<String>() {

            @Override    

            public void onData(SocketIOClient client, String data, AckRequest ackRequest) {

                //同上

            }

        });

        

        /**

         * 监听其他事件

         */

        

        //添加客户端连接事件

        server.addConnectListener(new ConnectListener() {

            @Override

            public void onConnect(SocketIOClient client) {

                // TODO Auto-generated method stub

                String sa = client.getRemoteAddress().toString();

                String clientIp = sa.substring(1,sa.indexOf(":"));//获取设备ip

                System.out.println(clientIp+"-------------------------"+"客户端已连接");

                Map params = client.getHandshakeData().getUrlParams();

                

                //给客户端发送消息

                client.sendEvent("advert_info",clientIp+"客户端你好,我是服务端,有什么能帮助你的?");

            }

        });

        //添加客户端断开连接事件

        server.addDisconnectListener(new DisconnectListener(){

            @Override

            public void onDisconnect(SocketIOClient client) {

                // TODO Auto-generated method stub

                String sa = client.getRemoteAddress().toString();

                String clientIp = sa.substring(1,sa.indexOf(":"));//获取设备ip

                System.out.println(clientIp+"-------------------------"+"客户端已断开连接");

                

                //给客户端发送消息

                client.sendEvent("advert_info",clientIp+"客户端你好,我是服务端,期待下次和你见面");

            }

        });

          server.start();

          

        Thread.sleep(Integer.MAX_VALUE);

        server.stop();

    }

}
socketio客户端:SocketIo_Client.java

import io.socket.client.IO;

import io.socket.client.Socket;

import io.socket.emitter.Emitter;

public class SocketIo_Client {

    public static void main(String[] args) {

        try{

            IO.Options options = n
10a62
ew IO.Options();    

            options.forceNew = true;

            options.reconnection = true;

            final Socket socket = IO.socket("http://localhost:9092?deviceId=ZYLPC", options);

           

            socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {

                @Override

                public void call(Object... args) {

                    System.out.println("connect");

//                    socket.close();

                }

            }).on(Socket.EVENT_CONNECT_TIMEOUT, new Emitter.Listener() {

                @Override

                public void call(Object... args) {

                    System.out.println("connect timeout");

                }

            }).on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() {

                @Override    

                public void call(Object... args) {

                    System.out.println("connect error");

                }

            }).on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {

                @Override

                public void call(Object... args) {    

                    System.out.println("disconnect");

                }

            }).on("advert_info", new Emitter.Listener() {

                @Override

                public void call(Object... args) {

                    String data = (String)args[0];

                    System.out.println("服务端:************"+data.toString());

                    //给服务端发送信息

                    socket.emit("advert_info", "服务端你好,我是客户端,我有问题想咨询你!");

                }

            }).on("notice_info", new Emitter.Listener(){

                @Override

                public void call(Object... args){

                    String data = (String)args[0];

                }

            });

            socket.open();

        }catch(Exception e){

            

        }

    }

}

与spring集成:

服务层:SocketIoService.java


import java.util.Collection;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import org.springframework.stereotype.Service;

import com.corundumstudio.socketio.AckRequest;

import com.corundumstudio.socketio.Configuration;

import com.corundumstudio.socketio.SocketIOClient;

import com.corundumstudio.socketio.SocketIOServer;

import com.corundumstudio.socketio.listener.ConnectListener;

import com.corundumstudio.socketio.listener.DataListener;

import com.corundumstudio.socketio.listener.DisconnectListener;

@Service("socketIoService")

public class SocketIoService {

    static SocketIOServer server;

    static Map<String, SocketIOClient> clientsMap = new HashMap<String, SocketIOClient>();

    

    public void startServer() throws InterruptedException{

        Configuration config = new Configuration();

        //服务器主机ip    

        config.setHostname("localhost");

        //端口

        config.setPort(9092);

        config.setMaxFramePayloadLength(1024 * 1024);

        config.setMaxHttpContentLength(1024 * 1024);

        server = new SocketIOServer(config);

        //监听广告推送事件,advert_info为事件名称,自定义

        server.addEventListener("advert_info", String.class, new DataListener<String>(){

            @Override

            public void onData(SocketIOClient client, String data, AckRequest ackRequest) throws ClassNotFoundException {

                //客户端推送advert_info事件时,onData接受数据,这里是string类型的json数据,还可以为Byte[],object其他类型

                

                String sa = client.getRemoteAddress().toString();

                String clientIp = sa.substring(1,sa.indexOf(":"));//获取客户端连接的ip

                Map params = client.getHandshakeData().getUrlParams();//获取客户端url参数

                

                System.out.println(clientIp+":客户端:************"+data);

            }

        });

        //监听通知事件

        server.addEventListener("notice_info", String.class, new DataListener<String>() {

            @Override    

            public void onData(SocketIOClient client, String data, AckRequest ackRequest) {

                //同上

            }

        });

        

        /**

         * 监听其他事件

         */

        

        //添加客户端连接事件

        server.addConnectListener(new ConnectListener() {

            @Override

            public void onConnect(SocketIOClient client) {

                // TODO Auto-generated method stub

                String sa = client.getRemoteAddress().toString();

                String clientIp = sa.substring(1,sa.indexOf(":"));//获取设备ip

                System.out.println(clientIp+"-------------------------"+"客户端已连接");

                Map params = client.getHandshakeData().getUrlParams();

                

                //获取客户端连接的uuid参数

                Object object = params.get("uuid");

                String uuid = "";

                if(object != null){

                    uuid = ((List<String>)object).get(0);

                    //将uuid和连接客户端对象进行绑定

                    clientsMap.put(uuid,client);

                }

                //给客户端发送消息

                client.sendEvent("advert_info",clientIp+"客户端你好,我是服务端,有什么能帮助你的?");

            }

        });

        //添加客户端断开连接事件

        server.addDisconnectListener(new DisconnectListener(){

            @Override

            public void onDisconnect(SocketIOClient client) {

                // TODO Auto-generated method stub

                String sa = client.getRemoteAddress().toString();

                String clientIp = sa.substring(1,sa.indexOf(":"));//获取设备ip

                System.out.println(clientIp+"-------------------------"+"客户端已断开连接");

                

                //给客户端发送消息

                client.sendEvent("advert_info",clientIp+"客户端你好,我是服务端,期待下次和你见面");

            }

        });

          server.start();

          

        Thread.sleep(Integer.MAX_VALUE);

        server.stop();

    }

    public void stopServer(){

        if(server != null){

            server.stop();

            server = null;

        }

    }

    /**

     *  给所有连接客户端推送消息

     * @param eventType 推送的事件类型

     * @param message  推送的内容

     */

    public void sendMessageToAllClient(String eventType,String message){

        Collection<SocketIOClient> clients = server.getAllClients();

        for(SocketIOClient client: clients){

            client.sendEvent(eventType,message);

        }

    }

    /**

     * 给具体的客户端推送消息

     * @param deviceId 设备类型

     * @param eventType推送事件类型

     * @param message 推送的消息内容

     */

    public void sendMessageToOneClient(String uuid,String eventType,String message){

        try {

            if(uuid != null && !"".equals(uuid)){

                SocketIOClient client = (SocketIOClient)clientsMap.get(uuid);

                if(client != null){

                    client.sendEvent(eventType,message);

                }

            }

        } catch (Exception e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        }

    }

   public static SocketIOServer getServer() {

        return server;

    }

}

控制层层:SocketIoController.java

import java.util.Map;

import javax.servlet.http.HttpServletRequest;

import javax.servlet.http.HttpServletResponse;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.RequestMapping;

@Controller

public class SocketIoController {

    @Autowired

    private SocketIoService service;

    

    //启动socket 服务

    @RequestMapping("startServer")

    public void startServer(HttpServletRequest request,HttpServletResponse response) throws Exception{

        Map params = ReflectUtil.transToMAP(request.getParameterMap());

        try {

            if(service.getServer() == null){

                new Thread(new Runnable() {

                    @Override

                    public void run() {

                        // TODO Auto-generated method stub

                        try {

                            service.startServer();

                        } catch (InterruptedException e) {

                            // TODO Auto-generated catch block

                            e.printStackTrace();

                        }

                    }

                }).start();

            }

        } catch (Exception e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        }

    }

    

    //停止socket服务

    @RequestMapping("stopServer")

    public void stopServer(HttpServletRequest request,HttpServletResponse response) throws Exception{

        Map params = ReflectUtil.transToMAP(request.getParameterMap());

        try {

            if(service.getServer() == null){

                service.stopServer();

            }

        } catch (Exception e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        }

    }

    //给指定的客户端推送消息

    @RequestMapping("sendAdvertInfoMsg")

    public void sendAdvertInfoMsg(HttpServletRequest request,HttpServletResponse response) throws Exception{

        Map params = ReflectUtil.transToMAP(request.getParameterMap());

        String uuid = ParamsUtil.nullDeal(params, "uuid", "");

        try {

            if(!"".equals(uuid) && service.getServer() != null){

                service.sendMessageToOneClient(uuid, "advert_info", "推送的内容");

            }

        } catch (Exception e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        }

    }

}

如果想在spring容器启动之后启动sockerio,可以这样做:

自定义一个类,用@component注入

@component (把普通pojo实例化到spring容器中,相当于配置文件中的<bean id="" class=""/>

实现spring  ApplicationListener接口,这样在spring加载成功之后就会调用onApplicationEvent方法启动socketio

import io.socket.client.IO;

import io.socket.client.Socket;

import io.socket.emitter.Emitter;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.ApplicationListener;

import org.springframework.context.event.ContextRefreshedEvent;

import org.springframework.stereotype.Component;

import com.zkkj.backend.common.socketio.BinaryEventLauncher;

import com.zkkj.backend.service.biz.advert.IAdvertService;

/**

 * spring加载完毕后执行

 * @author ZYL_PC

 *

 */

@Component("BeanDefineConfigue")

public class BeanDefineConfigue  implements ApplicationListener<ContextRefreshedEvent>{

    @Autowired

    private SocketIoService service;

    //当前服务器的ip

    private String serverIp = "";

    //当前服务器设备id

    private String deviceId = "";

    //执行时间,时间单位为毫秒,读者可自行设定,不得小于等于0

    private static Long cacheTime = Long.MAX_VALUE;

    //延迟时间,时间单位为毫秒,读者可自行设定,不得小于等于0

    private static Integer delay = 3000;

    @Override

    public void onApplicationEvent(ContextRefreshedEvent event) {

        // TODO Auto-generated method stub

        Timer timer = new Timer();

        timer.scheduleAtFixedRate(new TimerTask() {

          public void run() {

            //启动socket监听

              try{

                  if(service.getServer() == null){

                      new Thread(new Runnable() {

                          @Override

                          public void run() {

                              try {

                                  service.startServer();

                              } catch (InterruptedException e) {

                                  e.printStackTrace();

                              }

                          }

                      }).start();

                  }

              }catch(Exception e){

              }

          }

        }, delay,cacheTime);// 这里设定将延时每天固定执行

    }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: