Do not create a connector multiple times for each rx() call
Priority is to use @ClientAsyncExecutor annotated ExecutorServiceProvider

Signed-off-by: jansupol <jan.supol@oracle.com>
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java b/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java
index 0c9453f..f6a48d7 100644
--- a/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java
+++ b/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2011, 2020 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2011, 2021 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -19,6 +19,7 @@
 import java.lang.reflect.Type;
 import java.net.URI;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.CancellationException;
@@ -61,10 +62,14 @@
 import org.glassfish.jersey.client.internal.ClientResponseProcessingException;
 import org.glassfish.jersey.client.internal.LocalizationMessages;
 import org.glassfish.jersey.internal.MapPropertiesDelegate;
+import org.glassfish.jersey.internal.inject.Bindings;
+import org.glassfish.jersey.internal.inject.DisposableSupplier;
 import org.glassfish.jersey.internal.inject.Providers;
+import org.glassfish.jersey.internal.inject.ServiceHolder;
 import org.glassfish.jersey.internal.util.Producer;
 import org.glassfish.jersey.internal.util.PropertiesHelper;
 import org.glassfish.jersey.internal.util.ReflectionHelper;
+import org.glassfish.jersey.process.internal.ExecutorProviders;
 import org.glassfish.jersey.process.internal.RequestScope;
 import org.glassfish.jersey.spi.ExecutorServiceProvider;
 
@@ -474,7 +479,7 @@
                 if (configured == null) {
                     final ExecutorService provided = executorService();
                     if (provided != null) {
-                        request().getClientConfig().executorService(provided);
+                        ((ClientConfig) request().getConfiguration()).executorService(provided);
                     }
                 }
                 return (T) new JerseyCompletionStageRxInvoker(this);
@@ -498,9 +503,36 @@
                 return result;
             }
 
-            return this.requestContext.getInjectionManager()
-                    .getInstance(ExecutorServiceProvider.class)
-                    .getExecutorService();
+            final List<ServiceHolder<ExecutorServiceProvider>> serviceHolders =
+                    this.requestContext.getInjectionManager().getAllServiceHolders(ExecutorServiceProvider.class);
+
+            BestServiceHolder best = serviceHolders.stream()
+                    .map(BestServiceHolder::new).sorted((a, b) -> a.isBetterThen(b) ? -1 : 1).findFirst().get();
+
+            return best.provider.getExecutorService();
+        }
+
+        /*
+         * Priority goes to: 1) user async
+         *                   2) user nonasync
+         *                   3) default async
+         */
+        private static final class BestServiceHolder {
+            private final ExecutorServiceProvider provider;
+            private final int value;
+
+            private BestServiceHolder(ServiceHolder<ExecutorServiceProvider> holder) {
+                provider = holder.getInstance();
+                boolean isDefault = DefaultClientAsyncExecutorProvider.class.equals(holder.getImplementationClass())
+                        || ClientExecutorProvidersConfigurator.ClientExecutorServiceProvider.class
+                                .equals(holder.getImplementationClass());
+                boolean isAsync = holder.getImplementationClass().getAnnotation(ClientAsyncExecutor.class) != null;
+                value = 10 * (isDefault ? 0 : 1) + (isAsync ? 1 : 0);
+            }
+
+            public boolean isBetterThen(BestServiceHolder other) {
+                return this.value > other.value;
+            }
         }
 
         /**
diff --git a/core-client/src/test/java/org/glassfish/jersey/client/ClientRxTest.java b/core-client/src/test/java/org/glassfish/jersey/client/ClientRxTest.java
index 15b83ac..f7792f1 100644
--- a/core-client/src/test/java/org/glassfish/jersey/client/ClientRxTest.java
+++ b/core-client/src/test/java/org/glassfish/jersey/client/ClientRxTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2017, 2021 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -16,9 +16,16 @@
 
 package org.glassfish.jersey.client;
 
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.ws.rs.client.Client;
@@ -29,10 +36,12 @@
 import javax.ws.rs.client.RxInvokerProvider;
 import javax.ws.rs.client.SyncInvoker;
 import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Configuration;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.ext.Provider;
 
+import org.glassfish.jersey.client.spi.Connector;
 import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;
 
 import org.glassfish.jersey.spi.ExecutorServiceProvider;
@@ -54,8 +63,9 @@
  */
 public class ClientRxTest {
 
-    private static final ExecutorService EXECUTOR_SERVICE =
-            Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("rxTest-%d").build());
+    private static final ExecutorService EXECUTOR_SERVICE = new ClientRxExecutorServiceWrapper(
+            Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("rxTest-%d").build())
+    );
 
     private final Client CLIENT;
     private final Client CLIENT_WITH_EXECUTOR;
@@ -160,6 +170,35 @@
         request.rx(TestRxInvoker.class).get();
     }
 
+    @Test
+    public void testConnectorIsReusedWhenRx() throws ExecutionException, InterruptedException {
+        final AtomicInteger atomicInteger = new AtomicInteger(0);
+        HttpUrlConnectorProvider provider = new HttpUrlConnectorProvider() {
+            @Override
+            public Connector getConnector(Client client, Configuration config) {
+                atomicInteger.incrementAndGet();
+                return super.getConnector(client, config);
+            }
+        };
+
+        ClientConfig clientConfig = new ClientConfig();
+        clientConfig.connectorProvider(provider);
+
+        ClientRequestFilter abortFilter = (f) -> { f.abortWith(Response.ok().build()); };
+        Client client = ClientBuilder.newClient(clientConfig).register(abortFilter);
+
+        AtomicReference<String> threadName = new AtomicReference<>();
+        for (int cnt = 0; cnt != 5; cnt++) {
+            try (Response r = target(client)
+                    .request().rx().get().toCompletableFuture().get()) {
+
+                assertEquals(200, r.getStatus());
+                assertEquals(1, atomicInteger.get());
+            }
+        }
+
+    }
+
     private WebTarget target(Client client) {
         // Uri is not relevant, the call won't be ever executed.
         return client.target("http://localhost:9999");
@@ -207,4 +246,141 @@
             //@After
         }
     }
+
+    // -----------------------------------------------------------------------------------------------------
+
+    @Test
+    public void testRxInvokerWithPriorityExecutorServiceProvider() {
+        AtomicReference<String> threadName = new AtomicReference<>();
+        String s = target(CLIENT)
+                .register(PriorityTestRxInvokerProvider.class)
+                .register(TestExecutorServiceProvider.class)
+                .register(PriorityTestExecutorServiceProvider.class)
+                .request().rx(PriorityTestRxInvoker.class).get();
+
+        assertTrue("Provided RxInvoker was not used.", s.startsWith("PriorityTestRxInvoker"));
+        assertTrue("@ClientAsyncExecutor Executor Service was not passed to RxInvoker", s.contains("TRUE"));
+    }
+
+    @ClientAsyncExecutor
+    private static class PriorityTestExecutorServiceProvider extends TestExecutorServiceProvider {
+        @Override
+        public ExecutorService getExecutorService() {
+            return new ClientRxExecutorServiceWrapper(EXECUTOR_SERVICE) {
+                //new class
+            };
+        }
+    }
+
+    @Provider
+    public static class PriorityTestRxInvokerProvider implements RxInvokerProvider<PriorityTestRxInvoker> {
+        @Override
+        public PriorityTestRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
+            return new PriorityTestRxInvoker(syncInvoker, executorService);
+        }
+
+        @Override
+        public boolean isProviderFor(Class<?> clazz) {
+            return PriorityTestRxInvoker.class.equals(clazz);
+        }
+    }
+
+    private static class PriorityTestRxInvoker extends AbstractRxInvoker<String> {
+
+        private PriorityTestRxInvoker(SyncInvoker syncInvoker, ExecutorService executor) {
+            super(syncInvoker, executor);
+        }
+
+        @Override
+        public <R> String method(String name, Entity<?> entity, Class<R> responseType) {
+            return "PriorityTestRxInvoker " + (getExecutorService() != null
+                    && !ClientRxExecutorServiceWrapper.class.equals(getExecutorService().getClass())
+                    && ClientRxExecutorServiceWrapper.class.isInstance(getExecutorService()) ? "TRUE" : "FALSE");
+        }
+
+        @Override
+        public <R> String method(String name, Entity<?> entity, GenericType<R> responseType) {
+            return method(null, null, (Class<?>) null);
+        }
+    }
+
+    // -----------------------------------------------------------------------------------------------------
+
+    /**
+     * Wrap the executor service to distinguish the executor service obtained from the Injection Manager by class name
+     */
+    private static class ClientRxExecutorServiceWrapper implements ExecutorService {
+        private final ExecutorService executorService;
+
+        private ClientRxExecutorServiceWrapper(ExecutorService executorService) {
+            this.executorService = executorService;
+        }
+
+        @Override
+        public void shutdown() {
+            executorService.shutdown();
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return executorService.shutdownNow();
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return executorService.isShutdown();
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return executorService.isTerminated();
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            return executorService.awaitTermination(timeout, unit);
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            return executorService.submit(task);
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            return executorService.submit(task, result);
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            return executorService.submit(task);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+            return executorService.invokeAll(tasks);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+                throws InterruptedException {
+            return invokeAll(tasks, timeout, unit);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+            return invokeAny(tasks);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException, TimeoutException {
+            return invokeAny(tasks, timeout, unit);
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            executorService.execute(command);
+        }
+    }
 }