Stop filling monitoring queues when processor fails (#4697)

Signed-off-by: Jorge Bescos Gascon <jorge.bescos.gascon@oracle.com>
diff --git a/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringEventListener.java b/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringEventListener.java
index 84abfd6..dd397bf 100644
--- a/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringEventListener.java
+++ b/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringEventListener.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013, 2018 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2021 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -20,14 +20,14 @@
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.stream.Collectors;
 
-import javax.ws.rs.ProcessingException;
-
 import javax.annotation.Priority;
 import javax.inject.Inject;
+import javax.ws.rs.ProcessingException;
 
 import org.glassfish.jersey.internal.inject.InjectionManager;
 import org.glassfish.jersey.server.internal.LocalizationMessages;
@@ -68,6 +68,8 @@
     private final Queue<Integer> responseStatuses = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE);
     private final Queue<RequestEvent> exceptionMapperEvents = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE);
     private volatile MonitoringStatisticsProcessor monitoringStatisticsProcessor;
+    // By default new events can arrive before MonitoringStatisticsProcessor is running.
+    private final AtomicBoolean processorFailed = new AtomicBoolean(false);
 
     /**
      * Time statistics.
@@ -185,6 +187,7 @@
             case RELOAD_FINISHED:
             case INITIALIZATION_FINISHED:
                 this.monitoringStatisticsProcessor = new MonitoringStatisticsProcessor(injectionManager, this);
+                processorFailed.set(false);
                 this.monitoringStatisticsProcessor.startMonitoringWorker();
                 break;
             case DESTROY_FINISHED:
@@ -238,13 +241,13 @@
                     methodStats = new MethodStats(method, methodTimeStart, now - methodTimeStart);
                     break;
                 case EXCEPTION_MAPPING_FINISHED:
-                    if (!exceptionMapperEvents.offer(event)) {
+                    if (!offer(exceptionMapperEvents, event)) {
                         LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_MAPPER());
                     }
                     break;
                 case FINISHED:
                     if (event.isResponseWritten()) {
-                        if (!responseStatuses.offer(event.getContainerResponse().getStatus())) {
+                        if (!offer(responseStatuses, event.getContainerResponse().getStatus())) {
                             LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_RESPONSE());
                         }
                     }
@@ -264,8 +267,7 @@
                         }
                         sb.setLength(sb.length() - 1);
                     }
-
-                    if (!requestQueuedItems.offer(new RequestStats(new TimeStats(requestTimeStart, now - requestTimeStart),
+                    if (!offer(requestQueuedItems, new RequestStats(new TimeStats(requestTimeStart, now - requestTimeStart),
                             methodStats, sb.toString()))) {
                         LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_REQUEST());
                     }
@@ -274,6 +276,21 @@
         }
     }
 
+    private <T> boolean offer(Queue<T> queue, T event) {
+        if (!processorFailed.get()) {
+            return queue.offer(event);
+        }
+        // Don't need to warn that the event was not queued because an Exception was thrown by MonitoringStatisticsProcessor
+        return true;
+    }
+
+    /**
+     * Invoked by {@link MonitoringStatisticsProcessor} when there is one exception consuming from queues.
+     */
+    void processorFailed() {
+        processorFailed.set(true);
+    }
+
     /**
      * Get the exception mapper event queue.
      *
diff --git a/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringStatisticsProcessor.java b/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringStatisticsProcessor.java
index 8bc4ec8..580c1db 100644
--- a/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringStatisticsProcessor.java
+++ b/core-server/src/main/java/org/glassfish/jersey/server/internal/monitoring/MonitoringStatisticsProcessor.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2013, 2021 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -35,6 +35,7 @@
 import org.glassfish.jersey.server.ExtendedResourceContext;
 import org.glassfish.jersey.server.ServerProperties;
 import org.glassfish.jersey.server.internal.LocalizationMessages;
+import org.glassfish.jersey.server.internal.monitoring.MonitoringEventListener.RequestStats;
 import org.glassfish.jersey.server.model.ResourceMethod;
 import org.glassfish.jersey.server.model.ResourceModel;
 import org.glassfish.jersey.server.monitoring.MonitoringStatisticsListener;
@@ -94,6 +95,7 @@
                     processResponseCodeEvents();
                     processExceptionMapperEvents();
                 } catch (final Throwable t) {
+                    monitoringEventListener.processorFailed();
                     LOGGER.log(Level.SEVERE, LocalizationMessages.ERROR_MONITORING_STATISTICS_GENERATION(), t);
                     // rethrowing exception stops further task execution
                     throw new ProcessingException(LocalizationMessages.ERROR_MONITORING_STATISTICS_GENERATION(), t);
@@ -120,11 +122,9 @@
     private void processExceptionMapperEvents() {
         final Queue<RequestEvent> eventQueue = monitoringEventListener.getExceptionMapperEvents();
         final FloodingLogger floodingLogger = new FloodingLogger(eventQueue);
-
-        while (!eventQueue.isEmpty()) {
+        RequestEvent event = null;
+        while ((event = eventQueue.poll()) != null) {
             floodingLogger.conditionallyLogFlooding();
-
-            final RequestEvent event = eventQueue.remove();
             final ExceptionMapperStatisticsImpl.Builder mapperStats = statisticsBuilder.getExceptionMapperStatisticsBuilder();
 
             if (event.getExceptionMapper() != null) {
@@ -138,12 +138,9 @@
     private void processRequestItems() {
         final Queue<MonitoringEventListener.RequestStats> requestQueuedItems = monitoringEventListener.getRequestQueuedItems();
         final FloodingLogger floodingLogger = new FloodingLogger(requestQueuedItems);
-
-        while (!requestQueuedItems.isEmpty()) {
+        RequestStats event = null;
+        while ((event = requestQueuedItems.poll()) != null) {
             floodingLogger.conditionallyLogFlooding();
-
-            final MonitoringEventListener.RequestStats event = requestQueuedItems.remove();
-
             final MonitoringEventListener.TimeStats requestStats = event.getRequestStats();
             statisticsBuilder.addRequestExecution(requestStats.getStartTime(), requestStats.getDuration());
 
@@ -160,11 +157,9 @@
     private void processResponseCodeEvents() {
         final Queue<Integer> responseEvents = monitoringEventListener.getResponseStatuses();
         final FloodingLogger floodingLogger = new FloodingLogger(responseEvents);
-
-        while (!responseEvents.isEmpty()) {
+        Integer code = null;
+        while ((code = responseEvents.poll()) != null) {
             floodingLogger.conditionallyLogFlooding();
-
-            final Integer code = responseEvents.remove();
             statisticsBuilder.addResponseCode(code);
         }
 
diff --git a/tests/integration/jersey-4697/pom.xml b/tests/integration/jersey-4697/pom.xml
new file mode 100644
index 0000000..27ee6ca
--- /dev/null
+++ b/tests/integration/jersey-4697/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Copyright (c) 2021 Oracle and/or its affiliates. All rights reserved.
+
+    This program and the accompanying materials are made available under the
+    terms of the Eclipse Public License v. 2.0, which is available at
+    http://www.eclipse.org/legal/epl-2.0.
+
+    This Source Code may also be made available under the following Secondary
+    Licenses when the conditions for such availability set forth in the
+    Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+    version 2 with the GNU Classpath Exception, which is available at
+    https://www.gnu.org/software/classpath/license.html.
+
+    SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>project</artifactId>
+        <groupId>org.glassfish.jersey.tests.integration</groupId>
+        <version>2.34-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>jersey-4697</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+            <artifactId>jersey-test-framework-provider-bundle</artifactId>
+            <type>pom</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/tests/integration/jersey-4697/src/test/java/org/glassfish/jersey/tests/integration/jersey4697/MonitoringEventListenerTest.java b/tests/integration/jersey-4697/src/test/java/org/glassfish/jersey/tests/integration/jersey4697/MonitoringEventListenerTest.java
new file mode 100644
index 0000000..578d964
--- /dev/null
+++ b/tests/integration/jersey-4697/src/test/java/org/glassfish/jersey/tests/integration/jersey4697/MonitoringEventListenerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright (c) 2021 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.tests.integration.jersey4697;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Method;
+import java.util.Collection;
+import java.util.Map;
+
+import javax.inject.Inject;
+import javax.management.JMX;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.glassfish.jersey.internal.inject.InjectionManager;
+import org.glassfish.jersey.internal.inject.Providers;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.server.ServerProperties;
+import org.glassfish.jersey.server.internal.monitoring.MonitoringEventListener;
+import org.glassfish.jersey.server.monitoring.ApplicationEventListener;
+import org.glassfish.jersey.server.monitoring.ExceptionMapperMXBean;
+import org.glassfish.jersey.server.monitoring.RequestEvent;
+import org.glassfish.jersey.server.monitoring.RequestEventListener;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MonitoringEventListenerTest extends JerseyTest {
+
+    private static final long TIMEOUT = 500;
+    private static final String MBEAN_EXCEPTION =
+            "org.glassfish.jersey:type=MonitoringEventListenerTest,subType=Global,exceptions=ExceptionMapper";
+
+    @Path("/example")
+    public static class ExampleResource {
+        @Inject
+        private InjectionManager injectionManager;
+        @GET
+        @Path("/error")
+        public Response error() {
+            throw new RuntimeException("Any exception to be counted in ExceptionMapper");
+        }
+        @GET
+        @Path("/poison")
+        public Response poison() {
+            MonitoringEventListener monitoringEventListener = listener();
+            RequestEvent requestEvent = mock(RequestEvent.class);
+            when(requestEvent.getType()).thenReturn(RequestEvent.Type.START);
+            RequestEventListener eventListener = monitoringEventListener.onRequest(requestEvent);
+            RequestEvent poisonEvent = mock(RequestEvent.class);
+            when(poisonEvent.getType()).thenReturn(RequestEvent.Type.EXCEPTION_MAPPING_FINISHED);
+            when(poisonEvent.getExceptionMapper())
+                .thenThrow(new IllegalStateException("This causes the scheduler to stop working"));
+            eventListener.onEvent(poisonEvent);
+            return Response.ok().build();
+        }
+        @GET
+        @Path("/queueSize")
+        public Response queueSize() throws Exception {
+            MonitoringEventListener monitoringEventListener = listener();
+            Method method = MonitoringEventListener.class.getDeclaredMethod("getExceptionMapperEvents");
+            method.setAccessible(true);
+            Collection<?> queue = (Collection<?>) method.invoke(monitoringEventListener);
+            return Response.ok(queue.size()).build();
+        }
+        private MonitoringEventListener listener() {
+            Iterable<ApplicationEventListener> listeners =
+                    Providers.getAllProviders(injectionManager, ApplicationEventListener.class);
+            for (ApplicationEventListener listener : listeners) {
+                if (listener instanceof MonitoringEventListener) {
+                    return (MonitoringEventListener) listener;
+                }
+            }
+            throw new IllegalStateException("MonitoringEventListener was not found");
+        }
+    }
+
+    @Provider
+    public static class RuntimeExceptionMapper implements ExceptionMapper<RuntimeException> {
+        @Override
+        public Response toResponse(RuntimeException e) {
+            return Response.status(500).entity("RuntimeExceptionMapper: " + e.getMessage()).build();
+        }
+    }
+
+    @Override
+    protected Application configure() {
+        ResourceConfig resourceConfig = new ResourceConfig(ExampleResource.class);
+        // Need to map the exception to be counted by ExceptionMapper
+        resourceConfig.register(RuntimeExceptionMapper.class);
+        resourceConfig.property(ServerProperties.MONITORING_ENABLED, true);
+        resourceConfig.property(ServerProperties.MONITORING_STATISTICS_ENABLED, true);
+        resourceConfig.property(ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED, true);
+        resourceConfig.property(ServerProperties.MONITORING_STATISTICS_REFRESH_INTERVAL, 1);
+        resourceConfig.setApplicationName("MonitoringEventListenerTest");
+        return resourceConfig;
+    }
+
+    @Test
+    public void exceptionInScheduler() throws Exception {
+        final Long ERRORS_BEFORE_FAIL = 10L;
+        // Send some requests to process some statistics.
+        request(ERRORS_BEFORE_FAIL);
+        // Give some time to the scheduler to collect data.
+        Thread.sleep(TIMEOUT);
+        // All events were consumed by scheduler
+        queueIsEmpty();
+        // Make the scheduler to fail. No more statistics are collected.
+        makeFailure();
+        // Sending again requests
+        request(20);
+        Thread.sleep(TIMEOUT);
+        // No new events should be accepted because scheduler is not working.
+        queueIsEmpty();
+        Long monitoredErrors = mappedErrorsFromJMX(MBEAN_EXCEPTION);
+        assertEquals(ERRORS_BEFORE_FAIL, monitoredErrors);
+    }
+
+    private void makeFailure() {
+        Response response = target("/example/poison").request().get();
+        assertEquals(200, response.getStatus());
+    }
+
+    private void queueIsEmpty() {
+        Response response = target("/example/queueSize").request().get();
+        assertEquals(200, response.getStatus());
+        assertEquals(Integer.valueOf(0), response.readEntity(Integer.class));
+    }
+
+    private Long mappedErrorsFromJMX(String name) throws Exception {
+        Long monitoredErrors = null;
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        ObjectName objectName = new ObjectName(name);
+        ExceptionMapperMXBean bean = JMX.newMBeanProxy(mbs, objectName, ExceptionMapperMXBean.class);
+        Map<?, ?> counter = bean.getExceptionMapperCount();
+        CompositeDataSupport value = (CompositeDataSupport) counter.entrySet().iterator().next().getValue();
+        for (Object obj : value.values()) {
+            if (obj instanceof Long) {
+                // Messy way to get the errors, but generic types doesn't match and there is no nice way
+                monitoredErrors = (Long) obj;
+                break;
+            }
+        }
+        return monitoredErrors;
+    }
+
+    private void request(long requests) {
+        for (long i = 0; i < requests; i++) {
+            Response response = target("/example/error").request().get();
+            assertEquals(500, response.getStatus());
+        }
+    }
+}
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 0b5c2b3..6398e81 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -88,6 +88,7 @@
         <module>jersey-4321</module>
         <module>jersey-4507</module>
         <module>jersey-4542</module>
+        <module>jersey-4697</module>
         <module>jersey-4722</module>
         <module>jetty-response-close</module>
         <module>microprofile</module>