您的位置:首页 > 编程语言 > Java开发

Java NIO时间服务

2016-09-27 15:40 274 查看


Java NIO时间服务

 

这篇文章内容是另一篇文章《Java 实现基于Redis的分布式锁》的分支. 

 

时间服务包括客户端和服务端, 服务端监听请求 ,若是时间请求,则返回当前服务器的时间, 各个客户端(分布式锁) 都从给服务器获取时间,已达到全局时间一致。

 

共三个类 TimeServer、 TimeClient和TimeClientException,下面是源码:

 

TimeServer.java:

Java代码 

package cc.lixiaohui.lock.time.nio.server;  

  

import java.io.IOException;  

import java.net.InetSocketAddress;  

import java.nio.ByteBuffer;  

import java.nio.channels.SelectionKey;  

import java.nio.channels.Selector;  

import java.nio.channels.ServerSocketChannel;  

import java.nio.channels.SocketChannel;  

import java.util.Iterator;  

  

import org.slf4j.Logger;  

import org.slf4j.LoggerFactory;  

  

/** 

 * 提供简单时间服务的服务器 

 *  

 * @author lixiaohui 

 */  

public class TimeServer {  

  

    private ServerSocketChannel serverChannel;  

  

    private Selector selector;  

  

    private volatile boolean alive = true;  

  

    private static final String TIME_CMD = "time";  

    private static final String HALT_CMD = "halt";  

  

    private static final String ERROR = "error";  

  

    private static final Logger logger = LoggerFactory.getLogger(TimeServer.class);  

  

    public void start(int port) throws IOException {  

        selector = Selector.open();  

  

        serverChannel = ServerSocketChannel.open();  

        serverChannel.configureBlocking(false); // non-blocking mode  

        serverChannel.bind(new InetSocketAddress(port));  

  

        // interested only in accept event  

        serverChannel.register(selector, SelectionKey.OP_ACCEPT);  

  

        while (alive) {  

            try {  

                if (selector.select() < 0) { // no events  

                    continue;  

                }  

                Iterator<SelectionKey> it = selector.selectedKeys().iterator();  

                while (it.hasNext()) {  

                    SelectionKey key = it.next();  

                    it.remove();  

                    try {  

                        if (!key.isValid())   

                            continue;  

                          

                        if (key.isAcceptable()) { // new channel incoming  

                            SocketChannel ch = ((ServerSocketChannel) key.channel()).accept();  

                            // ignore if register failed  

                            if (!registerChannel(selector, ch, SelectionKey.OP_READ)) {  

                                continue;  

                            }  

                            logger.info("new channel registered {}", ch.getRemoteAddress().toString());  

                        }  

                        // client request  

                        if (key.isReadable()) {  

                            handleRead(key);  

                        }  

                        if (key.isWritable()) {  

                            handleWrite(key);  

                        }  

                    } catch (IOException e) {  

                        logger.error("{} exception: {}", key.channel(), e);  

                        if (key != null) {  

                            key.cancel();  

                            if (key.channel() != null) {  

                                key.channel().close();  

                            }  

                        }  

                    }  

                }  

            } catch (Exception e) {  

                logger.error("{}", e);  

            }  

        }  

          

        if (selector != null) {  

            try {  

            selector.close();  

            } catch (Exception e) {  

                logger.error("error occurred when closing selector: e", e);  

            }  

        }  

    }  

  

    private void handleWrite(SelectionKey key) throws IOException {  

        SocketChannel ch = (SocketChannel) key.channel();  

        try {  

            ByteBuffer buf = (ByteBuffer) key.attachment();  

            if (buf != null) {  

                writeBytesToChannel(ch, buf, key);  

            }  

        } catch (ClassCastException e) {  

            logger.error("{}", e);  

        }  

    }  

  

    private void handleRead(SelectionKey key) throws IOException {  

        SocketChannel ch = (SocketChannel) key.channel();  

        ByteBuffer buffer = ByteBuffer.allocate(16);  

        int read = ch.read(buffer);  

        if (read < 4) { // not a full command, write error back,  

                        // meaning client will send command  

                        // again.  

            writeBytesToChannel(ch, ERROR.getBytes(), key);  

        } else {  

            String cmd = extractCommand(buffer);  

            logger.info("recieve {} request from {}", cmd, ch.getRemoteAddress().toString());  

            if (TIME_CMD.equalsIgnoreCase(cmd)) {  

                // 回写时间  

                writeBytesToChannel(ch, String.valueOf(time()).getBytes(), key);  

                logger.info("write time to {}", ch.getRemoteAddress().toString());  

            } else if (HALT_CMD.equalsIgnoreCase(cmd)) {  

                // 停止服务  

                logger.info("stopping timeserver");  

                stop();  

                logger.info("timeserver stopped");  

            } else {  

                writeBytesToChannel(ch, ERROR.getBytes(), key);  

                logger.warn("unreconized command {}, will discard it.", cmd);  

            }  

        }  

    }  

  

    private String extractCommand(ByteBuffer buffer) {  

        buffer.flip();  

        byte[] array = buffer.array();  

        byte[] newArray = new byte[buffer.remaining()];  

        System.arraycopy(array, buffer.position(), newArray, 0, buffer.remaining());  

        return new String(newArray);  

    }  

  

    private void writeBytesToChannel(SocketChannel ch, byte[] bs, SelectionKey key) throws IOException {  

        ByteBuffer buf = ByteBuffer.wrap(bs);  

        int total = buf.remaining();  

        int write = ch.write(buf);  

        if (write < total) { // didn't wrote all, then write rest when next  

                                // event triggered  

            key.attach(buf);  

        }  

    }  

  

    private void writeBytesToChannel(SocketChannel ch, ByteBuffer buf, SelectionKey key) throws IOException {  

        if (!buf.hasRemaining()) {  

            return;  

        }  

        int total = buf.remaining();  

        int write = ch.write(buf);  

        if (write < total) { // didn't wrote all, then write rest when next  

                                // event triggered  

            key.attach(buf);  

        }  

    }  

  

    protected void stop() {  

        alive = false;  

        try {  

            serverChannel.close();  

            selector.close();  

        } catch (IOException e) {  

            // TODO Auto-generated catch block  

            e.printStackTrace();  

        }  

    }  

  

    private boolean registerChannel(Selector sel, SocketChannel sc, int ops) {  

        try {  

            sc.configureBlocking(false);  

            sc.register(sel, ops);  

        } catch (Exception e) {  

            return false;  

        }  

        return true;  

    }  

  

    private long time() {  

        return System.currentTimeMillis();  

    }  

  

}  

 

TimeCient.java:

 

Java代码 

package cc.lixiaohui.lock.time.nio.client;  

  

import java.io.IOException;  

import java.net.InetSocketAddress;  

import java.net.SocketAddress;  

import java.nio.ByteBuffer;  

import java.nio.channels.SocketChannel;  

  

/** 

 * 时间获取客户端 

 * @author lixiaohui 

 * 

 */  

public class TimeClient {  

      

    private static final String TIME_CMD = "time";  

      

    private final SocketAddress address;  

      

    private SocketChannel channel;  

      

    public TimeClient(SocketAddress address) throws IOException {  

        this.address = address;  

        channel = SocketChannel.open(address);  

        channel.configureBlocking(true); // blocking mode  

    }  

      

    /** 

     * @throws TimeClientException when connection with time server is closed. 

     * @return currentTimeMillis in server 

     */  

    public long currentTimeMillis() {  

        try {  

            channel.write(ByteBuffer.wrap(TIME_CMD.getBytes()));  

              

            ByteBuffer buf = ByteBuffer.allocate(64);  

            channel.read(buf);  

              

            buf.flip(); // flip for use of read  

            byte[] bytes = new byte[buf.limit() - buf.position()];  

            System.arraycopy(buf.array(), buf.position(), bytes, 0, bytes.length);  

              

            return Long.parseLong(new String(bytes));  

        } catch(NumberFormatException e) {  

            System.err.println(e);  

            return System.currentTimeMillis();  

        } catch (IOException e) {  

            throw new TimeClientException(address);  

        }  

    }  

      

    /** 

     * close the client, along with its connection with server. 

     */  

    public void close() {  

        try {  

            if (channel != null) {  

                channel.close();  

            }  

        } catch (IOException e) {  

            e.printStackTrace();  

        }  

          

    }  

      

    public static void main(String[] args) throws IOException {  

        TimeClient client = new TimeClient(new InetSocketAddress("localhost", 9999));  

        System.out.println(client.currentTimeMillis());  

        //client.close();  

        System.in.read();  

    }  

  

      

}  

 

TimeClientException.java:

Java代码 

package cc.lixiaohui.lock.time.nio.client;  

  

import java.net.SocketAddress;  

  

public class TimeClientException extends RuntimeException {  

  

    /** 

     *  

     */  

    private static final long serialVersionUID = 1L;  

  

    public TimeClientException() {  

        super();  

        // TODO Auto-generated constructor stub  

    }  

  

    public TimeClientException(String message) {  

        super(message);  

        // TODO Auto-generated constructor stub  

    }  

      

    public TimeClientException(SocketAddress address) {  

        super(address.toString());  

    }  

      

}  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: