| /* |
| * Copyright (c) 2012, 2021 Oracle and/or its affiliates. All rights reserved. |
| * |
| * This program and the accompanying materials are made available under the |
| * terms of the Eclipse Public License v. 2.0, which is available at |
| * http://www.eclipse.org/legal/epl-2.0. |
| * |
| * This Source Code may also be made available under the following Secondary |
| * Licenses when the conditions for such availability set forth in the |
| * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, |
| * version 2 with the GNU Classpath Exception, which is available at |
| * https://www.gnu.org/software/classpath/license.html. |
| * |
| * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 |
| */ |
| |
| package org.glassfish.jersey.media.sse; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.lang.annotation.Annotation; |
| import java.util.Arrays; |
| |
| import javax.ws.rs.ProcessingException; |
| import javax.ws.rs.core.GenericType; |
| import javax.ws.rs.core.MediaType; |
| import javax.ws.rs.core.MultivaluedMap; |
| import javax.ws.rs.ext.MessageBodyReader; |
| import javax.ws.rs.sse.InboundSseEvent; |
| |
| import org.glassfish.jersey.message.MessageBodyWorkers; |
| import org.glassfish.jersey.message.internal.MessageBodyProviderNotFoundException; |
| |
| /** |
| * Inbound event. |
| * |
| * @author Pavel Bucek |
| * @author Marek Potociar |
| */ |
| public class InboundEvent implements InboundSseEvent { |
| |
| private static final GenericType<String> STRING_AS_GENERIC_TYPE = new GenericType<>(String.class); |
| |
| private final String name; |
| private final String id; |
| private final String comment; |
| private final byte[] data; |
| private final long reconnectDelay; |
| |
| private final MessageBodyWorkers messageBodyWorkers; |
| private final Annotation[] annotations; |
| private final MediaType mediaType; |
| private final MultivaluedMap<String, String> headers; |
| |
| /** |
| * Inbound event builder. This implementation is not thread-safe. |
| */ |
| static class Builder { |
| |
| private String name; |
| private String id; |
| private long reconnectDelay = SseFeature.RECONNECT_NOT_SET; |
| private final ByteArrayOutputStream dataStream; |
| |
| private final MessageBodyWorkers workers; |
| private final Annotation[] annotations; |
| private final MediaType mediaType; |
| private final MultivaluedMap<String, String> headers; |
| private final StringBuilder commentBuilder; |
| |
| /** |
| * Create new inbound event builder. |
| * |
| * @param workers configured client-side {@link MessageBodyWorkers entity providers} used for |
| * {@link javax.ws.rs.ext.MessageBodyReader} lookup. |
| * @param annotations annotations attached to the Java type to be read. Used for |
| * {@link javax.ws.rs.ext.MessageBodyReader} lookup. |
| * @param mediaType media type of the SSE event data. |
| * Used for {@link javax.ws.rs.ext.MessageBodyReader} lookup. |
| * @param headers response headers. Used for {@link javax.ws.rs.ext.MessageBodyWriter} lookup. |
| */ |
| public Builder(MessageBodyWorkers workers, |
| Annotation[] annotations, |
| MediaType mediaType, |
| MultivaluedMap<String, String> headers) { |
| this.workers = workers; |
| this.annotations = annotations; |
| this.mediaType = mediaType; |
| this.headers = headers; |
| |
| this.commentBuilder = new StringBuilder(); |
| this.dataStream = new ByteArrayOutputStream(); |
| } |
| |
| /** |
| * Set inbound event name. |
| * <p/> |
| * Value of the received SSE {@code "event"} field. |
| * |
| * @param name {@code "event"} field value. |
| * @return updated builder instance. |
| */ |
| public Builder name(String name) { |
| this.name = name; |
| return this; |
| } |
| |
| /** |
| * Set inbound event identifier. |
| * <p/> |
| * Value of the received SSE {@code "id"} field. |
| * |
| * @param id {@code "id"} field value. |
| * @return updated builder instance. |
| */ |
| public Builder id(String id) { |
| this.id = id; |
| return this; |
| } |
| |
| /** |
| * Add a comment line to the event. |
| * <p> |
| * The comment line will be added to the received SSE event comment as a new line in the comment field. |
| * If the comment line parameter is {@code null}, the call will be ignored. |
| * </p> |
| * |
| * @param commentLine comment line to be added to the event comment. |
| * @return updated builder instance. |
| * @since 2.21 |
| */ |
| public Builder commentLine(final CharSequence commentLine) { |
| if (commentLine != null) { |
| commentBuilder.append(commentLine).append('\n'); |
| } |
| |
| return this; |
| } |
| |
| /** |
| * Set reconnection delay (in milliseconds) that indicates how long the event receiver should wait |
| * before attempting to reconnect in case a connection to SSE event source is lost. |
| * <p> |
| * Value of the received SSE {@code "retry"} field. |
| * </p> |
| * |
| * @param milliseconds reconnection delay in milliseconds. Negative values un-set the reconnection delay. |
| * @return updated builder instance. |
| * @since 2.3 |
| */ |
| public Builder reconnectDelay(long milliseconds) { |
| if (milliseconds < 0) { |
| milliseconds = SseFeature.RECONNECT_NOT_SET; |
| } |
| this.reconnectDelay = milliseconds; |
| return this; |
| } |
| |
| /** |
| * Add more inbound event data. |
| * |
| * @param data byte array containing data stored in the incoming event. |
| * @return updated builder instance. |
| */ |
| public Builder write(byte[] data) { |
| if (data == null || data.length == 0) { |
| return this; |
| } |
| |
| try { |
| this.dataStream.write(data); |
| } catch (IOException ex) { |
| // ignore - this is not possible with ByteArrayOutputStream |
| } |
| return this; |
| } |
| |
| /** |
| * Build a new inbound event instance using the supplied data. |
| * |
| * @return new inbound event instance. |
| */ |
| public InboundEvent build() { |
| return new InboundEvent( |
| name, |
| id, |
| commentBuilder.length() > 0 ? commentBuilder.substring(0, commentBuilder.length() - 1) : null, |
| reconnectDelay, |
| dataStream.toByteArray(), |
| workers, |
| annotations, |
| mediaType, |
| headers); |
| } |
| } |
| |
| private InboundEvent(final String name, |
| final String id, |
| final String comment, |
| final long reconnectDelay, |
| final byte[] data, |
| final MessageBodyWorkers messageBodyWorkers, |
| final Annotation[] annotations, |
| final MediaType mediaType, |
| final MultivaluedMap<String, String> headers) { |
| this.name = name; |
| this.id = id; |
| this.comment = comment; |
| this.reconnectDelay = reconnectDelay; |
| this.data = stripLastLineBreak(data); |
| this.messageBodyWorkers = messageBodyWorkers; |
| this.annotations = annotations; |
| this.mediaType = mediaType; |
| this.headers = headers; |
| } |
| |
| /** |
| * Get event name. |
| * <p> |
| * Contains value of SSE {@code "event"} field. This field is optional. Method may return {@code null}, if the event |
| * name is not specified. |
| * </p> |
| * |
| * @return event name, or {@code null} if not set. |
| */ |
| public String getName() { |
| return name; |
| } |
| |
| /** |
| * Get event identifier. |
| * <p> |
| * Contains value of SSE {@code "id"} field. This field is optional. Method may return {@code null}, if the event |
| * identifier is not specified. |
| * </p> |
| * |
| * @return event id. |
| * @since 2.3 |
| */ |
| public String getId() { |
| return id; |
| } |
| |
| /** |
| * Get a comment string that accompanies the event. |
| * <p> |
| * Contains value of the comment associated with SSE event. This field is optional. Method may return {@code null}, |
| * if the event comment is not specified. |
| * </p> |
| * |
| * @return comment associated with the event. |
| * @since 2.21 |
| */ |
| public String getComment() { |
| return comment; |
| } |
| |
| /** |
| * Get new connection retry time in milliseconds the event receiver should wait before attempting to |
| * reconnect after a connection to the SSE event source is lost. |
| * <p> |
| * Contains value of SSE {@code "retry"} field. This field is optional. Method returns {@link SseFeature#RECONNECT_NOT_SET} |
| * if no value has been set. |
| * </p> |
| * |
| * @return reconnection delay in milliseconds or {@link SseFeature#RECONNECT_NOT_SET} if no value has been set. |
| * @since 2.3 |
| */ |
| public long getReconnectDelay() { |
| return reconnectDelay; |
| } |
| |
| /** |
| * Check if the connection retry time has been set in the event. |
| * |
| * @return {@code true} if new reconnection delay has been set in the event, {@code false} otherwise. |
| * @since 2.3 |
| */ |
| public boolean isReconnectDelaySet() { |
| return reconnectDelay > SseFeature.RECONNECT_NOT_SET; |
| } |
| |
| /** |
| * Check if the event is empty (i.e. does not contain any data). |
| * |
| * @return {@code true} if current instance does not contain any data, {@code false} otherwise. |
| */ |
| public boolean isEmpty() { |
| return data.length == 0; |
| } |
| |
| /** |
| * Get the original event data string {@link String}. |
| * |
| * @return event data de-serialized into a string. |
| * @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause. |
| * @since 2.3 |
| */ |
| public String readData() { |
| return readData(STRING_AS_GENERIC_TYPE, null); |
| } |
| |
| /** |
| * Read event data as a given Java type. |
| * |
| * @param type Java type to be used for event data de-serialization. |
| * @return event data de-serialized as an instance of a given type. |
| * @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause. |
| * @since 2.3 |
| */ |
| public <T> T readData(Class<T> type) { |
| return readData(new GenericType<T>(type), null); |
| } |
| |
| /** |
| * Read event data as a given generic type. |
| * |
| * @param type generic type to be used for event data de-serialization. |
| * @return event data de-serialized as an instance of a given type. |
| * @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause. |
| * @since 2.3 |
| */ |
| @SuppressWarnings("unused") |
| public <T> T readData(GenericType<T> type) { |
| return readData(type, null); |
| } |
| |
| /** |
| * Read event data as a given Java type. |
| * |
| * @param messageType Java type to be used for event data de-serialization. |
| * @param mediaType {@link MediaType media type} to be used for event data de-serialization. |
| * @return event data de-serialized as an instance of a given type. |
| * @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause. |
| * @since 2.3 |
| */ |
| @SuppressWarnings("unused") |
| public <T> T readData(Class<T> messageType, MediaType mediaType) { |
| return readData(new GenericType<T>(messageType), mediaType); |
| } |
| |
| /** |
| * Read event data as a given generic type. |
| * |
| * @param type generic type to be used for event data de-serialization. |
| * @param mediaType {@link MediaType media type} to be used for event data de-serialization. |
| * @return event data de-serialized as an instance of a given type. |
| * @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause. |
| * @since 2.3 |
| */ |
| public <T> T readData(GenericType<T> type, MediaType mediaType) { |
| final MediaType effectiveMediaType = mediaType == null ? this.mediaType : mediaType; |
| final MessageBodyReader reader = |
| messageBodyWorkers.getMessageBodyReader(type.getRawType(), type.getType(), annotations, mediaType); |
| if (reader == null) { |
| throw new MessageBodyProviderNotFoundException(LocalizationMessages.EVENT_DATA_READER_NOT_FOUND()); |
| } |
| return readAndCast(type, effectiveMediaType, reader); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private <T> T readAndCast(GenericType<T> type, MediaType effectiveMediaType, MessageBodyReader reader) { |
| try { |
| return (T) reader.readFrom( |
| type.getRawType(), |
| type.getType(), |
| annotations, |
| effectiveMediaType, |
| headers, |
| new ByteArrayInputStream(data)); |
| } catch (IOException ex) { |
| throw new ProcessingException(ex); |
| } |
| } |
| |
| /** |
| * Get the raw event data bytes. |
| * |
| * @return raw event data bytes. The returned byte array may be empty if the event does not |
| * contain any data. |
| */ |
| @SuppressWarnings("unused") |
| public byte[] getRawData() { |
| if (isEmpty()) { |
| return data; |
| } |
| |
| return Arrays.copyOf(data, data.length); |
| } |
| |
| @Override |
| public String toString() { |
| String s; |
| |
| try { |
| s = readData(); |
| } catch (ProcessingException e) { |
| s = "<Error reading data into a string>"; |
| } |
| |
| return "InboundEvent{" |
| + "name='" + name + '\'' |
| + ", id='" + id + '\'' |
| + ", comment=" + (comment == null ? "[no comments]" : '\'' + comment + '\'') |
| + ", data=" + s |
| + '}'; |
| } |
| |
| /** |
| * String last line break from data. (Last line-break should not be considered as part of received data). |
| * |
| * @param data data |
| * @return updated byte array. |
| */ |
| private static byte[] stripLastLineBreak(final byte[] data) { |
| |
| if (data.length > 0 && data[data.length - 1] == '\n') { |
| return Arrays.copyOf(data, data.length - 1); |
| } |
| |
| return data; |
| } |
| } |