blob: b849a9892ecf6c5a17ff69a8d24a52565493513d [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.rhttp.gateway;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.rhttp.client.RHTTPRequest;
import org.eclipse.jetty.rhttp.client.RHTTPResponse;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* The servlet that handles the communication with the gateway clients.
* @version $Revision$ $Date$
*/
public class ConnectorServlet extends HttpServlet
{
private final Logger logger = Log.getLogger(getClass().toString());
private final TargetIdRetriever targetIdRetriever = new StandardTargetIdRetriever();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<String, Future<?>> expirations = new ConcurrentHashMap<String, Future<?>>();
private final Gateway gateway;
private long clientTimeout=15000;
public ConnectorServlet(Gateway gateway)
{
this.gateway = gateway;
}
@Override
public void init() throws ServletException
{
String t = getInitParameter("clientTimeout");
if (t!=null && !"".equals(t))
clientTimeout=Long.parseLong(t);
}
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
String targetId = targetIdRetriever.retrieveTargetId(request);
String uri = request.getRequestURI();
String path = uri.substring(request.getServletPath().length());
String[] segments = path.split("/");
if (segments.length < 3)
throw new ServletException("Invalid request to " + getClass().getSimpleName() + ": " + uri);
String action = segments[2];
if ("handshake".equals(action))
serviceHandshake(targetId, request, response);
else if ("connect".equals(action))
serviceConnect(targetId, request, response);
else if ("deliver".equals(action))
serviceDeliver(targetId, request, response);
else if ("disconnect".equals(action))
serviceDisconnect(targetId, request, response);
else
throw new ServletException("Invalid request to " + getClass().getSimpleName() + ": " + uri);
}
private void serviceHandshake(String targetId, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException
{
ClientDelegate client = gateway.getClientDelegate(targetId);
if (client != null)
throw new IOException("Client with targetId " + targetId + " is already connected");
client = gateway.newClientDelegate(targetId);
ClientDelegate existing = gateway.addClientDelegate(targetId, client);
if (existing != null)
throw new IOException("Client with targetId " + targetId + " is already connected");
flush(client, httpRequest, httpResponse);
}
private void flush(ClientDelegate client, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException
{
List<RHTTPRequest> requests = client.process(httpRequest);
if (requests != null)
{
// Schedule before sending the requests, to avoid that the remote client
// reconnects before we have scheduled the expiration timeout.
if (!client.isClosed())
schedule(client);
ServletOutputStream output = httpResponse.getOutputStream();
for (RHTTPRequest request : requests)
output.write(request.getFrameBytes());
// I could count the framed bytes of all requests and set a Content-Length header,
// but the implementation of ServletOutputStream takes care of everything:
// if the request was HTTP/1.1, then flushing result in a chunked response, but the
// client know how to handle it; if the request was HTTP/1.0, then no chunking.
// To avoid chunking in HTTP/1.1 I must set the Content-Length header.
output.flush();
logger.debug("Delivered to device {} requests {} ", client.getTargetId(), requests);
}
}
private void schedule(ClientDelegate client)
{
Future<?> task = scheduler.schedule(new ClientExpirationTask(client), clientTimeout, TimeUnit.MILLISECONDS);
Future<?> existing = expirations.put(client.getTargetId(), task);
assert existing == null;
}
private void unschedule(String targetId)
{
Future<?> task = expirations.remove(targetId);
if (task != null)
task.cancel(false);
}
private void serviceConnect(String targetId, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException
{
unschedule(targetId);
ClientDelegate client = gateway.getClientDelegate(targetId);
if (client == null)
{
// Expired client tries to connect without handshake
httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED);
return;
}
flush(client, httpRequest, httpResponse);
if (client.isClosed())
gateway.removeClientDelegate(targetId);
}
private void expireConnect(ClientDelegate client, long time)
{
String targetId = client.getTargetId();
logger.info("Client with targetId {} missing, last seen {} ms ago, closing it", targetId, System.currentTimeMillis() - time);
client.close();
// If the client expired, means that it did not connect,
// so there no request to resume, and we cleanup here
// (while normally this cleanup is done in serviceConnect())
unschedule(targetId);
gateway.removeClientDelegate(targetId);
}
private void serviceDeliver(String targetId, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException, IOException
{
if (gateway.getClientDelegate(targetId) == null)
{
// Expired client tries to deliver without handshake
httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED);
return;
}
byte[] body = Utils.read(httpRequest.getInputStream());
RHTTPResponse response = RHTTPResponse.fromFrameBytes(body);
ExternalRequest externalRequest = gateway.removeExternalRequest(response.getId());
if (externalRequest != null)
{
externalRequest.respond(response);
logger.debug("Deliver request from device {}, gateway request {}, response {}", new Object[] {targetId, externalRequest, response});
}
else
{
// We can arrive here for a race with the continuation expiration, which expired just before
// the gateway client responded with a valid response; log this case ignore it.
logger.debug("Deliver request from device {}, missing gateway request, response {}", targetId, response);
}
}
private void serviceDisconnect(String targetId, HttpServletRequest request, HttpServletResponse response)
{
// Do not remove the ClientDelegate from the gateway here,
// since closing the ClientDelegate will resume the connect request
// and we remove the ClientDelegate from the gateway there
ClientDelegate client = gateway.getClientDelegate(targetId);
if (client != null)
client.close();
}
private class ClientExpirationTask implements Runnable
{
private final long time = System.currentTimeMillis();
private final ClientDelegate client;
public ClientExpirationTask(ClientDelegate client)
{
this.client = client;
}
public void run()
{
expireConnect(client, time);
}
}
}