| /****************************************************************************** |
| * Copyright (c) 2016 TypeFox and others. |
| * |
| * This program and the accompanying materials are made available under the |
| * terms of the Eclipse Public License v. 2.0 which is available at |
| * http://www.eclipse.org/legal/epl-2.0, |
| * or the Eclipse Distribution License v. 1.0 which is available at |
| * http://www.eclipse.org/org/documents/edl-v10.php. |
| * |
| * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause |
| ******************************************************************************/ |
| package org.eclipse.lsp4j.jsonrpc.json; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.charset.StandardCharsets; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import org.eclipse.lsp4j.jsonrpc.JsonRpcException; |
| import org.eclipse.lsp4j.jsonrpc.MessageConsumer; |
| import org.eclipse.lsp4j.jsonrpc.MessageIssueException; |
| import org.eclipse.lsp4j.jsonrpc.MessageIssueHandler; |
| import org.eclipse.lsp4j.jsonrpc.MessageProducer; |
| import org.eclipse.lsp4j.jsonrpc.messages.Message; |
| |
| /** |
| * A message producer that reads from an input stream and parses messages from JSON. |
| */ |
| public class StreamMessageProducer implements MessageProducer, Closeable, MessageConstants { |
| |
| private static final Logger LOG = Logger.getLogger(StreamMessageProducer.class.getName()); |
| |
| private final MessageJsonHandler jsonHandler; |
| private final MessageIssueHandler issueHandler; |
| |
| private InputStream input; |
| |
| private MessageConsumer callback; |
| private boolean keepRunning; |
| |
| public StreamMessageProducer(InputStream input, MessageJsonHandler jsonHandler) { |
| this(input, jsonHandler, null); |
| } |
| |
| public StreamMessageProducer(InputStream input, MessageJsonHandler jsonHandler, MessageIssueHandler issueHandler) { |
| this.input = input; |
| this.jsonHandler = jsonHandler; |
| this.issueHandler = issueHandler; |
| } |
| |
| public InputStream getInput() { |
| return input; |
| } |
| |
| public void setInput(InputStream input) { |
| this.input = input; |
| } |
| |
| protected static class Headers { |
| public int contentLength = -1; |
| public String charset = StandardCharsets.UTF_8.name(); |
| } |
| |
| @Override |
| public void listen(MessageConsumer callback) { |
| if (keepRunning) { |
| throw new IllegalStateException("This StreamMessageProducer is already running."); |
| } |
| this.keepRunning = true; |
| this.callback = callback; |
| try { |
| StringBuilder headerBuilder = null; |
| StringBuilder debugBuilder = null; |
| boolean newLine = false; |
| Headers headers = new Headers(); |
| while (keepRunning) { |
| int c = input.read(); |
| if (c == -1) { |
| // End of input stream has been reached |
| keepRunning = false; |
| } else { |
| if (debugBuilder == null) |
| debugBuilder = new StringBuilder(); |
| debugBuilder.append((char) c); |
| if (c == '\n') { |
| if (newLine) { |
| // Two consecutive newlines have been read, which signals the start of the message content |
| if (headers.contentLength < 0) { |
| fireError(new IllegalStateException("Missing header " + CONTENT_LENGTH_HEADER |
| + " in input \"" + debugBuilder + "\"")); |
| } else { |
| boolean result = handleMessage(input, headers); |
| if (!result) |
| keepRunning = false; |
| newLine = false; |
| } |
| headers = new Headers(); |
| debugBuilder = null; |
| } else if (headerBuilder != null) { |
| // A single newline ends a header line |
| parseHeader(headerBuilder.toString(), headers); |
| headerBuilder = null; |
| } |
| newLine = true; |
| } else if (c != '\r') { |
| // Add the input to the current header line |
| if (headerBuilder == null) |
| headerBuilder = new StringBuilder(); |
| headerBuilder.append((char) c); |
| newLine = false; |
| } |
| } |
| } // while (keepRunning) |
| } catch (IOException exception) { |
| if (JsonRpcException.indicatesStreamClosed(exception)) { |
| // Only log the error if we had intended to keep running |
| if (keepRunning) |
| fireStreamClosed(exception); |
| } else |
| throw new JsonRpcException(exception); |
| } finally { |
| this.callback = null; |
| this.keepRunning = false; |
| } |
| } |
| |
| /** |
| * Log an error. |
| */ |
| protected void fireError(Throwable error) { |
| String message = error.getMessage() != null ? error.getMessage() : "An error occurred while processing an incoming message."; |
| LOG.log(Level.SEVERE, message, error); |
| } |
| |
| /** |
| * Report that the stream was closed through an exception. |
| */ |
| protected void fireStreamClosed(Exception cause) { |
| String message = cause.getMessage() != null ? cause.getMessage() : "The input stream was closed."; |
| LOG.log(Level.INFO, message, cause); |
| } |
| |
| /** |
| * Parse a header attribute and set the corresponding data in the {@link Headers} fields. |
| */ |
| protected void parseHeader(String line, Headers headers) { |
| int sepIndex = line.indexOf(':'); |
| if (sepIndex >= 0) { |
| String key = line.substring(0, sepIndex).trim(); |
| switch (key) { |
| case CONTENT_LENGTH_HEADER: |
| try { |
| headers.contentLength = Integer.parseInt(line.substring(sepIndex + 1).trim()); |
| } catch (NumberFormatException e) { |
| fireError(e); |
| } |
| break; |
| case CONTENT_TYPE_HEADER: { |
| int charsetIndex = line.indexOf("charset="); |
| if (charsetIndex >= 0) |
| headers.charset = line.substring(charsetIndex + 8).trim(); |
| break; |
| } |
| } |
| } |
| } |
| |
| /** |
| * Read the JSON content part of a message, parse it, and notify the callback. |
| * |
| * @return {@code true} if we should continue reading from the input stream, {@code false} if we should stop |
| */ |
| protected boolean handleMessage(InputStream input, Headers headers) throws IOException { |
| if (callback == null) |
| callback = message -> LOG.log(Level.INFO, "Received message: " + message); |
| |
| try { |
| int contentLength = headers.contentLength; |
| byte[] buffer = new byte[contentLength]; |
| int bytesRead = 0; |
| |
| while (bytesRead < contentLength) { |
| int readResult = input.read(buffer, bytesRead, contentLength - bytesRead); |
| if (readResult == -1) |
| return false; |
| bytesRead += readResult; |
| } |
| |
| String content = new String(buffer, headers.charset); |
| try { |
| Message message = jsonHandler.parseMessage(content); |
| callback.consume(message); |
| } catch (MessageIssueException exception) { |
| // An issue was found while parsing or validating the message |
| if (issueHandler != null) |
| issueHandler.handle(exception.getRpcMessage(), exception.getIssues()); |
| else |
| fireError(exception); |
| } |
| } catch (Exception exception) { |
| // UnsupportedEncodingException can be thrown by String constructor |
| // JsonParseException can be thrown by jsonHandler |
| // We also catch arbitrary exceptions that are thrown by message consumers in order to keep this thread alive |
| fireError(exception); |
| } |
| return true; |
| } |
| |
| @Override |
| public void close() { |
| keepRunning = false; |
| } |
| |
| } |