blob: 008cd21e64d191e8ecd7fc297d9fa9586ccc6a44 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.gcloud.session;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context;
import org.eclipse.jetty.server.session.AbstractSession;
import org.eclipse.jetty.server.session.AbstractSessionManager;
import org.eclipse.jetty.server.session.MemSession;
import org.eclipse.jetty.util.ClassLoadingObjectInputStream;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import com.google.cloud.datastore.Blob;
import com.google.cloud.datastore.Datastore;
import com.google.cloud.datastore.DatastoreException;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.cloud.datastore.Entity;
import com.google.cloud.datastore.GqlQuery;
import com.google.cloud.datastore.Key;
import com.google.cloud.datastore.KeyFactory;
import com.google.cloud.datastore.Query;
import com.google.cloud.datastore.Query.ResultType;
import com.google.cloud.datastore.QueryResults;
/**
* GCloudSessionManager
*
*
*/
public class GCloudSessionManager extends AbstractSessionManager
{
private final static Logger LOG = Log.getLogger("org.eclipse.jetty.server.session");
public static final String KIND = "GCloudSession";
public static final int DEFAULT_MAX_QUERY_RESULTS = 100;
public static final long DEFAULT_SCAVENGE_SEC = 600;
public static final int DEFAULT_BACKOFF_MS = 1000; //start at 1 sec
public static final int DEFAULT_MAX_RETRIES = 5;
/**
* Sessions known to this node held in memory
*/
private ConcurrentHashMap<String, GCloudSessionManager.Session> _sessions;
/**
* The length of time a session can be in memory without being checked against
* the cluster. A value of 0 indicates that the session is never checked against
* the cluster - the current node is considered to be the master for the session.
*
*/
private long _staleIntervalSec = 0;
protected Scheduler.Task _task; //scavenge task
protected Scheduler _scheduler;
protected Scavenger _scavenger;
protected long _scavengeIntervalMs = 1000L * DEFAULT_SCAVENGE_SEC; //10mins
protected boolean _ownScheduler;
private Datastore _datastore;
private KeyFactory _keyFactory;
private SessionEntityConverter _converter;
private int _maxResults = DEFAULT_MAX_QUERY_RESULTS;
private int _backoffMs = DEFAULT_BACKOFF_MS;
private int _maxRetries = DEFAULT_MAX_RETRIES;
private boolean _dsSet;
/**
* Scavenger
*
*/
protected class Scavenger implements Runnable
{
@Override
public void run()
{
try
{
scavenge();
}
finally
{
if (_scheduler != null && _scheduler.isRunning())
_task = _scheduler.schedule(this, _scavengeIntervalMs, TimeUnit.MILLISECONDS);
}
}
}
/**
* SessionEntityConverter
*
*
*/
public class SessionEntityConverter
{
public final String CLUSTERID = "clusterId";
public final String CONTEXTPATH = "contextPath";
public final String VHOST = "vhost";
public final String ACCESSED = "accessed";
public final String LASTACCESSED = "lastAccessed";
public final String CREATETIME = "createTime";
public final String COOKIESETTIME = "cookieSetTime";
public final String LASTNODE = "lastNode";
public final String EXPIRY = "expiry";
public final String MAXINACTIVE = "maxInactive";
public final String ATTRIBUTES = "attributes";
public Entity entityFromSession (Session session, Key key) throws Exception
{
if (session == null)
return null;
Entity entity = null;
//serialize the attribute map
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(session.getAttributeMap());
oos.flush();
//turn a session into an entity
entity = Entity.builder(key)
.set(CLUSTERID, session.getId())
.set(CONTEXTPATH, session.getContextPath())
.set(VHOST, session.getVHost())
.set(ACCESSED, session.getAccessed())
.set(LASTACCESSED, session.getLastAccessedTime())
.set(CREATETIME, session.getCreationTime())
.set(COOKIESETTIME, session.getCookieSetTime())
.set(LASTNODE,session.getLastNode())
.set(EXPIRY, session.getExpiry())
.set(MAXINACTIVE, session.getMaxInactiveInterval())
.set(ATTRIBUTES, Blob.copyFrom(baos.toByteArray())).build();
return entity;
}
public Session sessionFromEntity (Entity entity) throws Exception
{
if (entity == null)
return null;
final AtomicReference<Session> reference = new AtomicReference<Session>();
final AtomicReference<Exception> exception = new AtomicReference<Exception>();
Runnable load = new Runnable()
{
public void run ()
{
try
{
//turn an entity into a Session
String clusterId = entity.getString(CLUSTERID);
String contextPath = entity.getString(CONTEXTPATH);
String vhost = entity.getString(VHOST);
long accessed = entity.getLong(ACCESSED);
long lastAccessed = entity.getLong(LASTACCESSED);
long createTime = entity.getLong(CREATETIME);
long cookieSetTime = entity.getLong(COOKIESETTIME);
String lastNode = entity.getString(LASTNODE);
long expiry = entity.getLong(EXPIRY);
long maxInactive = entity.getLong(MAXINACTIVE);
Blob blob = (Blob) entity.getBlob(ATTRIBUTES);
Session session = new Session (clusterId, createTime, accessed, maxInactive);
session.setLastNode(lastNode);
session.setContextPath(contextPath);
session.setVHost(vhost);
session.setCookieSetTime(cookieSetTime);
session.setLastAccessedTime(lastAccessed);
session.setLastNode(lastNode);
session.setExpiry(expiry);
try (ClassLoadingObjectInputStream ois = new ClassLoadingObjectInputStream(blob.asInputStream()))
{
Object o = ois.readObject();
session.addAttributes((Map<String,Object>)o);
}
reference.set(session);
}
catch (Exception e)
{
exception.set(e);
}
}
};
if (_context==null)
load.run();
else
_context.getContextHandler().handle(null,load);
if (exception.get() != null)
{
exception.get().printStackTrace();
throw exception.get();
}
return reference.get();
}
}
/**
* Session
*
* Representation of a session in local memory.
*/
public class Session extends MemSession
{
private ReentrantLock _lock = new ReentrantLock();
/**
* The (canonical) context path for with which this session is associated
*/
private String _contextPath;
/**
* The time in msec since the epoch at which this session should expire
*/
private long _expiryTime;
/**
* Time in msec since the epoch at which this session was last read from cluster
*/
private long _lastSyncTime;
/**
* The workername of last node known to be managing the session
*/
private String _lastNode;
/**
* If dirty, session needs to be (re)sent to cluster
*/
protected boolean _dirty=false;
/**
* Any virtual hosts for the context with which this session is associated
*/
private String _vhost;
/**
* Count of how many threads are active in this session
*/
private AtomicInteger _activeThreads = new AtomicInteger(0);
/**
* A new session.
*
* @param request
*/
protected Session (HttpServletRequest request)
{
super(GCloudSessionManager.this,request);
long maxInterval = getMaxInactiveInterval();
_expiryTime = (maxInterval <= 0 ? 0 : (System.currentTimeMillis() + maxInterval*1000L));
_lastNode = getSessionIdManager().getWorkerName();
setVHost(GCloudSessionManager.getVirtualHost(_context));
setContextPath(GCloudSessionManager.getContextPath(_context));
_activeThreads.incrementAndGet(); //access will not be called on a freshly created session so increment here
}
/**
* A restored session.
*
* @param sessionId
* @param created
* @param accessed
* @param maxInterval
*/
protected Session (String sessionId, long created, long accessed, long maxInterval)
{
super(GCloudSessionManager.this, created, accessed, sessionId);
_expiryTime = (maxInterval <= 0 ? 0 : (System.currentTimeMillis() + maxInterval*1000L));
}
/**
* Called on entry to the session.
*
* @see org.eclipse.jetty.server.session.AbstractSession#access(long)
*/
@Override
protected boolean access(long time)
{
if (LOG.isDebugEnabled())
LOG.debug("Access session({}) for context {} on worker {}", getId(), getContextPath(), getSessionIdManager().getWorkerName());
try
{
long now = System.currentTimeMillis();
//lock so that no other thread can call access or complete until the first one has refreshed the session object if necessary
_lock.lock();
//a request thread is entering
if (_activeThreads.incrementAndGet() == 1)
{
//if the first thread, check that the session in memory is not stale, if we're checking for stale sessions
if (getStaleIntervalSec() > 0 && (now - getLastSyncTime()) >= (getStaleIntervalSec() * 1000L))
{
if (LOG.isDebugEnabled())
LOG.debug("Acess session({}) for context {} on worker {} stale session. Reloading.", getId(), getContextPath(), getSessionIdManager().getWorkerName());
refresh();
}
}
}
catch (Exception e)
{
LOG.warn(e);
}
finally
{
_lock.unlock();
}
if (super.access(time))
{
int maxInterval=getMaxInactiveInterval();
_expiryTime = (maxInterval <= 0 ? 0 : (time + maxInterval*1000L));
return true;
}
return false;
}
/**
* Exit from session
* @see org.eclipse.jetty.server.session.AbstractSession#complete()
*/
@Override
protected void complete()
{
super.complete();
//lock so that no other thread that might be calling access can proceed until this complete is done
_lock.lock();
try
{
//if this is the last request thread to be in the session
if (_activeThreads.decrementAndGet() == 0)
{
try
{
//an invalid session will already have been removed from the
//local session map and deleted from the cluster. If its valid save
//it to the cluster.
//TODO consider doing only periodic saves if only the last access
//time to the session changes
if (isValid())
{
//if session still valid && its dirty or stale or never been synced, write it to the cluster
//otherwise, we just keep the updated last access time in memory
if (_dirty || getLastSyncTime() == 0 || isStale(System.currentTimeMillis()))
{
willPassivate();
save(this);
didActivate();
}
}
}
catch (Exception e)
{
LOG.warn("Problem saving session({})",getId(), e);
}
finally
{
_dirty = false;
}
}
}
finally
{
_lock.unlock();
}
}
/** Test if the session is stale
* @param atTime
* @return true if the session is stale at the time given
*/
protected boolean isStale (long atTime)
{
return (getStaleIntervalSec() > 0) && (atTime - getLastSyncTime() >= (getStaleIntervalSec()*1000L));
}
/** Test if the session is dirty
* @return true if the dirty flag is set
*/
protected boolean isDirty ()
{
return _dirty;
}
/**
* Expire the session.
*
* @see org.eclipse.jetty.server.session.AbstractSession#timeout()
*/
@Override
protected void timeout()
{
if (LOG.isDebugEnabled()) LOG.debug("Timing out session {}", getId());
super.timeout();
}
/**
* Reload the session from the cluster. If the node that
* last managed the session from the cluster is ourself,
* then the session does not need refreshing.
* NOTE: this method MUST be called with sufficient locks
* in place to prevent 2 or more concurrent threads from
* simultaneously updating the session.
*/
private void refresh ()
throws Exception
{
//get fresh copy from the cluster
Session fresh = load(makeKey(getClusterId(), _context));
//if the session no longer exists, invalidate
if (fresh == null)
{
invalidate();
return;
}
//cluster copy assumed to be the same as we were the last
//node to manage it
if (fresh.getLastNode().equals(getLastNode()))
return;
setLastNode(getSessionIdManager().getWorkerName());
//prepare for refresh
willPassivate();
//if fresh has no attributes, remove them
if (fresh.getAttributes() == 0)
this.clearAttributes();
else
{
//reconcile attributes
for (String key:fresh.getAttributeMap().keySet())
{
Object freshvalue = fresh.getAttribute(key);
//session does not already contain this attribute, so bind it
if (getAttribute(key) == null)
{
doPutOrRemove(key,freshvalue);
bindValue(key,freshvalue);
}
else //session already contains this attribute, update its value
{
doPutOrRemove(key,freshvalue);
}
}
// cleanup, remove values from session, that don't exist in data anymore:
for (String key : getNames())
{
if (fresh.getAttribute(key) == null)
{
Object oldvalue = getAttribute(key);
doPutOrRemove(key,null);
unbindValue(key,oldvalue);
}
}
}
//finish refresh
didActivate();
}
public void setExpiry (long expiry)
{
_expiryTime = expiry;
}
public long getExpiry ()
{
return _expiryTime;
}
public boolean isExpiredAt (long time)
{
if (_expiryTime <= 0)
return false; //never expires
return (_expiryTime <= time);
}
public void swapId (String newId, String newNodeId)
{
//TODO probably synchronize rather than use the access/complete lock?
_lock.lock();
setClusterId(newId);
setNodeId(newNodeId);
_lock.unlock();
}
@Override
public void setAttribute (String name, Object value)
{
Object old = changeAttribute(name, value);
if (value == null && old == null)
return; //if same as remove attribute but attribute was already removed, no change
_dirty = true;
}
public String getContextPath()
{
return _contextPath;
}
public void setContextPath(String contextPath)
{
this._contextPath = contextPath;
}
public String getVHost()
{
return _vhost;
}
public void setVHost(String vhost)
{
this._vhost = vhost;
}
public String getLastNode()
{
return _lastNode;
}
public void setLastNode(String lastNode)
{
_lastNode = lastNode;
}
public long getLastSyncTime()
{
return _lastSyncTime;
}
public void setLastSyncTime(long lastSyncTime)
{
_lastSyncTime = lastSyncTime;
}
}
public void setDatastore (Datastore datastore)
{
_datastore = datastore;
_dsSet = true;
}
/**
* Start the session manager.
*
* @see org.eclipse.jetty.server.session.AbstractSessionManager#doStart()
*/
@Override
public void doStart() throws Exception
{
if (_sessionIdManager == null)
throw new IllegalStateException("No session id manager defined");
if (!_dsSet)
_datastore = DatastoreOptions.defaultInstance().service();
_keyFactory = _datastore.newKeyFactory().kind(KIND);
_converter = new SessionEntityConverter();
_sessions = new ConcurrentHashMap<String, Session>();
//try and use a common scheduler, fallback to own
_scheduler = getSessionHandler().getServer().getBean(Scheduler.class);
if (_scheduler == null)
{
_scheduler = new ScheduledExecutorScheduler();
_ownScheduler = true;
_scheduler.start();
}
else if (!_scheduler.isStarted())
throw new IllegalStateException("Shared scheduler not started");
setScavengeIntervalSec(getScavengeIntervalSec());
super.doStart();
}
/**
* Stop the session manager.
*
* @see org.eclipse.jetty.server.session.AbstractSessionManager#doStop()
*/
@Override
public void doStop() throws Exception
{
super.doStop();
if (_task!=null)
_task.cancel();
_task=null;
if (_ownScheduler && _scheduler !=null)
_scheduler.stop();
_scheduler = null;
_sessions.clear();
_sessions = null;
if (!_dsSet)
_datastore = null;
}
/**
* Look for sessions in local memory that have expired.
*/
public void scavenge ()
{
try
{
//scavenge in the database every so often
scavengeGCloudDataStore();
}
catch (Exception e)
{
LOG.warn("Problem scavenging", e);
}
}
protected void scavengeGCloudDataStore()
throws Exception
{
//query the datastore for sessions that have expired
long now = System.currentTimeMillis();
//give a bit of leeway so we don't immediately something that has only just expired a nanosecond ago
now = now - (_scavengeIntervalMs/2);
if (LOG.isDebugEnabled())
LOG.debug("Scavenging for sessions expired before "+now);
GqlQuery.Builder builder = Query.gqlQueryBuilder(ResultType.ENTITY, "select * from "+KIND+" where expiry < @1 limit "+_maxResults);
builder.allowLiteral(true);
builder.addBinding(now);
Query<Entity> query = builder.build();
QueryResults<Entity> results = _datastore.run(query);
while (results.hasNext())
{
Entity sessionEntity = results.next();
scavengeSession(sessionEntity);
}
}
/**
* Scavenge a session that has expired
* @param e the session info from datastore
* @throws Exception
*/
protected void scavengeSession (Entity e)
throws Exception
{
long now = System.currentTimeMillis();
Session session = _converter.sessionFromEntity(e);
if (session == null)
return;
if (LOG.isDebugEnabled())
LOG.debug("Scavenging session: {}",session.getId());
//if the session isn't in memory already, put it there so we can do a normal timeout call
Session memSession = _sessions.putIfAbsent(session.getId(), session);
if (memSession == null)
{
memSession = session;
}
//final check
if (memSession.isExpiredAt(now))
{
if (LOG.isDebugEnabled()) LOG.debug("Session {} is definitely expired", memSession.getId());
memSession.timeout();
}
}
public long getScavengeIntervalSec ()
{
return _scavengeIntervalMs/1000;
}
/**
* Set the interval between runs of the scavenger. It should not be run too
* often.
*
*
* @param sec the number of seconds between scavenge cycles
*/
public void setScavengeIntervalSec (long sec)
{
long old_period=_scavengeIntervalMs;
long period=sec*1000L;
_scavengeIntervalMs=period;
if (_scavengeIntervalMs > 0)
{
//add a bit of variability into the scavenge time so that not all
//nodes with the same scavenge time sync up
long tenPercent = _scavengeIntervalMs/10;
if ((System.currentTimeMillis()%2) == 0)
_scavengeIntervalMs += tenPercent;
if (LOG.isDebugEnabled())
LOG.debug("Scavenging every "+_scavengeIntervalMs+" ms");
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Scavenging disabled");
}
synchronized (this)
{
if (_scheduler != null && (period!=old_period || _task==null))
{
//clean up any previously scheduled scavenger
if (_task!=null)
_task.cancel();
//start a new one
if (_scavengeIntervalMs > 0)
{
if (_scavenger == null)
_scavenger = new Scavenger();
_task = _scheduler.schedule(_scavenger,_scavengeIntervalMs,TimeUnit.MILLISECONDS);
}
}
}
}
public long getStaleIntervalSec()
{
return _staleIntervalSec;
}
public void setStaleIntervalSec(long staleIntervalSec)
{
_staleIntervalSec = staleIntervalSec;
}
public int getMaxResults()
{
return _maxResults;
}
public void setMaxResults(int maxResults)
{
if (_maxResults <= 0)
_maxResults = DEFAULT_MAX_QUERY_RESULTS;
else
_maxResults = maxResults;
}
/**
* Add a new session for the context related to this session manager
*
* @see org.eclipse.jetty.server.session.AbstractSessionManager#addSession(org.eclipse.jetty.server.session.AbstractSession)
*/
@Override
protected void addSession(AbstractSession session)
{
if (session==null)
return;
if (LOG.isDebugEnabled()) LOG.debug("Adding session({}) to session manager for context {} on worker {}",session.getClusterId(), getContextPath(getContext()),getSessionIdManager().getWorkerName() + " with lastnode="+((Session)session).getLastNode());
_sessions.put(session.getClusterId(), (Session)session);
try
{
session.willPassivate();
save(((GCloudSessionManager.Session)session));
session.didActivate();
}
catch (Exception e)
{
LOG.warn("Unable to store new session id="+session.getId() , e);
}
}
/**
* Ask the cluster for the session.
*
* @see org.eclipse.jetty.server.session.AbstractSessionManager#getSession(java.lang.String)
*/
@Override
public AbstractSession getSession(String idInCluster)
{
Session session = null;
//try and find the session in this node's memory
Session memSession = (Session)_sessions.get(idInCluster);
if (LOG.isDebugEnabled())
LOG.debug("getSession({}) {} in session map",idInCluster,(memSession==null?"not":""));
long now = System.currentTimeMillis();
try
{
//if the session is not in this node's memory, then load it from the datastore
if (memSession == null)
{
if (LOG.isDebugEnabled())
LOG.debug("getSession({}): loading session data from cluster", idInCluster);
session = load(makeKey(idInCluster, _context));
if (session != null)
{
//Check that it wasn't expired
if (session.getExpiry() > 0 && session.getExpiry() <= now)
{
if (LOG.isDebugEnabled()) LOG.debug("getSession ({}): Session expired", idInCluster);
//ensure that the session id for the expired session is deleted so that a new session with the
//same id cannot be created (because the idInUse() test would succeed)
((GCloudSessionIdManager)getSessionIdManager()).removeSession(session);
return null;
}
//Update the last worker node to me
session.setLastNode(getSessionIdManager().getWorkerName());
//TODO consider saving session here if lastNode was not this node
//Check that another thread hasn't loaded the same session
Session existingSession = _sessions.putIfAbsent(idInCluster, session);
if (existingSession != null)
{
//use the one that the other thread inserted
session = existingSession;
LOG.debug("getSession({}): using session loaded by another request thread ", idInCluster);
}
else
{
//indicate that the session was reinflated
session.didActivate();
LOG.debug("getSession({}): loaded session from cluster", idInCluster);
}
return session;
}
else
{
//The requested session does not exist anywhere in the cluster
LOG.debug("getSession({}): No session in cluster matching",idInCluster);
return null;
}
}
else
{
//The session exists in this node's memory
LOG.debug("getSession({}): returning session from local memory ", memSession.getClusterId());
return memSession;
}
}
catch (Exception e)
{
LOG.warn("Unable to load session="+idInCluster, e);
return null;
}
}
/**
* The session manager is stopping.
*
* @see org.eclipse.jetty.server.session.AbstractSessionManager#shutdownSessions()
*/
@Override
protected void shutdownSessions() throws Exception
{
Set<String> keys = new HashSet<String>(_sessions.keySet());
for (String key:keys)
{
Session session = _sessions.remove(key); //take the session out of the session list
//If the session is dirty, then write it to the cluster.
//If the session is simply stale do NOT write it to the cluster, as some other node
//may have started managing that session - this means that the last accessed/expiry time
//will not be updated, meaning it may look like it can expire sooner than it should.
try
{
if (session.isDirty())
{
if (LOG.isDebugEnabled())
LOG.debug("Saving dirty session {} before exiting ", session.getId());
save(session);
}
}
catch (Exception e)
{
LOG.warn(e);
}
}
}
@Override
protected AbstractSession newSession(HttpServletRequest request)
{
return new Session(request);
}
/**
* Remove a session from local memory, and delete it from
* the cluster cache.
*
* @see org.eclipse.jetty.server.session.AbstractSessionManager#removeSession(java.lang.String)
*/
@Override
protected boolean removeSession(String idInCluster)
{
Session session = (Session)_sessions.remove(idInCluster);
try
{
if (session != null)
{
delete(session);
}
}
catch (Exception e)
{
LOG.warn("Problem deleting session id="+idInCluster, e);
}
return session!=null;
}
@Override
public void renewSessionId(String oldClusterId, String oldNodeId, String newClusterId, String newNodeId)
{
Session session = null;
try
{
//take the session with that id out of our managed list
session = (Session)_sessions.remove(oldClusterId);
if (session != null)
{
//TODO consider transactionality and ramifications if the session is live on another node
delete(session); //delete the old session from the cluster
session.swapId(newClusterId, newNodeId); //update the session
_sessions.put(newClusterId, session); //put it into managed list under new key
save(session); //put the session under the new id into the cluster
}
}
catch (Exception e)
{
LOG.warn(e);
}
super.renewSessionId(oldClusterId, oldNodeId, newClusterId, newNodeId);
}
/**
* Load a session from the clustered cache.
*
* @param key the unique datastore key for the session
* @return the Session object restored from datastore
*/
protected Session load (Key key)
throws Exception
{
if (_datastore == null)
throw new IllegalStateException("No DataStore");
if (LOG.isDebugEnabled()) LOG.debug("Loading session {} from DataStore ", key);
Entity entity = _datastore.get(key);
if (entity == null)
{
if (LOG.isDebugEnabled()) LOG.debug("No session {} in DataStore ",key);
return null;
}
else
{
Session session = _converter.sessionFromEntity(entity);
session.setLastSyncTime(System.currentTimeMillis());
return session;
}
}
/**
* Save or update the session to the cluster cache
*
* @param session the session to save to datastore
* @throws Exception
*/
protected void save (GCloudSessionManager.Session session)
throws Exception
{
if (_datastore == null)
throw new IllegalStateException("No DataStore");
if (LOG.isDebugEnabled()) LOG.debug("Writing session {} to DataStore", session.getId());
Entity entity = _converter.entityFromSession(session, makeKey(session, _context));
//attempt the update with exponential back-off
int backoff = getBackoffMs();
int attempts;
for (attempts = 0; attempts < getMaxRetries(); attempts++)
{
try
{
_datastore.put(entity);
session.setLastSyncTime(System.currentTimeMillis());
return;
}
catch (DatastoreException e)
{
if (e.retryable())
{
if (LOG.isDebugEnabled()) LOG.debug("Datastore put retry {} waiting {}ms", attempts, backoff);
try
{
Thread.currentThread().sleep(backoff);
}
catch (InterruptedException x)
{
}
backoff *= 2;
}
else
{
throw e;
}
}
}
throw new IOException("Retries exhausted");
}
public int getMaxRetries()
{
return _maxRetries;
}
public int getBackoffMs()
{
return _backoffMs;
}
/**
* @param backoffMs the backoffMs to set
*/
public void setBackoffMs(int backoffMs)
{
_backoffMs = backoffMs;
}
/**
* @param maxRetries the maxRetries to set
*/
public void setMaxRetries(int maxRetries)
{
_maxRetries = maxRetries;
}
/**
* Remove the session from the cluster cache.
*
* @param session the session to delete from datastore
*/
protected void delete (GCloudSessionManager.Session session)
{
if (_datastore == null)
throw new IllegalStateException("No DataStore");
if (LOG.isDebugEnabled()) LOG.debug("Removing session {} from DataStore", session.getId());
_datastore.delete(makeKey(session, _context));
}
/**
* Invalidate a session for this context with the given id
*
* @param idInCluster the id of the session to invalidate
*/
public void invalidateSession (String idInCluster)
{
Session session = (Session)_sessions.get(idInCluster);
if (session != null)
{
session.invalidate();
}
}
/**
* Make a unique key for this session.
* As the same session id can be used across multiple contexts, to
* make it unique, the key must be composed of:
* <ol>
* <li>the id</li>
* <li>the context path</li>
* <li>the virtual hosts</li>
* </ol>
*
* @param session the session for which the key should be created
* @param context the context to which the session belongs
* @return a unique datastore key for the session
*/
protected Key makeKey (Session session, Context context)
{
return makeKey(session.getId(), context);
}
/**
* Make a unique key for this session.
* As the same session id can be used across multiple contexts, to
* make it unique, the key must be composed of:
* <ol>
* <li>the id</li>
* <li>the context path</li>
* <li>the virtual hosts</li>
* </ol>
*
* @param id the id of the session for which the key should be created
* @param context the context to which the session belongs
* @return a unique datastore key for the session
*/
protected Key makeKey (String id, Context context)
{
return _keyFactory.newKey(canonicalizeKey(id,context));
}
/**
* Make a unique string from the session id and info from its Context
* @param id the id of the Session
* @param context the Context in which the Session exists
* @return a unique string representing the id of the session in the context
*/
protected String canonicalizeKey(String id, Context context)
{
String key = getContextPath(context);
key = key + "_" + getVirtualHost(context);
key = key+"_"+id;
return key;
}
/**
* Turn the context path into an acceptable string
*
* @param context a context
* @return a stringified version of the context
*/
private static String getContextPath (ContextHandler.Context context)
{
return canonicalize (context.getContextPath());
}
/**
* Get the first virtual host for the context.
*
* Used to help identify the exact session/contextPath.
*
* @param context a context
* @return a stringified form of the virtual hosts for the context, 0.0.0.0 if none are defined
*/
private static String getVirtualHost (ContextHandler.Context context)
{
String vhost = "0.0.0.0";
if (context==null)
return vhost;
String [] vhosts = context.getContextHandler().getVirtualHosts();
if (vhosts==null || vhosts.length==0 || vhosts[0]==null)
return vhost;
return vhosts[0];
}
/**
* Make an acceptable name from a context path.
*
* @param path a context path
* @return a stringified form of the context path
*/
private static String canonicalize (String path)
{
if (path==null)
return "";
return path.replace('/', '_').replace('.','_').replace('\\','_');
}
}