blob: 3b4a18be44f848e56968ea619303b4f93c7a09bc [file] [log] [blame]
* Copyright (c) 2007, 2018 Oracle and/or its affiliates. All rights reserved.
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
package org.jvnet.hk2.config;
import java.util.*;
import java.util.logging.Logger;
import java.util.logging.Level;
import java.beans.PropertyChangeEvent;
import java.lang.reflect.Proxy;
import java.util.concurrent.*;
import org.jvnet.hk2.annotations.Optional;
import org.jvnet.hk2.annotations.Service;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Provider;
import org.glassfish.hk2.api.PostConstruct;
import org.glassfish.hk2.api.PreDestroy;
* Transactions is a singleton service that receives transaction notifications and dispatch these
* notifications asynchronously to listeners.
* @author Jerome Dochez
public final class Transactions implements PostConstruct, PreDestroy {
// each transaction listener has a notification pump.
private final List<Provider<ListenerNotifier<TransactionListener, ?, Void>>> listeners =
new ArrayList<Provider<ListenerNotifier<TransactionListener, ?, Void>>>();
private final Map<Class, Set<ConfigListener>> typeListeners = new HashMap<Class, Set<ConfigListener>>();
@Inject @Named("transactions-executor") @Optional
private ExecutorService executor;
// all configuration listeners are notified though one notifier.
private final Provider<ConfigListenerNotifier> configListenerNotifier = new Provider<ConfigListenerNotifier>() {
private final ConfigListenerNotifier configListenerNotifier = new ConfigListenerNotifier();
private final CountDownLatch initialized = new CountDownLatch(1);
public ConfigListenerNotifier get() {
//synchronized(initialized) {
if (initialized.getCount()>0) {
return configListenerNotifier;
public void postConstruct() {
if (executor==null) {
executor = Executors.newCachedThreadPool();
public void preDestroy() {
for (Provider<ListenerNotifier<TransactionListener, ?, Void>> listener : listeners) {
* Abstract notification pump, it adds jobs to the queue and process them in the order
* jobs were added.
* Jobs are just a wrapper for events of type <U> and a notification mechanism for
* completion notification
* @param <T> type of listener interface
* @param <U> type of events the listener methods are expecting
* @param <V> return type of the listener interface methods.
private abstract class Notifier<T, U, V> {
private final BlockingQueue<FutureTask> pendingJobs = new ArrayBlockingQueue<FutureTask>(50);
private CountDownLatch latch = new CountDownLatch(1);
* Creates the task that will notify the listeners of a particular job.
* @param job contains the specifics of the notification like the events that need to be notified.
* @return a task that can be run and return an optional value.
protected abstract FutureTask<V> prepare(final Job<T, U, V> job);
* Adds a job to the notification pump. This job will be processed as soon as all other pending
* jobs have completed.
* @param job new notification job.
* @return a future on the return value.
public Future<V> add(final Job<T, U, V> job) {
// NOTE that this is put() which blocks, *not* add() which will not block and will
// throw an IllegalStateException if the queue is full.
if (latch.getCount()==0) {
throw new RuntimeException("TransactionListener is inactive, yet jobs are published to it");
try {
} catch (InterruptedException e ) {
throw new RuntimeException(e);
return null;
protected void start() {
executor.submit(new Runnable() {
public void run() {
while (latch.getCount()>0) {
try {
final FutureTask job = pendingJobs.take();
// when listeners start a transaction themselves, several jobs try to get published
// simultaneously so we cannot block the pump while delivering the messages.
executor.submit(new Runnable() {
public void run() {;
catch (InterruptedException e) {
// do anything here?
void stop() {
// last event to force the close
pendingJobs.add(prepare(new Job<T, U, V>(null, null) {
public V process(T target) {
return null;
* Default listener notification pump. One thread per listener, jobs processed in
* the order it was received.
* @param <T> type of listener interface
* @param <U> type of events the listener methods are expecting
* @param <V> return type of the listener interface methods.
private class ListenerNotifier<T,U,V> extends Notifier<T,U,V> {
final T listener;
public ListenerNotifier(T listener) {
this.listener = listener;
protected FutureTask<V> prepare(final Job<T, U, V> job) {
return new FutureTask<V>(new Callable<V>() {
public V call() throws Exception {
try {
if ( job.mEvents.size() != 0 ) {
return job.process(listener);
} finally {
return null;
* Configuration listener notification pump. All Listeners are notified within their own thread, only on thread
* takes care of the job pump.
private class ConfigListenerNotifier extends Notifier<ConfigListener,PropertyChangeEvent,UnprocessedChangeEvents> {
protected FutureTask<UnprocessedChangeEvents>
prepare(final Job<ConfigListener, PropertyChangeEvent, UnprocessedChangeEvents> job) {
// first, calculate the recipients.
final Set<ConfigListener> configListeners = new HashSet<ConfigListener>();
if (job.mEvents != null) {
for (PropertyChangeEvent event : job.mEvents) {
final Dom dom = (Dom) ((ConfigView) Proxy.getInvocationHandler(event.getSource())).getMasterView();
// we also notify the parent.
if (dom.parent()!=null) {
// and now, notify all listeners for the changed types.
Set<ConfigListener> listeners = typeListeners.get(dom.getProxyType());
if (listeners!=null) {
// we need to check if elements are removed to ensure
// the typed listeners are notified.
if (event.getNewValue()==null) {
Object oldValue = event.getOldValue();
if (oldValue instanceof ConfigBeanProxy) {
Dom domOldValue = Dom.unwrap((ConfigBeanProxy) oldValue);
Set<ConfigListener> typedListeners = typeListeners.get(domOldValue.<ConfigBeanProxy>getProxyType());
if (typedListeners!=null) {
return new FutureTask<UnprocessedChangeEvents>(new Callable<UnprocessedChangeEvents>() {
public UnprocessedChangeEvents call() throws Exception {
try {
// temporary structure to store our future notifications with pointer to the
// originator config listener
Map<Future<UnprocessedChangeEvents>, ConfigListener> futures = new HashMap<Future<UnprocessedChangeEvents>, ConfigListener>();
for (final ConfigListener listener : configListeners) {
// each listener is notified in it's own thread.
futures.put(executor.submit(new Callable<UnprocessedChangeEvents>() {
public UnprocessedChangeEvents call() throws Exception {
UnprocessedChangeEvents e = job.process(listener);
return e;
}), listener);
List<UnprocessedChangeEvents> unprocessed = new ArrayList<UnprocessedChangeEvents>(futures.size());
for (Map.Entry<Future<UnprocessedChangeEvents>, ConfigListener> futureEntry : futures.entrySet()) {
Future<UnprocessedChangeEvents> future = futureEntry.getKey();
try {
UnprocessedChangeEvents result = future.get(200, TimeUnit.SECONDS);
if (result!=null && result.getUnprocessed()!=null && result.getUnprocessed().size()>0) {
for (UnprocessedChangeEvent event : result.getUnprocessed()) {
Logger.getAnonymousLogger().log(Level.WARNING, "Unprocessed event : " + event);
} catch (InterruptedException e) {
Logger.getAnonymousLogger().log(Level.SEVERE, "Config Listener notification got interrupted", e);
} catch (ExecutionException e) {
Logger.getAnonymousLogger().log(Level.SEVERE, "Config Listener notification got interrupted", e);
} catch (TimeoutException e) {
ConfigListener cl = futureEntry.getValue();
Logger.getAnonymousLogger().log(Level.SEVERE, "Config Listener " + cl.getClass() + " notification took too long", e);
// all notification have been successful, I just need to notify the unprocessed events.
// note these events are always synchronous so far.
if (!unprocessed.isEmpty()) {
Job unprocessedJob = new UnprocessedEventsJob(unprocessed, null);
for (Provider<ListenerNotifier<TransactionListener, ?, Void>> listener : Transactions.this.listeners) {
} finally {
// in theory I should aggregate my unprocessed events but nobody cares.
return null;
A job contains an optional CountdownLatch so that a caller can learn when the
transaction has "cleared" by blocking until that time.
private abstract static class Job<T,U,V> {
private final CountDownLatch mLatch;
protected final List<U> mEvents;
public Job(List<U> events, final CountDownLatch latch ) {
mLatch = latch;
mEvents = events;
public void waitForLatch() throws InterruptedException {
if ( mLatch != null ) {
public void releaseLatch() {
if ( mLatch != null ) {
public abstract V process(T target);
private static class TransactionListenerJob extends Job<TransactionListener, PropertyChangeEvent, Void> {
public TransactionListenerJob(List<PropertyChangeEvent> events, CountDownLatch latch) {
super(events, latch);
public Void process(TransactionListener listener) {
try {
} catch(Exception e) {
return null;
private static class UnprocessedEventsJob extends Job<TransactionListener, UnprocessedChangeEvents, Void> {
public UnprocessedEventsJob(List<UnprocessedChangeEvents> events, CountDownLatch latch) {
super(events, latch);
public Void process(TransactionListener listener) {
try {
} catch(Exception e) {
return null;
private class ConfigListenerJob extends Job<ConfigListener, PropertyChangeEvent, UnprocessedChangeEvents> {
final PropertyChangeEvent[] eventsArray;
public ConfigListenerJob(List<PropertyChangeEvent> events, CountDownLatch latch) {
super(events, latch);
eventsArray = mEvents.toArray(new PropertyChangeEvent[mEvents.size()]);
public UnprocessedChangeEvents process(ConfigListener target) {
return target.changed(eventsArray);
* adds a listener for a particular config type
* @param listenerType the config type
* @param listener the config listener
public synchronized void addListenerForType(Class listenerType, ConfigListener listener) {
Set<ConfigListener> listeners = typeListeners.get(listenerType);
if (listeners==null) {
listeners = new HashSet<ConfigListener>();
typeListeners.put(listenerType, listeners);
* removes a listener for a particular config type
* @param listenerType the config type
* @param listener the config listener
* @return true if the listener was removed successfully, false otherwise.
public synchronized boolean removeListenerForType(Class listenerType, ConfigListener listener) {
Set<ConfigListener> listeners = typeListeners.get(listenerType);
if (listeners==null) {
return false;
return listeners.remove(listener);
* add a new listener to all transaction events.
* @param listener to be added.
public void addTransactionsListener(final TransactionListener listener) {
synchronized(listeners) {
listeners.add(new Provider<ListenerNotifier<TransactionListener, ?, Void>>() {
final ListenerNotifier<TransactionListener, PropertyChangeEvent, Void> tsListener = new ListenerNotifier<TransactionListener, PropertyChangeEvent, Void>(listener);
final CountDownLatch initialized = new CountDownLatch(1);
public ListenerNotifier<TransactionListener, PropertyChangeEvent, Void> get() {
//synchronized(initialized) {
if (initialized.getCount()>0) {
return tsListener;
* Removes an existing listener for transaction events
* @param listener the registered listener
* @return true if the listener unregistration was successful
public boolean removeTransactionsListener(TransactionListener listener) {
synchronized(listeners) {
for (Provider<ListenerNotifier<TransactionListener, ?, Void>> holder : listeners) {
ListenerNotifier info = holder.get();
if (info.listener==listener) {
return listeners.remove(holder);
return false;
public List<TransactionListener> currentListeners() {
synchronized(listeners) {
List<TransactionListener> l = new ArrayList<TransactionListener>();
for (Provider<ListenerNotifier<TransactionListener, ?, Void>> holder : listeners) {
ListenerNotifier<TransactionListener, ?, Void> info = holder.get();
return l;
* Synchronous notification of a new transactional configuration change operation.
* @param events list of changes
void addTransaction( final List<PropertyChangeEvent> events) {
addTransaction(events, true);
* Notification of a new transaction completion
* @param events accumulated list of changes
* @param waitTillCleared synchronous semantics; wait until all change events are sent
void addTransaction(
final List<PropertyChangeEvent> events,
final boolean waitTillCleared ) {
final List<ListenerNotifier<TransactionListener, ?, Void>> listInfos = new ArrayList<ListenerNotifier<TransactionListener, ?, Void>>();
for (Provider<ListenerNotifier<TransactionListener, ?, Void>> holder : listeners) {
ListenerNotifier<TransactionListener, ?, Void> info = holder.get();
// create a CountDownLatch to implement waiting for events to actually be sent
final Job<TransactionListener, ?, Void> job = new TransactionListenerJob( events,
waitTillCleared ? new CountDownLatch(listInfos.size()) : null);
final ConfigListenerJob configJob = new ConfigListenerJob(events,
waitTillCleared? new CountDownLatch(1):null);
// NOTE that this is put() which blocks, *not* add() which will not block and will
// throw an IllegalStateException if the queue is full.
try {
for (ListenerNotifier listener : listInfos) {
} catch (InterruptedException e ) {
throw new RuntimeException(e);
public void waitForDrain() {
// insert a dummy Job and block until is has been processed. This guarantees
// that all prior jobs have finished
addTransaction( new ArrayList<PropertyChangeEvent>(), true );
// at this point all prior transactions are guaranteed to have cleared