blob: 39117a7a3da292386cfd765011bcd34648222bfc [file] [log] [blame]
/*
* Copyright (c) 2002, 2018 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package connector;
import java.lang.reflect.Method;
import java.util.Iterator;
import jakarta.resource.spi.ActivationSpec;
import jakarta.resource.spi.BootstrapContext;
import jakarta.resource.spi.XATerminator;
import jakarta.resource.spi.endpoint.MessageEndpoint;
import jakarta.resource.spi.endpoint.MessageEndpointFactory;
import jakarta.resource.spi.UnavailableException;
import jakarta.resource.spi.work.Work;
import jakarta.resource.spi.work.WorkManager;
import jakarta.resource.spi.work.ExecutionContext;
import javax.transaction.xa.Xid;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
/**
*
* @author Qingqing Ouyang
*/
public class WorkDispatcher implements Work {
protected boolean stop = false;
protected String id;
protected MessageEndpointFactory factory;
protected BootstrapContext ctx;
protected ActivationSpec spec;
protected WorkManager wm;
protected XATerminator xa;
public WorkDispatcher(
String id,
BootstrapContext ctx,
MessageEndpointFactory factory,
ActivationSpec spec) {
this.id = id;
this.ctx = ctx;
this.factory = factory;
this.spec = spec;
this.wm = ctx.getWorkManager();
this.xa = ctx.getXATerminator();
}
public void run() {
debug("ENTER...");
try {
synchronized (Controls.readyLock) {
debug("WAIT...");
Controls.readyLock.wait();
if (stop) {
return;
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
debug("Running...");
//try 3 times to create endpoint (in case of failure)
for (int i = 0; i < 1; i++) {
try {
Method onMessage = getOnMessageMethod();
System.out.println("isDeliveryTransacted = " +
factory.isDeliveryTransacted(onMessage));
if (!factory.isDeliveryTransacted(onMessage)) {
//MessageEndpoint ep = factory.createEndpoint(null);
//DeliveryWork d = new DeliveryWork("NO_TX", ep);
//wm.doWork(d, 0, null, null);
} else {
//MessageEndpoint ep = factory.createEndpoint(null);
MessageEndpoint ep = factory.createEndpoint(new FakeXAResource());
int numOfMessages = 5;
//importing transaction
//write/commit
ExecutionContext ec = startTx();
debug("Start TX - " + ec.getXid());
DeliveryWork w =
new DeliveryWork(ep, numOfMessages, "WRITE");
wm.doWork(w, 0, ec, null);
xa.commit(ec.getXid(), true);
debug("DONE WRITE TO DB");
Controls.expectedResults = numOfMessages;
notifyAndWait();
//delete/rollback
ec = startTx();
debug("Start TX - " + ec.getXid());
w = new DeliveryWork(ep, numOfMessages, "DELETE");
wm.doWork(w, 0, ec, null);
xa.rollback(ec.getXid());
debug("DONE ROLLBACK FROM DB");
Controls.expectedResults = numOfMessages;
notifyAndWait();
//delete/commit
ec = startTx();
debug("Start TX - " + ec.getXid());
w = new DeliveryWork(ep, numOfMessages, "DELETE");
wm.doWork(w, 0, ec, null);
xa.commit(ec.getXid(), true);
debug("DONE DELETE FROM DB");
Controls.expectedResults = 0;
notifyAndWait();
//write/commit
ec = startTx();
debug("Start TX - " + ec.getXid());
w = new DeliveryWork(ep, numOfMessages, "WRITE");
wm.doWork(w, 0, ec, null);
xa.commit(ec.getXid(), true);
debug("DONE WRITE TO DB");
Controls.expectedResults = numOfMessages;
notifyAndWait();
//delete/commit
ec = startTx();
debug("Start TX - " + ec.getXid());
w = new DeliveryWork(ep, numOfMessages, "DELETE");
wm.doWork(w, 0, ec, null);
xa.commit(ec.getXid(), true);
debug("DONE DELETE FROM DB");
Controls.expectedResults = 0;
notifyAndWait();
//write multiple times using doWork/commit
ec = startTx();
debug("Start TX - " + ec.getXid());
w = new DeliveryWork(ep, 1, "WRITE", true);
wm.doWork(w, 0, ec, null);
wm.doWork(w, 0, ec, null);
wm.doWork(w, 0, ec, null);
xa.commit(ec.getXid(), true);
debug("DONE WRITE TO DB");
Controls.expectedResults = 3;
notifyAndWait();
//write multiple times using doWork/rollback
ec = startTx();
debug("Start TX - " + ec.getXid());
w = new DeliveryWork(ep, 1, "WRITE", true);
wm.doWork(w, 0, ec, null);
wm.doWork(w, 0, ec, null);
wm.doWork(w, 0, ec, null);
xa.rollback(ec.getXid());
debug("DONE WRITE TO DB");
Controls.expectedResults = 3;
notifyAndWait();
ec = startTx();
debug("Start TX - " + ec.getXid());
//write multiple times using doWork/rollback
w = new DeliveryWork(ep, 2, "WRITE", true);
wm.doWork(w, 0, ec, null);
wm.doWork(w, 0, ec, null);
wm.doWork(w, 0, ec, null);
if (XAResource.XA_OK == xa.prepare(ec.getXid())) {
xa.commit(ec.getXid(), false);
debug("XA PREPARE/COMMIT. DONE WRITE TO DB ");
Controls.expectedResults = 9;
notifyAndWait();
} else {
xa.rollback(ec.getXid());
debug("XA PREPARE UNSUCCESSFUL. DONE ROLLBACK");
Controls.expectedResults = 3;
notifyAndWait();
}
//delete all.
ec = startTx();
debug("Start TX - " + ec.getXid());
w = new DeliveryWork(ep, 1, "DELETE_ALL");
wm.doWork(w, 0, ec, null);
xa.commit(ec.getXid(), true);
debug("DONE DELETE ALL FROM DB");
Controls.expectedResults = 0;
notifyAndWait();
done();
}
break;
} catch (UnavailableException ex) {
//ex.printStackTrace();
System.out.println("WorkDispatcher["+id+"] Endpoint Unavailable");
try {
Thread.currentThread().sleep(3*1000); //3 seconds
} catch (Exception e) {
e.printStackTrace();
}
} catch (XAException ex) {
ex.printStackTrace();
System.out.println("ERROR CODE = " + ex.errorCode);
done();
break;
} catch (Exception ex) {
System.out.println("Exception while executing work");
ex.printStackTrace();
done();
break;
}finally{
done();
}
}
debug("LEAVE...");
}
public void release() {}
public void stop() {
this.stop = true;
}
public String toString() {
return id;
}
public Method getOnMessageMethod() {
Method onMessageMethod = null;
try {
Class msgListenerClass = connector.MyMessageListener.class;
Class[] paramTypes = { java.lang.String.class };
onMessageMethod =
msgListenerClass.getMethod("onMessage", paramTypes);
} catch (NoSuchMethodException ex) {
ex.printStackTrace();
}
return onMessageMethod;
}
protected ExecutionContext startTx() {
ExecutionContext ec = new ExecutionContext();
try {
Xid xid = new XID();
ec.setXid(xid);
ec.setTransactionTimeout(5*1000); //5 seconds
} catch (Exception ex) {
ex.printStackTrace();
}
return ec;
}
protected void notifyAndWait() {
try {
//Sleep for 5 seconds
//Thread.currentThread().sleep(5*1000);
synchronized(Controls.readyLock) {
//Notify the client to check the results
Controls.readyLock.notifyAll();
//Wait until results are verified by the client
Controls.readyLock.wait();
if (stop) {
return;
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
protected void done() {
try {
synchronized(Controls.readyLock) {
Controls.done = true;
//Notify the client to check the results
Controls.readyLock.notifyAll();
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
protected void debug(String mesg) {
System.out.println("Dispatcher[" + id + "] --> " + mesg);
}
}