blob: 835a903e3d0801514f601a26b54ca9345012812e [file] [log] [blame]
/******************************************************************************
* Copyright (c) 2016 TypeFox and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0,
* or the Eclipse Distribution License v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
******************************************************************************/
package org.eclipse.lsp4j.jsonrpc.json;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.eclipse.lsp4j.jsonrpc.JsonRpcException;
import org.eclipse.lsp4j.jsonrpc.MessageConsumer;
import org.eclipse.lsp4j.jsonrpc.MessageIssueException;
import org.eclipse.lsp4j.jsonrpc.MessageIssueHandler;
import org.eclipse.lsp4j.jsonrpc.MessageProducer;
import org.eclipse.lsp4j.jsonrpc.messages.Message;
/**
* A message producer that reads from an input stream and parses messages from JSON.
*/
public class StreamMessageProducer implements MessageProducer, Closeable, MessageConstants {
private static final Logger LOG = Logger.getLogger(StreamMessageProducer.class.getName());
private final MessageJsonHandler jsonHandler;
private final MessageIssueHandler issueHandler;
private InputStream input;
private MessageConsumer callback;
private boolean keepRunning;
public StreamMessageProducer(InputStream input, MessageJsonHandler jsonHandler) {
this(input, jsonHandler, null);
}
public StreamMessageProducer(InputStream input, MessageJsonHandler jsonHandler, MessageIssueHandler issueHandler) {
this.input = input;
this.jsonHandler = jsonHandler;
this.issueHandler = issueHandler;
}
public InputStream getInput() {
return input;
}
public void setInput(InputStream input) {
this.input = input;
}
protected static class Headers {
public int contentLength = -1;
public String charset = StandardCharsets.UTF_8.name();
}
@Override
public void listen(MessageConsumer callback) {
if (keepRunning) {
throw new IllegalStateException("This StreamMessageProducer is already running.");
}
this.keepRunning = true;
this.callback = callback;
try {
StringBuilder headerBuilder = null;
StringBuilder debugBuilder = null;
boolean newLine = false;
Headers headers = new Headers();
while (keepRunning) {
int c = input.read();
if (c == -1) {
// End of input stream has been reached
keepRunning = false;
} else {
if (debugBuilder == null)
debugBuilder = new StringBuilder();
debugBuilder.append((char) c);
if (c == '\n') {
if (newLine) {
// Two consecutive newlines have been read, which signals the start of the message content
if (headers.contentLength < 0) {
fireError(new IllegalStateException("Missing header " + CONTENT_LENGTH_HEADER
+ " in input \"" + debugBuilder + "\""));
} else {
boolean result = handleMessage(input, headers);
if (!result)
keepRunning = false;
newLine = false;
}
headers = new Headers();
debugBuilder = null;
} else if (headerBuilder != null) {
// A single newline ends a header line
parseHeader(headerBuilder.toString(), headers);
headerBuilder = null;
}
newLine = true;
} else if (c != '\r') {
// Add the input to the current header line
if (headerBuilder == null)
headerBuilder = new StringBuilder();
headerBuilder.append((char) c);
newLine = false;
}
}
} // while (keepRunning)
} catch (IOException exception) {
if (JsonRpcException.indicatesStreamClosed(exception)) {
// Only log the error if we had intended to keep running
if (keepRunning)
fireStreamClosed(exception);
} else
throw new JsonRpcException(exception);
} finally {
this.callback = null;
this.keepRunning = false;
}
}
/**
* Log an error.
*/
protected void fireError(Throwable error) {
String message = error.getMessage() != null ? error.getMessage() : "An error occurred while processing an incoming message.";
LOG.log(Level.SEVERE, message, error);
}
/**
* Report that the stream was closed through an exception.
*/
protected void fireStreamClosed(Exception cause) {
String message = cause.getMessage() != null ? cause.getMessage() : "The input stream was closed.";
LOG.log(Level.INFO, message, cause);
}
/**
* Parse a header attribute and set the corresponding data in the {@link Headers} fields.
*/
protected void parseHeader(String line, Headers headers) {
int sepIndex = line.indexOf(':');
if (sepIndex >= 0) {
String key = line.substring(0, sepIndex).trim();
switch (key) {
case CONTENT_LENGTH_HEADER:
try {
headers.contentLength = Integer.parseInt(line.substring(sepIndex + 1).trim());
} catch (NumberFormatException e) {
fireError(e);
}
break;
case CONTENT_TYPE_HEADER: {
int charsetIndex = line.indexOf("charset=");
if (charsetIndex >= 0)
headers.charset = line.substring(charsetIndex + 8).trim();
break;
}
}
}
}
/**
* Read the JSON content part of a message, parse it, and notify the callback.
*
* @return {@code true} if we should continue reading from the input stream, {@code false} if we should stop
*/
protected boolean handleMessage(InputStream input, Headers headers) throws IOException {
if (callback == null)
callback = message -> LOG.log(Level.INFO, "Received message: " + message);
try {
int contentLength = headers.contentLength;
byte[] buffer = new byte[contentLength];
int bytesRead = 0;
while (bytesRead < contentLength) {
int readResult = input.read(buffer, bytesRead, contentLength - bytesRead);
if (readResult == -1)
return false;
bytesRead += readResult;
}
String content = new String(buffer, headers.charset);
try {
Message message = jsonHandler.parseMessage(content);
callback.consume(message);
} catch (MessageIssueException exception) {
// An issue was found while parsing or validating the message
if (issueHandler != null)
issueHandler.handle(exception.getRpcMessage(), exception.getIssues());
else
fireError(exception);
}
} catch (Exception exception) {
// UnsupportedEncodingException can be thrown by String constructor
// JsonParseException can be thrown by jsonHandler
// We also catch arbitrary exceptions that are thrown by message consumers in order to keep this thread alive
fireError(exception);
}
return true;
}
@Override
public void close() {
keepRunning = false;
}
}