blob: cf021d835cff758c534a43781b627f3fa5ca8830 [file] [log] [blame]
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// The Apache License v2.0 is available at
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.session.infinispan;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context;
import org.eclipse.jetty.server.session.AbstractSession;
import org.eclipse.jetty.server.session.AbstractSessionManager;
import org.eclipse.jetty.server.session.MemSession;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.infinispan.Cache;
import org.infinispan.commons.api.BasicCache;
import org.omg.CORBA._IDLTypeStub;
* InfinispanSessionManager
* The data for a session relevant to a particular context is stored in an Infinispan (clustered) cache:
* <pre>
* Key: is the id of the session + the context path + the vhost for the context
* Value: is the data of the session
* </pre>
* The key is necessarily complex because the same session id can be in-use by more than one
* context. In this case, the contents of the session will strictly be different for each
* context, although the id will be the same.
* Sessions are also kept in local memory when they are used by this session manager. This allows
* multiple different request threads in the same context to call Request.getSession() and
* obtain the same object.
* This session manager support scavenging, which is only done over the set of sessions in its
* local memory. This can result in some sessions being "stranded" in the cluster cache if no
* session manager is currently managing it (eg the node managing the session crashed and it
* was never requested on another node).
public class InfinispanSessionManager extends AbstractSessionManager
private final static Logger LOG = Log.getLogger("org.eclipse.jetty.server.session");
* Clustered cache of sessions
private BasicCache<String, Object> _cache;
* Sessions known to this node held in memory
private ConcurrentHashMap<String, InfinispanSessionManager.Session> _sessions;
* The length of time a session can be in memory without being checked against
* the cluster. A value of 0 indicates that the session is never checked against
* the cluster - the current node is considered to be the master for the session.
private long _staleIntervalSec = 0;
protected Scheduler.Task _task; //scavenge task
protected Scheduler _scheduler;
protected Scavenger _scavenger;
protected long _scavengeIntervalMs = 1000L * 60 * 10; //10mins
protected boolean _ownScheduler;
* Scavenger
protected class Scavenger implements Runnable
public void run()
if (_scheduler != null && _scheduler.isRunning())
_task = _scheduler.schedule(this, _scavengeIntervalMs, TimeUnit.MILLISECONDS);
* Every time a Session is put into the cache one of these objects
* is created to copy the data out of the in-memory session, and
* every time an object is read from the cache one of these objects
* a fresh Session object is created based on the data held by this
* object.
public class SerializableSessionData implements Serializable
private static final long serialVersionUID = -7779120106058533486L;
String clusterId;
String contextPath;
String vhost;
long accessed;
long lastAccessed;
long createTime;
long cookieSetTime;
String lastNode;
long expiry;
long maxInactive;
Map<String, Object> attributes;
public SerializableSessionData()
public SerializableSessionData(Session s)
clusterId = s.getClusterId();
contextPath = s.getContextPath();
vhost = s.getVHost();
accessed = s.getAccessed();
lastAccessed = s.getLastAccessedTime();
createTime = s.getCreationTime();
cookieSetTime = s.getCookieSetTime();
lastNode = s.getLastNode();
expiry = s.getExpiry();
maxInactive = s.getMaxInactiveInterval();
attributes = s.getAttributeMap(); // TODO pointer, not a copy
private void writeObject( out) throws IOException
out.writeUTF(clusterId); //session id
out.writeUTF(contextPath); //context path
out.writeUTF(vhost); //first vhost
out.writeLong(lastAccessed); //lastAccessTime
out.writeLong(createTime); //time created
out.writeLong(cookieSetTime);//time cookie was set
out.writeUTF(lastNode); //name of last node managing
private void readObject( in) throws IOException, ClassNotFoundException
clusterId = in.readUTF();
contextPath = in.readUTF();
vhost = in.readUTF();
accessed = in.readLong();//accessTime
lastAccessed = in.readLong(); //lastAccessTime
createTime = in.readLong(); //time created
cookieSetTime = in.readLong();//time cookie was set
lastNode = in.readUTF(); //last managing node
expiry = in.readLong();
maxInactive = in.readLong();
attributes = (HashMap<String,Object>)in.readObject();
* Session
* Representation of a session in local memory.
public class Session extends MemSession
private ReentrantLock _lock = new ReentrantLock();
* The (canonical) context path for with which this session is associated
private String _contextPath;
* The time in msec since the epoch at which this session should expire
private long _expiryTime;
* Time in msec since the epoch at which this session was last read from cluster
private long _lastSyncTime;
* The workername of last node known to be managing the session
private String _lastNode;
* If dirty, session needs to be (re)sent to cluster
protected boolean _dirty=false;
* Any virtual hosts for the context with which this session is associated
private String _vhost;
* Count of how many threads are active in this session
private AtomicInteger _activeThreads = new AtomicInteger(0);
* A new session.
* @param request the request
protected Session (HttpServletRequest request)
long maxInterval = getMaxInactiveInterval();
_expiryTime = (maxInterval <= 0 ? 0 : (System.currentTimeMillis() + maxInterval*1000L));
_lastNode = getSessionIdManager().getWorkerName();
_activeThreads.incrementAndGet(); //access will not be called on a freshly created session so increment here
protected Session (SerializableSessionData sd)
super(InfinispanSessionManager.this, sd.createTime, sd.accessed, sd.clusterId);
_expiryTime = (sd.maxInactive <= 0 ? 0 : (System.currentTimeMillis() + sd.maxInactive*1000L));
* A restored session.
* @param sessionId the session id
* @param created time created
* @param accessed time last accessed
* @param maxInterval max expiry interval
protected Session (String sessionId, long created, long accessed, long maxInterval)
super(InfinispanSessionManager.this, created, accessed, sessionId);
_expiryTime = (maxInterval <= 0 ? 0 : (System.currentTimeMillis() + maxInterval*1000L));
* Called on entry to the session.
* @see org.eclipse.jetty.server.session.AbstractSession#access(long)
protected boolean access(long time)
if (LOG.isDebugEnabled())
LOG.debug("Access session({}) for context {} on worker {}", getId(), getContextPath(), getSessionIdManager().getWorkerName());
long now = System.currentTimeMillis();
//lock so that no other thread can call access or complete until the first one has refreshed the session object if necessary
//a request thread is entering
if (_activeThreads.incrementAndGet() == 1)
//if the first thread, check that the session in memory is not stale, if we're checking for stale sessions
if (getStaleIntervalSec() > 0 && (now - getLastSyncTime()) >= (getStaleIntervalSec() * 1000L))
if (LOG.isDebugEnabled())
LOG.debug("Acess session({}) for context {} on worker {} stale session. Reloading.", getId(), getContextPath(), getSessionIdManager().getWorkerName());
catch (Exception e)
if (super.access(time))
int maxInterval=getMaxInactiveInterval();
_expiryTime = (maxInterval <= 0 ? 0 : (time + maxInterval*1000L));
return true;
return false;
* Exit from session
* @see org.eclipse.jetty.server.session.AbstractSession#complete()
protected void complete()
//lock so that no other thread that might be calling access can proceed until this complete is done
//if this is the last request thread to be in the session
if (_activeThreads.decrementAndGet() == 0)
//an invalid session will already have been removed from the
//local session map and deleted from the cluster. If its valid save
//it to the cluster.
//TODO consider doing only periodic saves if only the last access
//time to the session changes
if (isValid())
//if session still valid && its dirty or stale or never been synced, write it to the cluster
//otherwise, we just keep the updated last access time in memory
if (_dirty || getLastSyncTime() == 0 || isStale(System.currentTimeMillis()))
catch (Exception e)
LOG.warn("Problem saving session({})",getId(), e);
_dirty = false;
/** Test if the session is stale
* @param atTime time when stale
* @return true if stale
protected boolean isStale (long atTime)
return (getStaleIntervalSec() > 0) && (atTime - getLastSyncTime() >= (getStaleIntervalSec()*1000L));
/** Test if the session is dirty
* @return true if dirty
protected boolean isDirty ()
return _dirty;
* Expire the session.
* @see org.eclipse.jetty.server.session.AbstractSession#timeout()
protected void timeout()
* Reload the session from the cluster. If the node that
* last managed the session from the cluster is ourself,
* then the session does not need refreshing.
* NOTE: this method MUST be called with sufficient locks
* in place to prevent 2 or more concurrent threads from
* simultaneously updating the session.
private void refresh ()
//get fresh copy from the cluster
Session fresh = load(makeKey(getClusterId(), _context));
//if the session no longer exists, invalidate
if (fresh == null)
//cluster copy assumed to be the same as we were the last
//node to manage it
if (fresh.getLastNode().equals(getLastNode()))
//prepare for refresh
//if fresh has no attributes, remove them
if (fresh.getAttributes() == 0)
//reconcile attributes
for (String key:fresh.getAttributeMap().keySet())
Object freshvalue = fresh.getAttribute(key);
//session does not already contain this attribute, so bind it
if (getAttribute(key) == null)
else //session already contains this attribute, update its value
// cleanup, remove values from session, that don't exist in data anymore:
for (String key : getNames())
if (fresh.getAttribute(key) == null)
Object oldvalue = getAttribute(key);
//finish refresh
public void setExpiry (long expiry)
_expiryTime = expiry;
public long getExpiry ()
return _expiryTime;
public void swapId (String newId, String newNodeId)
//TODO probably synchronize rather than use the access/complete lock?
public void setAttribute (String name, Object value)
Object old = changeAttribute(name, value);
if (value == null && old == null)
return; //if same as remove attribute but attribute was already removed, no change
_dirty = true;
public String getContextPath()
return _contextPath;
public void setContextPath(String contextPath)
this._contextPath = contextPath;
public String getVHost()
return _vhost;
public void setVHost(String vhost)
this._vhost = vhost;
public String getLastNode()
return _lastNode;
public void setLastNode(String lastNode)
_lastNode = lastNode;
public long getLastSyncTime()
return _lastSyncTime;
public void setLastSyncTime(long lastSyncTime)
_lastSyncTime = lastSyncTime;
* Start the session manager.
* @see org.eclipse.jetty.server.session.AbstractSessionManager#doStart()
public void doStart() throws Exception
if (_sessionIdManager == null)
throw new IllegalStateException("No session id manager defined");
if (_cache == null)
throw new IllegalStateException("No session cache defined");
_sessions = new ConcurrentHashMap<String, Session>();
//try and use a common scheduler, fallback to own
_scheduler = getSessionHandler().getServer().getBean(Scheduler.class);
if (_scheduler == null)
_scheduler = new ScheduledExecutorScheduler();
_ownScheduler = true;
else if (!_scheduler.isStarted())
throw new IllegalStateException("Shared scheduler not started");
* Stop the session manager.
* @see org.eclipse.jetty.server.session.AbstractSessionManager#doStop()
public void doStop() throws Exception
if (_task!=null)
if (_ownScheduler && _scheduler !=null)
_scheduler = null;
_sessions = null;
* Look for sessions in local memory that have expired.
public void scavenge ()
Set<String> candidateIds = new HashSet<String>();
long now = System.currentTimeMillis();"SessionManager for context {} scavenging at {} ", getContextPath(getContext()), now);
for (Map.Entry<String, Session> entry:_sessions.entrySet())
long expiry = entry.getValue().getExpiry();
if (expiry > 0 && expiry < now)
for (String candidateId:candidateIds)
if (LOG.isDebugEnabled())
LOG.debug("Session {} expired ", candidateId);
Session candidateSession = _sessions.get(candidateId);
if (candidateSession != null)
//double check the state of the session in the cache, as the
//session may have migrated to another node. This leaves a window
//where the cached session may have been changed by another node
Session cachedSession = load(makeKey(candidateId, _context));
if (cachedSession == null)
if (LOG.isDebugEnabled()) LOG.debug("Locally expired session({}) does not exist in cluster ",candidateId);
//the session no longer exists, do a full invalidation
else if (getSessionIdManager().getWorkerName().equals(cachedSession.getLastNode()))
if (LOG.isDebugEnabled()) LOG.debug("Expiring session({}) local to session manager",candidateId);
//if I am the master of the session then it can be timed out
//some other node is the master of the session, simply remove it from my memory
if (LOG.isDebugEnabled()) LOG.debug("Session({}) not local to this session manager, removing from local memory", candidateId);
public long getScavengeInterval ()
return _scavengeIntervalMs/1000;
* Set the interval between runs of the scavenger. It should not be run too
* often.
* @param sec scavenge interval in seconds
public void setScavengeInterval (long sec)
if (sec<=0)
long old_period=_scavengeIntervalMs;
long period=sec*1000L;
//add a bit of variability into the scavenge time so that not all
//nodes with the same scavenge time sync up
long tenPercent = _scavengeIntervalMs/10;
if ((System.currentTimeMillis()%2) == 0)
_scavengeIntervalMs += tenPercent;
if (LOG.isDebugEnabled())
LOG.debug("Scavenging every "+_scavengeIntervalMs+" ms");
synchronized (this)
if (_scheduler != null && (period!=old_period || _task==null))
if (_task!=null)
if (_scavenger == null)
_scavenger = new Scavenger();
_task = _scheduler.schedule(_scavenger,_scavengeIntervalMs,TimeUnit.MILLISECONDS);
* Get the clustered cache instance.
* @return the cache
public BasicCache<String, Object> getCache()
return _cache;
* Set the clustered cache instance.
* @param cache the cache
public void setCache (BasicCache<String, Object> cache)
this._cache = cache;
public long getStaleIntervalSec()
return _staleIntervalSec;
public void setStaleIntervalSec(long staleIntervalSec)
_staleIntervalSec = staleIntervalSec;
* Add a new session for the context related to this session manager
* @see org.eclipse.jetty.server.session.AbstractSessionManager#addSession(org.eclipse.jetty.server.session.AbstractSession)
protected void addSession(AbstractSession session)
if (session==null)
if (LOG.isDebugEnabled()) LOG.debug("Adding session({}) to session manager for context {} on worker {}",session.getClusterId(), getContextPath(getContext()),getSessionIdManager().getWorkerName() + " with lastnode="+((Session)session).getLastNode());
_sessions.put(session.getClusterId(), (Session)session);
catch (Exception e)
LOG.warn("Unable to store new session id="+session.getId() , e);
* Ask the cluster for the session.
* @see org.eclipse.jetty.server.session.AbstractSessionManager#getSession(java.lang.String)
public AbstractSession getSession(String idInCluster)
Session session = null;
//try and find the session in this node's memory
Session memSession = (Session)_sessions.get(idInCluster);
if (LOG.isDebugEnabled())
LOG.debug("getSession({}) {} in session map",idInCluster,(memSession==null?"not":""));
long now = System.currentTimeMillis();
//if the session is not in this node's memory, then load it from the cluster cache
if (memSession == null)
if (LOG.isDebugEnabled())
LOG.debug("getSession({}): loading session data from cluster", idInCluster);
session = load(makeKey(idInCluster, _context));
if (session != null)
//We retrieved a session with the same key from the database
//Check that it wasn't expired
if (session.getExpiry() > 0 && session.getExpiry() <= now)
if (LOG.isDebugEnabled()) LOG.debug("getSession ({}): Session expired", idInCluster);
//ensure that the session id for the expired session is deleted so that a new session with the
//same id cannot be created (because the idInUse() test would succeed)
return null;
//Update the last worker node to me
//TODO consider saving session here if lastNode was not this node
//Check that another thread hasn't loaded the same session
Session existingSession = _sessions.putIfAbsent(idInCluster, session);
if (existingSession != null)
//use the one that the other thread inserted
session = existingSession;
LOG.debug("getSession({}): using session loaded by another request thread ", idInCluster);
//indicate that the session was reinflated
LOG.debug("getSession({}): loaded session from cluster", idInCluster);
return session;
//The requested session does not exist anywhere in the cluster
LOG.debug("getSession({}): No session in cluster matching",idInCluster);
return null;
//The session exists in this node's memory
LOG.debug("getSession({}): returning session from local memory ", memSession.getClusterId());
return memSession;
catch (Exception e)
LOG.warn("Unable to load session="+idInCluster, e);
return null;
* The session manager is stopping.
* @see org.eclipse.jetty.server.session.AbstractSessionManager#shutdownSessions()
protected void shutdownSessions() throws Exception
Set<String> keys = new HashSet<String>(_sessions.keySet());
for (String key:keys)
Session session = _sessions.remove(key); //take the session out of the session list
//If the session is dirty, then write it to the cluster.
//If the session is simply stale do NOT write it to the cluster, as some other node
//may have started managing that session - this means that the last accessed/expiry time
//will not be updated, meaning it may look like it can expire sooner than it should.
if (session.isDirty())
if (LOG.isDebugEnabled())
LOG.debug("Saving dirty session {} before exiting ", session.getId());
catch (Exception e)
protected AbstractSession newSession(HttpServletRequest request)
return new Session(request);
* Remove a session from local memory, and delete it from
* the cluster cache.
* @see org.eclipse.jetty.server.session.AbstractSessionManager#removeSession(java.lang.String)
protected boolean removeSession(String idInCluster)
Session session = (Session)_sessions.remove(idInCluster);
if (session != null)
catch (Exception e)
LOG.warn("Problem deleting session id="+idInCluster, e);
return session!=null;
public void renewSessionId(String oldClusterId, String oldNodeId, String newClusterId, String newNodeId)
Session session = null;
//take the session with that id out of our managed list
session = (Session)_sessions.remove(oldClusterId);
if (session != null)
//TODO consider transactionality and ramifications if the session is live on another node
delete(session); //delete the old session from the cluster
session.swapId(newClusterId, newNodeId); //update the session
_sessions.put(newClusterId, session); //put it into managed list under new key
save(session); //put the session under the new id into the cluster
catch (Exception e)
super.renewSessionId(oldClusterId, oldNodeId, newClusterId, newNodeId);
* Load a session from the clustered cache.
* @param key the session key
* @return the session
protected Session load (String key)
if (_cache == null)
throw new IllegalStateException("No cache");
if (LOG.isDebugEnabled()) LOG.debug("Loading session {} from cluster", key);
SerializableSessionData storableSession = (SerializableSessionData)_cache.get(key);
if (storableSession == null)
if (LOG.isDebugEnabled()) LOG.debug("No session {} in cluster ",key);
return null;
Session session = new Session (storableSession);
return session;
* Save or update the session to the cluster cache
* @param session the session
* @throws Exception if unable to save
protected void save (InfinispanSessionManager.Session session)
throws Exception
if (_cache == null)
throw new IllegalStateException("No cache");
if (LOG.isDebugEnabled()) LOG.debug("Writing session {} to cluster", session.getId());
SerializableSessionData storableSession = new SerializableSessionData(session);
//Put an idle timeout on the cache entry if the session is not immortal -
//if no requests arrive at any node before this timeout occurs, or no node
//scavenges the session before this timeout occurs, the session will be removed.
//NOTE: that no session listeners can be called for this.
InfinispanSessionIdManager sessionIdManager = (InfinispanSessionIdManager)getSessionIdManager();
if (storableSession.maxInactive > 0)
_cache.put(makeKey(session, _context), storableSession, -1, TimeUnit.SECONDS, storableSession.maxInactive*sessionIdManager.getIdleExpiryMultiple(), TimeUnit.SECONDS);
_cache.put(makeKey(session, _context), storableSession);
//tickle the session id manager to keep the sessionid entry for this session up-to-date
* Remove the session from the cluster cache.
* @param session the session
protected void delete (InfinispanSessionManager.Session session)
if (_cache == null)
throw new IllegalStateException("No cache");
if (LOG.isDebugEnabled()) LOG.debug("Removing session {} from cluster", session.getId());
_cache.remove(makeKey(session, _context));
* Invalidate a session for this context with the given id
* @param idInCluster session id in cluster
public void invalidateSession (String idInCluster)
Session session = (Session)_sessions.get(idInCluster);
if (session != null)
* Make a unique key for this session.
* As the same session id can be used across multiple contexts, to
* make it unique, the key must be composed of:
* <ol>
* <li>the id</li>
* <li>the context path</li>
* <li>the virtual hosts</li>
* </ol>
*TODO consider the difference between getClusterId and getId
* @param session
* @return
private String makeKey (Session session, Context context)
return makeKey(session.getId(), context);
* Make a unique key for this session.
* As the same session id can be used across multiple contexts, to
* make it unique, the key must be composed of:
* <ol>
* <li>the id</li>
* <li>the context path</li>
* <li>the virtual hosts</li>
* </ol>
*TODO consider the difference between getClusterId and getId
* @param session
* @return
private String makeKey (String id, Context context)
String key = getContextPath(context);
key = key + "_" + getVirtualHost(context);
key = key+"_"+id;
return key;
* Turn the context path into an acceptable string
* @param context
* @return
private static String getContextPath (ContextHandler.Context context)
return canonicalize (context.getContextPath());
* Get the first virtual host for the context.
* Used to help identify the exact session/contextPath.
* @return if no virtual host is defined
private static String getVirtualHost (ContextHandler.Context context)
String vhost = "";
if (context==null)
return vhost;
String [] vhosts = context.getContextHandler().getVirtualHosts();
if (vhosts==null || vhosts.length==0 || vhosts[0]==null)
return vhost;
return vhosts[0];
* Make an acceptable name from a context path.
* @param path
* @return
private static String canonicalize (String path)
if (path==null)
return "";
return path.replace('/', '_').replace('.','_').replace('\\','_');