Java NIO时间服务
2016-09-27 15:40
274 查看
Java NIO时间服务
这篇文章内容是另一篇文章《Java 实现基于Redis的分布式锁》的分支.
时间服务包括客户端和服务端, 服务端监听请求 ,若是时间请求,则返回当前服务器的时间, 各个客户端(分布式锁) 都从给服务器获取时间,已达到全局时间一致。
共三个类 TimeServer、 TimeClient和TimeClientException,下面是源码:
TimeServer.java:
Java代码
package cc.lixiaohui.lock.time.nio.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 提供简单时间服务的服务器
*
* @author lixiaohui
*/
public class TimeServer {
private ServerSocketChannel serverChannel;
private Selector selector;
private volatile boolean alive = true;
private static final String TIME_CMD = "time";
private static final String HALT_CMD = "halt";
private static final String ERROR = "error";
private static final Logger logger = LoggerFactory.getLogger(TimeServer.class);
public void start(int port) throws IOException {
selector = Selector.open();
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false); // non-blocking mode
serverChannel.bind(new InetSocketAddress(port));
// interested only in accept event
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (alive) {
try {
if (selector.select() < 0) { // no events
continue;
}
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
try {
if (!key.isValid())
continue;
if (key.isAcceptable()) { // new channel incoming
SocketChannel ch = ((ServerSocketChannel) key.channel()).accept();
// ignore if register failed
if (!registerChannel(selector, ch, SelectionKey.OP_READ)) {
continue;
}
logger.info("new channel registered {}", ch.getRemoteAddress().toString());
}
// client request
if (key.isReadable()) {
handleRead(key);
}
if (key.isWritable()) {
handleWrite(key);
}
} catch (IOException e) {
logger.error("{} exception: {}", key.channel(), e);
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (Exception e) {
logger.error("{}", e);
}
}
if (selector != null) {
try {
selector.close();
} catch (Exception e) {
logger.error("error occurred when closing selector: e", e);
}
}
}
private void handleWrite(SelectionKey key) throws IOException {
SocketChannel ch = (SocketChannel) key.channel();
try {
ByteBuffer buf = (ByteBuffer) key.attachment();
if (buf != null) {
writeBytesToChannel(ch, buf, key);
}
} catch (ClassCastException e) {
logger.error("{}", e);
}
}
private void handleRead(SelectionKey key) throws IOException {
SocketChannel ch = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
int read = ch.read(buffer);
if (read < 4) { // not a full command, write error back,
// meaning client will send command
// again.
writeBytesToChannel(ch, ERROR.getBytes(), key);
} else {
String cmd = extractCommand(buffer);
logger.info("recieve {} request from {}", cmd, ch.getRemoteAddress().toString());
if (TIME_CMD.equalsIgnoreCase(cmd)) {
// 回写时间
writeBytesToChannel(ch, String.valueOf(time()).getBytes(), key);
logger.info("write time to {}", ch.getRemoteAddress().toString());
} else if (HALT_CMD.equalsIgnoreCase(cmd)) {
// 停止服务
logger.info("stopping timeserver");
stop();
logger.info("timeserver stopped");
} else {
writeBytesToChannel(ch, ERROR.getBytes(), key);
logger.warn("unreconized command {}, will discard it.", cmd);
}
}
}
private String extractCommand(ByteBuffer buffer) {
buffer.flip();
byte[] array = buffer.array();
byte[] newArray = new byte[buffer.remaining()];
System.arraycopy(array, buffer.position(), newArray, 0, buffer.remaining());
return new String(newArray);
}
private void writeBytesToChannel(SocketChannel ch, byte[] bs, SelectionKey key) throws IOException {
ByteBuffer buf = ByteBuffer.wrap(bs);
int total = buf.remaining();
int write = ch.write(buf);
if (write < total) { // didn't wrote all, then write rest when next
// event triggered
key.attach(buf);
}
}
private void writeBytesToChannel(SocketChannel ch, ByteBuffer buf, SelectionKey key) throws IOException {
if (!buf.hasRemaining()) {
return;
}
int total = buf.remaining();
int write = ch.write(buf);
if (write < total) { // didn't wrote all, then write rest when next
// event triggered
key.attach(buf);
}
}
protected void stop() {
alive = false;
try {
serverChannel.close();
selector.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private boolean registerChannel(Selector sel, SocketChannel sc, int ops) {
try {
sc.configureBlocking(false);
sc.register(sel, ops);
} catch (Exception e) {
return false;
}
return true;
}
private long time() {
return System.currentTimeMillis();
}
}
TimeCient.java:
Java代码
package cc.lixiaohui.lock.time.nio.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* 时间获取客户端
* @author lixiaohui
*
*/
public class TimeClient {
private static final String TIME_CMD = "time";
private final SocketAddress address;
private SocketChannel channel;
public TimeClient(SocketAddress address) throws IOException {
this.address = address;
channel = SocketChannel.open(address);
channel.configureBlocking(true); // blocking mode
}
/**
* @throws TimeClientException when connection with time server is closed.
* @return currentTimeMillis in server
*/
public long currentTimeMillis() {
try {
channel.write(ByteBuffer.wrap(TIME_CMD.getBytes()));
ByteBuffer buf = ByteBuffer.allocate(64);
channel.read(buf);
buf.flip(); // flip for use of read
byte[] bytes = new byte[buf.limit() - buf.position()];
System.arraycopy(buf.array(), buf.position(), bytes, 0, bytes.length);
return Long.parseLong(new String(bytes));
} catch(NumberFormatException e) {
System.err.println(e);
return System.currentTimeMillis();
} catch (IOException e) {
throw new TimeClientException(address);
}
}
/**
* close the client, along with its connection with server.
*/
public void close() {
try {
if (channel != null) {
channel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
TimeClient client = new TimeClient(new InetSocketAddress("localhost", 9999));
System.out.println(client.currentTimeMillis());
//client.close();
System.in.read();
}
}
TimeClientException.java:
Java代码
package cc.lixiaohui.lock.time.nio.client;
import java.net.SocketAddress;
public class TimeClientException extends RuntimeException {
/**
*
*/
private static final long serialVersionUID = 1L;
public TimeClientException() {
super();
// TODO Auto-generated constructor stub
}
public TimeClientException(String message) {
super(message);
// TODO Auto-generated constructor stub
}
public TimeClientException(SocketAddress address) {
super(address.toString());
}
}
相关文章推荐
- javascript ajax获得服务响应头的时间
- Linux下 ntp 时间同步服务ntpd 出现 the NTP socket is in use, exiting 解决
- Cisco交换机时间服务(NTP)的设置
- CentOS启动sendmail服务时间长问题解决方法
- SpringCloud(第 010 篇)简单 Quartz-Cluster 微服务,支持集群分布式,并支持动态修改 Quartz 任务的 cronExpression 执行时间
- 时区修改与服务之间时间同步
- 为应用程序池 'DefaultAppPool' 提供服务的进程关闭时间超过了限制
- NTP 网络时间协议服务配置说明(Windows)
- 闲谈:价值、服务、时间、用户体验、美、过度开发
- 解决为应用程序池 提供服务的进程关闭时间超过了限制
- [转]整合Web和Windows服务——按预定时间间隔运行ASP.NET代码
- 使用Messenger类绑定服务显示时间
- datenode节点超时时间设置,Hadoop启动不正常,HDFS冗余数据块的自动删除,NameNode安全模式问题,ntp时间服务同步,机架感知配置
- 3G的定价原则是时间、流量、服务复合计费[新浪]
- Linux上搭建FTP服务的相关配置3:设置用户磁盘额及访问时间
- 原创:原来准备创业的项目:公交服务构思,让一个人可以在最短时间内安全,快速,舒适的到达终点站
- 修改 IIS 服务的会话时间(timeout)
- 第一个NTP时间服务
- 网游服务器端设计思考:时间服务(二)需求
- 手动配置 Windows 时间服务