Fix intermittent premature ClientRuntime finalization (#4508)

* Fix intermittent premature ClientRuntime finalization

Signed-off-by: jansupol <jan.supol@oracle.com>
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/ClientConfig.java b/core-client/src/main/java/org/glassfish/jersey/client/ClientConfig.java
index da8052b..d444788 100644
--- a/core-client/src/main/java/org/glassfish/jersey/client/ClientConfig.java
+++ b/core-client/src/main/java/org/glassfish/jersey/client/ClientConfig.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2020 Oracle and/or its affiliates. All rights reserved.
  * Copyright (c) 2018 Payara Foundation and/or its affiliates.
  *
  * This program and the accompanying materials are made available under the
@@ -414,12 +414,17 @@
 
             BootstrapBag bootstrapBag = new ClientBootstrapBag();
             bootstrapBag.setManagedObjectsFinalizer(new ManagedObjectsFinalizer(injectionManager));
-            List<BootstrapConfigurator> bootstrapConfigurators = Arrays.asList(new RequestScope.RequestScopeConfigurator(),
+
+            final ClientMessageBodyFactory.MessageBodyWorkersConfigurator messageBodyWorkersConfigurator =
+                    new ClientMessageBodyFactory.MessageBodyWorkersConfigurator();
+
+            List<BootstrapConfigurator> bootstrapConfigurators = Arrays.asList(
+                    new RequestScope.RequestScopeConfigurator(),
                     new ParamConverterConfigurator(),
                     new ParameterUpdaterConfigurator(),
                     new RuntimeConfigConfigurator(runtimeCfgState),
                     new ContextResolverFactory.ContextResolversConfigurator(),
-                    new MessageBodyFactory.MessageBodyWorkersConfigurator(),
+                    messageBodyWorkersConfigurator,
                     new ExceptionMapperFactory.ExceptionMappersConfigurator(),
                     new JaxrsProviders.ProvidersConfigurator(),
                     new AutoDiscoverableConfigurator(RuntimeType.CLIENT));
@@ -455,6 +460,8 @@
             final ClientRuntime crt = new ClientRuntime(configuration, connector, injectionManager, bootstrapBag);
 
             client.registerShutdownHook(crt);
+            messageBodyWorkersConfigurator.setClientRuntime(crt);
+
             return crt;
         }
 
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/ClientMessageBodyFactory.java b/core-client/src/main/java/org/glassfish/jersey/client/ClientMessageBodyFactory.java
new file mode 100644
index 0000000..e05cb5f
--- /dev/null
+++ b/core-client/src/main/java/org/glassfish/jersey/client/ClientMessageBodyFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.client;
+
+import org.glassfish.jersey.internal.BootstrapBag;
+import org.glassfish.jersey.internal.BootstrapConfigurator;
+import org.glassfish.jersey.internal.inject.Bindings;
+import org.glassfish.jersey.internal.inject.InjectionManager;
+import org.glassfish.jersey.internal.inject.InstanceBinding;
+import org.glassfish.jersey.internal.util.collection.LazyValue;
+import org.glassfish.jersey.internal.util.collection.Value;
+import org.glassfish.jersey.internal.util.collection.Values;
+import org.glassfish.jersey.message.MessageBodyWorkers;
+import org.glassfish.jersey.message.internal.MessageBodyFactory;
+
+import javax.ws.rs.core.Configuration;
+
+class ClientMessageBodyFactory extends MessageBodyFactory {
+
+    /**
+     * Keep reference to {@link ClientRuntime} so that {@code finalize} on it is not called
+     * before the {@link MessageBodyFactory} is used.
+     * <p>
+     * Some entity types {@code @Inject} {@code MessageBodyFactory} for their {@code read} methods,
+     * but if the finalizer is invoked before that, the HK2 injection manager gets closed.
+     * </p>
+     */
+    private final LazyValue<ClientRuntime> clientRuntime;
+
+    /**
+     * Create a new message body factory.
+     *
+     * @param configuration configuration. Optional - can be null.
+     * @param clientRuntimeValue - a reference to ClientRuntime.
+     */
+    private ClientMessageBodyFactory(Configuration configuration, Value<ClientRuntime> clientRuntimeValue) {
+        super(configuration);
+        clientRuntime = Values.lazy(clientRuntimeValue);
+    }
+
+    /**
+     * Configurator which initializes and register {@link MessageBodyWorkers} instance into {@link InjectionManager} and
+     * {@link BootstrapBag}.
+     */
+    static class MessageBodyWorkersConfigurator implements BootstrapConfigurator {
+
+        private ClientMessageBodyFactory messageBodyFactory;
+        private ClientRuntime clientRuntime;
+
+        @Override
+        public void init(InjectionManager injectionManager, BootstrapBag bootstrapBag) {
+            messageBodyFactory = new ClientMessageBodyFactory(bootstrapBag.getConfiguration(), () -> clientRuntime);
+            InstanceBinding<ClientMessageBodyFactory> binding =
+                    Bindings.service(messageBodyFactory)
+                            .to(MessageBodyWorkers.class);
+            injectionManager.register(binding);
+        }
+
+        @Override
+        public void postInit(InjectionManager injectionManager, BootstrapBag bootstrapBag) {
+            messageBodyFactory.initialize(injectionManager);
+            bootstrapBag.setMessageBodyWorkers(messageBodyFactory);
+        }
+
+        void setClientRuntime(ClientRuntime clientRuntime) {
+            this.clientRuntime = clientRuntime;
+        }
+    }
+
+    ClientRuntime getClientRuntime() {
+        return clientRuntime.get();
+    }
+}
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/InboundJaxrsResponse.java b/core-client/src/main/java/org/glassfish/jersey/client/InboundJaxrsResponse.java
index 4d6fe52..fd5ebfa 100644
--- a/core-client/src/main/java/org/glassfish/jersey/client/InboundJaxrsResponse.java
+++ b/core-client/src/main/java/org/glassfish/jersey/client/InboundJaxrsResponse.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2019 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2020 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -22,6 +22,7 @@
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.ws.rs.ProcessingException;
 import javax.ws.rs.core.EntityTag;
@@ -51,6 +52,7 @@
     private final ClientResponse context;
     private final RequestScope scope;
     private final RequestContext requestContext;
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
     /**
      * Create new scoped client response.
@@ -139,11 +141,13 @@
 
     @Override
     public void close() throws ProcessingException {
-        try {
-            context.close();
-        } finally {
-            if (requestContext != null) {
-                requestContext.release();
+        if (isClosed.compareAndSet(false, true)) {
+            try {
+                context.close();
+            } finally {
+                if (requestContext != null) {
+                    requestContext.release();
+                }
             }
         }
     }
diff --git a/tests/integration/jersey-4507/pom.xml b/tests/integration/jersey-4507/pom.xml
new file mode 100644
index 0000000..4d5352c
--- /dev/null
+++ b/tests/integration/jersey-4507/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+
+    This program and the accompanying materials are made available under the
+    terms of the Eclipse Public License v. 2.0, which is available at
+    http://www.eclipse.org/legal/epl-2.0.
+
+    This Source Code may also be made available under the following Secondary
+    Licenses when the conditions for such availability set forth in the
+    Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+    version 2 with the GNU Classpath Exception, which is available at
+    https://www.gnu.org/software/classpath/license.html.
+
+    SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>project</artifactId>
+        <groupId>org.glassfish.jersey.tests.integration</groupId>
+        <version>2.32.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>jersey-4507</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+            <artifactId>jersey-test-framework-provider-bundle</artifactId>
+            <type>pom</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.test-framework</groupId>
+            <artifactId>jersey-test-framework-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.examples</groupId>
+            <artifactId>server-sent-events-jersey</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/tests/integration/jersey-4507/src/test/java/org/glassfish/jersey/tests/integration/jersey4507/SSETest.java b/tests/integration/jersey-4507/src/test/java/org/glassfish/jersey/tests/integration/jersey4507/SSETest.java
new file mode 100644
index 0000000..b02eda5
--- /dev/null
+++ b/tests/integration/jersey-4507/src/test/java/org/glassfish/jersey/tests/integration/jersey4507/SSETest.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.tests.integration.jersey4507;
+
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientLifecycleListener;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.examples.sse.jersey.App;
+import org.glassfish.jersey.examples.sse.jersey.DomainResource;
+import org.glassfish.jersey.examples.sse.jersey.ServerSentEventsResource;
+import org.glassfish.jersey.media.sse.EventInput;
+import org.glassfish.jersey.media.sse.InboundEvent;
+import org.glassfish.jersey.media.sse.SseFeature;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.Application;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+public class SSETest extends JerseyTest {
+    private static final int MAX_CLIENTS = 10;
+    private static final int COUNT = 30;
+    private static final AtomicInteger atomicInteger = new AtomicInteger(0);
+    private static final CountDownLatch closeLatch = new CountDownLatch(COUNT);
+
+    @Override
+    protected Application configure() {
+        // enable(TestProperties.LOG_TRAFFIC);
+        return new ResourceConfig(ServerSentEventsResource.class, DomainResource.class, SseFeature.class);
+    }
+
+    @Override
+    protected void configureClient(ClientConfig config) {
+        config.property(ClientProperties.ASYNC_THREADPOOL_SIZE, MAX_CLIENTS + 2);
+        config.register(new ClientRuntimeCloseVerifier());
+    }
+
+    /**
+     * Test consuming multiple SSE events sequentially using event input.
+     *
+     * @throws Exception in case of a failure during the test execution.
+     */
+    public void testInboundEventReader() throws Exception {
+        final int MAX_MESSAGES = 5;
+        final CountDownLatch startLatch = new CountDownLatch(1);
+
+        final ExecutorService executor = Executors.newSingleThreadExecutor();
+        try {
+            final Future<List<String>> futureMessages =
+                    executor.submit(new Callable<List<String>>() {
+
+                        @Override
+                        public List<String> call() throws Exception {
+                            final EventInput eventInput = target(App.ROOT_PATH).register(SseFeature.class)
+                                    .request().get(EventInput.class);
+
+                            startLatch.countDown();
+
+                            final List<String> messages = new ArrayList<String>(MAX_MESSAGES);
+                            try {
+                                for (int i = 0; i < MAX_MESSAGES; i++) {
+                                    InboundEvent event = eventInput.read();
+                                    messages.add(event.readData());
+                                }
+                            } finally {
+                                if (eventInput != null) {
+                                    eventInput.close();
+                                }
+                            }
+
+                            return messages;
+                        }
+                    });
+
+            Assert.assertTrue("Waiting for receiver thread to start has timed out.",
+                    startLatch.await(15000, TimeUnit.SECONDS));
+
+            for (int i = 0; i < MAX_MESSAGES; i++) {
+                target(App.ROOT_PATH).request().post(Entity.text("message " + i));
+            }
+
+            int i = 0;
+            for (String message : futureMessages.get(50, TimeUnit.SECONDS)) {
+                Assert.assertThat("Unexpected SSE event data value.", message, equalTo("message " + i++));
+            }
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testInboundEventReaderMultiple() throws Exception {
+        for (int i = 0; i != COUNT; i++) {
+            testInboundEventReader();
+        }
+
+        System.gc();
+        closeLatch.await(15_000, TimeUnit.MILLISECONDS);
+        // One ClientConfig is on the Client
+        // + COUNT of them is created by .register(SseFeature.class)
+        Assert.assertEquals(COUNT + 1, atomicInteger.get());
+        Assert.assertEquals(0, closeLatch.getCount());
+    }
+
+
+
+    public static class ClientRuntimeCloseVerifier implements ClientLifecycleListener {
+
+        @Override
+        public void onInit() {
+            atomicInteger.incrementAndGet();
+        }
+
+        @Override
+        public void onClose() {
+            closeLatch.countDown();
+        }
+    }
+}
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 85cb602..5ac0218 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -85,6 +85,7 @@
         <module>jersey-4003</module>
         <module>jersey-4099</module>
         <module>jersey-4321</module>
+        <module>jersey-4507</module>
         <module>jetty-response-close</module>
         <module>microprofile</module>
         <module>portability-jersey-1</module>