您的位置:首页 > 其它

用zookeeper实现分布式session

2015-01-15 20:32 411 查看
废话不说,直接贴代码

[java] view
plaincopy

package com.tianque.session;

import java.util.Date;

import java.util.HashMap;

import java.util.Map;

import javax.servlet.ServletContext;

import javax.servlet.http.HttpSession;

import javax.servlet.http.HttpSessionContext;

import com.tianque.session.zookeeper.ZkSessionChangeListener;

import com.tianque.session.zookeeper.ZkSessionHelper;

/**

* @author hxpwangyi@163.com

* @date 2013-3-1

*/

public abstract class AbstractSession implements HttpSession {

private SessionMetaData meta;

public AbstractSession(){

meta=new SessionMetaData();

meta.setCreateTime(new Date().getTime());

&n
20000
bsp; //meta.setSid(SidGenerator.generateSid());

meta.setLastAccessedTime(meta.getCreateTime());

meta.setIsnew(true);

meta.setMaxInactiveInterval((int)SessionChangeListener.getTimeout());

//ZkSessionHelper.addSession(meta);

}

public void setLastAccessedTime(long lastAccessedTime) {

meta.setLastAccessedTime(lastAccessedTime);

}

public long getCreationTime() {

return meta.getCreateTime();

}

public String getId() {

return meta.getSid();

}

public void setId(String sid){

meta.setSid(sid);

}

public long getLastAccessedTime() {

return meta.getLastAccessedTime();

}

public ServletContext getServletContext() {

return null;

}

public void setMaxInactiveInterval(int interval) {

meta.setMaxInactiveInterval(interval);

}

public int getMaxInactiveInterval() {

return meta.getMaxInactiveInterval();

}

public HttpSessionContext getSessionContext() {

return null;

}

public void invalidate() {

}

public boolean isNew() {

return meta.isIsnew();

}

public SessionMetaData getMeta() {

return meta;

}

public void setMeta(SessionMetaData meta) {

this.meta = meta;

}

}

[java] view
plaincopy

package com.tianque.session;

import java.io.IOException;

import java.util.Date;

import javax.servlet.Filter;

import javax.servlet.FilterChain;

import javax.servlet.FilterConfig;

import javax.servlet.ServletException;

import javax.servlet.ServletRequest;

import javax.servlet.ServletResponse;

import javax.servlet.http.HttpServletRequest;

import javax.servlet.http.HttpServletResponse;

/**

* @author hxpwangyi@163.com

* @date 2013-3-1

*/

public abstract class AbstractSessionFilter implements Filter{

@Override

public void init(FilterConfig filterConfig) throws ServletException {

}

@Override

public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {

doFilterInternal((HttpServletRequest)request,(HttpServletResponse)response);

chain.doFilter(request, response);

}

protected abstract void doFilterInternal(HttpServletRequest request,HttpServletResponse response);

@Override

public void destroy() {

}

}

[java] view
plaincopy

package com.tianque.session;

import java.util.HashMap;

import java.util.Map;

import javax.servlet.http.Cookie;

import javax.servlet.http.HttpServletRequest;

import javax.servlet.http.HttpSession;

/**

* @author hxpwangyi@163.com

* @date 2013-3-1

*/

public abstract class AbstractSessionManager {

private Map sessions=new HashMap();

public AbstractSessionManager(){}

public HttpSession getSession(String sid){

return (HttpSession)sessions.get(sid);

}

public void addSession(HttpSession session,String sid){

this.sessions.put(sid, session);

}

public abstract void loadSession();

public Map getAllSession(){

return sessions;

}

public String getSessionIdByCookie(HttpServletRequest request){

Cookie[] cookies = request.getCookies();

if (cookies == null) {

return null;

}

for (int i = 0; i < cookies.length; i++) {

Cookie cookie = cookies[i];

if ("sid".equals(cookie.getName())) {

String sid = cookie.getValue();

return sid;

}

}

return null;

}

}

[java] view
plaincopy

package com.tianque.session;

import javax.servlet.ServletContextEvent;

import javax.servlet.ServletContextListener;

/**

* @author hxpwangyi@163.com

* @date 2013-3-1

*/

public abstract class SessionChangeListener implements ServletContextListener {

private static long timeout;

public static long getTimeout(){

return timeout;

}

public void contextInitialized(ServletContextEvent sce) {

String timeoutStr=sce.getServletContext().getInitParameter("sessionTimeout");

if(timeoutStr!=null&&!timeoutStr.equals("")){

timeout=Long.parseLong(timeoutStr);

}else{

timeout=3600000;

}

subscribeSession();

}

public void contextDestroyed(ServletContextEvent sce) {

release();

}

protected abstract void subscribeSession();

protected abstract void release();

}

[java] view
plaincopy

package com.tianque.session;

import java.io.Serializable;

import javax.servlet.ServletContext;

/**

* @author hxpwangyi@163.com

* @date 2013-3-1

*/

public class SessionMetaData implements Serializable{

private long createTime;

private String sid;

private long lastAccessedTime;

private int maxInactiveInterval;

private boolean isnew;

public long getCreateTime() {

return createTime;

}

public void setCreateTime(long createTime) {

this.createTime = createTime;

}

pu
4000
blic String getSid() {

return sid;

}

public void setSid(String sid) {

this.sid = sid;

}

public long getLastAccessedTime() {

return lastAccessedTime;

}

public void setLastAccessedTime(long lastAccessedTime) {

this.lastAccessedTime = lastAccessedTime;

}

public int getMaxInactiveInterval() {

return maxInactiveInterval;

}

public void setMaxInactiveInterval(int maxInactiveInterval) {

this.maxInactiveInterval = maxInactiveInterval;

}

public boolean isIsnew() {

return isnew;

}

public void setIsnew(boolean isnew) {

this.isnew = isnew;

}

}

[java] view
plaincopy

package com.tianque.session;

import java.util.UUID;

/**

* @author hxpwangyi@163.com

* @date 2013-3-1

*/

public class SidGenerator {

public static String generateSid(){

return UUID.randomUUID().toString();

}

}

[java] view
plaincopy

package com.tianque.session.zookeeper;

import org.I0Itec.zkclient.ZkClient;

/**

* @author hxpwangyi@163.com

* @date 2013-3-1

*/

public class ZkConnectionSingleton {

private static String zkServer = "127.0.0.1:2181";

private static ZkClient zkClient=new ZkClient(zkServer);

public static ZkClient getInstance(){

return zkClient;

}

}

[java] view
plaincopy

package com.tianque.session.zookeeper;

import java.util.Enumeration;

import java.util.HashMap;

import java.util.Map;

import javax.servlet.http.HttpServletRequest;

import com.tianque.session.AbstractSession;

import com.tianque.session.AbstractSessionManager;

/**

* @author hxpwangyi@163.com

* @date 2013-3-1

*/

public class ZkSession extends AbstractSession {

private Map attributes=new HashMap();

public Object getAttribute(String name) {

return this.attributes.get(name);

}

public Object getValue(String name) {

return null;

}

public Enumeration getAttributeNames() {

return null;

}

public String[] getValueNames() {

return null;

}

public void setAttribute(String name, Object value) {

this.attributes.put(name, value);

ZkSessionHelper.setAttribute(this.getId(), name, value);

}

public void putValue(String name, Object value) {

}

public void removeAttribute(String name) {

this.attributes.remove(name);

ZkSessionHelper.removeAttribute(this.getId(), name);

}

public void removeValue(String name) {

}

public void localSetAttribute(String name, Object value){

this.attributes.put(name, value);

}

}

[java] view
plaincopy

package com.tianque.session.zookeeper;

import org.I0Itec.zkclient.IZkDataListener;

/**

* @author hxpwangyi@163.com

* @date 2013-3-2

*/

public class ZkSessionAttributeDataListener implements IZkDataListener {

@Override

public void handleDataChange(String arg0, Object arg1) throws Exception {

String name=arg0.substring(arg0.lastIndexOf("/")+1);

Object value=arg1;

String prefix=arg0.substring(0,arg0.lastIndexOf("/"));

String sid=prefix.substring(prefix.lastIndexOf("/")+1);

((ZkSession)ZkSessionManager.getInstance().getSession(sid)).localSetAttribute(name, value);

}

@Override

public void handleDataDeleted(String arg0) throws Exception {

}

}

[java] view
plaincopy

package com.tianque.session.zookeeper;

import java.util.HashMap;

import java.util.HashSet;

import java.util.Iterator;

import java.util.List;

import java.util.Map;

import java.util.Set;

import org.I0Itec.zkclient.IZkChildListener;

import org.I0Itec.zkclient.IZkDataListener;

import org.I0Itec.zkclient.ZkClient;

import com.tianque.session.SessionChangeListener;

import com.tianque.session.SessionMetaData;

/**

* @author hxpwangyi@163.com

* @date 2013-3-1

*/

public class ZkSessionChangeListener extends SessionChangeListener {

private static final Map<String, Set<IZkDataListener>> sissionDataListeners=new HashMap<String, Set<IZkDataListener>>();

private static final Map<String,Map<String, Set<IZkDataListener>>> attributeDataListeners= new HashMap<String,Map<String, Set<IZkDataListener>>>();

private void subscribeSessionAttributeChange(List<String> zkSessions){

final ZkClient zkClient=ZkConnectionSingleton.getInstance();

for(int i=0,len=zkSessions.size();i<len;i++){

String sid=(String)zkSessions.get(i);

List<String> zkSessionAttributes=zkClient.getChildren(ZkSessionHelper.root+"/"+sid);

Map<String, Set<IZkDataListener>> attributesDataListener=attributeDataListeners.get(sid);

for(int j=0,size=zkSessionAttributes.size();j<size;j++){

String name=zkSessionAttributes.get(j);

IZkDataListener newDataListener=new ZkSessionAttributeDataListener();

if(attributesDataListener==null){

attributesDataListener=new HashMap<String, Set<IZkDataListener>>();

Set<IZkDataListener> attributeDataListener=new HashSet<IZkDataListener>();

attributeDataListener.add(newDataListener);

attributesDataListener.put(name, attributeDataListener);

attributeDataListeners.put(sid, attributesDataListener);

}else{

Set<IZkDataListener> attributeDataListener=attributesDataListener.get(name);

if(attributeDataListener!=null){

Iterator<IZkDataListener> it=attributeDataListener.iterator();

while(it.hasNext()){

zkClient.unsubscribeDataChanges(ZkSessionHelper.root+"/"+sid+"/"+name, it.next());

}

}else{

attributeDataListener=new HashSet<IZkDataListener>();

attributeDataListener.add(newDataListener);

}

}

zkClient.subscribeDataChanges(ZkSessionHelper.root+"/"+sid+"/"+name,newDataListener );

}

}

}

private void subscribeSessionDataChange(List<String> zkSessions){

final ZkClient zkClient=ZkConnectionSingleton.getInstance();

for(int i=0,len=zkSessions.size();i<len;i++){

String sid=(String)zkSessions.get(i);

Set<IZkDataListener> listenerSet=sissionDataListeners.get(sid);

if(listenerSet!=null){

Iterator<IZkDataListener> it=listenerSet.iterator();

while(it.hasNext()){

zkClient.unsubscribeDataChanges(ZkSessionHelper.root+"/"+sid, it.next());

}

}

IZkDataListener newDataListener=new ZkSessionDataListener();

if(listenerSet==null){

listenerSet=new HashSet<IZkDataListener>();

listenerSet.add(newDataListener);

sissionDataListeners.put(sid, listenerSet);

}else{

listenerSet.add(newDataListener);

}

zkClient.subscribeDataChanges(ZkSessionHelper.root+"/"+sid,newDataListener );

}

subscribeSessionAttributeChange(zkSessions);

}

protected void subscribeSession() {

final ZkClient zkClient=ZkConnectionSingleton.getInstance();

if(!zkClient.exists(ZkSessionHelper.root)){

zkClient.createPersistent(ZkSessionHelper.root);

}

ZkSessionManager.getInstance().loadSession();

new ZkSessionCleaner().start();

List<String> zkSessions=zkClient.getChildren(ZkSessionHelper.root);

subscribeSessionDataChange(zkSessions);

zkClient.subscribeChildChanges(ZkSessionHelper.root, new IZkChildListener() {

@Override

public void handleChildChange(String parentPath, List currentChilds) throws Exception {

subscribeSessionDataChange(currentChilds);

//subscribeSessionAttributeChange(currentChilds);

Map sessions=ZkSessionManager.getInstance().getAllSession();

for(Object sid: sessions.keySet()){

boolean has=false;

for(int j=0, len=currentChilds.size();j<len;j++){

if(((String)sid).equals(currentChilds.get(j))){

has=true;

break;

}

}

if(!has){

sessions.remove(sid);

}

}

for(int j=0, len=currentChilds.size();j<len;j++){

boolean has=false;

String zkSid=(String)currentChilds.get(j);

for(Object sid: sessions.keySet()){

if(((String)sid).equals(zkSid)){

has=true;

break;

}

}

if(!has){

SessionMetaData meta=zkClient.readData(ZkSessionHelper.root+"/"+zkSid);

ZkSession session=new ZkSession();

session.setMeta(meta);

ZkSessionManager.getInstance().addSession(session, session.getId());

List<String> keys=zkClient.getChildren(ZkSessionHelper.root+"/"+zkSid);

for(int i=0,size=keys.size();i<size;i++){

Object val=zkClient.readData(keys.get(i));

session.localSetAttribute(keys.get(i), val);

}

}

}

}

});

}

protected void release() {

}

}

[java] view
plaincopy

package com.tianque.session.zookeeper;

import java.util.Date;

import java.util.List;

import org.I0Itec.zkclient.ZkClient;

import com.tianque.session.SessionMetaData;

/**

* @author hxpwangyi@163.com

* @date 2013-3-2

*/

public class ZkSessionCleaner extends Thread {

@Override

public void run() {

ZkClient client=ZkConnectionSingleton.getInstance();

while(true){

List<String> sessionList=client.getChildren(ZkSessionHelper.root);

for(int i=0,len=sessionList.size();i<len;i++){

String sid=sessionList.get(i);

SessionMetaData meta=client.readData(ZkSessionHelper.root+"/"+sid);

ZkSession session=new ZkSession();

if((new Date().getTime()- meta.getLastAccessedTime())>meta.getMaxInactiveInterval()){

client.deleteRecursive(ZkSessionHelper.root+"/"+sid);

}

}

try {

Thread.sleep(30000);

} catch (InterruptedException e) {

}

}

}

}

[java] view
plaincopy

package com.tianque.session.zookeeper;

import org.I0Itec.zkclient.IZkDataListener;

import org.I0Itec.zkclient.ZkClient;

import com.tianque.session.SessionMetaData;

/**

* @author hxpwangyi@163.com

* @date 2013-3-2

*/

public class ZkSessionDataListener implements IZkDataListener {

@Override

public void handleDataChange(String arg0, Object arg1) throws Exception {

ZkClient client=ZkConnectionSingleton.getInstance();

String sid=arg0.substring(arg0.lastIndexOf("/")+1);

ZkSession session=(ZkSession)ZkSessionManager.getInstance().getSession(sid);

SessionMetaData meta=client.readData(arg0);

session.setMeta(meta);

}

@Override

public void handleDataDeleted(String arg0) throws Exception {

}

}

[java] view
plaincopy

package com.tianque.session.zookeeper;

import java.util.Date;

import javax.servlet.http.Cookie;

import javax.servlet.http.HttpServletRequest;

import javax.servlet.http.HttpServletResponse;

import javax.servlet.http.HttpSession;

import org.I0Itec.zkclient.ZkClient;

import com.tianque.session.AbstractSession;

import com.tianque.session.AbstractSessionFilter;

import com.tianque.session.AbstractSessionManager;

import com.tianque.session.SessionChangeListener;

import com.tianque.session.SidGenerator;

/**

* @author hxpwangyi@163.com

* @date 2013-3-1

*/

public class ZkSessionFilter extends AbstractSessionFilter {

private void newSession(HttpServletRequest request, HttpServletResponse response) {

HttpSession session=new ZkSession();

String sid=SidGenerator.generateSid();

((AbstractSession)session).setId(sid);

ZkSessionManager.getInstance().addSession(session, sid);

ZkSessionHelper.addSession(((AbstractSession)session).getMeta());

Cookie cookie=new Cookie("sid",sid);

cookie.setMaxAge((int)SessionChangeListener.getTimeout());

response.addCookie(cookie);

request.setAttribute("sid", sid);

}

@Override

protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response) {

AbstractSessionManager sessionManager=ZkSessionManager.getInstance();

String sid=sessionManager.getSessionIdByCookie((HttpServletRequest)request);

if(sid==null || sid.equals("")){

newSession((HttpServletRequest)request,(HttpServletResponse)response);

}else{

AbstractSession session=(AbstractSession)sessionManager.getSession(sid);

if(session!=null){

session.setLastAccessedTime(new Date().getTime());

ZkClient client=ZkConnectionSingleton.getInstance();

client.writeData(ZkSessionHelper.root+"/"+sid, session.getMeta());

}else{

newSession((HttpServletRequest)request,(HttpServletResponse)response);

}

}

}

}

[java] view
plaincopy

package com.tianque.session.zookeeper;

import org.I0Itec.zkclient.ZkClient;

import com.tianque.session.SessionMetaData;

/**

* @author hxpwangyi@163.com

* @date 2013-3-1

*/

public class ZkSessionHelper {

public static final String root="/tianque-session-root-test";

public static void setAttribute(String sid,String name,Object value){

ZkClient client=ZkConnectionSingleton.getInstance();

if(!client.exists(root+"/"+sid+"/"+name)){

client.createPersistent(root+"/"+sid+"/"+name);

}

client.writeData(root+"/"+sid+"/"+name, value);

}

public static void removeAttribute(String sid,String name){

ZkClient client=ZkConnectionSingleton.getInstance();

if(client.exists(root+"/"+sid+"/"+name)){

client.delete(root+"/"+sid+"/"+name);

}

}

public static void addSession(SessionMetaData meta){

ZkClient client=ZkConnectionSingleton.getInstance();

client.createPersistent(root+"/"+meta.getSid());

client.writeData(root+"/"+meta.getSid(), meta);

}

}

[java] view
plaincopy

package com.tianque.session.zookeeper;

import java.util.List;

import org.I0Itec.zkclient.ZkClient;

import com.tianque.session.AbstractSessionManager;

import com.tianque.session.SessionMetaData;

/**

* @author hxpwangyi@163.com

* @date 2013-3-2

*/

public class ZkSessionManager extends AbstractSessionManager {

private static AbstractSessionManager instance=new ZkSessionManager();

@Override

public void loadSession() {

ZkClient client=ZkConnectionSingleton.getInstance();

List<String> sessionList=client.getChildren(ZkSessionHelper.root);

for(int i=0,len=sessionList.size();i<len;i++){

String sid=sessionList.get(i);

SessionMetaData meta=client.readData(ZkSessionHelper.root+"/"+sid);

ZkSession session=new ZkSession();

session.setId(sid);

session.setMeta(meta);

List<String> attributeList=client.getChildren(ZkSessionHelper.root+"/"+sid);

for(int j=0,size=attributeList.size();j<size;j++){

String name=attributeList.get(j);

Object value=client.readData(ZkSessionHelper.root+"/"+sid+"/"+name);

session.localSetAttribute(name, value);

}

AbstractSessionManager sessionManager=ZkSessionManager.getInstance();

sessionManager.addSession(session, sid);

}

}

public static AbstractSessionManager getInstance(){

return instance;

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息