用zookeeper实现分布式session
2015-01-15 20:32
411 查看
废话不说,直接贴代码
[java] view
plaincopy
package com.tianque.session;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpSession;
import javax.servlet.http.HttpSessionContext;
import com.tianque.session.zookeeper.ZkSessionChangeListener;
import com.tianque.session.zookeeper.ZkSessionHelper;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public abstract class AbstractSession implements HttpSession {
private SessionMetaData meta;
public AbstractSession(){
meta=new SessionMetaData();
meta.setCreateTime(new Date().getTime());
&n
20000
bsp; //meta.setSid(SidGenerator.generateSid());
meta.setLastAccessedTime(meta.getCreateTime());
meta.setIsnew(true);
meta.setMaxInactiveInterval((int)SessionChangeListener.getTimeout());
//ZkSessionHelper.addSession(meta);
}
public void setLastAccessedTime(long lastAccessedTime) {
meta.setLastAccessedTime(lastAccessedTime);
}
public long getCreationTime() {
return meta.getCreateTime();
}
public String getId() {
return meta.getSid();
}
public void setId(String sid){
meta.setSid(sid);
}
public long getLastAccessedTime() {
return meta.getLastAccessedTime();
}
public ServletContext getServletContext() {
return null;
}
public void setMaxInactiveInterval(int interval) {
meta.setMaxInactiveInterval(interval);
}
public int getMaxInactiveInterval() {
return meta.getMaxInactiveInterval();
}
public HttpSessionContext getSessionContext() {
return null;
}
public void invalidate() {
}
public boolean isNew() {
return meta.isIsnew();
}
public SessionMetaData getMeta() {
return meta;
}
public void setMeta(SessionMetaData meta) {
this.meta = meta;
}
}
[java] view
plaincopy
package com.tianque.session;
import java.io.IOException;
import java.util.Date;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public abstract class AbstractSessionFilter implements Filter{
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
doFilterInternal((HttpServletRequest)request,(HttpServletResponse)response);
chain.doFilter(request, response);
}
protected abstract void doFilterInternal(HttpServletRequest request,HttpServletResponse response);
@Override
public void destroy() {
}
}
[java] view
plaincopy
package com.tianque.session;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public abstract class AbstractSessionManager {
private Map sessions=new HashMap();
public AbstractSessionManager(){}
public HttpSession getSession(String sid){
return (HttpSession)sessions.get(sid);
}
public void addSession(HttpSession session,String sid){
this.sessions.put(sid, session);
}
public abstract void loadSession();
public Map getAllSession(){
return sessions;
}
public String getSessionIdByCookie(HttpServletRequest request){
Cookie[] cookies = request.getCookies();
if (cookies == null) {
return null;
}
for (int i = 0; i < cookies.length; i++) {
Cookie cookie = cookies[i];
if ("sid".equals(cookie.getName())) {
String sid = cookie.getValue();
return sid;
}
}
return null;
}
}
[java] view
plaincopy
package com.tianque.session;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public abstract class SessionChangeListener implements ServletContextListener {
private static long timeout;
public static long getTimeout(){
return timeout;
}
public void contextInitialized(ServletContextEvent sce) {
String timeoutStr=sce.getServletContext().getInitParameter("sessionTimeout");
if(timeoutStr!=null&&!timeoutStr.equals("")){
timeout=Long.parseLong(timeoutStr);
}else{
timeout=3600000;
}
subscribeSession();
}
public void contextDestroyed(ServletContextEvent sce) {
release();
}
protected abstract void subscribeSession();
protected abstract void release();
}
[java] view
plaincopy
package com.tianque.session;
import java.io.Serializable;
import javax.servlet.ServletContext;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public class SessionMetaData implements Serializable{
private long createTime;
private String sid;
private long lastAccessedTime;
private int maxInactiveInterval;
private boolean isnew;
public long getCreateTime() {
return createTime;
}
public void setCreateTime(long createTime) {
this.createTime = createTime;
}
pu
4000
blic String getSid() {
return sid;
}
public void setSid(String sid) {
this.sid = sid;
}
public long getLastAccessedTime() {
return lastAccessedTime;
}
public void setLastAccessedTime(long lastAccessedTime) {
this.lastAccessedTime = lastAccessedTime;
}
public int getMaxInactiveInterval() {
return maxInactiveInterval;
}
public void setMaxInactiveInterval(int maxInactiveInterval) {
this.maxInactiveInterval = maxInactiveInterval;
}
public boolean isIsnew() {
return isnew;
}
public void setIsnew(boolean isnew) {
this.isnew = isnew;
}
}
[java] view
plaincopy
package com.tianque.session;
import java.util.UUID;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public class SidGenerator {
public static String generateSid(){
return UUID.randomUUID().toString();
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import org.I0Itec.zkclient.ZkClient;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public class ZkConnectionSingleton {
private static String zkServer = "127.0.0.1:2181";
private static ZkClient zkClient=new ZkClient(zkServer);
public static ZkClient getInstance(){
return zkClient;
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import com.tianque.session.AbstractSession;
import com.tianque.session.AbstractSessionManager;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public class ZkSession extends AbstractSession {
private Map attributes=new HashMap();
public Object getAttribute(String name) {
return this.attributes.get(name);
}
public Object getValue(String name) {
return null;
}
public Enumeration getAttributeNames() {
return null;
}
public String[] getValueNames() {
return null;
}
public void setAttribute(String name, Object value) {
this.attributes.put(name, value);
ZkSessionHelper.setAttribute(this.getId(), name, value);
}
public void putValue(String name, Object value) {
}
public void removeAttribute(String name) {
this.attributes.remove(name);
ZkSessionHelper.removeAttribute(this.getId(), name);
}
public void removeValue(String name) {
}
public void localSetAttribute(String name, Object value){
this.attributes.put(name, value);
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import org.I0Itec.zkclient.IZkDataListener;
/**
* @author hxpwangyi@163.com
* @date 2013-3-2
*/
public class ZkSessionAttributeDataListener implements IZkDataListener {
@Override
public void handleDataChange(String arg0, Object arg1) throws Exception {
String name=arg0.substring(arg0.lastIndexOf("/")+1);
Object value=arg1;
String prefix=arg0.substring(0,arg0.lastIndexOf("/"));
String sid=prefix.substring(prefix.lastIndexOf("/")+1);
((ZkSession)ZkSessionManager.getInstance().getSession(sid)).localSetAttribute(name, value);
}
@Override
public void handleDataDeleted(String arg0) throws Exception {
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import com.tianque.session.SessionChangeListener;
import com.tianque.session.SessionMetaData;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public class ZkSessionChangeListener extends SessionChangeListener {
private static final Map<String, Set<IZkDataListener>> sissionDataListeners=new HashMap<String, Set<IZkDataListener>>();
private static final Map<String,Map<String, Set<IZkDataListener>>> attributeDataListeners= new HashMap<String,Map<String, Set<IZkDataListener>>>();
private void subscribeSessionAttributeChange(List<String> zkSessions){
final ZkClient zkClient=ZkConnectionSingleton.getInstance();
for(int i=0,len=zkSessions.size();i<len;i++){
String sid=(String)zkSessions.get(i);
List<String> zkSessionAttributes=zkClient.getChildren(ZkSessionHelper.root+"/"+sid);
Map<String, Set<IZkDataListener>> attributesDataListener=attributeDataListeners.get(sid);
for(int j=0,size=zkSessionAttributes.size();j<size;j++){
String name=zkSessionAttributes.get(j);
IZkDataListener newDataListener=new ZkSessionAttributeDataListener();
if(attributesDataListener==null){
attributesDataListener=new HashMap<String, Set<IZkDataListener>>();
Set<IZkDataListener> attributeDataListener=new HashSet<IZkDataListener>();
attributeDataListener.add(newDataListener);
attributesDataListener.put(name, attributeDataListener);
attributeDataListeners.put(sid, attributesDataListener);
}else{
Set<IZkDataListener> attributeDataListener=attributesDataListener.get(name);
if(attributeDataListener!=null){
Iterator<IZkDataListener> it=attributeDataListener.iterator();
while(it.hasNext()){
zkClient.unsubscribeDataChanges(ZkSessionHelper.root+"/"+sid+"/"+name, it.next());
}
}else{
attributeDataListener=new HashSet<IZkDataListener>();
attributeDataListener.add(newDataListener);
}
}
zkClient.subscribeDataChanges(ZkSessionHelper.root+"/"+sid+"/"+name,newDataListener );
}
}
}
private void subscribeSessionDataChange(List<String> zkSessions){
final ZkClient zkClient=ZkConnectionSingleton.getInstance();
for(int i=0,len=zkSessions.size();i<len;i++){
String sid=(String)zkSessions.get(i);
Set<IZkDataListener> listenerSet=sissionDataListeners.get(sid);
if(listenerSet!=null){
Iterator<IZkDataListener> it=listenerSet.iterator();
while(it.hasNext()){
zkClient.unsubscribeDataChanges(ZkSessionHelper.root+"/"+sid, it.next());
}
}
IZkDataListener newDataListener=new ZkSessionDataListener();
if(listenerSet==null){
listenerSet=new HashSet<IZkDataListener>();
listenerSet.add(newDataListener);
sissionDataListeners.put(sid, listenerSet);
}else{
listenerSet.add(newDataListener);
}
zkClient.subscribeDataChanges(ZkSessionHelper.root+"/"+sid,newDataListener );
}
subscribeSessionAttributeChange(zkSessions);
}
protected void subscribeSession() {
final ZkClient zkClient=ZkConnectionSingleton.getInstance();
if(!zkClient.exists(ZkSessionHelper.root)){
zkClient.createPersistent(ZkSessionHelper.root);
}
ZkSessionManager.getInstance().loadSession();
new ZkSessionCleaner().start();
List<String> zkSessions=zkClient.getChildren(ZkSessionHelper.root);
subscribeSessionDataChange(zkSessions);
zkClient.subscribeChildChanges(ZkSessionHelper.root, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List currentChilds) throws Exception {
subscribeSessionDataChange(currentChilds);
//subscribeSessionAttributeChange(currentChilds);
Map sessions=ZkSessionManager.getInstance().getAllSession();
for(Object sid: sessions.keySet()){
boolean has=false;
for(int j=0, len=currentChilds.size();j<len;j++){
if(((String)sid).equals(currentChilds.get(j))){
has=true;
break;
}
}
if(!has){
sessions.remove(sid);
}
}
for(int j=0, len=currentChilds.size();j<len;j++){
boolean has=false;
String zkSid=(String)currentChilds.get(j);
for(Object sid: sessions.keySet()){
if(((String)sid).equals(zkSid)){
has=true;
break;
}
}
if(!has){
SessionMetaData meta=zkClient.readData(ZkSessionHelper.root+"/"+zkSid);
ZkSession session=new ZkSession();
session.setMeta(meta);
ZkSessionManager.getInstance().addSession(session, session.getId());
List<String> keys=zkClient.getChildren(ZkSessionHelper.root+"/"+zkSid);
for(int i=0,size=keys.size();i<size;i++){
Object val=zkClient.readData(keys.get(i));
session.localSetAttribute(keys.get(i), val);
}
}
}
}
});
}
protected void release() {
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import java.util.Date;
import java.util.List;
import org.I0Itec.zkclient.ZkClient;
import com.tianque.session.SessionMetaData;
/**
* @author hxpwangyi@163.com
* @date 2013-3-2
*/
public class ZkSessionCleaner extends Thread {
@Override
public void run() {
ZkClient client=ZkConnectionSingleton.getInstance();
while(true){
List<String> sessionList=client.getChildren(ZkSessionHelper.root);
for(int i=0,len=sessionList.size();i<len;i++){
String sid=sessionList.get(i);
SessionMetaData meta=client.readData(ZkSessionHelper.root+"/"+sid);
ZkSession session=new ZkSession();
if((new Date().getTime()- meta.getLastAccessedTime())>meta.getMaxInactiveInterval()){
client.deleteRecursive(ZkSessionHelper.root+"/"+sid);
}
}
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
}
}
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import com.tianque.session.SessionMetaData;
/**
* @author hxpwangyi@163.com
* @date 2013-3-2
*/
public class ZkSessionDataListener implements IZkDataListener {
@Override
public void handleDataChange(String arg0, Object arg1) throws Exception {
ZkClient client=ZkConnectionSingleton.getInstance();
String sid=arg0.substring(arg0.lastIndexOf("/")+1);
ZkSession session=(ZkSession)ZkSessionManager.getInstance().getSession(sid);
SessionMetaData meta=client.readData(arg0);
session.setMeta(meta);
}
@Override
public void handleDataDeleted(String arg0) throws Exception {
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import java.util.Date;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import org.I0Itec.zkclient.ZkClient;
import com.tianque.session.AbstractSession;
import com.tianque.session.AbstractSessionFilter;
import com.tianque.session.AbstractSessionManager;
import com.tianque.session.SessionChangeListener;
import com.tianque.session.SidGenerator;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public class ZkSessionFilter extends AbstractSessionFilter {
private void newSession(HttpServletRequest request, HttpServletResponse response) {
HttpSession session=new ZkSession();
String sid=SidGenerator.generateSid();
((AbstractSession)session).setId(sid);
ZkSessionManager.getInstance().addSession(session, sid);
ZkSessionHelper.addSession(((AbstractSession)session).getMeta());
Cookie cookie=new Cookie("sid",sid);
cookie.setMaxAge((int)SessionChangeListener.getTimeout());
response.addCookie(cookie);
request.setAttribute("sid", sid);
}
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response) {
AbstractSessionManager sessionManager=ZkSessionManager.getInstance();
String sid=sessionManager.getSessionIdByCookie((HttpServletRequest)request);
if(sid==null || sid.equals("")){
newSession((HttpServletRequest)request,(HttpServletResponse)response);
}else{
AbstractSession session=(AbstractSession)sessionManager.getSession(sid);
if(session!=null){
session.setLastAccessedTime(new Date().getTime());
ZkClient client=ZkConnectionSingleton.getInstance();
client.writeData(ZkSessionHelper.root+"/"+sid, session.getMeta());
}else{
newSession((HttpServletRequest)request,(HttpServletResponse)response);
}
}
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import org.I0Itec.zkclient.ZkClient;
import com.tianque.session.SessionMetaData;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public class ZkSessionHelper {
public static final String root="/tianque-session-root-test";
public static void setAttribute(String sid,String name,Object value){
ZkClient client=ZkConnectionSingleton.getInstance();
if(!client.exists(root+"/"+sid+"/"+name)){
client.createPersistent(root+"/"+sid+"/"+name);
}
client.writeData(root+"/"+sid+"/"+name, value);
}
public static void removeAttribute(String sid,String name){
ZkClient client=ZkConnectionSingleton.getInstance();
if(client.exists(root+"/"+sid+"/"+name)){
client.delete(root+"/"+sid+"/"+name);
}
}
public static void addSession(SessionMetaData meta){
ZkClient client=ZkConnectionSingleton.getInstance();
client.createPersistent(root+"/"+meta.getSid());
client.writeData(root+"/"+meta.getSid(), meta);
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import java.util.List;
import org.I0Itec.zkclient.ZkClient;
import com.tianque.session.AbstractSessionManager;
import com.tianque.session.SessionMetaData;
/**
* @author hxpwangyi@163.com
* @date 2013-3-2
*/
public class ZkSessionManager extends AbstractSessionManager {
private static AbstractSessionManager instance=new ZkSessionManager();
@Override
public void loadSession() {
ZkClient client=ZkConnectionSingleton.getInstance();
List<String> sessionList=client.getChildren(ZkSessionHelper.root);
for(int i=0,len=sessionList.size();i<len;i++){
String sid=sessionList.get(i);
SessionMetaData meta=client.readData(ZkSessionHelper.root+"/"+sid);
ZkSession session=new ZkSession();
session.setId(sid);
session.setMeta(meta);
List<String> attributeList=client.getChildren(ZkSessionHelper.root+"/"+sid);
for(int j=0,size=attributeList.size();j<size;j++){
String name=attributeList.get(j);
Object value=client.readData(ZkSessionHelper.root+"/"+sid+"/"+name);
session.localSetAttribute(name, value);
}
AbstractSessionManager sessionManager=ZkSessionManager.getInstance();
sessionManager.addSession(session, sid);
}
}
public static AbstractSessionManager getInstance(){
return instance;
}
}
[java] view
plaincopy
package com.tianque.session;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpSession;
import javax.servlet.http.HttpSessionContext;
import com.tianque.session.zookeeper.ZkSessionChangeListener;
import com.tianque.session.zookeeper.ZkSessionHelper;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public abstract class AbstractSession implements HttpSession {
private SessionMetaData meta;
public AbstractSession(){
meta=new SessionMetaData();
meta.setCreateTime(new Date().getTime());
&n
20000
bsp; //meta.setSid(SidGenerator.generateSid());
meta.setLastAccessedTime(meta.getCreateTime());
meta.setIsnew(true);
meta.setMaxInactiveInterval((int)SessionChangeListener.getTimeout());
//ZkSessionHelper.addSession(meta);
}
public void setLastAccessedTime(long lastAccessedTime) {
meta.setLastAccessedTime(lastAccessedTime);
}
public long getCreationTime() {
return meta.getCreateTime();
}
public String getId() {
return meta.getSid();
}
public void setId(String sid){
meta.setSid(sid);
}
public long getLastAccessedTime() {
return meta.getLastAccessedTime();
}
public ServletContext getServletContext() {
return null;
}
public void setMaxInactiveInterval(int interval) {
meta.setMaxInactiveInterval(interval);
}
public int getMaxInactiveInterval() {
return meta.getMaxInactiveInterval();
}
public HttpSessionContext getSessionContext() {
return null;
}
public void invalidate() {
}
public boolean isNew() {
return meta.isIsnew();
}
public SessionMetaData getMeta() {
return meta;
}
public void setMeta(SessionMetaData meta) {
this.meta = meta;
}
}
[java] view
plaincopy
package com.tianque.session;
import java.io.IOException;
import java.util.Date;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public abstract class AbstractSessionFilter implements Filter{
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
doFilterInternal((HttpServletRequest)request,(HttpServletResponse)response);
chain.doFilter(request, response);
}
protected abstract void doFilterInternal(HttpServletRequest request,HttpServletResponse response);
@Override
public void destroy() {
}
}
[java] view
plaincopy
package com.tianque.session;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public abstract class AbstractSessionManager {
private Map sessions=new HashMap();
public AbstractSessionManager(){}
public HttpSession getSession(String sid){
return (HttpSession)sessions.get(sid);
}
public void addSession(HttpSession session,String sid){
this.sessions.put(sid, session);
}
public abstract void loadSession();
public Map getAllSession(){
return sessions;
}
public String getSessionIdByCookie(HttpServletRequest request){
Cookie[] cookies = request.getCookies();
if (cookies == null) {
return null;
}
for (int i = 0; i < cookies.length; i++) {
Cookie cookie = cookies[i];
if ("sid".equals(cookie.getName())) {
String sid = cookie.getValue();
return sid;
}
}
return null;
}
}
[java] view
plaincopy
package com.tianque.session;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public abstract class SessionChangeListener implements ServletContextListener {
private static long timeout;
public static long getTimeout(){
return timeout;
}
public void contextInitialized(ServletContextEvent sce) {
String timeoutStr=sce.getServletContext().getInitParameter("sessionTimeout");
if(timeoutStr!=null&&!timeoutStr.equals("")){
timeout=Long.parseLong(timeoutStr);
}else{
timeout=3600000;
}
subscribeSession();
}
public void contextDestroyed(ServletContextEvent sce) {
release();
}
protected abstract void subscribeSession();
protected abstract void release();
}
[java] view
plaincopy
package com.tianque.session;
import java.io.Serializable;
import javax.servlet.ServletContext;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public class SessionMetaData implements Serializable{
private long createTime;
private String sid;
private long lastAccessedTime;
private int maxInactiveInterval;
private boolean isnew;
public long getCreateTime() {
return createTime;
}
public void setCreateTime(long createTime) {
this.createTime = createTime;
}
pu
4000
blic String getSid() {
return sid;
}
public void setSid(String sid) {
this.sid = sid;
}
public long getLastAccessedTime() {
return lastAccessedTime;
}
public void setLastAccessedTime(long lastAccessedTime) {
this.lastAccessedTime = lastAccessedTime;
}
public int getMaxInactiveInterval() {
return maxInactiveInterval;
}
public void setMaxInactiveInterval(int maxInactiveInterval) {
this.maxInactiveInterval = maxInactiveInterval;
}
public boolean isIsnew() {
return isnew;
}
public void setIsnew(boolean isnew) {
this.isnew = isnew;
}
}
[java] view
plaincopy
package com.tianque.session;
import java.util.UUID;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public class SidGenerator {
public static String generateSid(){
return UUID.randomUUID().toString();
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import org.I0Itec.zkclient.ZkClient;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public class ZkConnectionSingleton {
private static String zkServer = "127.0.0.1:2181";
private static ZkClient zkClient=new ZkClient(zkServer);
public static ZkClient getInstance(){
return zkClient;
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import com.tianque.session.AbstractSession;
import com.tianque.session.AbstractSessionManager;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public class ZkSession extends AbstractSession {
private Map attributes=new HashMap();
public Object getAttribute(String name) {
return this.attributes.get(name);
}
public Object getValue(String name) {
return null;
}
public Enumeration getAttributeNames() {
return null;
}
public String[] getValueNames() {
return null;
}
public void setAttribute(String name, Object value) {
this.attributes.put(name, value);
ZkSessionHelper.setAttribute(this.getId(), name, value);
}
public void putValue(String name, Object value) {
}
public void removeAttribute(String name) {
this.attributes.remove(name);
ZkSessionHelper.removeAttribute(this.getId(), name);
}
public void removeValue(String name) {
}
public void localSetAttribute(String name, Object value){
this.attributes.put(name, value);
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import org.I0Itec.zkclient.IZkDataListener;
/**
* @author hxpwangyi@163.com
* @date 2013-3-2
*/
public class ZkSessionAttributeDataListener implements IZkDataListener {
@Override
public void handleDataChange(String arg0, Object arg1) throws Exception {
String name=arg0.substring(arg0.lastIndexOf("/")+1);
Object value=arg1;
String prefix=arg0.substring(0,arg0.lastIndexOf("/"));
String sid=prefix.substring(prefix.lastIndexOf("/")+1);
((ZkSession)ZkSessionManager.getInstance().getSession(sid)).localSetAttribute(name, value);
}
@Override
public void handleDataDeleted(String arg0) throws Exception {
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import com.tianque.session.SessionChangeListener;
import com.tianque.session.SessionMetaData;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public class ZkSessionChangeListener extends SessionChangeListener {
private static final Map<String, Set<IZkDataListener>> sissionDataListeners=new HashMap<String, Set<IZkDataListener>>();
private static final Map<String,Map<String, Set<IZkDataListener>>> attributeDataListeners= new HashMap<String,Map<String, Set<IZkDataListener>>>();
private void subscribeSessionAttributeChange(List<String> zkSessions){
final ZkClient zkClient=ZkConnectionSingleton.getInstance();
for(int i=0,len=zkSessions.size();i<len;i++){
String sid=(String)zkSessions.get(i);
List<String> zkSessionAttributes=zkClient.getChildren(ZkSessionHelper.root+"/"+sid);
Map<String, Set<IZkDataListener>> attributesDataListener=attributeDataListeners.get(sid);
for(int j=0,size=zkSessionAttributes.size();j<size;j++){
String name=zkSessionAttributes.get(j);
IZkDataListener newDataListener=new ZkSessionAttributeDataListener();
if(attributesDataListener==null){
attributesDataListener=new HashMap<String, Set<IZkDataListener>>();
Set<IZkDataListener> attributeDataListener=new HashSet<IZkDataListener>();
attributeDataListener.add(newDataListener);
attributesDataListener.put(name, attributeDataListener);
attributeDataListeners.put(sid, attributesDataListener);
}else{
Set<IZkDataListener> attributeDataListener=attributesDataListener.get(name);
if(attributeDataListener!=null){
Iterator<IZkDataListener> it=attributeDataListener.iterator();
while(it.hasNext()){
zkClient.unsubscribeDataChanges(ZkSessionHelper.root+"/"+sid+"/"+name, it.next());
}
}else{
attributeDataListener=new HashSet<IZkDataListener>();
attributeDataListener.add(newDataListener);
}
}
zkClient.subscribeDataChanges(ZkSessionHelper.root+"/"+sid+"/"+name,newDataListener );
}
}
}
private void subscribeSessionDataChange(List<String> zkSessions){
final ZkClient zkClient=ZkConnectionSingleton.getInstance();
for(int i=0,len=zkSessions.size();i<len;i++){
String sid=(String)zkSessions.get(i);
Set<IZkDataListener> listenerSet=sissionDataListeners.get(sid);
if(listenerSet!=null){
Iterator<IZkDataListener> it=listenerSet.iterator();
while(it.hasNext()){
zkClient.unsubscribeDataChanges(ZkSessionHelper.root+"/"+sid, it.next());
}
}
IZkDataListener newDataListener=new ZkSessionDataListener();
if(listenerSet==null){
listenerSet=new HashSet<IZkDataListener>();
listenerSet.add(newDataListener);
sissionDataListeners.put(sid, listenerSet);
}else{
listenerSet.add(newDataListener);
}
zkClient.subscribeDataChanges(ZkSessionHelper.root+"/"+sid,newDataListener );
}
subscribeSessionAttributeChange(zkSessions);
}
protected void subscribeSession() {
final ZkClient zkClient=ZkConnectionSingleton.getInstance();
if(!zkClient.exists(ZkSessionHelper.root)){
zkClient.createPersistent(ZkSessionHelper.root);
}
ZkSessionManager.getInstance().loadSession();
new ZkSessionCleaner().start();
List<String> zkSessions=zkClient.getChildren(ZkSessionHelper.root);
subscribeSessionDataChange(zkSessions);
zkClient.subscribeChildChanges(ZkSessionHelper.root, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List currentChilds) throws Exception {
subscribeSessionDataChange(currentChilds);
//subscribeSessionAttributeChange(currentChilds);
Map sessions=ZkSessionManager.getInstance().getAllSession();
for(Object sid: sessions.keySet()){
boolean has=false;
for(int j=0, len=currentChilds.size();j<len;j++){
if(((String)sid).equals(currentChilds.get(j))){
has=true;
break;
}
}
if(!has){
sessions.remove(sid);
}
}
for(int j=0, len=currentChilds.size();j<len;j++){
boolean has=false;
String zkSid=(String)currentChilds.get(j);
for(Object sid: sessions.keySet()){
if(((String)sid).equals(zkSid)){
has=true;
break;
}
}
if(!has){
SessionMetaData meta=zkClient.readData(ZkSessionHelper.root+"/"+zkSid);
ZkSession session=new ZkSession();
session.setMeta(meta);
ZkSessionManager.getInstance().addSession(session, session.getId());
List<String> keys=zkClient.getChildren(ZkSessionHelper.root+"/"+zkSid);
for(int i=0,size=keys.size();i<size;i++){
Object val=zkClient.readData(keys.get(i));
session.localSetAttribute(keys.get(i), val);
}
}
}
}
});
}
protected void release() {
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import java.util.Date;
import java.util.List;
import org.I0Itec.zkclient.ZkClient;
import com.tianque.session.SessionMetaData;
/**
* @author hxpwangyi@163.com
* @date 2013-3-2
*/
public class ZkSessionCleaner extends Thread {
@Override
public void run() {
ZkClient client=ZkConnectionSingleton.getInstance();
while(true){
List<String> sessionList=client.getChildren(ZkSessionHelper.root);
for(int i=0,len=sessionList.size();i<len;i++){
String sid=sessionList.get(i);
SessionMetaData meta=client.readData(ZkSessionHelper.root+"/"+sid);
ZkSession session=new ZkSession();
if((new Date().getTime()- meta.getLastAccessedTime())>meta.getMaxInactiveInterval()){
client.deleteRecursive(ZkSessionHelper.root+"/"+sid);
}
}
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
}
}
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import com.tianque.session.SessionMetaData;
/**
* @author hxpwangyi@163.com
* @date 2013-3-2
*/
public class ZkSessionDataListener implements IZkDataListener {
@Override
public void handleDataChange(String arg0, Object arg1) throws Exception {
ZkClient client=ZkConnectionSingleton.getInstance();
String sid=arg0.substring(arg0.lastIndexOf("/")+1);
ZkSession session=(ZkSession)ZkSessionManager.getInstance().getSession(sid);
SessionMetaData meta=client.readData(arg0);
session.setMeta(meta);
}
@Override
public void handleDataDeleted(String arg0) throws Exception {
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import java.util.Date;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import org.I0Itec.zkclient.ZkClient;
import com.tianque.session.AbstractSession;
import com.tianque.session.AbstractSessionFilter;
import com.tianque.session.AbstractSessionManager;
import com.tianque.session.SessionChangeListener;
import com.tianque.session.SidGenerator;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public class ZkSessionFilter extends AbstractSessionFilter {
private void newSession(HttpServletRequest request, HttpServletResponse response) {
HttpSession session=new ZkSession();
String sid=SidGenerator.generateSid();
((AbstractSession)session).setId(sid);
ZkSessionManager.getInstance().addSession(session, sid);
ZkSessionHelper.addSession(((AbstractSession)session).getMeta());
Cookie cookie=new Cookie("sid",sid);
cookie.setMaxAge((int)SessionChangeListener.getTimeout());
response.addCookie(cookie);
request.setAttribute("sid", sid);
}
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response) {
AbstractSessionManager sessionManager=ZkSessionManager.getInstance();
String sid=sessionManager.getSessionIdByCookie((HttpServletRequest)request);
if(sid==null || sid.equals("")){
newSession((HttpServletRequest)request,(HttpServletResponse)response);
}else{
AbstractSession session=(AbstractSession)sessionManager.getSession(sid);
if(session!=null){
session.setLastAccessedTime(new Date().getTime());
ZkClient client=ZkConnectionSingleton.getInstance();
client.writeData(ZkSessionHelper.root+"/"+sid, session.getMeta());
}else{
newSession((HttpServletRequest)request,(HttpServletResponse)response);
}
}
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import org.I0Itec.zkclient.ZkClient;
import com.tianque.session.SessionMetaData;
/**
* @author hxpwangyi@163.com
* @date 2013-3-1
*/
public class ZkSessionHelper {
public static final String root="/tianque-session-root-test";
public static void setAttribute(String sid,String name,Object value){
ZkClient client=ZkConnectionSingleton.getInstance();
if(!client.exists(root+"/"+sid+"/"+name)){
client.createPersistent(root+"/"+sid+"/"+name);
}
client.writeData(root+"/"+sid+"/"+name, value);
}
public static void removeAttribute(String sid,String name){
ZkClient client=ZkConnectionSingleton.getInstance();
if(client.exists(root+"/"+sid+"/"+name)){
client.delete(root+"/"+sid+"/"+name);
}
}
public static void addSession(SessionMetaData meta){
ZkClient client=ZkConnectionSingleton.getInstance();
client.createPersistent(root+"/"+meta.getSid());
client.writeData(root+"/"+meta.getSid(), meta);
}
}
[java] view
plaincopy
package com.tianque.session.zookeeper;
import java.util.List;
import org.I0Itec.zkclient.ZkClient;
import com.tianque.session.AbstractSessionManager;
import com.tianque.session.SessionMetaData;
/**
* @author hxpwangyi@163.com
* @date 2013-3-2
*/
public class ZkSessionManager extends AbstractSessionManager {
private static AbstractSessionManager instance=new ZkSessionManager();
@Override
public void loadSession() {
ZkClient client=ZkConnectionSingleton.getInstance();
List<String> sessionList=client.getChildren(ZkSessionHelper.root);
for(int i=0,len=sessionList.size();i<len;i++){
String sid=sessionList.get(i);
SessionMetaData meta=client.readData(ZkSessionHelper.root+"/"+sid);
ZkSession session=new ZkSession();
session.setId(sid);
session.setMeta(meta);
List<String> attributeList=client.getChildren(ZkSessionHelper.root+"/"+sid);
for(int j=0,size=attributeList.size();j<size;j++){
String name=attributeList.get(j);
Object value=client.readData(ZkSessionHelper.root+"/"+sid+"/"+name);
session.localSetAttribute(name, value);
}
AbstractSessionManager sessionManager=ZkSessionManager.getInstance();
sessionManager.addSession(session, sid);
}
}
public static AbstractSessionManager getInstance(){
return instance;
}
}
相关文章推荐
- 分布式 php实现session共享
- Zookeeper分布式锁(多进程竞争)实现的代码示例分享
- php web分布式后,memcache存session实现多域名跨域登录
- 基于zookeeper实现的分布式锁
- 利用curator实现的zookeeper分布式锁服务
- 巧用zookeeper实现分布式并行计算 - J2EE企业应用 顾问/咨询 Java传教士 -H.E.'s Blog
- Apache shiro集群实现 (六)分布式集群系统下的高可用session解决方案---Session共享
- zookeeper适用场景:分布式锁实现
- [Node.js] Node + Redis 实现分布式Session方案
- Apache shiro集群实现 (五)分布式集群系统下的高可用session解决方案
- Zookeeper实现分布式共享锁
- 巧用zookeeper实现分布式并行计算
- 使用zookeeper实现分布式共享锁
- 基于zookeeper简单实现分布式锁
- ZooKeeper实现分布式队列Queue
- 关于如何用Zookeeper实现分布式锁机制
- 利用ZooKeeper服务实现分布式系统的Leader选举
- php web分布式后,memcache存session实现多域名跨域登录
- HttpServletRequestWrapper模拟实现分布式Session
- Apache shiro集群实现 (六)分布式集群系统下的高可用session解决方案---Session共享