blob: f7792f1971bfea6ab7b8709c9537ae5eb94e39e4 [file] [log] [blame]
/*
* Copyright (c) 2017, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.jersey.client;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.ClientRequestFilter;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.RxInvokerProvider;
import javax.ws.rs.client.SyncInvoker;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Configuration;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.Provider;
import org.glassfish.jersey.client.spi.Connector;
import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;
import org.glassfish.jersey.spi.ExecutorServiceProvider;
import org.hamcrest.core.AllOf;
import org.hamcrest.core.StringContains;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Sanity test for {@link Invocation.Builder#rx()} methods.
*
* @author Pavel Bucek
*/
public class ClientRxTest {
private static final ExecutorService EXECUTOR_SERVICE = new ClientRxExecutorServiceWrapper(
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("rxTest-%d").build())
);
private final Client CLIENT;
private final Client CLIENT_WITH_EXECUTOR;
public ClientRxTest() {
CLIENT = ClientBuilder.newClient();
CLIENT_WITH_EXECUTOR = ClientBuilder.newBuilder().executorService(EXECUTOR_SERVICE).build();
}
@Rule
public ExpectedException thrown = ExpectedException.none();
@After
public void afterTest() {
CLIENT.close();
CLIENT_WITH_EXECUTOR.close();
}
@AfterClass
public static void afterClass() {
EXECUTOR_SERVICE.shutdownNow();
}
@Test
public void testRxInvoker() {
// explicit register is not necessary, but it can be used.
CLIENT.register(TestRxInvokerProvider.class, RxInvokerProvider.class);
String s = target(CLIENT).request().rx(TestRxInvoker.class).get();
assertTrue("Provided RxInvoker was not used.", s.startsWith("rxTestInvoker"));
}
@Test
public void testRxInvokerWithExecutor() {
// implicit register (not saying that the contract is RxInvokerProvider).
String s = target(CLIENT_WITH_EXECUTOR).register(TestRxInvokerProvider.class).request().rx(TestRxInvoker.class).get();
assertTrue("Provided RxInvoker was not used.", s.startsWith("rxTestInvoker"));
assertTrue("Executor Service was not passed to RxInvoker", s.contains("rxTest-"));
}
@Test
public void testDefaultRxInvokerWithExecutor() throws ExecutionException, InterruptedException {
AtomicReference<String> threadName = new AtomicReference<>();
ClientRequestFilter threadFilter = (f) -> { threadName.set(Thread.currentThread().getName()); };
ClientRequestFilter abortFilter = (f) -> { f.abortWith(Response.ok().build()); };
try (Response r = target(CLIENT_WITH_EXECUTOR)
.register(threadFilter, 100)
.register(abortFilter, 200)
.request().rx().get().toCompletableFuture().get()) {
assertEquals(200, r.getStatus());
assertTrue("Executor Service was not passed to RxInvoker", threadName.get().contains("rxTest-"));
}
}
@Test
public void testRxInvokerWithExecutorServiceProvider() {
AtomicReference<String> threadName = new AtomicReference<>();
String s = target(CLIENT)
.register(TestRxInvokerProvider.class, 200)
.register(TestExecutorServiceProvider.class)
.request().rx(TestRxInvoker.class).get();
assertTrue("Provided RxInvoker was not used.", s.startsWith("rxTestInvoker"));
assertTrue("Executor Service was not passed to RxInvoker", s.contains("rxTest-"));
}
@Test
public void testDefaultRxInvokerWithExecutorServiceProvider() throws ExecutionException, InterruptedException {
AtomicReference<String> threadName = new AtomicReference<>();
ClientRequestFilter threadFilter = (f) -> { threadName.set(Thread.currentThread().getName()); };
ClientRequestFilter abortFilter = (f) -> { f.abortWith(Response.ok().build()); };
try (Response r = target(CLIENT)
.register(threadFilter, 100)
.register(abortFilter, 200)
.register(TestExecutorServiceProvider.class)
.request().rx().get().toCompletableFuture().get()) {
assertEquals(200, r.getStatus());
assertTrue("Executor Service was not passed to RxInvoker", threadName.get().contains("rxTest-"));
}
}
@Test
public void testRxInvokerInvalid() {
Invocation.Builder request = target(CLIENT).request();
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(AllOf.allOf(new StringContains("null"), new StringContains("clazz")));
request.rx(null).get();
}
@Test
public void testRxInvokerNotRegistered() {
Invocation.Builder request = target(CLIENT).request();
thrown.expect(IllegalStateException.class);
thrown.expectMessage(AllOf.allOf(
new StringContains("TestRxInvoker"),
new StringContains("not registered"),
new StringContains("RxInvokerProvider")));
request.rx(TestRxInvoker.class).get();
}
@Test
public void testConnectorIsReusedWhenRx() throws ExecutionException, InterruptedException {
final AtomicInteger atomicInteger = new AtomicInteger(0);
HttpUrlConnectorProvider provider = new HttpUrlConnectorProvider() {
@Override
public Connector getConnector(Client client, Configuration config) {
atomicInteger.incrementAndGet();
return super.getConnector(client, config);
}
};
ClientConfig clientConfig = new ClientConfig();
clientConfig.connectorProvider(provider);
ClientRequestFilter abortFilter = (f) -> { f.abortWith(Response.ok().build()); };
Client client = ClientBuilder.newClient(clientConfig).register(abortFilter);
AtomicReference<String> threadName = new AtomicReference<>();
for (int cnt = 0; cnt != 5; cnt++) {
try (Response r = target(client)
.request().rx().get().toCompletableFuture().get()) {
assertEquals(200, r.getStatus());
assertEquals(1, atomicInteger.get());
}
}
}
private WebTarget target(Client client) {
// Uri is not relevant, the call won't be ever executed.
return client.target("http://localhost:9999");
}
@Provider
public static class TestRxInvokerProvider implements RxInvokerProvider<TestRxInvoker> {
@Override
public TestRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
return new TestRxInvoker(syncInvoker, executorService);
}
@Override
public boolean isProviderFor(Class<?> clazz) {
return TestRxInvoker.class.equals(clazz);
}
}
private static class TestRxInvoker extends AbstractRxInvoker<String> {
private TestRxInvoker(SyncInvoker syncInvoker, ExecutorService executor) {
super(syncInvoker, executor);
}
@Override
public <R> String method(String name, Entity<?> entity, Class<R> responseType) {
return "rxTestInvoker" + (getExecutorService() == null ? "" : " rxTest-");
}
@Override
public <R> String method(String name, Entity<?> entity, GenericType<R> responseType) {
return "rxTestInvoker" + (getExecutorService() == null ? "" : " rxTest-");
}
}
private static class TestExecutorServiceProvider implements ExecutorServiceProvider {
@Override
public ExecutorService getExecutorService() {
return EXECUTOR_SERVICE;
}
@Override
public void dispose(ExecutorService executorService) {
//@After
}
}
// -----------------------------------------------------------------------------------------------------
@Test
public void testRxInvokerWithPriorityExecutorServiceProvider() {
AtomicReference<String> threadName = new AtomicReference<>();
String s = target(CLIENT)
.register(PriorityTestRxInvokerProvider.class)
.register(TestExecutorServiceProvider.class)
.register(PriorityTestExecutorServiceProvider.class)
.request().rx(PriorityTestRxInvoker.class).get();
assertTrue("Provided RxInvoker was not used.", s.startsWith("PriorityTestRxInvoker"));
assertTrue("@ClientAsyncExecutor Executor Service was not passed to RxInvoker", s.contains("TRUE"));
}
@ClientAsyncExecutor
private static class PriorityTestExecutorServiceProvider extends TestExecutorServiceProvider {
@Override
public ExecutorService getExecutorService() {
return new ClientRxExecutorServiceWrapper(EXECUTOR_SERVICE) {
//new class
};
}
}
@Provider
public static class PriorityTestRxInvokerProvider implements RxInvokerProvider<PriorityTestRxInvoker> {
@Override
public PriorityTestRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
return new PriorityTestRxInvoker(syncInvoker, executorService);
}
@Override
public boolean isProviderFor(Class<?> clazz) {
return PriorityTestRxInvoker.class.equals(clazz);
}
}
private static class PriorityTestRxInvoker extends AbstractRxInvoker<String> {
private PriorityTestRxInvoker(SyncInvoker syncInvoker, ExecutorService executor) {
super(syncInvoker, executor);
}
@Override
public <R> String method(String name, Entity<?> entity, Class<R> responseType) {
return "PriorityTestRxInvoker " + (getExecutorService() != null
&& !ClientRxExecutorServiceWrapper.class.equals(getExecutorService().getClass())
&& ClientRxExecutorServiceWrapper.class.isInstance(getExecutorService()) ? "TRUE" : "FALSE");
}
@Override
public <R> String method(String name, Entity<?> entity, GenericType<R> responseType) {
return method(null, null, (Class<?>) null);
}
}
// -----------------------------------------------------------------------------------------------------
/**
* Wrap the executor service to distinguish the executor service obtained from the Injection Manager by class name
*/
private static class ClientRxExecutorServiceWrapper implements ExecutorService {
private final ExecutorService executorService;
private ClientRxExecutorServiceWrapper(ExecutorService executorService) {
this.executorService = executorService;
}
@Override
public void shutdown() {
executorService.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return executorService.shutdownNow();
}
@Override
public boolean isShutdown() {
return executorService.isShutdown();
}
@Override
public boolean isTerminated() {
return executorService.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executorService.awaitTermination(timeout, unit);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return executorService.submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return executorService.submit(task, result);
}
@Override
public Future<?> submit(Runnable task) {
return executorService.submit(task);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return executorService.invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command) {
executorService.execute(command);
}
}
}