Enable to use AsyncInvoker in Rx client
Signed-off-by: Jan Supol <jan.supol@oracle.com>
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/AbstractNonSyncInvoker.java b/core-client/src/main/java/org/glassfish/jersey/client/AbstractNonSyncInvoker.java
new file mode 100644
index 0000000..4f9fdf5
--- /dev/null
+++ b/core-client/src/main/java/org/glassfish/jersey/client/AbstractNonSyncInvoker.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.client;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.GenericType;
+
+/* package */ abstract class AbstractNonSyncInvoker<T> {
+
+ public T get() {
+ return method("GET");
+ }
+
+ public <R> T get(final Class<R> responseType) {
+ return method("GET", responseType);
+ }
+
+ public <R> T get(final GenericType<R> responseType) {
+ return method("GET", responseType);
+ }
+
+ public T put(final Entity<?> entity) {
+ return method("PUT", entity);
+ }
+
+ public <R> T put(final Entity<?> entity, final Class<R> clazz) {
+ return method("PUT", entity, clazz);
+ }
+
+ public <R> T put(final Entity<?> entity, final GenericType<R> type) {
+ return method("PUT", entity, type);
+ }
+
+ public T post(final Entity<?> entity) {
+ return method("POST", entity);
+ }
+
+ public <R> T post(final Entity<?> entity, final Class<R> clazz) {
+ return method("POST", entity, clazz);
+ }
+
+ public <R> T post(final Entity<?> entity, final GenericType<R> type) {
+ return method("POST", entity, type);
+ }
+
+ public T delete() {
+ return method("DELETE");
+ }
+
+ public <R> T delete(final Class<R> responseType) {
+ return method("DELETE", responseType);
+ }
+
+ public <R> T delete(final GenericType<R> responseType) {
+ return method("DELETE", responseType);
+ }
+
+ public T head() {
+ return method("HEAD");
+ }
+
+ public T options() {
+ return method("OPTIONS");
+ }
+
+ public <R> T options(final Class<R> responseType) {
+ return method("OPTIONS", responseType);
+ }
+
+ public <R> T options(final GenericType<R> responseType) {
+ return method("OPTIONS", responseType);
+ }
+
+ public T trace() {
+ return method("TRACE");
+ }
+
+ public <R> T trace(final Class<R> responseType) {
+ return method("TRACE", responseType);
+ }
+
+ public <R> T trace(final GenericType<R> responseType) {
+ return method("TRACE", responseType);
+ }
+
+ public abstract T method(final String name);
+
+ public abstract <R> T method(final String name, final Class<R> responseType);
+
+ public abstract <R> T method(final String name, final GenericType<R> responseType);
+
+ public abstract T method(final String name, final Entity<?> entity);
+
+ public abstract <R> T method(final String name, final Entity<?> entity, final Class<R> responseType);
+
+ public abstract <R> T method(final String name, final Entity<?> entity, final GenericType<R> responseType);
+}
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/AbstractRxInvoker.java b/core-client/src/main/java/org/glassfish/jersey/client/AbstractRxInvoker.java
index deb4bd0..aa85a6b 100644
--- a/core-client/src/main/java/org/glassfish/jersey/client/AbstractRxInvoker.java
+++ b/core-client/src/main/java/org/glassfish/jersey/client/AbstractRxInvoker.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -16,6 +16,8 @@
package org.glassfish.jersey.client;
+import org.glassfish.jersey.client.internal.LocalizationMessages;
+
import java.util.concurrent.ExecutorService;
import javax.ws.rs.client.Entity;
@@ -25,7 +27,7 @@
import javax.ws.rs.core.Response;
/**
- * Default implementation of {@link javax.ws.rs.client.rx.RxInvoker reactive invoker}. Extensions of this class are
+ * Default implementation of {@link javax.ws.rs.client.RxInvoker reactive invoker}. Extensions of this class are
* supposed to implement {@link #method(String, Entity, Class)} and
* {@link #method(String, Entity, GenericType)} methods to which implementations of the rest
* of the methods from the contract delegate to.
@@ -35,16 +37,15 @@
* @author Michal Gajdos
* @since 2.26
*/
-public abstract class AbstractRxInvoker<T> implements RxInvoker<T> {
+public abstract class AbstractRxInvoker<T> extends AbstractNonSyncInvoker<T> implements RxInvoker<T> {
- private final SyncInvoker syncInvoker;
private final ExecutorService executorService;
+ private final SyncInvoker syncInvoker;
public AbstractRxInvoker(final SyncInvoker syncInvoker, final ExecutorService executor) {
if (syncInvoker == null) {
- throw new IllegalArgumentException("Invocation builder cannot be null.");
+ throw new IllegalArgumentException(LocalizationMessages.NULL_INVOCATION_BUILDER());
}
-
this.syncInvoker = syncInvoker;
this.executorService = executor;
}
@@ -68,101 +69,6 @@
}
@Override
- public T get() {
- return method("GET");
- }
-
- @Override
- public <R> T get(final Class<R> responseType) {
- return method("GET", responseType);
- }
-
- @Override
- public <R> T get(final GenericType<R> responseType) {
- return method("GET", responseType);
- }
-
- @Override
- public T put(final Entity<?> entity) {
- return method("PUT", entity);
- }
-
- @Override
- public <R> T put(final Entity<?> entity, final Class<R> clazz) {
- return method("PUT", entity, clazz);
- }
-
- @Override
- public <R> T put(final Entity<?> entity, final GenericType<R> type) {
- return method("PUT", entity, type);
- }
-
- @Override
- public T post(final Entity<?> entity) {
- return method("POST", entity);
- }
-
- @Override
- public <R> T post(final Entity<?> entity, final Class<R> clazz) {
- return method("POST", entity, clazz);
- }
-
- @Override
- public <R> T post(final Entity<?> entity, final GenericType<R> type) {
- return method("POST", entity, type);
- }
-
- @Override
- public T delete() {
- return method("DELETE");
- }
-
- @Override
- public <R> T delete(final Class<R> responseType) {
- return method("DELETE", responseType);
- }
-
- @Override
- public <R> T delete(final GenericType<R> responseType) {
- return method("DELETE", responseType);
- }
-
- @Override
- public T head() {
- return method("HEAD");
- }
-
- @Override
- public T options() {
- return method("OPTIONS");
- }
-
- @Override
- public <R> T options(final Class<R> responseType) {
- return method("OPTIONS", responseType);
- }
-
- @Override
- public <R> T options(final GenericType<R> responseType) {
- return method("OPTIONS", responseType);
- }
-
- @Override
- public T trace() {
- return method("TRACE");
- }
-
- @Override
- public <R> T trace(final Class<R> responseType) {
- return method("TRACE", responseType);
- }
-
- @Override
- public <R> T trace(final GenericType<R> responseType) {
- return method("TRACE", responseType);
- }
-
- @Override
public T method(final String name) {
return method(name, Response.class);
}
@@ -181,4 +87,5 @@
public T method(final String name, final Entity<?> entity) {
return method(name, entity, Response.class);
}
+
}
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/CompletableFutureAsyncInvoker.java b/core-client/src/main/java/org/glassfish/jersey/client/CompletableFutureAsyncInvoker.java
new file mode 100644
index 0000000..aa05c9d
--- /dev/null
+++ b/core-client/src/main/java/org/glassfish/jersey/client/CompletableFutureAsyncInvoker.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.client;
+
+import javax.ws.rs.client.AsyncInvoker;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+import java.util.concurrent.CompletableFuture;
+
+/*package*/ abstract class CompletableFutureAsyncInvoker
+ extends AbstractNonSyncInvoker<CompletableFuture> implements AsyncInvoker {
+ @Override
+ public <R> CompletableFuture<R> get(InvocationCallback<R> callback) {
+ return method("GET", callback);
+ }
+
+ @Override
+ public <R> CompletableFuture<R> put(Entity<?> entity, InvocationCallback<R> callback) {
+ return method("PUT", entity, callback);
+ }
+
+ @Override
+ public <R> CompletableFuture<R> post(Entity<?> entity, InvocationCallback<R> callback) {
+ return method("POST", entity, callback);
+ }
+
+ @Override
+ public <R> CompletableFuture<R> delete(InvocationCallback<R> callback) {
+ return method("DELETE", callback);
+ }
+
+ @Override
+ public CompletableFuture<Response> head(InvocationCallback<Response> callback) {
+ return method("HEAD", callback);
+ }
+
+ @Override
+ public <R> CompletableFuture<R> options(InvocationCallback<R> callback) {
+ return method("OPTIONS", callback);
+ }
+
+ @Override
+ public <R> CompletableFuture<R> trace(InvocationCallback<R> callback) {
+ return method("TRACE", callback);
+ }
+
+ @Override
+ public abstract <R> CompletableFuture<R> method(String name, InvocationCallback<R> callback);
+
+ @Override
+ public abstract <R> CompletableFuture<R> method(String name, Entity<?> entity, InvocationCallback<R> callback);
+
+ @Override
+ public abstract <R> CompletableFuture method(String name, Entity<?> entity, Class<R> responseType);
+
+ @Override
+ public abstract <R> CompletableFuture method(String name, Entity<?> entity, GenericType<R> responseType);
+}
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/JerseyCompletionStageRxInvoker.java b/core-client/src/main/java/org/glassfish/jersey/client/JerseyCompletionStageRxInvoker.java
index 28531ed..4579b48 100644
--- a/core-client/src/main/java/org/glassfish/jersey/client/JerseyCompletionStageRxInvoker.java
+++ b/core-client/src/main/java/org/glassfish/jersey/client/JerseyCompletionStageRxInvoker.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -16,42 +16,20 @@
package org.glassfish.jersey.client;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ExecutorService;
-
import javax.ws.rs.client.CompletionStageRxInvoker;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.Invocation;
-import javax.ws.rs.core.GenericType;
/**
* Implementation of Reactive Invoker for {@code CompletionStage}.
*
+ * This class allows for using {@link javax.ws.rs.client.InvocationCallback} in
+ * {@link javax.ws.rs.client.Invocation.Builder#rx(Class) Invocation.Builder.rx(JerseyCompletionStageRxInvoker.class)}
+ * requests.
+ *
* @author Michal Gajdos
* @since 2.26
*/
-public class JerseyCompletionStageRxInvoker extends AbstractRxInvoker<CompletionStage> implements CompletionStageRxInvoker {
-
- JerseyCompletionStageRxInvoker(Invocation.Builder builder, ExecutorService executor) {
- super(builder, executor);
+public class JerseyCompletionStageRxInvoker extends JerseyInvocation.AsyncInvoker implements CompletionStageRxInvoker {
+ JerseyCompletionStageRxInvoker(JerseyInvocation.Builder builder) {
+ super(builder);
}
-
- @Override
- public <T> CompletionStage<T> method(final String name, final Entity<?> entity, final Class<T> responseType) {
- final ExecutorService executorService = getExecutorService();
-
- return executorService == null
- ? CompletableFuture.supplyAsync(() -> getSyncInvoker().method(name, entity, responseType))
- : CompletableFuture.supplyAsync(() -> getSyncInvoker().method(name, entity, responseType), executorService);
- }
-
- @Override
- public <T> CompletionStage<T> method(final String name, final Entity<?> entity, final GenericType<T> responseType) {
- final ExecutorService executorService = getExecutorService();
-
- return executorService == null
- ? CompletableFuture.supplyAsync(() -> getSyncInvoker().method(name, entity, responseType))
- : CompletableFuture.supplyAsync(() -> getSyncInvoker().method(name, entity, responseType), executorService);
- }
-}
+}
\ No newline at end of file
diff --git a/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java b/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java
index d887a44..474b600 100644
--- a/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java
+++ b/core-client/src/main/java/org/glassfish/jersey/client/JerseyInvocation.java
@@ -451,11 +451,14 @@
@Override
public CompletionStageRxInvoker rx() {
- return new JerseyCompletionStageRxInvoker(this, executorService());
+ return new JerseyCompletionStageRxInvoker(this);
}
@Override
public <T extends RxInvoker> T rx(Class<T> clazz) {
+ if (clazz == JerseyCompletionStageRxInvoker.class) {
+ return (T) new JerseyCompletionStageRxInvoker(this);
+ }
return createRxInvoker(clazz, executorService());
}
@@ -520,207 +523,77 @@
}
}
- private static class AsyncInvoker implements javax.ws.rs.client.AsyncInvoker {
+ /* package */ static class AsyncInvoker extends CompletableFutureAsyncInvoker implements javax.ws.rs.client.AsyncInvoker {
private final JerseyInvocation.Builder builder;
- private AsyncInvoker(final JerseyInvocation.Builder request) {
+ /* package */ AsyncInvoker(final JerseyInvocation.Builder request) {
this.builder = request;
this.builder.requestContext.setAsynchronous(true);
}
@Override
- public Future<Response> get() {
- return method("GET");
- }
-
- @Override
- public <T> Future<T> get(final Class<T> responseType) {
- return method("GET", responseType);
- }
-
- @Override
- public <T> Future<T> get(final GenericType<T> responseType) {
- return method("GET", responseType);
- }
-
- @Override
- public <T> Future<T> get(final InvocationCallback<T> callback) {
- return method("GET", callback);
- }
-
- @Override
- public Future<Response> put(final Entity<?> entity) {
- return method("PUT", entity);
- }
-
- @Override
- public <T> Future<T> put(final Entity<?> entity, final Class<T> responseType) {
- return method("PUT", entity, responseType);
- }
-
- @Override
- public <T> Future<T> put(final Entity<?> entity, final GenericType<T> responseType) {
- return method("PUT", entity, responseType);
- }
-
- @Override
- public <T> Future<T> put(final Entity<?> entity, final InvocationCallback<T> callback) {
- return method("PUT", entity, callback);
- }
-
- @Override
- public Future<Response> post(final Entity<?> entity) {
- return method("POST", entity);
- }
-
- @Override
- public <T> Future<T> post(final Entity<?> entity, final Class<T> responseType) {
- return method("POST", entity, responseType);
- }
-
- @Override
- public <T> Future<T> post(final Entity<?> entity, final GenericType<T> responseType) {
- return method("POST", entity, responseType);
- }
-
- @Override
- public <T> Future<T> post(final Entity<?> entity, final InvocationCallback<T> callback) {
- return method("POST", entity, callback);
- }
-
- @Override
- public Future<Response> delete() {
- return method("DELETE");
- }
-
- @Override
- public <T> Future<T> delete(final Class<T> responseType) {
- return method("DELETE", responseType);
- }
-
- @Override
- public <T> Future<T> delete(final GenericType<T> responseType) {
- return method("DELETE", responseType);
- }
-
- @Override
- public <T> Future<T> delete(final InvocationCallback<T> callback) {
- return method("DELETE", callback);
- }
-
- @Override
- public Future<Response> head() {
- return method("HEAD");
- }
-
- @Override
- public Future<Response> head(final InvocationCallback<Response> callback) {
- return method("HEAD", callback);
- }
-
- @Override
- public Future<Response> options() {
- return method("OPTIONS");
- }
-
- @Override
- public <T> Future<T> options(final Class<T> responseType) {
- return method("OPTIONS", responseType);
- }
-
- @Override
- public <T> Future<T> options(final GenericType<T> responseType) {
- return method("OPTIONS", responseType);
- }
-
- @Override
- public <T> Future<T> options(final InvocationCallback<T> callback) {
- return method("OPTIONS", callback);
- }
-
- @Override
- public Future<Response> trace() {
- return method("TRACE");
- }
-
- @Override
- public <T> Future<T> trace(final Class<T> responseType) {
- return method("TRACE", responseType);
- }
-
- @Override
- public <T> Future<T> trace(final GenericType<T> responseType) {
- return method("TRACE", responseType);
- }
-
- @Override
- public <T> Future<T> trace(final InvocationCallback<T> callback) {
- return method("TRACE", callback);
- }
-
- @Override
- public Future<Response> method(final String name) {
+ public CompletableFuture<Response> method(final String name) {
builder.requestContext.setMethod(name);
- return new JerseyInvocation(builder).submit();
+ return (CompletableFuture<Response>) new JerseyInvocation(builder).submit();
}
@Override
- public <T> Future<T> method(final String name, final Class<T> responseType) {
+ public <T> CompletableFuture<T> method(final String name, final Class<T> responseType) {
if (responseType == null) {
throw new IllegalArgumentException(LocalizationMessages.RESPONSE_TYPE_IS_NULL());
}
builder.requestContext.setMethod(name);
- return new JerseyInvocation(builder).submit(responseType);
+ return (CompletableFuture<T>) new JerseyInvocation(builder).submit(responseType);
}
@Override
- public <T> Future<T> method(final String name, final GenericType<T> responseType) {
+ public <T> CompletableFuture<T> method(final String name, final GenericType<T> responseType) {
if (responseType == null) {
throw new IllegalArgumentException(LocalizationMessages.RESPONSE_TYPE_IS_NULL());
}
builder.requestContext.setMethod(name);
- return new JerseyInvocation(builder).submit(responseType);
+ return (CompletableFuture<T>) new JerseyInvocation(builder).submit(responseType);
}
@Override
- public <T> Future<T> method(final String name, final InvocationCallback<T> callback) {
+ public <T> CompletableFuture<T> method(final String name, final InvocationCallback<T> callback) {
builder.requestContext.setMethod(name);
- return new JerseyInvocation(builder).submit(callback);
+ return (CompletableFuture<T>) new JerseyInvocation(builder).submit(callback);
}
@Override
- public Future<Response> method(final String name, final Entity<?> entity) {
+ public CompletableFuture<Response> method(final String name, final Entity<?> entity) {
builder.requestContext.setMethod(name);
builder.storeEntity(entity);
- return new JerseyInvocation(builder).submit();
+ return (CompletableFuture<Response>) new JerseyInvocation(builder).submit();
}
@Override
- public <T> Future<T> method(final String name, final Entity<?> entity, final Class<T> responseType) {
+ public <T> CompletableFuture<T> method(final String name, final Entity<?> entity, final Class<T> responseType) {
if (responseType == null) {
throw new IllegalArgumentException(LocalizationMessages.RESPONSE_TYPE_IS_NULL());
}
builder.requestContext.setMethod(name);
builder.storeEntity(entity);
- return new JerseyInvocation(builder).submit(responseType);
+ return (CompletableFuture<T>) new JerseyInvocation(builder).submit(responseType);
}
@Override
- public <T> Future<T> method(final String name, final Entity<?> entity, final GenericType<T> responseType) {
+ public <T> CompletableFuture<T> method(final String name, final Entity<?> entity, final GenericType<T> responseType) {
if (responseType == null) {
throw new IllegalArgumentException(LocalizationMessages.RESPONSE_TYPE_IS_NULL());
}
builder.requestContext.setMethod(name);
builder.storeEntity(entity);
- return new JerseyInvocation(builder).submit(responseType);
+ return (CompletableFuture<T>) new JerseyInvocation(builder).submit(responseType);
}
@Override
- public <T> Future<T> method(final String name, final Entity<?> entity, final InvocationCallback<T> callback) {
+ public <T> CompletableFuture<T> method(final String name, final Entity<?> entity, final InvocationCallback<T> callback) {
builder.requestContext.setMethod(name);
builder.storeEntity(entity);
- return new JerseyInvocation(builder).submit(callback);
+ return (CompletableFuture<T>) new JerseyInvocation(builder).submit(callback);
}
}
diff --git a/core-client/src/main/resources/org/glassfish/jersey/client/internal/localization.properties b/core-client/src/main/resources/org/glassfish/jersey/client/internal/localization.properties
index 1a70cb3..ef68af0 100644
--- a/core-client/src/main/resources/org/glassfish/jersey/client/internal/localization.properties
+++ b/core-client/src/main/resources/org/glassfish/jersey/client/internal/localization.properties
@@ -51,6 +51,7 @@
null.connector.provider=ConnectorProvider must not be set to null.
null.executor.service=ExecutorService must not be set to null.
null.input.parameter=Input method parameter {0} must not be null.
+null.invocation.builder=Invocation builder must not be null.
null.scheduled.executor.service=ScheduledExecutorService must not be set to null.
null.ssl.context=Custom client SSL context, if set, must not be null.
null.keystore=Custom key store, if set, must not be null.
diff --git a/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/ClientExecutorTest.java b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/ClientExecutorTest.java
index 81442ef..e1230cd 100644
--- a/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/ClientExecutorTest.java
+++ b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/ClientExecutorTest.java
@@ -43,11 +43,11 @@
import org.glassfish.jersey.spi.ExecutorServiceProvider;
import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertNotNull;
/**
* @author Pavel Bucek
@@ -70,52 +70,31 @@
return new ResourceConfig(ClientExecutorTestResource.class);
}
- private volatile String threadName = null;
+ private volatile StringBuilder threadName;
+ private volatile CountDownLatch latch;
- @Test
- public void testCustomExecutorRx() throws InterruptedException {
-
- ExecutorService clientExecutor =
- Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("ClientExecutor-%d").build());
-
- Client client = ClientBuilder.newBuilder().executorService(clientExecutor).build();
-
- final CountDownLatch latch = new CountDownLatch(1);
-
- testRx(client, latch);
-
- latch.await(3, TimeUnit.SECONDS);
-
- assertNotNull(threadName);
- assertThat(threadName, containsString("ClientExecutor"));
+ @Before
+ public void setUpThreadNameHolder() {
+ threadName = new StringBuilder();
+ latch = new CountDownLatch(1);
}
@Test
public void testDefaultExecutorRx() throws InterruptedException {
-
Client client = ClientBuilder.newClient();
-
- final CountDownLatch latch = new CountDownLatch(1);
-
testRx(client, latch);
latch.await(3, TimeUnit.SECONDS);
- assertNotNull(threadName);
- assertThat(threadName, containsString("jersey-client-async-executor"));
+ assertThat(threadName.toString(), containsString("jersey-client-async-executor"));
}
@Test
public void testDefaultExecutorAsync() throws InterruptedException {
-
Client client = ClientBuilder.newClient();
-
- final CountDownLatch latch = new CountDownLatch(1);
-
testAsync(client, latch);
latch.await(3, TimeUnit.SECONDS);
- assertNotNull(threadName);
- assertThat(threadName, containsString("jersey-client-async-executor"));
+ assertThat(threadName.toString(), containsString("jersey-client-async-executor"));
}
@Test
@@ -123,37 +102,28 @@
Client client = ClientBuilder.newClient();
client.register(MyExecutorProvider.class);
- final CountDownLatch latch = new CountDownLatch(1);
-
testAsync(client, latch);
latch.await(3, TimeUnit.SECONDS);
- assertNotNull(threadName);
- assertThat(threadName, containsString("MyExecutorProvider"));
+ assertThat(threadName.toString(), containsString("MyExecutorProvider"));
+ }
+
+ @Test
+ public void testCustomExecutorRx() throws InterruptedException {
+ ExecutorService clientExecutor =
+ Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("ClientExecutor-%d").build());
+
+ Client client = ClientBuilder.newBuilder().executorService(clientExecutor).build();
+ testRx(client, latch);
+
+ latch.await(3, TimeUnit.SECONDS);
+ assertThat(threadName.toString(), containsString("ClientExecutor"));
}
private void testRx(Client client, CountDownLatch latch) {
client.target(UriBuilder.fromUri(getBaseUri()).path("ClientExecutorTest"))
- .register(new MessageBodyReader<ClientExecutorTest>() {
- @Override
- public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
- return true;
- }
-
- @Override
- public ClientExecutorTest readFrom(Class<ClientExecutorTest> type, Type genericType,
- Annotation[] annotations,
- MediaType mediaType, MultivaluedMap<String, String> httpHeaders,
- InputStream entityStream) throws IOException, WebApplicationException {
-
- ClientExecutorTest.this.threadName = Thread.currentThread().getName();
- latch.countDown();
-
- return new ClientExecutorTest();
-
- }
- })
+ .register(new ClientExecutorTestReader(threadName, latch))
.request()
.rx()
.get(ClientExecutorTest.class);
@@ -161,25 +131,7 @@
private void testAsync(Client client, CountDownLatch latch) {
client.target(UriBuilder.fromUri(getBaseUri()).path("ClientExecutorTest"))
- .register(new MessageBodyReader<ClientExecutorTest>() {
- @Override
- public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
- return true;
- }
-
- @Override
- public ClientExecutorTest readFrom(Class<ClientExecutorTest> type, Type genericType,
- Annotation[] annotations,
- MediaType mediaType, MultivaluedMap<String, String> httpHeaders,
- InputStream entityStream) throws IOException, WebApplicationException {
-
- ClientExecutorTest.this.threadName = Thread.currentThread().getName();
- latch.countDown();
-
- return new ClientExecutorTest();
-
- }
- })
+ .register(new ClientExecutorTestReader(threadName, latch))
.request()
.async()
.get(ClientExecutorTest.class);
@@ -202,4 +154,32 @@
executorService.shutdown();
}
}
+
+ public static class ClientExecutorTestReader implements MessageBodyReader<ClientExecutorTest> {
+ private final StringBuilder threadName;
+ private final CountDownLatch countDown;
+
+ public ClientExecutorTestReader(StringBuilder threadName, CountDownLatch countDown) {
+ this.threadName = threadName;
+ this.countDown = countDown;
+ }
+
+ @Override
+ public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
+ return true;
+ }
+
+ @Override
+ public ClientExecutorTest readFrom(Class<ClientExecutorTest> type, Type genericType,
+ Annotation[] annotations,
+ MediaType mediaType, MultivaluedMap<String, String> httpHeaders,
+ InputStream entityStream) throws IOException, WebApplicationException {
+
+ threadName.append(Thread.currentThread().getName());
+ countDown.countDown();
+
+ return new ClientExecutorTest();
+
+ }
+ }
}
diff --git a/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/InvocationBuilderRxTest.java b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/InvocationBuilderRxTest.java
new file mode 100644
index 0000000..6c43026
--- /dev/null
+++ b/tests/e2e-client/src/test/java/org/glassfish/jersey/tests/e2e/client/InvocationBuilderRxTest.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.tests.e2e.client;
+
+import org.glassfish.jersey.client.JerseyCompletionStageRxInvoker;
+import org.glassfish.jersey.client.JerseyInvocation;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class InvocationBuilderRxTest extends JerseyTest {
+
+ private static final int AWAIT_TIME = 2_000;
+ private static final String ECHO = "ECHO";
+ private CountDownLatch latch;
+
+ @Before
+ public void beforeEach() {
+ latch = new CountDownLatch(1);
+ }
+
+ @Override
+ protected Application configure() {
+ return new ResourceConfig().register(Resource.class);
+ }
+
+ @Path("/")
+ public static class Resource {
+ @Path("/double")
+ @POST
+ public String doublePost(String content) {
+ return content + content;
+ }
+
+ @Path("/single")
+ @GET
+ public String doublePost() {
+ return ECHO;
+ }
+ }
+
+ public static class TestCallback implements InvocationCallback<String> {
+ private final CountDownLatch latch;
+
+ public TestCallback(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public void completed(String s) {
+ latch.countDown();
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ throw new RuntimeException(throwable);
+ }
+ }
+
+ @Test
+ public void testMethodCallback() throws InterruptedException, ExecutionException, TimeoutException {
+ CompletableFuture<String> future = target("/single").request().rx(JerseyCompletionStageRxInvoker.class)
+ .method("GET", new TestCallback(latch));
+ latch.await(AWAIT_TIME, TimeUnit.MILLISECONDS);
+
+ Assert.assertEquals(ECHO, future.get());
+ Assert.assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ public void testMethodEntityCallback() throws InterruptedException, ExecutionException, TimeoutException {
+ CompletableFuture<String> future = target("/double").request().rx(JerseyCompletionStageRxInvoker.class)
+ .method("POST", Entity.entity(ECHO, MediaType.TEXT_PLAIN), new TestCallback(latch));
+ latch.await(AWAIT_TIME, TimeUnit.MILLISECONDS);
+
+ Assert.assertEquals(ECHO + ECHO, future.get());
+ Assert.assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ public void testMethodEntityResponseType() throws InterruptedException, ExecutionException, TimeoutException {
+ CompletionStage<Response> stage = target("/double").request().rx()
+ .method("POST", Entity.entity(ECHO, MediaType.TEXT_PLAIN), Response.class);
+
+ try (Response response = stage.toCompletableFuture().get()) {
+ Assert.assertEquals(ECHO + ECHO, response.readEntity(String.class));
+ }
+ }
+
+ @Test
+ public void testMethodEntityGenericType() throws InterruptedException, ExecutionException, TimeoutException {
+ CompletionStage<Response> stage = target("/double").request().rx()
+ .method("POST", Entity.entity(ECHO, MediaType.TEXT_PLAIN), new GenericType<Response>(){});
+
+ try (Response response = stage.toCompletableFuture().get()) {
+ Assert.assertEquals(ECHO + ECHO, response.readEntity(String.class));
+ }
+ }
+}