blob: 12576de16812a66d958b6baa5b09ff47b390f63e [file] [log] [blame]
/*
* Copyright (c) 2012, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.jersey.media.sse;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.util.Arrays;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.sse.InboundSseEvent;
import org.glassfish.jersey.message.MessageBodyWorkers;
import org.glassfish.jersey.message.internal.MessageBodyProviderNotFoundException;
/**
* Inbound event.
*
* @author Pavel Bucek
* @author Marek Potociar
*/
public class InboundEvent implements InboundSseEvent {
private static final GenericType<String> STRING_AS_GENERIC_TYPE = new GenericType<>(String.class);
private final String name;
private final String id;
private final String comment;
private final byte[] data;
private final long reconnectDelay;
private final MessageBodyWorkers messageBodyWorkers;
private final Annotation[] annotations;
private final MediaType mediaType;
private final MultivaluedMap<String, String> headers;
/**
* Inbound event builder. This implementation is not thread-safe.
*/
static class Builder {
private String name;
private String id;
private long reconnectDelay = SseFeature.RECONNECT_NOT_SET;
private final ByteArrayOutputStream dataStream;
private final MessageBodyWorkers workers;
private final Annotation[] annotations;
private final MediaType mediaType;
private final MultivaluedMap<String, String> headers;
private final StringBuilder commentBuilder;
/**
* Create new inbound event builder.
*
* @param workers configured client-side {@link MessageBodyWorkers entity providers} used for
* {@link javax.ws.rs.ext.MessageBodyReader} lookup.
* @param annotations annotations attached to the Java type to be read. Used for
* {@link javax.ws.rs.ext.MessageBodyReader} lookup.
* @param mediaType media type of the SSE event data.
* Used for {@link javax.ws.rs.ext.MessageBodyReader} lookup.
* @param headers response headers. Used for {@link javax.ws.rs.ext.MessageBodyWriter} lookup.
*/
public Builder(MessageBodyWorkers workers,
Annotation[] annotations,
MediaType mediaType,
MultivaluedMap<String, String> headers) {
this.workers = workers;
this.annotations = annotations;
this.mediaType = mediaType;
this.headers = headers;
this.commentBuilder = new StringBuilder();
this.dataStream = new ByteArrayOutputStream();
}
/**
* Set inbound event name.
* <p/>
* Value of the received SSE {@code "event"} field.
*
* @param name {@code "event"} field value.
* @return updated builder instance.
*/
public Builder name(String name) {
this.name = name;
return this;
}
/**
* Set inbound event identifier.
* <p/>
* Value of the received SSE {@code "id"} field.
*
* @param id {@code "id"} field value.
* @return updated builder instance.
*/
public Builder id(String id) {
this.id = id;
return this;
}
/**
* Add a comment line to the event.
* <p>
* The comment line will be added to the received SSE event comment as a new line in the comment field.
* If the comment line parameter is {@code null}, the call will be ignored.
* </p>
*
* @param commentLine comment line to be added to the event comment.
* @return updated builder instance.
* @since 2.21
*/
public Builder commentLine(final CharSequence commentLine) {
if (commentLine != null) {
commentBuilder.append(commentLine).append('\n');
}
return this;
}
/**
* Set reconnection delay (in milliseconds) that indicates how long the event receiver should wait
* before attempting to reconnect in case a connection to SSE event source is lost.
* <p>
* Value of the received SSE {@code "retry"} field.
* </p>
*
* @param milliseconds reconnection delay in milliseconds. Negative values un-set the reconnection delay.
* @return updated builder instance.
* @since 2.3
*/
public Builder reconnectDelay(long milliseconds) {
if (milliseconds < 0) {
milliseconds = SseFeature.RECONNECT_NOT_SET;
}
this.reconnectDelay = milliseconds;
return this;
}
/**
* Add more inbound event data.
*
* @param data byte array containing data stored in the incoming event.
* @return updated builder instance.
*/
public Builder write(byte[] data) {
if (data == null || data.length == 0) {
return this;
}
try {
this.dataStream.write(data);
} catch (IOException ex) {
// ignore - this is not possible with ByteArrayOutputStream
}
return this;
}
/**
* Build a new inbound event instance using the supplied data.
*
* @return new inbound event instance.
*/
public InboundEvent build() {
return new InboundEvent(
name,
id,
commentBuilder.length() > 0 ? commentBuilder.substring(0, commentBuilder.length() - 1) : null,
reconnectDelay,
dataStream.toByteArray(),
workers,
annotations,
mediaType,
headers);
}
}
private InboundEvent(final String name,
final String id,
final String comment,
final long reconnectDelay,
final byte[] data,
final MessageBodyWorkers messageBodyWorkers,
final Annotation[] annotations,
final MediaType mediaType,
final MultivaluedMap<String, String> headers) {
this.name = name;
this.id = id;
this.comment = comment;
this.reconnectDelay = reconnectDelay;
this.data = stripLastLineBreak(data);
this.messageBodyWorkers = messageBodyWorkers;
this.annotations = annotations;
this.mediaType = mediaType;
this.headers = headers;
}
/**
* Get event name.
* <p>
* Contains value of SSE {@code "event"} field. This field is optional. Method may return {@code null}, if the event
* name is not specified.
* </p>
*
* @return event name, or {@code null} if not set.
*/
public String getName() {
return name;
}
/**
* Get event identifier.
* <p>
* Contains value of SSE {@code "id"} field. This field is optional. Method may return {@code null}, if the event
* identifier is not specified.
* </p>
*
* @return event id.
* @since 2.3
*/
public String getId() {
return id;
}
/**
* Get a comment string that accompanies the event.
* <p>
* Contains value of the comment associated with SSE event. This field is optional. Method may return {@code null},
* if the event comment is not specified.
* </p>
*
* @return comment associated with the event.
* @since 2.21
*/
public String getComment() {
return comment;
}
/**
* Get new connection retry time in milliseconds the event receiver should wait before attempting to
* reconnect after a connection to the SSE event source is lost.
* <p>
* Contains value of SSE {@code "retry"} field. This field is optional. Method returns {@link SseFeature#RECONNECT_NOT_SET}
* if no value has been set.
* </p>
*
* @return reconnection delay in milliseconds or {@link SseFeature#RECONNECT_NOT_SET} if no value has been set.
* @since 2.3
*/
public long getReconnectDelay() {
return reconnectDelay;
}
/**
* Check if the connection retry time has been set in the event.
*
* @return {@code true} if new reconnection delay has been set in the event, {@code false} otherwise.
* @since 2.3
*/
public boolean isReconnectDelaySet() {
return reconnectDelay > SseFeature.RECONNECT_NOT_SET;
}
/**
* Check if the event is empty (i.e. does not contain any data).
*
* @return {@code true} if current instance does not contain any data, {@code false} otherwise.
*/
public boolean isEmpty() {
return data.length == 0;
}
/**
* Get the original event data string {@link String}.
*
* @return event data de-serialized into a string.
* @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause.
* @since 2.3
*/
public String readData() {
return readData(STRING_AS_GENERIC_TYPE, null);
}
/**
* Read event data as a given Java type.
*
* @param type Java type to be used for event data de-serialization.
* @return event data de-serialized as an instance of a given type.
* @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause.
* @since 2.3
*/
public <T> T readData(Class<T> type) {
return readData(new GenericType<T>(type), null);
}
/**
* Read event data as a given generic type.
*
* @param type generic type to be used for event data de-serialization.
* @return event data de-serialized as an instance of a given type.
* @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause.
* @since 2.3
*/
@SuppressWarnings("unused")
public <T> T readData(GenericType<T> type) {
return readData(type, null);
}
/**
* Read event data as a given Java type.
*
* @param messageType Java type to be used for event data de-serialization.
* @param mediaType {@link MediaType media type} to be used for event data de-serialization.
* @return event data de-serialized as an instance of a given type.
* @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause.
* @since 2.3
*/
@SuppressWarnings("unused")
public <T> T readData(Class<T> messageType, MediaType mediaType) {
return readData(new GenericType<T>(messageType), mediaType);
}
/**
* Read event data as a given generic type.
*
* @param type generic type to be used for event data de-serialization.
* @param mediaType {@link MediaType media type} to be used for event data de-serialization.
* @return event data de-serialized as an instance of a given type.
* @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause.
* @since 2.3
*/
public <T> T readData(GenericType<T> type, MediaType mediaType) {
final MediaType effectiveMediaType = mediaType == null ? this.mediaType : mediaType;
final MessageBodyReader reader =
messageBodyWorkers.getMessageBodyReader(type.getRawType(), type.getType(), annotations, mediaType);
if (reader == null) {
throw new MessageBodyProviderNotFoundException(LocalizationMessages.EVENT_DATA_READER_NOT_FOUND());
}
return readAndCast(type, effectiveMediaType, reader);
}
@SuppressWarnings("unchecked")
private <T> T readAndCast(GenericType<T> type, MediaType effectiveMediaType, MessageBodyReader reader) {
try {
return (T) reader.readFrom(
type.getRawType(),
type.getType(),
annotations,
effectiveMediaType,
headers,
new ByteArrayInputStream(data));
} catch (IOException ex) {
throw new ProcessingException(ex);
}
}
/**
* Get the raw event data bytes.
*
* @return raw event data bytes. The returned byte array may be empty if the event does not
* contain any data.
*/
@SuppressWarnings("unused")
public byte[] getRawData() {
if (isEmpty()) {
return data;
}
return Arrays.copyOf(data, data.length);
}
@Override
public String toString() {
String s;
try {
s = readData();
} catch (ProcessingException e) {
s = "<Error reading data into a string>";
}
return "InboundEvent{"
+ "name='" + name + '\''
+ ", id='" + id + '\''
+ ", comment=" + (comment == null ? "[no comments]" : '\'' + comment + '\'')
+ ", data=" + s
+ '}';
}
/**
* String last line break from data. (Last line-break should not be considered as part of received data).
*
* @param data data
* @return updated byte array.
*/
private static byte[] stripLastLineBreak(final byte[] data) {
if (data.length > 0 && data[data.length - 1] == '\n') {
return Arrays.copyOf(data, data.length - 1);
}
return data;
}
}