blob: 3b4a18be44f848e56968ea619303b4f93c7a09bc [file] [log] [blame]
/*
* Copyright (c) 2007, 2018 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.jvnet.hk2.config;
import java.util.*;
import java.util.logging.Logger;
import java.util.logging.Level;
import java.beans.PropertyChangeEvent;
import java.lang.reflect.Proxy;
import java.util.concurrent.*;
import org.jvnet.hk2.annotations.Optional;
import org.jvnet.hk2.annotations.Service;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Provider;
import org.glassfish.hk2.api.PostConstruct;
import org.glassfish.hk2.api.PreDestroy;
/**
* Transactions is a singleton service that receives transaction notifications and dispatch these
* notifications asynchronously to listeners.
*
* @author Jerome Dochez
*/
@Service
public final class Transactions implements PostConstruct, PreDestroy {
// each transaction listener has a notification pump.
private final List<Provider<ListenerNotifier<TransactionListener, ?, Void>>> listeners =
new ArrayList<Provider<ListenerNotifier<TransactionListener, ?, Void>>>();
private final Map<Class, Set<ConfigListener>> typeListeners = new HashMap<Class, Set<ConfigListener>>();
@Inject @Named("transactions-executor") @Optional
private ExecutorService executor;
// all configuration listeners are notified though one notifier.
private final Provider<ConfigListenerNotifier> configListenerNotifier = new Provider<ConfigListenerNotifier>() {
private final ConfigListenerNotifier configListenerNotifier = new ConfigListenerNotifier();
private final CountDownLatch initialized = new CountDownLatch(1);
public ConfigListenerNotifier get() {
//synchronized(initialized) {
if (initialized.getCount()>0) {
configListenerNotifier.start();
initialized.countDown();
}
return configListenerNotifier;
//}
}
};
public void postConstruct() {
if (executor==null) {
executor = Executors.newCachedThreadPool();
}
}
public void preDestroy() {
for (Provider<ListenerNotifier<TransactionListener, ?, Void>> listener : listeners) {
listener.get().stop();
}
configListenerNotifier.get().stop();
executor.shutdown();
}
/**
* Abstract notification pump, it adds jobs to the queue and process them in the order
* jobs were added.
*
* Jobs are just a wrapper for events of type <U> and a notification mechanism for
* completion notification
*
* @param <T> type of listener interface
* @param <U> type of events the listener methods are expecting
* @param <V> return type of the listener interface methods.
*/
private abstract class Notifier<T, U, V> {
private final BlockingQueue<FutureTask> pendingJobs = new ArrayBlockingQueue<FutureTask>(50);
private CountDownLatch latch = new CountDownLatch(1);
/**
* Creates the task that will notify the listeners of a particular job.
* @param job contains the specifics of the notification like the events that need to be notified.
* @return a task that can be run and return an optional value.
*/
protected abstract FutureTask<V> prepare(final Job<T, U, V> job);
/**
* Adds a job to the notification pump. This job will be processed as soon as all other pending
* jobs have completed.
*
* @param job new notification job.
* @return a future on the return value.
*/
public Future<V> add(final Job<T, U, V> job) {
// NOTE that this is put() which blocks, *not* add() which will not block and will
// throw an IllegalStateException if the queue is full.
if (latch.getCount()==0) {
throw new RuntimeException("TransactionListener is inactive, yet jobs are published to it");
}
try {
pendingJobs.put(prepare(job));
} catch (InterruptedException e ) {
throw new RuntimeException(e);
}
return null;
}
protected void start() {
executor.submit(new Runnable() {
public void run() {
while (latch.getCount()>0) {
try {
final FutureTask job = pendingJobs.take();
// when listeners start a transaction themselves, several jobs try to get published
// simultaneously so we cannot block the pump while delivering the messages.
executor.submit(new Runnable() {
public void run() {
job.run();
}
});
}
catch (InterruptedException e) {
// do anything here?
}
}
}
});
}
void stop() {
latch.countDown();
// last event to force the close
pendingJobs.add(prepare(new Job<T, U, V>(null, null) {
public V process(T target) {
return null;
}
}));
}
}
/**
* Default listener notification pump. One thread per listener, jobs processed in
* the order it was received.
*
* @param <T> type of listener interface
* @param <U> type of events the listener methods are expecting
* @param <V> return type of the listener interface methods.
*/
private class ListenerNotifier<T,U,V> extends Notifier<T,U,V> {
final T listener;
public ListenerNotifier(T listener) {
this.listener = listener;
}
protected FutureTask<V> prepare(final Job<T, U, V> job) {
return new FutureTask<V>(new Callable<V>() {
public V call() throws Exception {
try {
if ( job.mEvents.size() != 0 ) {
return job.process(listener);
}
} finally {
job.releaseLatch();
}
return null;
}
});
}
}
/**
* Configuration listener notification pump. All Listeners are notified within their own thread, only on thread
* takes care of the job pump.
*
*/
private class ConfigListenerNotifier extends Notifier<ConfigListener,PropertyChangeEvent,UnprocessedChangeEvents> {
protected FutureTask<UnprocessedChangeEvents>
prepare(final Job<ConfigListener, PropertyChangeEvent, UnprocessedChangeEvents> job) {
// first, calculate the recipients.
final Set<ConfigListener> configListeners = new HashSet<ConfigListener>();
if (job.mEvents != null) {
for (PropertyChangeEvent event : job.mEvents) {
final Dom dom = (Dom) ((ConfigView) Proxy.getInvocationHandler(event.getSource())).getMasterView();
configListeners.addAll(dom.getListeners());
// we also notify the parent.
if (dom.parent()!=null) {
configListeners.addAll(dom.parent().getListeners());
}
// and now, notify all listeners for the changed types.
Set<ConfigListener> listeners = typeListeners.get(dom.getProxyType());
if (listeners!=null) {
configListeners.addAll(listeners);
}
// we need to check if elements are removed to ensure
// the typed listeners are notified.
if (event.getNewValue()==null) {
Object oldValue = event.getOldValue();
if (oldValue instanceof ConfigBeanProxy) {
Dom domOldValue = Dom.unwrap((ConfigBeanProxy) oldValue);
Set<ConfigListener> typedListeners = typeListeners.get(domOldValue.<ConfigBeanProxy>getProxyType());
if (typedListeners!=null) {
configListeners.addAll(typedListeners);
}
}
}
}
}
return new FutureTask<UnprocessedChangeEvents>(new Callable<UnprocessedChangeEvents>() {
public UnprocessedChangeEvents call() throws Exception {
try {
// temporary structure to store our future notifications with pointer to the
// originator config listener
Map<Future<UnprocessedChangeEvents>, ConfigListener> futures = new HashMap<Future<UnprocessedChangeEvents>, ConfigListener>();
for (final ConfigListener listener : configListeners) {
// each listener is notified in it's own thread.
futures.put(executor.submit(new Callable<UnprocessedChangeEvents>() {
public UnprocessedChangeEvents call() throws Exception {
UnprocessedChangeEvents e = job.process(listener);
return e;
}
}), listener);
}
List<UnprocessedChangeEvents> unprocessed = new ArrayList<UnprocessedChangeEvents>(futures.size());
for (Map.Entry<Future<UnprocessedChangeEvents>, ConfigListener> futureEntry : futures.entrySet()) {
Future<UnprocessedChangeEvents> future = futureEntry.getKey();
try {
UnprocessedChangeEvents result = future.get(200, TimeUnit.SECONDS);
if (result!=null && result.getUnprocessed()!=null && result.getUnprocessed().size()>0) {
for (UnprocessedChangeEvent event : result.getUnprocessed()) {
Logger.getAnonymousLogger().log(Level.WARNING, "Unprocessed event : " + event);
}
unprocessed.add(result);
}
} catch (InterruptedException e) {
Logger.getAnonymousLogger().log(Level.SEVERE, "Config Listener notification got interrupted", e);
} catch (ExecutionException e) {
Logger.getAnonymousLogger().log(Level.SEVERE, "Config Listener notification got interrupted", e);
} catch (TimeoutException e) {
ConfigListener cl = futureEntry.getValue();
Logger.getAnonymousLogger().log(Level.SEVERE, "Config Listener " + cl.getClass() + " notification took too long", e);
}
}
// all notification have been successful, I just need to notify the unprocessed events.
// note these events are always synchronous so far.
if (!unprocessed.isEmpty()) {
Job unprocessedJob = new UnprocessedEventsJob(unprocessed, null);
for (Provider<ListenerNotifier<TransactionListener, ?, Void>> listener : Transactions.this.listeners) {
listener.get().add(unprocessedJob);
}
}
} finally {
job.releaseLatch();
}
// in theory I should aggregate my unprocessed events but nobody cares.
return null;
}
});
}
}
/**
A job contains an optional CountdownLatch so that a caller can learn when the
transaction has "cleared" by blocking until that time.
*/
private abstract static class Job<T,U,V> {
private final CountDownLatch mLatch;
protected final List<U> mEvents;
public Job(List<U> events, final CountDownLatch latch ) {
mLatch = latch;
mEvents = events;
}
public void waitForLatch() throws InterruptedException {
if ( mLatch != null ) {
mLatch.await();
}
}
public void releaseLatch() {
if ( mLatch != null ) {
mLatch.countDown();
}
}
public abstract V process(T target);
}
private static class TransactionListenerJob extends Job<TransactionListener, PropertyChangeEvent, Void> {
public TransactionListenerJob(List<PropertyChangeEvent> events, CountDownLatch latch) {
super(events, latch);
}
@Override
public Void process(TransactionListener listener) {
try {
listener.transactionCommited(mEvents);
} catch(Exception e) {
e.printStackTrace();
}
return null;
}
}
private static class UnprocessedEventsJob extends Job<TransactionListener, UnprocessedChangeEvents, Void> {
public UnprocessedEventsJob(List<UnprocessedChangeEvents> events, CountDownLatch latch) {
super(events, latch);
}
@Override
public Void process(TransactionListener listener) {
try {
listener.unprocessedTransactedEvents(mEvents);
} catch(Exception e) {
e.printStackTrace();
}
return null;
}
}
private class ConfigListenerJob extends Job<ConfigListener, PropertyChangeEvent, UnprocessedChangeEvents> {
final PropertyChangeEvent[] eventsArray;
public ConfigListenerJob(List<PropertyChangeEvent> events, CountDownLatch latch) {
super(events, latch);
eventsArray = mEvents.toArray(new PropertyChangeEvent[mEvents.size()]);
}
public UnprocessedChangeEvents process(ConfigListener target) {
return target.changed(eventsArray);
}
}
/**
* adds a listener for a particular config type
* @param listenerType the config type
* @param listener the config listener
*/
public synchronized void addListenerForType(Class listenerType, ConfigListener listener) {
Set<ConfigListener> listeners = typeListeners.get(listenerType);
if (listeners==null) {
listeners = new HashSet<ConfigListener>();
typeListeners.put(listenerType, listeners);
}
listeners.add(listener);
}
/**
* removes a listener for a particular config type
*
* @param listenerType the config type
* @param listener the config listener
* @return true if the listener was removed successfully, false otherwise.
*/
public synchronized boolean removeListenerForType(Class listenerType, ConfigListener listener) {
Set<ConfigListener> listeners = typeListeners.get(listenerType);
if (listeners==null) {
return false;
}
return listeners.remove(listener);
}
/**
* add a new listener to all transaction events.
*
* @param listener to be added.
*/
public void addTransactionsListener(final TransactionListener listener) {
synchronized(listeners) {
listeners.add(new Provider<ListenerNotifier<TransactionListener, ?, Void>>() {
final ListenerNotifier<TransactionListener, PropertyChangeEvent, Void> tsListener = new ListenerNotifier<TransactionListener, PropertyChangeEvent, Void>(listener);
final CountDownLatch initialized = new CountDownLatch(1);
public ListenerNotifier<TransactionListener, PropertyChangeEvent, Void> get() {
//synchronized(initialized) {
if (initialized.getCount()>0) {
tsListener.start();
initialized.countDown();
}
//}
return tsListener;
}
});
}
}
/**
* Removes an existing listener for transaction events
* @param listener the registered listener
* @return true if the listener unregistration was successful
*/
public boolean removeTransactionsListener(TransactionListener listener) {
synchronized(listeners) {
for (Provider<ListenerNotifier<TransactionListener, ?, Void>> holder : listeners) {
ListenerNotifier info = holder.get();
if (info.listener==listener) {
info.stop();
return listeners.remove(holder);
}
}
}
return false;
}
public List<TransactionListener> currentListeners() {
synchronized(listeners) {
List<TransactionListener> l = new ArrayList<TransactionListener>();
for (Provider<ListenerNotifier<TransactionListener, ?, Void>> holder : listeners) {
ListenerNotifier<TransactionListener, ?, Void> info = holder.get();
l.add(info.listener);
}
return l;
}
}
/**
* Synchronous notification of a new transactional configuration change operation.
*
* @param events list of changes
*/
void addTransaction( final List<PropertyChangeEvent> events) {
addTransaction(events, true);
}
/**
* Notification of a new transaction completion
*
* @param events accumulated list of changes
* @param waitTillCleared synchronous semantics; wait until all change events are sent
*/
@SuppressWarnings("cast")
void addTransaction(
final List<PropertyChangeEvent> events,
final boolean waitTillCleared ) {
final List<ListenerNotifier<TransactionListener, ?, Void>> listInfos = new ArrayList<ListenerNotifier<TransactionListener, ?, Void>>();
for (Provider<ListenerNotifier<TransactionListener, ?, Void>> holder : listeners) {
ListenerNotifier<TransactionListener, ?, Void> info = holder.get();
listInfos.add(info);
}
// create a CountDownLatch to implement waiting for events to actually be sent
final Job<TransactionListener, ?, Void> job = new TransactionListenerJob( events,
waitTillCleared ? new CountDownLatch(listInfos.size()) : null);
final ConfigListenerJob configJob = new ConfigListenerJob(events,
waitTillCleared? new CountDownLatch(1):null);
// NOTE that this is put() which blocks, *not* add() which will not block and will
// throw an IllegalStateException if the queue is full.
try {
for (ListenerNotifier listener : listInfos) {
listener.add(job);
}
configListenerNotifier.get().add(configJob);
job.waitForLatch();
configJob.waitForLatch();
} catch (InterruptedException e ) {
throw new RuntimeException(e);
}
}
public void waitForDrain() {
// insert a dummy Job and block until is has been processed. This guarantees
// that all prior jobs have finished
addTransaction( new ArrayList<PropertyChangeEvent>(), true );
// at this point all prior transactions are guaranteed to have cleared
}
}