| // |
| // ======================================================================== |
| // Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. |
| // ------------------------------------------------------------------------ |
| // All rights reserved. This program and the accompanying materials |
| // are made available under the terms of the Eclipse Public License v1.0 |
| // and Apache License v2.0 which accompanies this distribution. |
| // |
| // The Eclipse Public License is available at |
| // http://www.eclipse.org/legal/epl-v10.html |
| // |
| // The Apache License v2.0 is available at |
| // http://www.opensource.org/licenses/apache2.0.php |
| // |
| // You may elect to redistribute this code under either of these licenses. |
| // ======================================================================== |
| // |
| |
| package org.eclipse.jetty.websocket.common.message; |
| |
| import java.io.IOException; |
| import java.io.Writer; |
| import java.nio.ByteBuffer; |
| |
| import org.eclipse.jetty.io.ByteBufferPool; |
| import org.eclipse.jetty.util.BufferUtil; |
| import org.eclipse.jetty.util.log.Log; |
| import org.eclipse.jetty.util.log.Logger; |
| import org.eclipse.jetty.websocket.api.BatchMode; |
| import org.eclipse.jetty.websocket.api.WriteCallback; |
| import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; |
| import org.eclipse.jetty.websocket.common.BlockingWriteCallback; |
| import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker; |
| import org.eclipse.jetty.websocket.common.WebSocketSession; |
| import org.eclipse.jetty.websocket.common.frames.TextFrame; |
| |
| /** |
| * Support for writing a single WebSocket TEXT message via a {@link Writer} |
| * <p> |
| * Note: Per WebSocket spec, all WebSocket TEXT messages must be encoded in UTF-8 |
| */ |
| public class MessageWriter extends Writer |
| { |
| private static final Logger LOG = Log.getLogger(MessageWriter.class); |
| |
| private final OutgoingFrames outgoing; |
| private final ByteBufferPool bufferPool; |
| private final BlockingWriteCallback blocker; |
| private long frameCount; |
| private TextFrame frame; |
| private ByteBuffer buffer; |
| private Utf8CharBuffer utf; |
| private WriteCallback callback; |
| private boolean closed; |
| |
| public MessageWriter(WebSocketSession session) |
| { |
| this(session.getOutgoingHandler(), session.getPolicy().getMaxTextMessageBufferSize(), session.getBufferPool()); |
| } |
| |
| public MessageWriter(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool) |
| { |
| this.outgoing = outgoing; |
| this.bufferPool = bufferPool; |
| this.blocker = new BlockingWriteCallback(); |
| this.buffer = bufferPool.acquire(bufferSize, true); |
| BufferUtil.flipToFill(buffer); |
| this.frame = new TextFrame(); |
| this.utf = Utf8CharBuffer.wrap(buffer); |
| } |
| |
| @Override |
| public void write(char[] chars, int off, int len) throws IOException |
| { |
| try |
| { |
| send(chars, off, len); |
| } |
| catch (Throwable x) |
| { |
| // Notify without holding locks. |
| notifyFailure(x); |
| throw x; |
| } |
| } |
| |
| @Override |
| public void write(int c) throws IOException |
| { |
| try |
| { |
| send(new char[]{(char)c}, 0, 1); |
| } |
| catch (Throwable x) |
| { |
| // Notify without holding locks. |
| notifyFailure(x); |
| throw x; |
| } |
| } |
| |
| @Override |
| public void flush() throws IOException |
| { |
| try |
| { |
| flush(false); |
| } |
| catch (Throwable x) |
| { |
| // Notify without holding locks. |
| notifyFailure(x); |
| throw x; |
| } |
| } |
| |
| @Override |
| public void close() throws IOException |
| { |
| try |
| { |
| flush(true); |
| bufferPool.release(buffer); |
| if (LOG.isDebugEnabled()) |
| LOG.debug("Stream closed, {} frames sent", frameCount); |
| // Notify without holding locks. |
| notifySuccess(); |
| } |
| catch (Throwable x) |
| { |
| // Notify without holding locks. |
| notifyFailure(x); |
| throw x; |
| } |
| } |
| |
| private void flush(boolean fin) throws IOException |
| { |
| synchronized (this) |
| { |
| if (closed) |
| throw new IOException("Stream is closed"); |
| |
| closed = fin; |
| |
| ByteBuffer data = utf.getByteBuffer(); |
| if (LOG.isDebugEnabled()) |
| LOG.debug("flush({}): {}", fin, BufferUtil.toDetailString(buffer)); |
| frame.setPayload(data); |
| frame.setFin(fin); |
| |
| try (WriteBlocker b = blocker.acquireWriteBlocker()) |
| { |
| outgoing.outgoingFrame(frame, b, BatchMode.OFF); |
| b.block(); |
| } |
| |
| ++frameCount; |
| // Any flush after the first will be a CONTINUATION frame. |
| frame.setIsContinuation(); |
| |
| utf.clear(); |
| } |
| } |
| |
| private void send(char[] chars, int offset, int length) throws IOException |
| { |
| synchronized (this) |
| { |
| if (closed) |
| throw new IOException("Stream is closed"); |
| |
| while (length > 0) |
| { |
| // There may be no space available, we want |
| // to handle correctly when space == 0. |
| int space = utf.remaining(); |
| int size = Math.min(space, length); |
| utf.append(chars, offset, size); |
| offset += size; |
| length -= size; |
| if (length > 0) |
| { |
| // If we could not write everything, it means |
| // that the buffer was full, so flush it. |
| flush(false); |
| } |
| } |
| } |
| } |
| |
| public void setCallback(WriteCallback callback) |
| { |
| synchronized (this) |
| { |
| this.callback = callback; |
| } |
| } |
| |
| private void notifySuccess() |
| { |
| WriteCallback callback; |
| synchronized (this) |
| { |
| callback = this.callback; |
| } |
| if (callback != null) |
| { |
| callback.writeSuccess(); |
| } |
| } |
| |
| private void notifyFailure(Throwable failure) |
| { |
| WriteCallback callback; |
| synchronized (this) |
| { |
| callback = this.callback; |
| } |
| if (callback != null) |
| { |
| callback.writeFailed(failure); |
| } |
| } |
| } |