Stop filling monitoring queues when processor fails (#4697)
Signed-off-by: Jorge Bescos Gascon <jorge.bescos.gascon@oracle.com>
diff --git a/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringEventListener.java b/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringEventListener.java
index 84abfd6..dd397bf 100644
--- a/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringEventListener.java
+++ b/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringEventListener.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2013, 2018 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -20,14 +20,14 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
-import javax.ws.rs.ProcessingException;
-
import javax.annotation.Priority;
import javax.inject.Inject;
+import javax.ws.rs.ProcessingException;
import org.glassfish.jersey.internal.inject.InjectionManager;
import org.glassfish.jersey.server.internal.LocalizationMessages;
@@ -68,6 +68,8 @@
private final Queue<Integer> responseStatuses = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE);
private final Queue<RequestEvent> exceptionMapperEvents = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE);
private volatile MonitoringStatisticsProcessor monitoringStatisticsProcessor;
+ // By default new events can arrive before MonitoringStatisticsProcessor is running.
+ private final AtomicBoolean processorFailed = new AtomicBoolean(false);
/**
* Time statistics.
@@ -185,6 +187,7 @@
case RELOAD_FINISHED:
case INITIALIZATION_FINISHED:
this.monitoringStatisticsProcessor = new MonitoringStatisticsProcessor(injectionManager, this);
+ processorFailed.set(false);
this.monitoringStatisticsProcessor.startMonitoringWorker();
break;
case DESTROY_FINISHED:
@@ -238,13 +241,13 @@
methodStats = new MethodStats(method, methodTimeStart, now - methodTimeStart);
break;
case EXCEPTION_MAPPING_FINISHED:
- if (!exceptionMapperEvents.offer(event)) {
+ if (!offer(exceptionMapperEvents, event)) {
LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_MAPPER());
}
break;
case FINISHED:
if (event.isResponseWritten()) {
- if (!responseStatuses.offer(event.getContainerResponse().getStatus())) {
+ if (!offer(responseStatuses, event.getContainerResponse().getStatus())) {
LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_RESPONSE());
}
}
@@ -264,8 +267,7 @@
}
sb.setLength(sb.length() - 1);
}
-
- if (!requestQueuedItems.offer(new RequestStats(new TimeStats(requestTimeStart, now - requestTimeStart),
+ if (!offer(requestQueuedItems, new RequestStats(new TimeStats(requestTimeStart, now - requestTimeStart),
methodStats, sb.toString()))) {
LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_REQUEST());
}
@@ -274,6 +276,21 @@
}
}
+ private <T> boolean offer(Queue<T> queue, T event) {
+ if (!processorFailed.get()) {
+ return queue.offer(event);
+ }
+ // Don't need to warn that the event was not queued because an Exception was thrown by MonitoringStatisticsProcessor
+ return true;
+ }
+
+ /**
+ * Invoked by {@link MonitoringStatisticsProcessor} when there is one exception consuming from queues.
+ */
+ void processorFailed() {
+ processorFailed.set(true);
+ }
+
/**
* Get the exception mapper event queue.
*
diff --git a/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringStatisticsProcessor.java b/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringStatisticsProcessor.java
index 8bc4ec8..580c1db 100644
--- a/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringStatisticsProcessor.java
+++ b/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringStatisticsProcessor.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2013, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -35,6 +35,7 @@
import org.glassfish.jersey.server.ExtendedResourceContext;
import org.glassfish.jersey.server.ServerProperties;
import org.glassfish.jersey.server.internal.LocalizationMessages;
+import org.glassfish.jersey.server.internal.monitoring.MonitoringEventListener.RequestStats;
import org.glassfish.jersey.server.model.ResourceMethod;
import org.glassfish.jersey.server.model.ResourceModel;
import org.glassfish.jersey.server.monitoring.MonitoringStatisticsListener;
@@ -94,6 +95,7 @@
processResponseCodeEvents();
processExceptionMapperEvents();
} catch (final Throwable t) {
+ monitoringEventListener.processorFailed();
LOGGER.log(Level.SEVERE, LocalizationMessages.ERROR_MONITORING_STATISTICS_GENERATION(), t);
// rethrowing exception stops further task execution
throw new ProcessingException(LocalizationMessages.ERROR_MONITORING_STATISTICS_GENERATION(), t);
@@ -120,11 +122,9 @@
private void processExceptionMapperEvents() {
final Queue<RequestEvent> eventQueue = monitoringEventListener.getExceptionMapperEvents();
final FloodingLogger floodingLogger = new FloodingLogger(eventQueue);
-
- while (!eventQueue.isEmpty()) {
+ RequestEvent event = null;
+ while ((event = eventQueue.poll()) != null) {
floodingLogger.conditionallyLogFlooding();
-
- final RequestEvent event = eventQueue.remove();
final ExceptionMapperStatisticsImpl.Builder mapperStats = statisticsBuilder.getExceptionMapperStatisticsBuilder();
if (event.getExceptionMapper() != null) {
@@ -138,12 +138,9 @@
private void processRequestItems() {
final Queue<MonitoringEventListener.RequestStats> requestQueuedItems = monitoringEventListener.getRequestQueuedItems();
final FloodingLogger floodingLogger = new FloodingLogger(requestQueuedItems);
-
- while (!requestQueuedItems.isEmpty()) {
+ RequestStats event = null;
+ while ((event = requestQueuedItems.poll()) != null) {
floodingLogger.conditionallyLogFlooding();
-
- final MonitoringEventListener.RequestStats event = requestQueuedItems.remove();
-
final MonitoringEventListener.TimeStats requestStats = event.getRequestStats();
statisticsBuilder.addRequestExecution(requestStats.getStartTime(), requestStats.getDuration());
@@ -160,11 +157,9 @@
private void processResponseCodeEvents() {
final Queue<Integer> responseEvents = monitoringEventListener.getResponseStatuses();
final FloodingLogger floodingLogger = new FloodingLogger(responseEvents);
-
- while (!responseEvents.isEmpty()) {
+ Integer code = null;
+ while ((code = responseEvents.poll()) != null) {
floodingLogger.conditionallyLogFlooding();
-
- final Integer code = responseEvents.remove();
statisticsBuilder.addResponseCode(code);
}
diff --git a/tests/integration/jersey-4697/pom.xml b/tests/integration/jersey-4697/pom.xml
new file mode 100644
index 0000000..27ee6ca
--- /dev/null
+++ b/tests/integration/jersey-4697/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Copyright (c) 2021 Oracle and/or its affiliates. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v. 2.0, which is available at
+ http://www.eclipse.org/legal/epl-2.0.
+
+ This Source Code may also be made available under the following Secondary
+ Licenses when the conditions for such availability set forth in the
+ Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ version 2 with the GNU Classpath Exception, which is available at
+ https://www.gnu.org/software/classpath/license.html.
+
+ SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>project</artifactId>
+ <groupId>org.glassfish.jersey.tests.integration</groupId>
+ <version>2.34-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>jersey-4697</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+ <artifactId>jersey-test-framework-provider-bundle</artifactId>
+ <type>pom</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/tests/integration/jersey-4697/src/test/java/org/glassfish/jersey/tests/integration/jersey4697/MonitoringEventListenerTest.java b/tests/integration/jersey-4697/src/test/java/org/glassfish/jersey/tests/integration/jersey4697/MonitoringEventListenerTest.java
new file mode 100644
index 0000000..578d964
--- /dev/null
+++ b/tests/integration/jersey-4697/src/test/java/org/glassfish/jersey/tests/integration/jersey4697/MonitoringEventListenerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright (c) 2021 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.tests.integration.jersey4697;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Method;
+import java.util.Collection;
+import java.util.Map;
+
+import javax.inject.Inject;
+import javax.management.JMX;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.glassfish.jersey.internal.inject.InjectionManager;
+import org.glassfish.jersey.internal.inject.Providers;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.server.ServerProperties;
+import org.glassfish.jersey.server.internal.monitoring.MonitoringEventListener;
+import org.glassfish.jersey.server.monitoring.ApplicationEventListener;
+import org.glassfish.jersey.server.monitoring.ExceptionMapperMXBean;
+import org.glassfish.jersey.server.monitoring.RequestEvent;
+import org.glassfish.jersey.server.monitoring.RequestEventListener;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MonitoringEventListenerTest extends JerseyTest {
+
+ private static final long TIMEOUT = 500;
+ private static final String MBEAN_EXCEPTION =
+ "org.glassfish.jersey:type=MonitoringEventListenerTest,subType=Global,exceptions=ExceptionMapper";
+
+ @Path("/example")
+ public static class ExampleResource {
+ @Inject
+ private InjectionManager injectionManager;
+ @GET
+ @Path("/error")
+ public Response error() {
+ throw new RuntimeException("Any exception to be counted in ExceptionMapper");
+ }
+ @GET
+ @Path("/poison")
+ public Response poison() {
+ MonitoringEventListener monitoringEventListener = listener();
+ RequestEvent requestEvent = mock(RequestEvent.class);
+ when(requestEvent.getType()).thenReturn(RequestEvent.Type.START);
+ RequestEventListener eventListener = monitoringEventListener.onRequest(requestEvent);
+ RequestEvent poisonEvent = mock(RequestEvent.class);
+ when(poisonEvent.getType()).thenReturn(RequestEvent.Type.EXCEPTION_MAPPING_FINISHED);
+ when(poisonEvent.getExceptionMapper())
+ .thenThrow(new IllegalStateException("This causes the scheduler to stop working"));
+ eventListener.onEvent(poisonEvent);
+ return Response.ok().build();
+ }
+ @GET
+ @Path("/queueSize")
+ public Response queueSize() throws Exception {
+ MonitoringEventListener monitoringEventListener = listener();
+ Method method = MonitoringEventListener.class.getDeclaredMethod("getExceptionMapperEvents");
+ method.setAccessible(true);
+ Collection<?> queue = (Collection<?>) method.invoke(monitoringEventListener);
+ return Response.ok(queue.size()).build();
+ }
+ private MonitoringEventListener listener() {
+ Iterable<ApplicationEventListener> listeners =
+ Providers.getAllProviders(injectionManager, ApplicationEventListener.class);
+ for (ApplicationEventListener listener : listeners) {
+ if (listener instanceof MonitoringEventListener) {
+ return (MonitoringEventListener) listener;
+ }
+ }
+ throw new IllegalStateException("MonitoringEventListener was not found");
+ }
+ }
+
+ @Provider
+ public static class RuntimeExceptionMapper implements ExceptionMapper<RuntimeException> {
+ @Override
+ public Response toResponse(RuntimeException e) {
+ return Response.status(500).entity("RuntimeExceptionMapper: " + e.getMessage()).build();
+ }
+ }
+
+ @Override
+ protected Application configure() {
+ ResourceConfig resourceConfig = new ResourceConfig(ExampleResource.class);
+ // Need to map the exception to be counted by ExceptionMapper
+ resourceConfig.register(RuntimeExceptionMapper.class);
+ resourceConfig.property(ServerProperties.MONITORING_ENABLED, true);
+ resourceConfig.property(ServerProperties.MONITORING_STATISTICS_ENABLED, true);
+ resourceConfig.property(ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED, true);
+ resourceConfig.property(ServerProperties.MONITORING_STATISTICS_REFRESH_INTERVAL, 1);
+ resourceConfig.setApplicationName("MonitoringEventListenerTest");
+ return resourceConfig;
+ }
+
+ @Test
+ public void exceptionInScheduler() throws Exception {
+ final Long ERRORS_BEFORE_FAIL = 10L;
+ // Send some requests to process some statistics.
+ request(ERRORS_BEFORE_FAIL);
+ // Give some time to the scheduler to collect data.
+ Thread.sleep(TIMEOUT);
+ // All events were consumed by scheduler
+ queueIsEmpty();
+ // Make the scheduler to fail. No more statistics are collected.
+ makeFailure();
+ // Sending again requests
+ request(20);
+ Thread.sleep(TIMEOUT);
+ // No new events should be accepted because scheduler is not working.
+ queueIsEmpty();
+ Long monitoredErrors = mappedErrorsFromJMX(MBEAN_EXCEPTION);
+ assertEquals(ERRORS_BEFORE_FAIL, monitoredErrors);
+ }
+
+ private void makeFailure() {
+ Response response = target("/example/poison").request().get();
+ assertEquals(200, response.getStatus());
+ }
+
+ private void queueIsEmpty() {
+ Response response = target("/example/queueSize").request().get();
+ assertEquals(200, response.getStatus());
+ assertEquals(Integer.valueOf(0), response.readEntity(Integer.class));
+ }
+
+ private Long mappedErrorsFromJMX(String name) throws Exception {
+ Long monitoredErrors = null;
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName objectName = new ObjectName(name);
+ ExceptionMapperMXBean bean = JMX.newMBeanProxy(mbs, objectName, ExceptionMapperMXBean.class);
+ Map<?, ?> counter = bean.getExceptionMapperCount();
+ CompositeDataSupport value = (CompositeDataSupport) counter.entrySet().iterator().next().getValue();
+ for (Object obj : value.values()) {
+ if (obj instanceof Long) {
+ // Messy way to get the errors, but generic types doesn't match and there is no nice way
+ monitoredErrors = (Long) obj;
+ break;
+ }
+ }
+ return monitoredErrors;
+ }
+
+ private void request(long requests) {
+ for (long i = 0; i < requests; i++) {
+ Response response = target("/example/error").request().get();
+ assertEquals(500, response.getStatus());
+ }
+ }
+}
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 0b5c2b3..6398e81 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -88,6 +88,7 @@
<module>jersey-4321</module>
<module>jersey-4507</module>
<module>jersey-4542</module>
+ <module>jersey-4697</module>
<module>jersey-4722</module>
<module>jetty-response-close</module>
<module>microprofile</module>